summaryrefslogtreecommitdiffstats
path: root/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java
blob: 6654f31f0474f567c539ed9ea559dcbe64fb9151 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package org.simpleframework.transport;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;

import junit.framework.TestCase;

import org.simpleframework.common.thread.ConcurrentExecutor;
import org.simpleframework.transport.reactor.ExecutorReactor;
import org.simpleframework.transport.reactor.Reactor;
import org.simpleframework.transport.trace.MockTrace;
import org.simpleframework.transport.trace.Trace;

public class SocketTransportPipeTest extends TestCase {
   
   private static final int ITERATIONS = 100000;

   public void testPipe() throws Exception {
      ServerSocket server = new ServerSocket(0);
      SocketAddress address = new InetSocketAddress("localhost", server.getLocalPort());
      SocketChannel channel = SocketChannel.open();
      channel.configureBlocking(false); // underlying socket must be non-blocking
      channel.connect(address);
      while(!channel.finishConnect()) { // wait to finish connection
         Thread.sleep(10);
      };
      Trace trace = new MockTrace();
      SocketWrapper wrapper = new SocketWrapper(channel,  trace);
      Executor executor = new ConcurrentExecutor(Runnable.class);
      Reactor reactor = new ExecutorReactor(executor);
      SocketTransport transport = new SocketTransport(wrapper,reactor);
      java.net.Socket socket = server.accept();
      final InputStream read = socket.getInputStream();
      final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
      final LinkedBlockingQueue<String> sent = new LinkedBlockingQueue<String>();
      final LinkedBlockingQueue<String> received = new LinkedBlockingQueue<String>();
      Thread thread = new Thread(new Runnable() {
         public void run(){
            try {
               byte[] token = new byte[]{'\r','\n'};
               int pos = 0;
               int count = 0;
               while((count = read.read()) != -1){
                  if(count != token[pos++]){
                     pos = 0;
                  }
                  if(pos == token.length) {
                     String value = buffer.toString().trim();
                     String expect = sent.take();
                     
                     if(!value.equals(expect)) {
                        throw new Exception("Out of sequence expected " + expect + " but got " + value);
                     }
                     received.offer(value);
                     buffer.reset();
                     pos = 0;
                  } else {
                     buffer.write(count);
                     System.err.write(count);
                     System.err.flush();
                  }
                     
               }
            }catch(Exception e){
               e.printStackTrace();
            }
         }
      });
      thread.start();
      for(int i = 0; i < ITERATIONS; i++) {
         String message = "message-"+i;
         transport.write(ByteBuffer.wrap((message+"\r\n").getBytes()));
         sent.offer(message);         
      }
      transport.flush();
      transport.close();
      
      for(int i = 0; i < ITERATIONS; i++) {
         assertEquals(received.take(), "message-"+i);         
      }
      assertTrue(sent.isEmpty());
      assertTrue(received.isEmpty());

   }
}