diff options
Diffstat (limited to 'simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java')
-rw-r--r-- | simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java | 269 |
1 files changed, 269 insertions, 0 deletions
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java new file mode 100644 index 0000000..3032692 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java @@ -0,0 +1,269 @@ +package org.simpleframework.transport.reactor; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + + +public class DistributorTest extends TestCase { + + private static final String PAYLOAD = + "POST /index.html HTTP/1.0\r\n"+ + "Content-Type: multipart/form-data; boundary=AaB03x\r\n"+ + "Accept: image/gif;q=1.0,\r\n image/jpeg;q=0.8,\r\n"+ + " \t\t image/png;\t\r\n\t"+ + " q=1.0,*;q=0.1\r\n"+ + "Accept-Language: fr;q=0.1, en-us;q=0.4, en-gb; q=0.8, en;q=0.7\r\n"+ + "Host: some.host.com \r\n"+ + "Cookie: $Version=1; UID=1234-5678; $Path=/; $Domain=.host.com\r\n"+ + "Cookie: $Version=1; NAME=\"Niall Gallagher\"; $path=\"/\"\r\n"+ + "\r\n" + + "--AaB03x\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file1.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file1.txt\r\n"+ + "--AaB03x\r\n"+ + "Content-Type: multipart/mixed; boundary=BbC04y\r\n\r\n"+ + "--BbC04y\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file2.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file3.txt ...\r\n"+ + "--BbC04y\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file3.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file4.txt ...\r\n"+ + "--BbC04y\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file4.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file4.txt ...\r\n"+ + "--BbC04y--\r\n"+ + "--AaB03x--\r\n"; + + public class Client extends Thread { + + private CountDownLatch latch; + private String message; + private int requests; + private int port; + + public Client(CountDownLatch latch, String message, int port, int requests) throws Exception { + this.message = message; + this.requests = requests; + this.port = port; + this.latch = latch; + this.start(); + } + + public void run() { + try { + latch.await(); + + Socket socket = new Socket("localhost", port); + OutputStream out = socket.getOutputStream(); + byte[] payload = message.getBytes(); + + for(int i = 0; i < requests; i++){ + out.write(payload); + } + out.close(); + } catch(Exception e) { + e.printStackTrace(); + } + } + } + + public class Worker implements Operation { + + private BlockingQueue<Worker> done; + private Reactor reactor; + private SocketChannel channel; + private ByteBuffer buffer; + private String payload; + private int accumulate; + private long finish; + private long start; + private int id; + + public Worker(BlockingQueue<Worker> done, Reactor reactor, SocketChannel channel, String payload, int id) throws Exception { + this.buffer = ByteBuffer.allocate(8192); + this.start = System.currentTimeMillis(); + this.finish = start + 60000; + this.payload = payload; + this.channel = channel; + this.reactor = reactor; + this.done = done; + this.id = id; + } + + public Trace getTrace() { + return new MockTrace(); + } + + public long getExpiry(TimeUnit unit) { + return unit.convert(finish - System.currentTimeMillis(), MILLISECONDS); + } + + public int getAccumulate() { + return accumulate; + } + + // XXX should this be executed in a thread!!!!???? yes... + public void cancel() { + System.err.println("############################# Worker has been canceled"); + } + + public void run() { + try { + // N.B Fundamental to performance + buffer.clear(); + + if(channel.isOpen()) { + int count = channel.read(buffer); + accumulate += count; + + System.err.println("Worker-"+id+" read ["+count +"] of payload sized ["+payload.length()+"] took ["+(System.currentTimeMillis() -start)+"]"); + + if(count != -1) { + reactor.process(this, SelectionKey.OP_READ); + } else { + channel.close(); + done.offer(this); + System.err.println("Worker-"+id+" Channel is closed after time ["+(System.currentTimeMillis() - start)+"] and read ["+accumulate+"]"); + } + } else { + System.err.println("Worker-"+id+" Channel is closed after time ["+(System.currentTimeMillis() - start)+"] and read ["+accumulate+"]"); + done.offer(this); + } + }catch(Exception e) { + e.printStackTrace(); + } + } + + public SocketChannel getChannel() { + return channel; + } + + } + + public class Server extends Thread { + + private BlockingQueue<SocketChannel> ready; + private CountDownLatch latch; + private ServerSocketChannel server; + private Selector selector; + private int port; + + public Server(CountDownLatch latch, BlockingQueue<SocketChannel> ready, int port) throws Exception { + this.server = ServerSocketChannel.open(); + this.selector = Selector.open(); + this.latch = latch; + this.port = port; + this.ready = ready; + this.start(); + } + + private void configure() throws Exception { + server.socket().bind(new InetSocketAddress(port)); + server.configureBlocking(false); + } + + public void run() { + try { + configure(); + execute(); + } catch(Exception e) { + e.printStackTrace(); + } + } + + private void execute() throws Exception { + SelectionKey serverKey = server.register(selector, SelectionKey.OP_ACCEPT); + + latch.countDown(); + + while(true){ + selector.select(); + Set keys = selector.selectedKeys(); + + for(Iterator i = keys.iterator(); i.hasNext();){ + SelectionKey key = (SelectionKey) i.next(); + i.remove(); + + if(key != serverKey) { + return; + } + if(key.isAcceptable()) { + SocketChannel channel = server.accept(); + channel.configureBlocking(false); + ready.offer(channel); + } + } + } + } + } + + public static void main(String[] list) throws Exception { + new DistributorTest().testReactor(); + } + + public void testReactor() throws Exception { + testReactor(PAYLOAD, 200, 100, 10, 8123); + } + + private void testReactor(String payload, int clients, int requests, int threads, int port) throws Exception { + BlockingQueue<Worker> done = new LinkedBlockingQueue<Worker>(); + BlockingQueue<SocketChannel> ready = new LinkedBlockingQueue<SocketChannel>(); + CountDownLatch latch = new CountDownLatch(1); + Server server = new Server(latch, ready, port); + Executor executor = Executors.newFixedThreadPool(10); + Reactor reactor = new ExecutorReactor(executor, 1); + + long start = System.currentTimeMillis(); + + for(int i = 0; i < clients; i++) { + new Client(latch, payload, port, requests); + } + for(int i = 0; i < clients; i++) { + SocketChannel channel = ready.take(); + Worker worker = new Worker(done, reactor, channel, payload, i); + + reactor.process(worker); + } + int total = 0; + + for(int i = 0; i < clients; i++) { + Worker worker = done.take(); + int accumulate = worker.getAccumulate(); + total += accumulate; + System.err.println("Accumulated ["+accumulate+"] of ["+(requests*payload.length())+"] closed ["+worker.getChannel().socket().isClosed()+"]"); + } + System.err.println("Accumulated ["+total+"] of ["+(clients*requests*payload.length())+"]"); + System.err.println("Total time to process ["+(clients*requests)+"] payloads from ["+clients+"] clients took ["+(System.currentTimeMillis() - start)+"]"); + } + + + + +} + + |