summaryrefslogtreecommitdiffstats
path: root/simple/simple-transport/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'simple/simple-transport/src/test')
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/MockSocket.java45
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/ServerBuffer.java75
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/SocketBufferTest.java86
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java92
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportTest.java51
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/StreamTransport.java66
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/TransportCursorTest.java83
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java404
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java269
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/trace/CompareQueueTest.java174
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/trace/MockTrace.java6
11 files changed, 1351 insertions, 0 deletions
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/MockSocket.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/MockSocket.java
new file mode 100644
index 0000000..bd1b582
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/MockSocket.java
@@ -0,0 +1,45 @@
+
+package org.simpleframework.transport;
+
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ssl.SSLEngine;
+
+import org.simpleframework.transport.trace.MockTrace;
+import org.simpleframework.transport.trace.Trace;
+
+public class MockSocket implements Socket {
+
+ private SocketChannel socket;
+ private SSLEngine engine;
+ private Map map;
+
+ public MockSocket(SocketChannel socket) {
+ this(socket, null);
+ }
+
+ public MockSocket(SocketChannel socket, SSLEngine engine) {
+ this.map = new HashMap();
+ this.engine = engine;
+ this.socket = socket;
+ }
+
+ public SSLEngine getEngine() {
+ return engine;
+ }
+
+ public SocketChannel getChannel() {
+ return socket;
+ }
+
+ public Map getAttributes() {
+ return map;
+ }
+
+ public Trace getTrace() {
+ return new MockTrace();
+ }
+}
+
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/ServerBuffer.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/ServerBuffer.java
new file mode 100644
index 0000000..34cdc0c
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/ServerBuffer.java
@@ -0,0 +1,75 @@
+package org.simpleframework.transport;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.util.concurrent.CountDownLatch;
+
+public class ServerBuffer extends Thread {
+
+ private ByteArrayOutputStream buffer;
+ private ServerSocket server;
+ private CountDownLatch latch;
+
+ public ServerBuffer() throws Exception {
+ this.buffer = new ByteArrayOutputStream();
+ this.latch = new CountDownLatch(1);
+ this.server = getSocket();
+ this.start();
+ }
+
+ public ByteArrayOutputStream getBuffer(){
+ return buffer;
+ }
+
+ public void awaitClose() throws Exception {
+ latch.await();
+ }
+
+ public int getPort() {
+ return server.getLocalPort();
+ }
+
+ private ServerSocket getSocket() throws Exception {
+ // Scan the ephemeral port range
+ for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket
+ try {
+ ServerSocket socket = new ServerSocket(i);
+ System.out.println("port=["+socket.getLocalPort()+"]");
+ return socket;
+ } catch(Exception e) {
+ Thread.sleep(200);
+ }
+ }
+ // Scan a second time for good measure, maybe something got freed up
+ for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket
+ try {
+ ServerSocket socket = new ServerSocket(i);
+ System.out.println("port=["+socket.getLocalPort()+"]");
+ return socket;
+ } catch(Exception e) {
+ Thread.sleep(200);
+ }
+ }
+ throw new IOException("Could not create a client socket");
+ }
+
+ public void run() {
+ try {
+ java.net.Socket socket = server.accept();
+ InputStream in = socket.getInputStream();
+ int count = 0;
+
+ while((count = in.read()) != -1) {
+ buffer.write(count);
+ System.err.write(count);
+ System.err.flush();
+ }
+ } catch(Exception e) {
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ }
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketBufferTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketBufferTest.java
new file mode 100644
index 0000000..a893f04
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketBufferTest.java
@@ -0,0 +1,86 @@
+package org.simpleframework.transport;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import junit.framework.TestCase;
+
+import org.simpleframework.transport.trace.MockTrace;
+import org.simpleframework.transport.trace.Trace;
+
+public class SocketBufferTest extends TestCase {
+
+
+ public void testBulkWrite() throws Exception {
+ ServerBuffer reader = new ServerBuffer();
+ SocketAddress address = new InetSocketAddress("localhost", reader.getPort());
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false); // underlying socket must be non-blocking
+ channel.connect(address);
+
+ while(!channel.finishConnect()) { // wait to finish connection
+ Thread.sleep(10);
+ };
+ Trace trace = new MockTrace();
+ SocketWrapper wrapper = new SocketWrapper(channel, trace);
+ SocketBuffer builder = new SocketBuffer(wrapper, 100, 4096);
+
+ for(int i = 0; i < 10000; i++){
+ ByteBuffer buf = ByteBuffer.wrap(("message-"+i+"\n").getBytes());
+
+ if(i > 18) {
+ System.err.println("FAIL......."+i);
+ }
+ if(!builder.write(buf)){
+ while(!builder.flush()) {
+ System.err.println("FLUSHING!!!");
+ Thread.sleep(1);
+ }
+ }
+ }
+ while(!builder.flush()) {
+ System.err.println("FLUSHING!!!");
+ }
+ builder.close();
+ reader.awaitClose();
+
+ String data = reader.getBuffer().toString();
+ String[] list = data.split("\\n");
+
+ for(int i = 0; i < 10000; i++){
+ String msg = list[i];
+ if(!msg.equals("message-"+i)) {
+ System.err.println(list[i]);
+ }
+ assertEquals("At index " + i + " value="+list[i] +" expect message-"+i, list[i], "message-"+i);
+ }
+ }
+
+ public void testSimpleWrite() throws Exception {
+ ServerBuffer reader = new ServerBuffer();
+ SocketAddress address = new InetSocketAddress("localhost", reader.getPort());
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false); // underlying socket must be non-blocking
+ channel.connect(address);
+
+ while(!channel.finishConnect()) { // wait to finish connection
+ Thread.sleep(10);
+ };
+ Trace trace = new MockTrace();
+ SocketWrapper wrapper = new SocketWrapper(channel, trace);
+ SocketBuffer builder = new SocketBuffer(wrapper, 100, 4096);
+
+ builder.write(ByteBuffer.wrap("hello there ".getBytes()));
+ builder.write(ByteBuffer.wrap("this ".getBytes()));
+ builder.write(ByteBuffer.wrap("is ".getBytes()));
+ builder.write(ByteBuffer.wrap("a ".getBytes()));
+ builder.write(ByteBuffer.wrap("test".getBytes()));
+ builder.flush();
+ builder.close();
+ reader.awaitClose();
+
+ assertEquals(reader.getBuffer().toString(), "hello there this is a test");
+ }
+}
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java
new file mode 100644
index 0000000..6654f31
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java
@@ -0,0 +1,92 @@
+package org.simpleframework.transport;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import junit.framework.TestCase;
+
+import org.simpleframework.common.thread.ConcurrentExecutor;
+import org.simpleframework.transport.reactor.ExecutorReactor;
+import org.simpleframework.transport.reactor.Reactor;
+import org.simpleframework.transport.trace.MockTrace;
+import org.simpleframework.transport.trace.Trace;
+
+public class SocketTransportPipeTest extends TestCase {
+
+ private static final int ITERATIONS = 100000;
+
+ public void testPipe() throws Exception {
+ ServerSocket server = new ServerSocket(0);
+ SocketAddress address = new InetSocketAddress("localhost", server.getLocalPort());
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false); // underlying socket must be non-blocking
+ channel.connect(address);
+ while(!channel.finishConnect()) { // wait to finish connection
+ Thread.sleep(10);
+ };
+ Trace trace = new MockTrace();
+ SocketWrapper wrapper = new SocketWrapper(channel, trace);
+ Executor executor = new ConcurrentExecutor(Runnable.class);
+ Reactor reactor = new ExecutorReactor(executor);
+ SocketTransport transport = new SocketTransport(wrapper,reactor);
+ java.net.Socket socket = server.accept();
+ final InputStream read = socket.getInputStream();
+ final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ final LinkedBlockingQueue<String> sent = new LinkedBlockingQueue<String>();
+ final LinkedBlockingQueue<String> received = new LinkedBlockingQueue<String>();
+ Thread thread = new Thread(new Runnable() {
+ public void run(){
+ try {
+ byte[] token = new byte[]{'\r','\n'};
+ int pos = 0;
+ int count = 0;
+ while((count = read.read()) != -1){
+ if(count != token[pos++]){
+ pos = 0;
+ }
+ if(pos == token.length) {
+ String value = buffer.toString().trim();
+ String expect = sent.take();
+
+ if(!value.equals(expect)) {
+ throw new Exception("Out of sequence expected " + expect + " but got " + value);
+ }
+ received.offer(value);
+ buffer.reset();
+ pos = 0;
+ } else {
+ buffer.write(count);
+ System.err.write(count);
+ System.err.flush();
+ }
+
+ }
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+ }
+ });
+ thread.start();
+ for(int i = 0; i < ITERATIONS; i++) {
+ String message = "message-"+i;
+ transport.write(ByteBuffer.wrap((message+"\r\n").getBytes()));
+ sent.offer(message);
+ }
+ transport.flush();
+ transport.close();
+
+ for(int i = 0; i < ITERATIONS; i++) {
+ assertEquals(received.take(), "message-"+i);
+ }
+ assertTrue(sent.isEmpty());
+ assertTrue(received.isEmpty());
+
+ }
+}
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportTest.java
new file mode 100644
index 0000000..1e09cd0
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportTest.java
@@ -0,0 +1,51 @@
+package org.simpleframework.transport;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.Executor;
+
+import junit.framework.TestCase;
+
+import org.simpleframework.common.thread.ConcurrentExecutor;
+import org.simpleframework.transport.reactor.ExecutorReactor;
+import org.simpleframework.transport.reactor.Reactor;
+import org.simpleframework.transport.trace.MockTrace;
+import org.simpleframework.transport.trace.Trace;
+
+public class SocketTransportTest extends TestCase {
+
+
+ public void testBulkWrite() throws Exception {
+ ServerBuffer reader = new ServerBuffer();
+ SocketAddress address = new InetSocketAddress("localhost", reader.getPort());
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false); // underlying socket must be non-blocking
+ channel.connect(address);
+
+ while(!channel.finishConnect()) { // wait to finish connection
+ Thread.sleep(10);
+ };
+ Trace trace = new MockTrace();
+ SocketWrapper wrapper = new SocketWrapper(channel, trace);
+ Executor executor = new ConcurrentExecutor(Runnable.class);
+ Reactor reactor = new ExecutorReactor(executor);
+ SocketTransport transport = new SocketTransport(wrapper,reactor);
+ for(int i = 0; i < 10000; i++){
+ transport.write(ByteBuffer.wrap(("message-"+i+"\n").getBytes()));
+ }
+ transport.close();
+ reader.awaitClose();
+
+ String data = reader.getBuffer().toString();
+ String[] list = data.split("\\n");
+
+ for(int i = 0; i < 10000; i++){
+ if(!list[i].equals("message-"+i)) {
+ System.err.println(list[i]);
+ }
+ assertEquals("At index " + i + " value="+list[i] +" expect message-"+i, list[i], "message-"+i);
+ }
+ }
+}
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/StreamTransport.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/StreamTransport.java
new file mode 100644
index 0000000..75f999e
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/StreamTransport.java
@@ -0,0 +1,66 @@
+package org.simpleframework.transport;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+
+import javax.net.ssl.SSLEngine;
+
+import org.simpleframework.transport.trace.MockTrace;
+import org.simpleframework.transport.trace.Trace;
+
+public class StreamTransport implements Transport {
+
+ private final WritableByteChannel write;
+ private final ReadableByteChannel read;
+ private final OutputStream out;
+
+ public StreamTransport(InputStream in, OutputStream out) {
+ this.write = Channels.newChannel(out);
+ this.read = Channels.newChannel(in);
+ this.out = out;
+ }
+
+ public void close() throws IOException {
+ write.close();
+ read.close();
+ }
+
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ public int read(ByteBuffer buffer) throws IOException {
+ return read.read(buffer);
+ }
+
+ public void write(ByteBuffer buffer) throws IOException {
+ write.write(buffer);
+ }
+
+ public Map getAttributes() {
+ return null;
+ }
+
+ public SocketChannel getChannel() {
+ return null;
+ }
+
+ public SSLEngine getEngine() {
+ return null;
+ }
+
+ public Certificate getCertificate() {
+ return null;
+ }
+
+ public Trace getTrace() {
+ return new MockTrace();
+ }
+}
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportCursorTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportCursorTest.java
new file mode 100644
index 0000000..161115f
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportCursorTest.java
@@ -0,0 +1,83 @@
+package org.simpleframework.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import junit.framework.TestCase;
+
+public class TransportCursorTest extends TestCase {
+
+ private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz";
+ private static final String SOURCE = ALPHABET + "\r\n";
+
+ public void testCursor() throws IOException {
+ byte[] data = SOURCE.getBytes("ISO-8859-1");
+ InputStream source = new ByteArrayInputStream(data);
+ Transport transport = new StreamTransport(source, System.out);
+ ByteCursor cursor = new TransportCursor(transport);
+ byte[] buffer = new byte[1024];
+
+ assertEquals(cursor.ready(), data.length);
+ assertEquals(26, cursor.read(buffer, 0, 26));
+ assertEquals(26, cursor.reset(26));
+ assertEquals(new String(buffer, 0, 26), ALPHABET);
+
+ assertEquals(cursor.ready(), data.length);
+ assertEquals(26, cursor.read(buffer, 0, 26));
+ assertEquals(26, cursor.reset(26));
+ assertEquals(new String(buffer, 0, 26), ALPHABET);
+
+ assertEquals(cursor.ready(), data.length);
+ assertEquals(4, cursor.read(buffer, 0, 4));
+ assertEquals(4, cursor.reset(26));
+ assertEquals(new String(buffer, 0, 4), "abcd");
+
+ assertEquals(cursor.ready(), data.length);
+ assertEquals(4, cursor.read(buffer, 0, 4));
+ assertEquals(4, cursor.reset(26));
+ assertEquals(new String(buffer, 0, 4), "abcd");
+
+ assertEquals(cursor.ready(), data.length);
+ assertEquals(4, cursor.read(buffer, 0, 4));
+ assertEquals(new String(buffer, 0, 4), "abcd");
+
+ assertEquals(cursor.ready(), data.length - 4);
+ assertEquals(4, cursor.read(buffer, 0, 4));
+ assertEquals(new String(buffer, 0, 4), "efgh");
+
+ assertEquals(cursor.ready(), data.length - 8);
+ assertEquals(4, cursor.read(buffer, 0, 4));
+ assertEquals(new String(buffer, 0, 4), "ijkl");
+
+ assertEquals(cursor.ready(), data.length - 12);
+ assertEquals(12, cursor.reset(12));
+ assertEquals(10, cursor.read(buffer, 0, 10));
+ assertEquals(new String(buffer, 0, 10), "abcdefghij");
+
+ cursor.push("1234".getBytes("ISO-8859-1"));
+ cursor.push("5678".getBytes("ISO-8859-1"));
+ cursor.push("90".getBytes("ISO-8859-1"));
+
+ assertEquals(cursor.ready(), 10);
+ assertEquals(2, cursor.read(buffer, 0, 2));
+ assertEquals(new String(buffer, 0, 2), "90");
+
+ assertEquals(cursor.ready(), 8);
+ assertEquals(4, cursor.read(buffer, 0, 4));
+ assertEquals(new String(buffer, 0, 4), "5678");
+
+ assertEquals(cursor.ready(), 4);
+ assertEquals(4, cursor.read(buffer, 0, 4));
+ assertEquals(new String(buffer, 0, 4), "1234");
+
+ assertEquals(4, cursor.reset(4));
+ assertEquals(cursor.ready(), 4);
+ assertEquals(4, cursor.read(buffer, 0, 4));
+ assertEquals(new String(buffer, 0, 4), "1234");
+
+ assertEquals(8, cursor.read(buffer, 0, 8));
+ assertEquals(new String(buffer, 0, 8), "klmnopqr");
+ }
+
+}
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java
new file mode 100644
index 0000000..1a14431
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java
@@ -0,0 +1,404 @@
+package org.simpleframework.transport;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.simpleframework.transport.reactor.ExecutorReactor;
+import org.simpleframework.transport.reactor.Reactor;
+
+/**
+ * Measure the performance of the transports to ensure that the perform
+ * well and that they send the correct sequence of bytes and that the
+ * blocks sent are in the correct order. This also performs a comparison
+ * with direct socket output streams to ensure there is a reasonable
+ * performance difference.
+ *
+ * @author Niall Gallagher
+ */
+public class TransportTest extends TestCase {
+
+ private static final int REPEAT = 1000;
+
+ public void testTransport() throws Exception {
+ testTransport(REPEAT);
+ }
+
+ public void testTransport(int repeat) throws Exception {
+ for(int i = 1; i < 7; i++) { // just do some random sizes
+ testTransport(i, 100);
+ }
+ for(int i = 4092; i < 4102; i++) {
+ testTransport(i, 100);
+ }
+ for(int i = 8190; i < 8200; i++) {
+ testTransport(i, 100);
+ }
+ for(int i = 11282; i < 11284; i++) {
+ testTransport(i, 1000);
+ }
+ for(int i = 204800; i < 204805; i++) {
+ testTransport(i, 1000);
+ }
+ testTransport(16, repeat);
+ testTransport(64, repeat);
+ testTransport(256, repeat);
+ testTransport(1024, repeat);
+ testTransport(2048, repeat);
+ testTransport(4096, repeat);
+ testTransport(4098, repeat);
+ testTransport(8192, repeat);
+ testTransport(8197, repeat);
+ }
+
+ // Test blocking transport
+ private void testTransport(int size, int repeat) throws Exception {
+ // ThreadDumper dumper = new ThreadDumper();
+ SocketConsumer consumer = new SocketConsumer(size, repeat);
+ SocketAddress address = new InetSocketAddress("localhost", consumer.getPort());
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false); // underlying socket must be non-blocking
+ channel.connect(address);
+
+ while(!channel.finishConnect()) { // wait to finish connection
+ Thread.sleep(10);
+ };
+ ExecutorService executor = Executors.newFixedThreadPool(20);
+ Reactor reactor = new ExecutorReactor(executor);
+ // Transport transport = new SocketTransport(channel, reactor, 2, 3);//XXX bug
+ MockSocket pipeline = new MockSocket(channel);
+ Transport transport = new SocketTransport(pipeline, reactor, 8192);
+ OutputStream out = new TransportOutputStream(transport);
+
+ // dumper.start();
+ testOutputStream(consumer, out, size, repeat);
+
+ out.close();
+ executor.shutdown();
+ channel.close();
+ reactor.stop();
+ // dumper.kill();
+ Thread.sleep(100);
+ }
+
+ public void s_testSocket() throws Exception {
+ s_testSocket(REPEAT);
+ }
+
+ public void s_testSocket(int repeat) throws Exception {
+ testSocket(16, repeat);
+ testSocket(64, repeat);
+ testSocket(256, repeat);
+ testSocket(1024, repeat);
+ testSocket(2048, repeat);
+ testSocket(4098, repeat);
+ testSocket(8192, repeat);
+ }
+
+ // Test blocking socket
+ private void testSocket(int size, int repeat) throws Exception {
+ // ThreadDumper dumper = new ThreadDumper();
+ SocketConsumer consumer = new SocketConsumer(size, repeat);
+ Socket socket = new Socket("localhost", consumer.getPort());
+ OutputStream out = socket.getOutputStream();
+
+ //dumper.start();
+ testOutputStream(consumer, out, size, repeat);
+
+ out.close();
+ socket.close();
+ //dumper.kill();
+ Thread.sleep(100);
+ }
+
+ private class AlpahbetIterator {
+
+ private byte[] alphabet = "abcdefghijklmnopqstuvwxyz".getBytes();
+
+ private int off;
+
+ public byte next() {
+ if(off == alphabet.length) {
+ off = 0;
+ }
+ return alphabet[off++];
+ }
+
+ public void reset() {
+ off = 0;
+ }
+ }
+
+ private void testOutputStream(SocketConsumer consumer, OutputStream out, int size, int repeat) throws Exception {
+ byte[] block = new byte[size]; // write size
+ AlpahbetIterator it = new AlpahbetIterator(); // write known data
+
+ for(int i = 1; i < block.length; i++) {
+ block[i] = it.next();
+ }
+ AtomicLong count = new AtomicLong();
+ PerformanceMonitor monitor = new PerformanceMonitor(consumer, count, out.getClass().getSimpleName(), size);
+
+ for(int i = 0; i < repeat; i++) {
+ block[0] = (byte) i; // mark the first byte in the block to be sure we get blocks in sequence
+ //System.err.println("["+i+"]"+new String(block,"ISO-8859-1"));
+ out.write(block); // manipulation of the underlying buffer is taking place when the compact is invoked, this is causing major problems as the next packet will be out of sequence
+ count.addAndGet(block.length);
+ }
+ Thread.sleep(2000); // wait for all bytes to flush through to consumer
+ monitor.kill();
+ }
+
+ private class PerformanceMonitor extends Thread {
+ private AtomicLong count;
+
+ private volatile boolean dead;
+
+ private SocketConsumer consumer;
+
+ private String name;
+
+ private int size;
+
+ public PerformanceMonitor(SocketConsumer consumer, AtomicLong count, String name, int size) {
+ this.consumer = consumer;
+ this.count = count;
+ this.name = name;
+ this.size = size;
+ this.start();
+ }
+
+ public void run() {
+ int second = 0;
+ while(!dead) {
+ try {
+ long octets = count.longValue();
+ System.out.printf("%s,%s,%s,%s,%s%n", name, size, second++, octets, consumer.getWindow());
+ Thread.sleep(1000);
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void kill() throws Exception {
+ dead = true;
+ }
+ }
+
+ private class SocketConsumer extends Thread {
+
+ private ServerSocket server;
+
+ private Window window;
+
+ private long repeat;
+
+ private long size;
+
+ public SocketConsumer(int size, int repeat) throws Exception {
+ this.window = new Window(20);
+ this.server = getSocket();
+ this.repeat = repeat;
+ this.size = size;
+ this.start();
+ }
+
+ public int getPort() {
+ return server.getLocalPort();
+ }
+
+ public String getWindow() {
+ return window.toString();
+ }
+
+ private ServerSocket getSocket() throws Exception {
+ // Scan the ephemeral port range
+ for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket
+ try {
+ ServerSocket socket = new ServerSocket(i);
+ System.out.println("port=["+socket.getLocalPort()+"]");
+ return socket;
+ } catch(Exception e) {
+ Thread.sleep(200);
+ }
+ }
+ // Scan a second time for good measure, maybe something got freed up
+ for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket
+ try {
+ ServerSocket socket = new ServerSocket(i);
+ System.out.println("port=["+socket.getLocalPort()+"]");
+ return socket;
+ } catch(Exception e) {
+ Thread.sleep(200);
+ }
+ }
+ throw new IOException("Could not create a client socket");
+ }
+
+ public void run() {
+ long count = 0;
+ int windowOctet = 0;
+ int expectWindowOctet = 0;
+
+ try {
+ Socket socket = server.accept();
+ InputStream in = socket.getInputStream();
+ InputStream source = new BufferedInputStream(in);
+ AlpahbetIterator it = new AlpahbetIterator();
+
+ scan: for(int i = 0; i < repeat; i++) {
+ int octet = source.read(); // check first byte in the block to make sure its correct in sequence
+
+ if(octet == -1) {
+ break scan;
+ }
+ count++; // we have read another byte
+ windowOctet = octet & 0x000000ff;
+ expectWindowOctet = i & 0x000000ff;
+
+ if((byte) octet != (byte) i) {
+ throw new Exception("Wrong sequence of blocks sent, was "
+ + (byte)octet + " should have been " + (byte)i + " count is "+count+" window is "+window+" compare "+explore(it, source, 5));
+ }
+ window.recieved(octet);
+
+ for(int j = 1, k = 0; j < size; j++, k++) {
+ octet = source.read();
+
+ if(octet == -1) {
+ break scan;
+ }
+ byte next = it.next();
+
+ if((byte) octet != next) {
+ throw new Exception("Invalid data received expected "+((byte)octet)+"("+((char)octet)+
+ ") but was "+next+"("+((char)next)+") total count is "+count+" block count is "+k+" window is expected "+
+ expectWindowOctet+"("+((char)expectWindowOctet)+")("+((byte)expectWindowOctet)+") got "+windowOctet+"("+
+ ((char)windowOctet)+")("+((byte)windowOctet)+") "+window+" compare "+explore(it, source, 5));
+ }
+ count++;
+ }
+ it.reset();
+ }
+ } catch(Throwable e) {
+ e.printStackTrace();
+ }
+ if(count != size * repeat) {
+ new Exception("Invalid number of bytes read, was " + count
+ + " should have been " + (size * repeat)).printStackTrace();
+ }
+ try {
+ // server.close();
+ }catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private String explore(AlpahbetIterator it, InputStream source, int count) throws IOException {
+ StringBuffer buf = new StringBuffer();
+ buf.append("expected (");
+ for(int i = 0; i < count; i++) {
+ buf.append( (char)it.next() );
+ }
+ buf.append(") is (");
+ for(int i = 0; i < count; i++) {
+ buf.append( (char)source.read() );
+ }
+ buf.append(")");
+ return buf.toString();
+ }
+ }
+
+
+ private static class TransportOutputStream extends OutputStream {
+
+ private Transport transport;
+
+ public TransportOutputStream(Transport transport) {
+ this.transport = transport;
+ }
+
+ public void write(int octet) throws IOException {
+ byte[] data = new byte[] { (byte) octet };
+ write(data);
+ }
+
+ public void write(byte[] data, int off, int len) throws IOException {
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(data, off, len);
+ ByteBuffer safe = buffer.asReadOnlyBuffer();
+
+ if(len > 0) {
+ transport.write(safe);
+ }
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new IOException("Write failed");
+ }
+ }
+
+ public void flush() throws IOException {
+ try {
+ transport.flush();
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new IOException("Flush failed");
+ }
+ }
+
+ public void close() throws IOException {
+ try {
+ transport.close();
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new IOException("Close failed");
+ }
+ }
+
+ }
+
+ private static class Window {
+
+ private final LinkedList<String> window;
+ private final int size;
+
+ public Window(int size) {
+ this.window = new LinkedList<String>();
+ this.size = size;
+ }
+
+ public synchronized void recieved(int sequence) {
+ window.addLast(String.valueOf(sequence));
+
+ if(window.size() > size) {
+ window.removeFirst();
+ }
+ }
+
+ public synchronized String toString() {
+ StringBuilder builder = new StringBuilder("[");
+ String delim = "";
+ for(String b : window) {
+ builder.append(delim).append(b);
+ delim=", ";
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+ }
+
+}
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java
new file mode 100644
index 0000000..3032692
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java
@@ -0,0 +1,269 @@
+package org.simpleframework.transport.reactor;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.simpleframework.transport.trace.MockTrace;
+import org.simpleframework.transport.trace.Trace;
+
+
+public class DistributorTest extends TestCase {
+
+ private static final String PAYLOAD =
+ "POST /index.html HTTP/1.0\r\n"+
+ "Content-Type: multipart/form-data; boundary=AaB03x\r\n"+
+ "Accept: image/gif;q=1.0,\r\n image/jpeg;q=0.8,\r\n"+
+ " \t\t image/png;\t\r\n\t"+
+ " q=1.0,*;q=0.1\r\n"+
+ "Accept-Language: fr;q=0.1, en-us;q=0.4, en-gb; q=0.8, en;q=0.7\r\n"+
+ "Host: some.host.com \r\n"+
+ "Cookie: $Version=1; UID=1234-5678; $Path=/; $Domain=.host.com\r\n"+
+ "Cookie: $Version=1; NAME=\"Niall Gallagher\"; $path=\"/\"\r\n"+
+ "\r\n" +
+ "--AaB03x\r\n"+
+ "Content-Disposition: form-data; name='pics'; filename='file1.txt'\r\n"+
+ "Content-Type: text/plain\r\n\r\n"+
+ "example contents of file1.txt\r\n"+
+ "--AaB03x\r\n"+
+ "Content-Type: multipart/mixed; boundary=BbC04y\r\n\r\n"+
+ "--BbC04y\r\n"+
+ "Content-Disposition: form-data; name='pics'; filename='file2.txt'\r\n"+
+ "Content-Type: text/plain\r\n\r\n"+
+ "example contents of file3.txt ...\r\n"+
+ "--BbC04y\r\n"+
+ "Content-Disposition: form-data; name='pics'; filename='file3.txt'\r\n"+
+ "Content-Type: text/plain\r\n\r\n"+
+ "example contents of file4.txt ...\r\n"+
+ "--BbC04y\r\n"+
+ "Content-Disposition: form-data; name='pics'; filename='file4.txt'\r\n"+
+ "Content-Type: text/plain\r\n\r\n"+
+ "example contents of file4.txt ...\r\n"+
+ "--BbC04y--\r\n"+
+ "--AaB03x--\r\n";
+
+ public class Client extends Thread {
+
+ private CountDownLatch latch;
+ private String message;
+ private int requests;
+ private int port;
+
+ public Client(CountDownLatch latch, String message, int port, int requests) throws Exception {
+ this.message = message;
+ this.requests = requests;
+ this.port = port;
+ this.latch = latch;
+ this.start();
+ }
+
+ public void run() {
+ try {
+ latch.await();
+
+ Socket socket = new Socket("localhost", port);
+ OutputStream out = socket.getOutputStream();
+ byte[] payload = message.getBytes();
+
+ for(int i = 0; i < requests; i++){
+ out.write(payload);
+ }
+ out.close();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public class Worker implements Operation {
+
+ private BlockingQueue<Worker> done;
+ private Reactor reactor;
+ private SocketChannel channel;
+ private ByteBuffer buffer;
+ private String payload;
+ private int accumulate;
+ private long finish;
+ private long start;
+ private int id;
+
+ public Worker(BlockingQueue<Worker> done, Reactor reactor, SocketChannel channel, String payload, int id) throws Exception {
+ this.buffer = ByteBuffer.allocate(8192);
+ this.start = System.currentTimeMillis();
+ this.finish = start + 60000;
+ this.payload = payload;
+ this.channel = channel;
+ this.reactor = reactor;
+ this.done = done;
+ this.id = id;
+ }
+
+ public Trace getTrace() {
+ return new MockTrace();
+ }
+
+ public long getExpiry(TimeUnit unit) {
+ return unit.convert(finish - System.currentTimeMillis(), MILLISECONDS);
+ }
+
+ public int getAccumulate() {
+ return accumulate;
+ }
+
+ // XXX should this be executed in a thread!!!!???? yes...
+ public void cancel() {
+ System.err.println("############################# Worker has been canceled");
+ }
+
+ public void run() {
+ try {
+ // N.B Fundamental to performance
+ buffer.clear();
+
+ if(channel.isOpen()) {
+ int count = channel.read(buffer);
+ accumulate += count;
+
+ System.err.println("Worker-"+id+" read ["+count +"] of payload sized ["+payload.length()+"] took ["+(System.currentTimeMillis() -start)+"]");
+
+ if(count != -1) {
+ reactor.process(this, SelectionKey.OP_READ);
+ } else {
+ channel.close();
+ done.offer(this);
+ System.err.println("Worker-"+id+" Channel is closed after time ["+(System.currentTimeMillis() - start)+"] and read ["+accumulate+"]");
+ }
+ } else {
+ System.err.println("Worker-"+id+" Channel is closed after time ["+(System.currentTimeMillis() - start)+"] and read ["+accumulate+"]");
+ done.offer(this);
+ }
+ }catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public SocketChannel getChannel() {
+ return channel;
+ }
+
+ }
+
+ public class Server extends Thread {
+
+ private BlockingQueue<SocketChannel> ready;
+ private CountDownLatch latch;
+ private ServerSocketChannel server;
+ private Selector selector;
+ private int port;
+
+ public Server(CountDownLatch latch, BlockingQueue<SocketChannel> ready, int port) throws Exception {
+ this.server = ServerSocketChannel.open();
+ this.selector = Selector.open();
+ this.latch = latch;
+ this.port = port;
+ this.ready = ready;
+ this.start();
+ }
+
+ private void configure() throws Exception {
+ server.socket().bind(new InetSocketAddress(port));
+ server.configureBlocking(false);
+ }
+
+ public void run() {
+ try {
+ configure();
+ execute();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void execute() throws Exception {
+ SelectionKey serverKey = server.register(selector, SelectionKey.OP_ACCEPT);
+
+ latch.countDown();
+
+ while(true){
+ selector.select();
+ Set keys = selector.selectedKeys();
+
+ for(Iterator i = keys.iterator(); i.hasNext();){
+ SelectionKey key = (SelectionKey) i.next();
+ i.remove();
+
+ if(key != serverKey) {
+ return;
+ }
+ if(key.isAcceptable()) {
+ SocketChannel channel = server.accept();
+ channel.configureBlocking(false);
+ ready.offer(channel);
+ }
+ }
+ }
+ }
+ }
+
+ public static void main(String[] list) throws Exception {
+ new DistributorTest().testReactor();
+ }
+
+ public void testReactor() throws Exception {
+ testReactor(PAYLOAD, 200, 100, 10, 8123);
+ }
+
+ private void testReactor(String payload, int clients, int requests, int threads, int port) throws Exception {
+ BlockingQueue<Worker> done = new LinkedBlockingQueue<Worker>();
+ BlockingQueue<SocketChannel> ready = new LinkedBlockingQueue<SocketChannel>();
+ CountDownLatch latch = new CountDownLatch(1);
+ Server server = new Server(latch, ready, port);
+ Executor executor = Executors.newFixedThreadPool(10);
+ Reactor reactor = new ExecutorReactor(executor, 1);
+
+ long start = System.currentTimeMillis();
+
+ for(int i = 0; i < clients; i++) {
+ new Client(latch, payload, port, requests);
+ }
+ for(int i = 0; i < clients; i++) {
+ SocketChannel channel = ready.take();
+ Worker worker = new Worker(done, reactor, channel, payload, i);
+
+ reactor.process(worker);
+ }
+ int total = 0;
+
+ for(int i = 0; i < clients; i++) {
+ Worker worker = done.take();
+ int accumulate = worker.getAccumulate();
+ total += accumulate;
+ System.err.println("Accumulated ["+accumulate+"] of ["+(requests*payload.length())+"] closed ["+worker.getChannel().socket().isClosed()+"]");
+ }
+ System.err.println("Accumulated ["+total+"] of ["+(clients*requests*payload.length())+"]");
+ System.err.println("Total time to process ["+(clients*requests)+"] payloads from ["+clients+"] clients took ["+(System.currentTimeMillis() - start)+"]");
+ }
+
+
+
+
+}
+
+
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/CompareQueueTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/CompareQueueTest.java
new file mode 100644
index 0000000..ca2f5db
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/CompareQueueTest.java
@@ -0,0 +1,174 @@
+package org.simpleframework.transport.trace;
+
+import java.text.DecimalFormat;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import junit.framework.TestCase;
+
+import org.simpleframework.common.thread.ConcurrentExecutor;
+
+public class CompareQueueTest extends TestCase {
+
+ private static final int TEST_DURATION = 10000;
+ private static final int THREAD_COUNT = 100;
+
+ private final Executor blockingReadExecutor = new ConcurrentExecutor(BlockingConsumer.class, THREAD_COUNT);
+ private final Executor concurrentReadExecutor = new ConcurrentExecutor(ConcurrentConsumer.class, THREAD_COUNT);
+ private final Executor writeExecutor = new ConcurrentExecutor(Producer.class, THREAD_COUNT);
+
+ public void testLinkedBlockingQueue() throws Exception {
+ BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
+ AtomicBoolean active = new AtomicBoolean(true);
+ AtomicLong writeCount = new AtomicLong();
+ AtomicLong readCount = new AtomicLong();
+ CountDownLatch startLatch = new CountDownLatch(THREAD_COUNT);
+ CountDownLatch stopLatch = new CountDownLatch(THREAD_COUNT);
+ DecimalFormat format = new DecimalFormat("###,###,###");
+
+ for(int i = 0; i < THREAD_COUNT; i++) {
+ BlockingConsumer consumer = new BlockingConsumer(queue, stopLatch, active, readCount);
+ blockingReadExecutor.execute(consumer);
+ }
+ Thread.sleep(1000);
+
+ for(int i = 0; i < THREAD_COUNT; i++) {
+ Producer producer = new Producer(queue, startLatch, active, writeCount);
+ writeExecutor.execute(producer);
+ }
+ Thread.sleep(TEST_DURATION);
+ active.set(false);
+ stopLatch.await();
+
+ System.err.printf("read=%s write=%s%n", format.format(readCount.get()), format.format(writeCount.get()));
+ }
+
+ public void testConcurrentQueue() throws Exception {
+ Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
+ AtomicBoolean active = new AtomicBoolean(true);
+ AtomicLong writeCount = new AtomicLong();
+ AtomicLong readCount = new AtomicLong();
+ CountDownLatch startLatch = new CountDownLatch(THREAD_COUNT);
+ CountDownLatch stopLatch = new CountDownLatch(THREAD_COUNT);
+ DecimalFormat format = new DecimalFormat("###,###,###");
+
+ for(int i = 0; i < THREAD_COUNT; i++) {
+ ConcurrentConsumer consumer = new ConcurrentConsumer(queue, stopLatch, active, readCount);
+ concurrentReadExecutor.execute(consumer);
+ }
+ Thread.sleep(1000);
+
+ for(int i = 0; i < THREAD_COUNT; i++) {
+ Producer producer = new Producer(queue, startLatch, active, writeCount);
+ writeExecutor.execute(producer);
+ }
+ Thread.sleep(TEST_DURATION);
+ active.set(false);
+ stopLatch.await();
+
+ System.err.printf("read=%s write=%s%n", format.format(readCount.get()), format.format(writeCount.get()));
+ }
+
+ private static class Producer implements Runnable {
+
+ private final Queue<Object> queue;
+ private final AtomicBoolean active;
+ private final AtomicLong count;
+ private final CountDownLatch latch;
+
+ public Producer(Queue<Object> queue, CountDownLatch latch, AtomicBoolean active, AtomicLong count) {
+ this.queue = queue;
+ this.active = active;
+ this.count = count;
+ this.latch = latch;
+ }
+
+ public void run() {
+ try {
+ latch.countDown();
+ latch.await();
+
+ while(active.get()) {
+ Long value = count.getAndIncrement();
+ queue.offer(value);
+ }
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ private static class ConcurrentConsumer implements Runnable {
+
+ private final Queue<Object> queue;
+ private final AtomicBoolean active;
+ private final AtomicLong count;
+ private final CountDownLatch latch;
+
+ public ConcurrentConsumer(Queue<Object> queue, CountDownLatch latch, AtomicBoolean active, AtomicLong count) {
+ this.queue = queue;
+ this.active = active;
+ this.count = count;
+ this.latch = latch;
+ }
+
+ public void run() {
+ try {
+ while(active.get()) {
+ Object value = queue.poll();
+ if(value != null) {
+ count.getAndIncrement();
+ } else {
+ LockSupport.parkNanos(100);
+ }
+ }
+ latch.countDown();
+ latch.await();
+ }catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private static class BlockingConsumer implements Runnable {
+
+ private final BlockingQueue<Object> queue;
+ private final AtomicBoolean active;
+ private final AtomicLong count;
+ private final CountDownLatch latch;
+
+ public BlockingConsumer(BlockingQueue<Object> queue, CountDownLatch latch, AtomicBoolean active, AtomicLong count) {
+ this.queue = queue;
+ this.active = active;
+ this.count = count;
+ this.latch = latch;
+ }
+
+ public void run() {
+ try {
+ while(active.get()) {
+ try {
+ Object value = queue.take();
+ if(value != null) {
+ count.getAndIncrement();
+ }
+ }catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ latch.countDown();
+ latch.await();
+ }catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/MockTrace.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/MockTrace.java
new file mode 100644
index 0000000..583326e
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/MockTrace.java
@@ -0,0 +1,6 @@
+package org.simpleframework.transport.trace;
+
+public class MockTrace implements Trace{
+ public void trace(Object event) {}
+ public void trace(Object event, Object value) {}
+}