diff options
Diffstat (limited to 'simple/simple-transport/src/test/java')
11 files changed, 1351 insertions, 0 deletions
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/MockSocket.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/MockSocket.java new file mode 100644 index 0000000..bd1b582 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/MockSocket.java @@ -0,0 +1,45 @@ + +package org.simpleframework.transport; + +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; + +import javax.net.ssl.SSLEngine; + +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + +public class MockSocket implements Socket { + + private SocketChannel socket; + private SSLEngine engine; + private Map map; + + public MockSocket(SocketChannel socket) { + this(socket, null); + } + + public MockSocket(SocketChannel socket, SSLEngine engine) { + this.map = new HashMap(); + this.engine = engine; + this.socket = socket; + } + + public SSLEngine getEngine() { + return engine; + } + + public SocketChannel getChannel() { + return socket; + } + + public Map getAttributes() { + return map; + } + + public Trace getTrace() { + return new MockTrace(); + } +} + diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/ServerBuffer.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/ServerBuffer.java new file mode 100644 index 0000000..34cdc0c --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/ServerBuffer.java @@ -0,0 +1,75 @@ +package org.simpleframework.transport; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.util.concurrent.CountDownLatch; + +public class ServerBuffer extends Thread { + + private ByteArrayOutputStream buffer; + private ServerSocket server; + private CountDownLatch latch; + + public ServerBuffer() throws Exception { + this.buffer = new ByteArrayOutputStream(); + this.latch = new CountDownLatch(1); + this.server = getSocket(); + this.start(); + } + + public ByteArrayOutputStream getBuffer(){ + return buffer; + } + + public void awaitClose() throws Exception { + latch.await(); + } + + public int getPort() { + return server.getLocalPort(); + } + + private ServerSocket getSocket() throws Exception { + // Scan the ephemeral port range + for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket + try { + ServerSocket socket = new ServerSocket(i); + System.out.println("port=["+socket.getLocalPort()+"]"); + return socket; + } catch(Exception e) { + Thread.sleep(200); + } + } + // Scan a second time for good measure, maybe something got freed up + for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket + try { + ServerSocket socket = new ServerSocket(i); + System.out.println("port=["+socket.getLocalPort()+"]"); + return socket; + } catch(Exception e) { + Thread.sleep(200); + } + } + throw new IOException("Could not create a client socket"); + } + + public void run() { + try { + java.net.Socket socket = server.accept(); + InputStream in = socket.getInputStream(); + int count = 0; + + while((count = in.read()) != -1) { + buffer.write(count); + System.err.write(count); + System.err.flush(); + } + } catch(Exception e) { + e.printStackTrace(); + } finally { + latch.countDown(); + } + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketBufferTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketBufferTest.java new file mode 100644 index 0000000..a893f04 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketBufferTest.java @@ -0,0 +1,86 @@ +package org.simpleframework.transport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import junit.framework.TestCase; + +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + +public class SocketBufferTest extends TestCase { + + + public void testBulkWrite() throws Exception { + ServerBuffer reader = new ServerBuffer(); + SocketAddress address = new InetSocketAddress("localhost", reader.getPort()); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); // underlying socket must be non-blocking + channel.connect(address); + + while(!channel.finishConnect()) { // wait to finish connection + Thread.sleep(10); + }; + Trace trace = new MockTrace(); + SocketWrapper wrapper = new SocketWrapper(channel, trace); + SocketBuffer builder = new SocketBuffer(wrapper, 100, 4096); + + for(int i = 0; i < 10000; i++){ + ByteBuffer buf = ByteBuffer.wrap(("message-"+i+"\n").getBytes()); + + if(i > 18) { + System.err.println("FAIL......."+i); + } + if(!builder.write(buf)){ + while(!builder.flush()) { + System.err.println("FLUSHING!!!"); + Thread.sleep(1); + } + } + } + while(!builder.flush()) { + System.err.println("FLUSHING!!!"); + } + builder.close(); + reader.awaitClose(); + + String data = reader.getBuffer().toString(); + String[] list = data.split("\\n"); + + for(int i = 0; i < 10000; i++){ + String msg = list[i]; + if(!msg.equals("message-"+i)) { + System.err.println(list[i]); + } + assertEquals("At index " + i + " value="+list[i] +" expect message-"+i, list[i], "message-"+i); + } + } + + public void testSimpleWrite() throws Exception { + ServerBuffer reader = new ServerBuffer(); + SocketAddress address = new InetSocketAddress("localhost", reader.getPort()); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); // underlying socket must be non-blocking + channel.connect(address); + + while(!channel.finishConnect()) { // wait to finish connection + Thread.sleep(10); + }; + Trace trace = new MockTrace(); + SocketWrapper wrapper = new SocketWrapper(channel, trace); + SocketBuffer builder = new SocketBuffer(wrapper, 100, 4096); + + builder.write(ByteBuffer.wrap("hello there ".getBytes())); + builder.write(ByteBuffer.wrap("this ".getBytes())); + builder.write(ByteBuffer.wrap("is ".getBytes())); + builder.write(ByteBuffer.wrap("a ".getBytes())); + builder.write(ByteBuffer.wrap("test".getBytes())); + builder.flush(); + builder.close(); + reader.awaitClose(); + + assertEquals(reader.getBuffer().toString(), "hello there this is a test"); + } +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java new file mode 100644 index 0000000..6654f31 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java @@ -0,0 +1,92 @@ +package org.simpleframework.transport; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; + +import junit.framework.TestCase; + +import org.simpleframework.common.thread.ConcurrentExecutor; +import org.simpleframework.transport.reactor.ExecutorReactor; +import org.simpleframework.transport.reactor.Reactor; +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + +public class SocketTransportPipeTest extends TestCase { + + private static final int ITERATIONS = 100000; + + public void testPipe() throws Exception { + ServerSocket server = new ServerSocket(0); + SocketAddress address = new InetSocketAddress("localhost", server.getLocalPort()); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); // underlying socket must be non-blocking + channel.connect(address); + while(!channel.finishConnect()) { // wait to finish connection + Thread.sleep(10); + }; + Trace trace = new MockTrace(); + SocketWrapper wrapper = new SocketWrapper(channel, trace); + Executor executor = new ConcurrentExecutor(Runnable.class); + Reactor reactor = new ExecutorReactor(executor); + SocketTransport transport = new SocketTransport(wrapper,reactor); + java.net.Socket socket = server.accept(); + final InputStream read = socket.getInputStream(); + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + final LinkedBlockingQueue<String> sent = new LinkedBlockingQueue<String>(); + final LinkedBlockingQueue<String> received = new LinkedBlockingQueue<String>(); + Thread thread = new Thread(new Runnable() { + public void run(){ + try { + byte[] token = new byte[]{'\r','\n'}; + int pos = 0; + int count = 0; + while((count = read.read()) != -1){ + if(count != token[pos++]){ + pos = 0; + } + if(pos == token.length) { + String value = buffer.toString().trim(); + String expect = sent.take(); + + if(!value.equals(expect)) { + throw new Exception("Out of sequence expected " + expect + " but got " + value); + } + received.offer(value); + buffer.reset(); + pos = 0; + } else { + buffer.write(count); + System.err.write(count); + System.err.flush(); + } + + } + }catch(Exception e){ + e.printStackTrace(); + } + } + }); + thread.start(); + for(int i = 0; i < ITERATIONS; i++) { + String message = "message-"+i; + transport.write(ByteBuffer.wrap((message+"\r\n").getBytes())); + sent.offer(message); + } + transport.flush(); + transport.close(); + + for(int i = 0; i < ITERATIONS; i++) { + assertEquals(received.take(), "message-"+i); + } + assertTrue(sent.isEmpty()); + assertTrue(received.isEmpty()); + + } +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportTest.java new file mode 100644 index 0000000..1e09cd0 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportTest.java @@ -0,0 +1,51 @@ +package org.simpleframework.transport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.concurrent.Executor; + +import junit.framework.TestCase; + +import org.simpleframework.common.thread.ConcurrentExecutor; +import org.simpleframework.transport.reactor.ExecutorReactor; +import org.simpleframework.transport.reactor.Reactor; +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + +public class SocketTransportTest extends TestCase { + + + public void testBulkWrite() throws Exception { + ServerBuffer reader = new ServerBuffer(); + SocketAddress address = new InetSocketAddress("localhost", reader.getPort()); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); // underlying socket must be non-blocking + channel.connect(address); + + while(!channel.finishConnect()) { // wait to finish connection + Thread.sleep(10); + }; + Trace trace = new MockTrace(); + SocketWrapper wrapper = new SocketWrapper(channel, trace); + Executor executor = new ConcurrentExecutor(Runnable.class); + Reactor reactor = new ExecutorReactor(executor); + SocketTransport transport = new SocketTransport(wrapper,reactor); + for(int i = 0; i < 10000; i++){ + transport.write(ByteBuffer.wrap(("message-"+i+"\n").getBytes())); + } + transport.close(); + reader.awaitClose(); + + String data = reader.getBuffer().toString(); + String[] list = data.split("\\n"); + + for(int i = 0; i < 10000; i++){ + if(!list[i].equals("message-"+i)) { + System.err.println(list[i]); + } + assertEquals("At index " + i + " value="+list[i] +" expect message-"+i, list[i], "message-"+i); + } + } +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/StreamTransport.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/StreamTransport.java new file mode 100644 index 0000000..75f999e --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/StreamTransport.java @@ -0,0 +1,66 @@ +package org.simpleframework.transport; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SocketChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Map; + +import javax.net.ssl.SSLEngine; + +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + +public class StreamTransport implements Transport { + + private final WritableByteChannel write; + private final ReadableByteChannel read; + private final OutputStream out; + + public StreamTransport(InputStream in, OutputStream out) { + this.write = Channels.newChannel(out); + this.read = Channels.newChannel(in); + this.out = out; + } + + public void close() throws IOException { + write.close(); + read.close(); + } + + public void flush() throws IOException { + out.flush(); + } + + public int read(ByteBuffer buffer) throws IOException { + return read.read(buffer); + } + + public void write(ByteBuffer buffer) throws IOException { + write.write(buffer); + } + + public Map getAttributes() { + return null; + } + + public SocketChannel getChannel() { + return null; + } + + public SSLEngine getEngine() { + return null; + } + + public Certificate getCertificate() { + return null; + } + + public Trace getTrace() { + return new MockTrace(); + } +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportCursorTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportCursorTest.java new file mode 100644 index 0000000..161115f --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportCursorTest.java @@ -0,0 +1,83 @@ +package org.simpleframework.transport; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import junit.framework.TestCase; + +public class TransportCursorTest extends TestCase { + + private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; + private static final String SOURCE = ALPHABET + "\r\n"; + + public void testCursor() throws IOException { + byte[] data = SOURCE.getBytes("ISO-8859-1"); + InputStream source = new ByteArrayInputStream(data); + Transport transport = new StreamTransport(source, System.out); + ByteCursor cursor = new TransportCursor(transport); + byte[] buffer = new byte[1024]; + + assertEquals(cursor.ready(), data.length); + assertEquals(26, cursor.read(buffer, 0, 26)); + assertEquals(26, cursor.reset(26)); + assertEquals(new String(buffer, 0, 26), ALPHABET); + + assertEquals(cursor.ready(), data.length); + assertEquals(26, cursor.read(buffer, 0, 26)); + assertEquals(26, cursor.reset(26)); + assertEquals(new String(buffer, 0, 26), ALPHABET); + + assertEquals(cursor.ready(), data.length); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(4, cursor.reset(26)); + assertEquals(new String(buffer, 0, 4), "abcd"); + + assertEquals(cursor.ready(), data.length); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(4, cursor.reset(26)); + assertEquals(new String(buffer, 0, 4), "abcd"); + + assertEquals(cursor.ready(), data.length); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "abcd"); + + assertEquals(cursor.ready(), data.length - 4); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "efgh"); + + assertEquals(cursor.ready(), data.length - 8); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "ijkl"); + + assertEquals(cursor.ready(), data.length - 12); + assertEquals(12, cursor.reset(12)); + assertEquals(10, cursor.read(buffer, 0, 10)); + assertEquals(new String(buffer, 0, 10), "abcdefghij"); + + cursor.push("1234".getBytes("ISO-8859-1")); + cursor.push("5678".getBytes("ISO-8859-1")); + cursor.push("90".getBytes("ISO-8859-1")); + + assertEquals(cursor.ready(), 10); + assertEquals(2, cursor.read(buffer, 0, 2)); + assertEquals(new String(buffer, 0, 2), "90"); + + assertEquals(cursor.ready(), 8); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "5678"); + + assertEquals(cursor.ready(), 4); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "1234"); + + assertEquals(4, cursor.reset(4)); + assertEquals(cursor.ready(), 4); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "1234"); + + assertEquals(8, cursor.read(buffer, 0, 8)); + assertEquals(new String(buffer, 0, 8), "klmnopqr"); + } + +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java new file mode 100644 index 0000000..1a14431 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java @@ -0,0 +1,404 @@ +package org.simpleframework.transport; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.LinkedList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +import junit.framework.TestCase; + +import org.simpleframework.transport.reactor.ExecutorReactor; +import org.simpleframework.transport.reactor.Reactor; + +/** + * Measure the performance of the transports to ensure that the perform + * well and that they send the correct sequence of bytes and that the + * blocks sent are in the correct order. This also performs a comparison + * with direct socket output streams to ensure there is a reasonable + * performance difference. + * + * @author Niall Gallagher + */ +public class TransportTest extends TestCase { + + private static final int REPEAT = 1000; + + public void testTransport() throws Exception { + testTransport(REPEAT); + } + + public void testTransport(int repeat) throws Exception { + for(int i = 1; i < 7; i++) { // just do some random sizes + testTransport(i, 100); + } + for(int i = 4092; i < 4102; i++) { + testTransport(i, 100); + } + for(int i = 8190; i < 8200; i++) { + testTransport(i, 100); + } + for(int i = 11282; i < 11284; i++) { + testTransport(i, 1000); + } + for(int i = 204800; i < 204805; i++) { + testTransport(i, 1000); + } + testTransport(16, repeat); + testTransport(64, repeat); + testTransport(256, repeat); + testTransport(1024, repeat); + testTransport(2048, repeat); + testTransport(4096, repeat); + testTransport(4098, repeat); + testTransport(8192, repeat); + testTransport(8197, repeat); + } + + // Test blocking transport + private void testTransport(int size, int repeat) throws Exception { + // ThreadDumper dumper = new ThreadDumper(); + SocketConsumer consumer = new SocketConsumer(size, repeat); + SocketAddress address = new InetSocketAddress("localhost", consumer.getPort()); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); // underlying socket must be non-blocking + channel.connect(address); + + while(!channel.finishConnect()) { // wait to finish connection + Thread.sleep(10); + }; + ExecutorService executor = Executors.newFixedThreadPool(20); + Reactor reactor = new ExecutorReactor(executor); + // Transport transport = new SocketTransport(channel, reactor, 2, 3);//XXX bug + MockSocket pipeline = new MockSocket(channel); + Transport transport = new SocketTransport(pipeline, reactor, 8192); + OutputStream out = new TransportOutputStream(transport); + + // dumper.start(); + testOutputStream(consumer, out, size, repeat); + + out.close(); + executor.shutdown(); + channel.close(); + reactor.stop(); + // dumper.kill(); + Thread.sleep(100); + } + + public void s_testSocket() throws Exception { + s_testSocket(REPEAT); + } + + public void s_testSocket(int repeat) throws Exception { + testSocket(16, repeat); + testSocket(64, repeat); + testSocket(256, repeat); + testSocket(1024, repeat); + testSocket(2048, repeat); + testSocket(4098, repeat); + testSocket(8192, repeat); + } + + // Test blocking socket + private void testSocket(int size, int repeat) throws Exception { + // ThreadDumper dumper = new ThreadDumper(); + SocketConsumer consumer = new SocketConsumer(size, repeat); + Socket socket = new Socket("localhost", consumer.getPort()); + OutputStream out = socket.getOutputStream(); + + //dumper.start(); + testOutputStream(consumer, out, size, repeat); + + out.close(); + socket.close(); + //dumper.kill(); + Thread.sleep(100); + } + + private class AlpahbetIterator { + + private byte[] alphabet = "abcdefghijklmnopqstuvwxyz".getBytes(); + + private int off; + + public byte next() { + if(off == alphabet.length) { + off = 0; + } + return alphabet[off++]; + } + + public void reset() { + off = 0; + } + } + + private void testOutputStream(SocketConsumer consumer, OutputStream out, int size, int repeat) throws Exception { + byte[] block = new byte[size]; // write size + AlpahbetIterator it = new AlpahbetIterator(); // write known data + + for(int i = 1; i < block.length; i++) { + block[i] = it.next(); + } + AtomicLong count = new AtomicLong(); + PerformanceMonitor monitor = new PerformanceMonitor(consumer, count, out.getClass().getSimpleName(), size); + + for(int i = 0; i < repeat; i++) { + block[0] = (byte) i; // mark the first byte in the block to be sure we get blocks in sequence + //System.err.println("["+i+"]"+new String(block,"ISO-8859-1")); + out.write(block); // manipulation of the underlying buffer is taking place when the compact is invoked, this is causing major problems as the next packet will be out of sequence + count.addAndGet(block.length); + } + Thread.sleep(2000); // wait for all bytes to flush through to consumer + monitor.kill(); + } + + private class PerformanceMonitor extends Thread { + private AtomicLong count; + + private volatile boolean dead; + + private SocketConsumer consumer; + + private String name; + + private int size; + + public PerformanceMonitor(SocketConsumer consumer, AtomicLong count, String name, int size) { + this.consumer = consumer; + this.count = count; + this.name = name; + this.size = size; + this.start(); + } + + public void run() { + int second = 0; + while(!dead) { + try { + long octets = count.longValue(); + System.out.printf("%s,%s,%s,%s,%s%n", name, size, second++, octets, consumer.getWindow()); + Thread.sleep(1000); + } catch(Exception e) { + e.printStackTrace(); + } + } + } + + public void kill() throws Exception { + dead = true; + } + } + + private class SocketConsumer extends Thread { + + private ServerSocket server; + + private Window window; + + private long repeat; + + private long size; + + public SocketConsumer(int size, int repeat) throws Exception { + this.window = new Window(20); + this.server = getSocket(); + this.repeat = repeat; + this.size = size; + this.start(); + } + + public int getPort() { + return server.getLocalPort(); + } + + public String getWindow() { + return window.toString(); + } + + private ServerSocket getSocket() throws Exception { + // Scan the ephemeral port range + for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket + try { + ServerSocket socket = new ServerSocket(i); + System.out.println("port=["+socket.getLocalPort()+"]"); + return socket; + } catch(Exception e) { + Thread.sleep(200); + } + } + // Scan a second time for good measure, maybe something got freed up + for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket + try { + ServerSocket socket = new ServerSocket(i); + System.out.println("port=["+socket.getLocalPort()+"]"); + return socket; + } catch(Exception e) { + Thread.sleep(200); + } + } + throw new IOException("Could not create a client socket"); + } + + public void run() { + long count = 0; + int windowOctet = 0; + int expectWindowOctet = 0; + + try { + Socket socket = server.accept(); + InputStream in = socket.getInputStream(); + InputStream source = new BufferedInputStream(in); + AlpahbetIterator it = new AlpahbetIterator(); + + scan: for(int i = 0; i < repeat; i++) { + int octet = source.read(); // check first byte in the block to make sure its correct in sequence + + if(octet == -1) { + break scan; + } + count++; // we have read another byte + windowOctet = octet & 0x000000ff; + expectWindowOctet = i & 0x000000ff; + + if((byte) octet != (byte) i) { + throw new Exception("Wrong sequence of blocks sent, was " + + (byte)octet + " should have been " + (byte)i + " count is "+count+" window is "+window+" compare "+explore(it, source, 5)); + } + window.recieved(octet); + + for(int j = 1, k = 0; j < size; j++, k++) { + octet = source.read(); + + if(octet == -1) { + break scan; + } + byte next = it.next(); + + if((byte) octet != next) { + throw new Exception("Invalid data received expected "+((byte)octet)+"("+((char)octet)+ + ") but was "+next+"("+((char)next)+") total count is "+count+" block count is "+k+" window is expected "+ + expectWindowOctet+"("+((char)expectWindowOctet)+")("+((byte)expectWindowOctet)+") got "+windowOctet+"("+ + ((char)windowOctet)+")("+((byte)windowOctet)+") "+window+" compare "+explore(it, source, 5)); + } + count++; + } + it.reset(); + } + } catch(Throwable e) { + e.printStackTrace(); + } + if(count != size * repeat) { + new Exception("Invalid number of bytes read, was " + count + + " should have been " + (size * repeat)).printStackTrace(); + } + try { + // server.close(); + }catch(Exception e) { + e.printStackTrace(); + } + } + + private String explore(AlpahbetIterator it, InputStream source, int count) throws IOException { + StringBuffer buf = new StringBuffer(); + buf.append("expected ("); + for(int i = 0; i < count; i++) { + buf.append( (char)it.next() ); + } + buf.append(") is ("); + for(int i = 0; i < count; i++) { + buf.append( (char)source.read() ); + } + buf.append(")"); + return buf.toString(); + } + } + + + private static class TransportOutputStream extends OutputStream { + + private Transport transport; + + public TransportOutputStream(Transport transport) { + this.transport = transport; + } + + public void write(int octet) throws IOException { + byte[] data = new byte[] { (byte) octet }; + write(data); + } + + public void write(byte[] data, int off, int len) throws IOException { + try { + ByteBuffer buffer = ByteBuffer.wrap(data, off, len); + ByteBuffer safe = buffer.asReadOnlyBuffer(); + + if(len > 0) { + transport.write(safe); + } + } catch(Exception e) { + e.printStackTrace(); + throw new IOException("Write failed"); + } + } + + public void flush() throws IOException { + try { + transport.flush(); + } catch(Exception e) { + e.printStackTrace(); + throw new IOException("Flush failed"); + } + } + + public void close() throws IOException { + try { + transport.close(); + } catch(Exception e) { + e.printStackTrace(); + throw new IOException("Close failed"); + } + } + + } + + private static class Window { + + private final LinkedList<String> window; + private final int size; + + public Window(int size) { + this.window = new LinkedList<String>(); + this.size = size; + } + + public synchronized void recieved(int sequence) { + window.addLast(String.valueOf(sequence)); + + if(window.size() > size) { + window.removeFirst(); + } + } + + public synchronized String toString() { + StringBuilder builder = new StringBuilder("["); + String delim = ""; + for(String b : window) { + builder.append(delim).append(b); + delim=", "; + } + builder.append("]"); + return builder.toString(); + } + } + +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java new file mode 100644 index 0000000..3032692 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java @@ -0,0 +1,269 @@ +package org.simpleframework.transport.reactor; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + + +public class DistributorTest extends TestCase { + + private static final String PAYLOAD = + "POST /index.html HTTP/1.0\r\n"+ + "Content-Type: multipart/form-data; boundary=AaB03x\r\n"+ + "Accept: image/gif;q=1.0,\r\n image/jpeg;q=0.8,\r\n"+ + " \t\t image/png;\t\r\n\t"+ + " q=1.0,*;q=0.1\r\n"+ + "Accept-Language: fr;q=0.1, en-us;q=0.4, en-gb; q=0.8, en;q=0.7\r\n"+ + "Host: some.host.com \r\n"+ + "Cookie: $Version=1; UID=1234-5678; $Path=/; $Domain=.host.com\r\n"+ + "Cookie: $Version=1; NAME=\"Niall Gallagher\"; $path=\"/\"\r\n"+ + "\r\n" + + "--AaB03x\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file1.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file1.txt\r\n"+ + "--AaB03x\r\n"+ + "Content-Type: multipart/mixed; boundary=BbC04y\r\n\r\n"+ + "--BbC04y\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file2.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file3.txt ...\r\n"+ + "--BbC04y\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file3.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file4.txt ...\r\n"+ + "--BbC04y\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file4.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file4.txt ...\r\n"+ + "--BbC04y--\r\n"+ + "--AaB03x--\r\n"; + + public class Client extends Thread { + + private CountDownLatch latch; + private String message; + private int requests; + private int port; + + public Client(CountDownLatch latch, String message, int port, int requests) throws Exception { + this.message = message; + this.requests = requests; + this.port = port; + this.latch = latch; + this.start(); + } + + public void run() { + try { + latch.await(); + + Socket socket = new Socket("localhost", port); + OutputStream out = socket.getOutputStream(); + byte[] payload = message.getBytes(); + + for(int i = 0; i < requests; i++){ + out.write(payload); + } + out.close(); + } catch(Exception e) { + e.printStackTrace(); + } + } + } + + public class Worker implements Operation { + + private BlockingQueue<Worker> done; + private Reactor reactor; + private SocketChannel channel; + private ByteBuffer buffer; + private String payload; + private int accumulate; + private long finish; + private long start; + private int id; + + public Worker(BlockingQueue<Worker> done, Reactor reactor, SocketChannel channel, String payload, int id) throws Exception { + this.buffer = ByteBuffer.allocate(8192); + this.start = System.currentTimeMillis(); + this.finish = start + 60000; + this.payload = payload; + this.channel = channel; + this.reactor = reactor; + this.done = done; + this.id = id; + } + + public Trace getTrace() { + return new MockTrace(); + } + + public long getExpiry(TimeUnit unit) { + return unit.convert(finish - System.currentTimeMillis(), MILLISECONDS); + } + + public int getAccumulate() { + return accumulate; + } + + // XXX should this be executed in a thread!!!!???? yes... + public void cancel() { + System.err.println("############################# Worker has been canceled"); + } + + public void run() { + try { + // N.B Fundamental to performance + buffer.clear(); + + if(channel.isOpen()) { + int count = channel.read(buffer); + accumulate += count; + + System.err.println("Worker-"+id+" read ["+count +"] of payload sized ["+payload.length()+"] took ["+(System.currentTimeMillis() -start)+"]"); + + if(count != -1) { + reactor.process(this, SelectionKey.OP_READ); + } else { + channel.close(); + done.offer(this); + System.err.println("Worker-"+id+" Channel is closed after time ["+(System.currentTimeMillis() - start)+"] and read ["+accumulate+"]"); + } + } else { + System.err.println("Worker-"+id+" Channel is closed after time ["+(System.currentTimeMillis() - start)+"] and read ["+accumulate+"]"); + done.offer(this); + } + }catch(Exception e) { + e.printStackTrace(); + } + } + + public SocketChannel getChannel() { + return channel; + } + + } + + public class Server extends Thread { + + private BlockingQueue<SocketChannel> ready; + private CountDownLatch latch; + private ServerSocketChannel server; + private Selector selector; + private int port; + + public Server(CountDownLatch latch, BlockingQueue<SocketChannel> ready, int port) throws Exception { + this.server = ServerSocketChannel.open(); + this.selector = Selector.open(); + this.latch = latch; + this.port = port; + this.ready = ready; + this.start(); + } + + private void configure() throws Exception { + server.socket().bind(new InetSocketAddress(port)); + server.configureBlocking(false); + } + + public void run() { + try { + configure(); + execute(); + } catch(Exception e) { + e.printStackTrace(); + } + } + + private void execute() throws Exception { + SelectionKey serverKey = server.register(selector, SelectionKey.OP_ACCEPT); + + latch.countDown(); + + while(true){ + selector.select(); + Set keys = selector.selectedKeys(); + + for(Iterator i = keys.iterator(); i.hasNext();){ + SelectionKey key = (SelectionKey) i.next(); + i.remove(); + + if(key != serverKey) { + return; + } + if(key.isAcceptable()) { + SocketChannel channel = server.accept(); + channel.configureBlocking(false); + ready.offer(channel); + } + } + } + } + } + + public static void main(String[] list) throws Exception { + new DistributorTest().testReactor(); + } + + public void testReactor() throws Exception { + testReactor(PAYLOAD, 200, 100, 10, 8123); + } + + private void testReactor(String payload, int clients, int requests, int threads, int port) throws Exception { + BlockingQueue<Worker> done = new LinkedBlockingQueue<Worker>(); + BlockingQueue<SocketChannel> ready = new LinkedBlockingQueue<SocketChannel>(); + CountDownLatch latch = new CountDownLatch(1); + Server server = new Server(latch, ready, port); + Executor executor = Executors.newFixedThreadPool(10); + Reactor reactor = new ExecutorReactor(executor, 1); + + long start = System.currentTimeMillis(); + + for(int i = 0; i < clients; i++) { + new Client(latch, payload, port, requests); + } + for(int i = 0; i < clients; i++) { + SocketChannel channel = ready.take(); + Worker worker = new Worker(done, reactor, channel, payload, i); + + reactor.process(worker); + } + int total = 0; + + for(int i = 0; i < clients; i++) { + Worker worker = done.take(); + int accumulate = worker.getAccumulate(); + total += accumulate; + System.err.println("Accumulated ["+accumulate+"] of ["+(requests*payload.length())+"] closed ["+worker.getChannel().socket().isClosed()+"]"); + } + System.err.println("Accumulated ["+total+"] of ["+(clients*requests*payload.length())+"]"); + System.err.println("Total time to process ["+(clients*requests)+"] payloads from ["+clients+"] clients took ["+(System.currentTimeMillis() - start)+"]"); + } + + + + +} + + diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/CompareQueueTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/CompareQueueTest.java new file mode 100644 index 0000000..ca2f5db --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/CompareQueueTest.java @@ -0,0 +1,174 @@ +package org.simpleframework.transport.trace; + +import java.text.DecimalFormat; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +import junit.framework.TestCase; + +import org.simpleframework.common.thread.ConcurrentExecutor; + +public class CompareQueueTest extends TestCase { + + private static final int TEST_DURATION = 10000; + private static final int THREAD_COUNT = 100; + + private final Executor blockingReadExecutor = new ConcurrentExecutor(BlockingConsumer.class, THREAD_COUNT); + private final Executor concurrentReadExecutor = new ConcurrentExecutor(ConcurrentConsumer.class, THREAD_COUNT); + private final Executor writeExecutor = new ConcurrentExecutor(Producer.class, THREAD_COUNT); + + public void testLinkedBlockingQueue() throws Exception { + BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(); + AtomicBoolean active = new AtomicBoolean(true); + AtomicLong writeCount = new AtomicLong(); + AtomicLong readCount = new AtomicLong(); + CountDownLatch startLatch = new CountDownLatch(THREAD_COUNT); + CountDownLatch stopLatch = new CountDownLatch(THREAD_COUNT); + DecimalFormat format = new DecimalFormat("###,###,###"); + + for(int i = 0; i < THREAD_COUNT; i++) { + BlockingConsumer consumer = new BlockingConsumer(queue, stopLatch, active, readCount); + blockingReadExecutor.execute(consumer); + } + Thread.sleep(1000); + + for(int i = 0; i < THREAD_COUNT; i++) { + Producer producer = new Producer(queue, startLatch, active, writeCount); + writeExecutor.execute(producer); + } + Thread.sleep(TEST_DURATION); + active.set(false); + stopLatch.await(); + + System.err.printf("read=%s write=%s%n", format.format(readCount.get()), format.format(writeCount.get())); + } + + public void testConcurrentQueue() throws Exception { + Queue<Object> queue = new ConcurrentLinkedQueue<Object>(); + AtomicBoolean active = new AtomicBoolean(true); + AtomicLong writeCount = new AtomicLong(); + AtomicLong readCount = new AtomicLong(); + CountDownLatch startLatch = new CountDownLatch(THREAD_COUNT); + CountDownLatch stopLatch = new CountDownLatch(THREAD_COUNT); + DecimalFormat format = new DecimalFormat("###,###,###"); + + for(int i = 0; i < THREAD_COUNT; i++) { + ConcurrentConsumer consumer = new ConcurrentConsumer(queue, stopLatch, active, readCount); + concurrentReadExecutor.execute(consumer); + } + Thread.sleep(1000); + + for(int i = 0; i < THREAD_COUNT; i++) { + Producer producer = new Producer(queue, startLatch, active, writeCount); + writeExecutor.execute(producer); + } + Thread.sleep(TEST_DURATION); + active.set(false); + stopLatch.await(); + + System.err.printf("read=%s write=%s%n", format.format(readCount.get()), format.format(writeCount.get())); + } + + private static class Producer implements Runnable { + + private final Queue<Object> queue; + private final AtomicBoolean active; + private final AtomicLong count; + private final CountDownLatch latch; + + public Producer(Queue<Object> queue, CountDownLatch latch, AtomicBoolean active, AtomicLong count) { + this.queue = queue; + this.active = active; + this.count = count; + this.latch = latch; + } + + public void run() { + try { + latch.countDown(); + latch.await(); + + while(active.get()) { + Long value = count.getAndIncrement(); + queue.offer(value); + } + } catch(Exception e) { + e.printStackTrace(); + } + } + + } + + private static class ConcurrentConsumer implements Runnable { + + private final Queue<Object> queue; + private final AtomicBoolean active; + private final AtomicLong count; + private final CountDownLatch latch; + + public ConcurrentConsumer(Queue<Object> queue, CountDownLatch latch, AtomicBoolean active, AtomicLong count) { + this.queue = queue; + this.active = active; + this.count = count; + this.latch = latch; + } + + public void run() { + try { + while(active.get()) { + Object value = queue.poll(); + if(value != null) { + count.getAndIncrement(); + } else { + LockSupport.parkNanos(100); + } + } + latch.countDown(); + latch.await(); + }catch(Exception e) { + e.printStackTrace(); + } + } + } + + private static class BlockingConsumer implements Runnable { + + private final BlockingQueue<Object> queue; + private final AtomicBoolean active; + private final AtomicLong count; + private final CountDownLatch latch; + + public BlockingConsumer(BlockingQueue<Object> queue, CountDownLatch latch, AtomicBoolean active, AtomicLong count) { + this.queue = queue; + this.active = active; + this.count = count; + this.latch = latch; + } + + public void run() { + try { + while(active.get()) { + try { + Object value = queue.take(); + if(value != null) { + count.getAndIncrement(); + } + }catch(Exception e) { + e.printStackTrace(); + } + } + latch.countDown(); + latch.await(); + }catch(Exception e) { + e.printStackTrace(); + } + } + } +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/MockTrace.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/MockTrace.java new file mode 100644 index 0000000..583326e --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/MockTrace.java @@ -0,0 +1,6 @@ +package org.simpleframework.transport.trace; + +public class MockTrace implements Trace{ + public void trace(Object event) {} + public void trace(Object event, Object value) {} +} |