diff options
Diffstat (limited to 'simple/simple-transport/src')
70 files changed, 10452 insertions, 0 deletions
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteCursor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteCursor.java new file mode 100644 index 0000000..a5335ed --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteCursor.java @@ -0,0 +1,131 @@ +/* + * ByteCursor.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; + +/** + * The <code>ByteCursor</code> object is used to acquire bytes from a + * given source. This provides a cursor style reading of bytes from + * a stream in that it will allow the reader to move the cursor back + * if the amount of bytes read is too much. Allowing the cursor to + * move ensures that excess bytes back be placed back in the stream. + * <p> + * This is used when parsing input from a stream as it ensures that + * on arrival at a terminal token any excess bytes can be placed + * back in to the stream. This allows data to be read efficiently + * in large chunks from blocking streams such as sockets. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.TransportCursor + */ +public interface ByteCursor { + + /** + * Determines whether the cursor is still open. The cursor is + * considered open if there are still bytes to read. If there is + * still bytes buffered and the underlying transport is closed + * then the cursor is still considered open. + * + * @return true if the read method does not return a -1 value + */ + boolean isOpen() throws IOException; + + /** + * Determines whether the cursor is ready for reading. When the + * cursor is ready then it guarantees that some amount of bytes + * can be read from the underlying stream without blocking. + * + * @return true if some data can be read without blocking + */ + boolean isReady() throws IOException; + + /** + * Provides the number of bytes that can be read from the stream + * without blocking. This is typically the number of buffered or + * available bytes within the stream. When this reaches zero then + * the cursor may perform a blocking read. + * + * @return the number of bytes that can be read without blocking + */ + int ready() throws IOException; + + /** + * Reads a block of bytes from the underlying stream. This will + * read up to the requested number of bytes from the underlying + * stream. If there are no ready bytes on the stream this can + * return zero, representing the fact that nothing was read. + * + * @param data this is the array to read the bytes in to + * + * @return this returns the number of bytes read from the stream + */ + int read(byte[] data) throws IOException; + + /** + * Reads a block of bytes from the underlying stream. This will + * read up to the requested number of bytes from the underlying + * stream. If there are no ready bytes on the stream this can + * return zero, representing the fact that nothing was read. + * + * @param data this is the array to read the bytes in to + * @param off this is the offset to begin writing the bytes to + * @param len this is the number of bytes that are requested + * + * @return this returns the number of bytes read from the stream + */ + int read(byte[] data, int off, int len) throws IOException; + + /** + * Pushes the provided data on to the cursor. Data pushed on to + * the cursor will be the next data read from the cursor. This + * complements the <code>reset</code> method which will reset + * the cursors position on a stream. Allowing data to be pushed + * on to the cursor allows more flexibility. + * + * @param data this is the data to be pushed on to the cursor + */ + void push(byte[] data) throws IOException; + + /** + * Pushes the provided data on to the cursor. Data pushed on to + * the cursor will be the next data read from the cursor. This + * complements the <code>reset</code> method which will reset + * the cursors position on a stream. Allowing data to be pushed + * on to the cursor allows more flexibility. + * + * @param data this is the data to be pushed on to the cursor + * @param off this is the offset to begin reading the bytes + * @param len this is the number of bytes that are to be used + */ + void push(byte[] data, int off, int len) throws IOException; + + /** + * Moves the cursor backward within the stream. This ensures + * that any bytes read from the last read can be pushed back + * in to the stream so that they can be read again. This will + * throw an exception if the reset can not be performed. + * + * @param len this is the number of bytes to reset back + * + * @return this is the number of bytes that have been reset + */ + int reset(int len) throws IOException; +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteReader.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteReader.java new file mode 100644 index 0000000..3e26e76 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteReader.java @@ -0,0 +1,107 @@ +/* + * ByteReader.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; + +/** + * The <code>ByteReader</code> object is used to acquire bytes from + * a given source. This provides a cursor style reading of bytes from + * a stream in that it will allow the reader to move the cursor back + * if the amount of bytes read is too much. Allowing the cursor to + * move ensures that excess bytes can be placed back in the stream. + * <p> + * This is used when parsing input from a stream as it ensures that + * on arrival at a terminal token any excess bytes can be placed + * back in to the stream. This allows data to be read efficiently + * in large chunks from blocking streams such as sockets. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.ByteCursor + */ +interface ByteReader { + + /** + * Determines whether the source is still open. The source is + * considered open if there are still bytes to read. If there is + * still bytes buffered and the underlying transport is closed + * then the source is still considered open. + * + * @return true if the read method does not return a -1 value + */ + boolean isOpen() throws IOException; + + /** + * Determines whether the source is ready for reading. When the + * source is ready then it guarantees that some amount of bytes + * can be read from the underlying stream without blocking. + * + * @return true if some data can be read without blocking + */ + boolean isReady() throws IOException; + + /** + * Provides the number of bytes that can be read from the stream + * without blocking. This is typically the number of buffered or + * available bytes within the stream. When this reaches zero then + * the source may perform a blocking read. + * + * @return the number of bytes that can be read without blocking + */ + int ready() throws IOException; + + /** + * Reads a block of bytes from the underlying stream. This will + * read up to the requested number of bytes from the underlying + * stream. If there are no ready bytes on the stream this can + * return zero, representing the fact that nothing was read. + * + * @param data this is the array to read the bytes in to + * + * @return this returns the number of bytes read from the source + */ + int read(byte[] data) throws IOException; + + /** + * Reads a block of bytes from the underlying stream. This will + * read up to the requested number of bytes from the underlying + * stream. If there are no ready bytes on the stream this can + * return zero, representing the fact that nothing was read. + * + * @param data this is the array to read the bytes in to + * @param off this is the offset to begin writing the bytes to + * @param len this is the number of bytes that are requested + * + * @return this returns the number of bytes read from the source + */ + int read(byte[] data, int off, int len) throws IOException; + + /** + * Moves the source backward within the stream. This ensures + * that any bytes read from the last read can be pushed back + * in to the stream so that they can be read again. This will + * throw an exception if the reset can not be performed. + * + * @param len this is the number of bytes to reset back + * + * @return this is the number of bytes that have been reset + */ + int reset(int len) throws IOException; +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteWriter.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteWriter.java new file mode 100644 index 0000000..dc647cf --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteWriter.java @@ -0,0 +1,98 @@ +/* + * ByteWriter.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * The <code>ByteWriter</code> object is used to send data over the TCP + * transport. This provides direct contact with the connected socket. + * Delivery over a sender implementation can be either SSL based or + * direct. It is the responsibility of the implementation to provide + * such behavior as required. + * + * @author Niall Gallagher + */ +public interface ByteWriter { + + /** + * This method is used to deliver the provided array of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or send directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param array this is the array of bytes to send to the client + */ + void write(byte[] array) throws IOException; + + /** + * This method is used to deliver the provided array of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or send directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param array this is the array of bytes to send to the client + * @param off this is the offset within the array to send from + * @param len this is the number of bytes that are to be sent + */ + void write(byte[] array, int off, int len) throws IOException; + + /** + * This method is used to deliver the provided buffer of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or send directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param buffer this is the buffer of bytes to send to the client + */ + void write(ByteBuffer buffer) throws IOException; + + /** + * This method is used to deliver the provided buffer of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or send directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param buffer this is the buffer of bytes to send to the client + * @param off this is the offset within the buffer to send from + * @param len this is the number of bytes that are to be sent + */ + void write(ByteBuffer buffer, int off, int len) throws IOException; + + /** + * This method is used to flush the contents of the buffer to + * the client. This method will block until such time as all of + * the data has been sent to the client. If at any point there + * is an error sending the content an exception is thrown. + */ + void flush() throws IOException; + + /** + * This is used to close the sender and the underlying transport. + * If a close is performed on the sender then no more bytes can + * be read from or written to the transport and the client will + * received a connection close on their side. + */ + void close() throws IOException; +} + + + + diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Certificate.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Certificate.java new file mode 100644 index 0000000..05f9c91 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Certificate.java @@ -0,0 +1,65 @@ +/* + * Certificate.java June 2013 + * + * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import javax.security.cert.X509Certificate; + +/** + * The <code>Certificate</code> interface represents the certificate + * that is sent by a client during a secure HTTPS conversation. This + * may or may not contain an X509 certificate chain from the client. + * If it does not a <code>CertificateChallenge</code> may be used to + * issue a renegotiation of the connection. One completion of the + * renegotiation the challenge executes a completion operation. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.CertificateChallenge + */ +public interface Certificate { + + /** + * This will return the X509 certificate chain, if any, that + * has been sent by the client. A certificate chain is typically + * only send when the server explicitly requests the certificate + * on the initial connection or when it is challenged for. + * + * @return this returns the clients X509 certificate chain + */ + X509Certificate[] getChain() throws Exception; + + /** + * This returns a challenge for the certificate. A challenge is + * issued by providing a <code>Runnable</code> task which is to + * be executed when the challenge has completed. Typically this + * task should be used to drive completion of an HTTPS request. + * + * @return this returns a challenge for the client certificate + */ + CertificateChallenge getChallenge() throws Exception; + + /** + * This is used to determine if the X509 certificate chain is + * present for the request. If it is not present then a challenge + * can be used to request the certificate. + * + * @return true if the certificate chain is present + */ + boolean isChainPresent() throws Exception; +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/CertificateChallenge.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/CertificateChallenge.java new file mode 100644 index 0000000..5ed2743 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/CertificateChallenge.java @@ -0,0 +1,73 @@ +/* + * CertificateChallenge.java June 2013 + * + * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.util.concurrent.Future; + +/** + * The <code>CertificateChallenge</code> object is used to challenge + * a client for their x509 certificate. Notification of a successful + * challenge for the certificate is done using a completion task. + * The task is executed when the SSL renegotiation completes with + * a client certificate. + * <p> + * For HTTPS the SSL renegotiation workflow used to challenge the + * client for their X509 certificate is rather bizzare. It starts + * with an initial challenge, where an SSL handshake is performed. + * This initial handshake typically completes but results in the + * TCP connection being closed by the client. Then a second + * handshake is performed by the client on a new TCP connection, + * this second handshake does not contain the certificate either. + * When the handshake is finished on this new connection the client + * will resubmit the original HTTP request. Again the server will + * have to challenge for the certificate, which should succeed and + * result in execution of the task provided. + * <p> + * An important point to note here, is that if the client closes + * the TCP connection on the first challenge, the completion task + * will not be executed, it will be ignored. Only a successful + * completion of a HTTPS renegotiation will result in execution + * of the provided task. + * + * @author Niall Gallagher + */ +public interface CertificateChallenge { + + /** + * This method will challenge the client for their certificate. + * It does so by performing an SSL renegotiation. Successful + * completion of the SSL renegotiation results in the client + * providing their certificate, and execution of the task. + * + * @return this future containing the original certificate + */ + Future<Certificate> challenge() throws Exception; + + /** + * This method will challenge the client for their certificate. + * It does so by performing an SSL renegotiation. Successful + * completion of the SSL renegotiation results in the client + * providing their certificate, and execution of the task. + * + * @param completion task to be run on successful challenge + * + * @return this future containing the original certificate + */ + Future<Certificate> challenge(Runnable completion) throws Exception; +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Channel.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Channel.java new file mode 100644 index 0000000..02e6cbd --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Channel.java @@ -0,0 +1,128 @@ +/* + * Channel.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.nio.channels.SocketChannel; +import java.util.Map; + +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>Channel</code> interface represents a connected channel + * through which data can be sent and received. Typically a channel + * will have a connected TCP socket, which can be used to determine + * when the channel is read ready, and write ready. A channel can + * also contain a bag of attributes used to describe the connection. + * <p> + * Reading and writing to a channel is performed using two special + * interfaces. The first is the <code>ByteCursor</code> object which + * is used to read data from the channel in a non-blocking manner. + * This can also be used to reset data if it has read too much. To + * write the <code>ByteWriter</code> can be used, this provides a + * blocking interface much like a conventional output stream. + * + * @author Niall Gallagher + */ +public interface Channel { + + /** + * This is used to determine if the channel is secure and that + * data read from and data written to the request is encrypted. + * Channels transferred over SSL are considered secure and will + * have this return true, otherwise it will return false. + * + * @return true if this is secure for reading and writing + */ + boolean isSecure(); + + /** + * This is the connected socket channel associated with this. In + * order to determine if content can be read or written to or + * from the channel this socket can be used with a selector. This + * provides a means to react to I/O events as they occur rather + * than polling the channel which is generally less performant. + * + * @return this returns the connected socket channel + */ + SocketChannel getSocket(); + + /** + * This is used to acquire the SSL certificate used for security. + * If the socket is connected to an SSL transport this returns an + * SSL certificate which was provided during the secure handshake + * between the client and server. If not certificates are present + * in the provided instance, a challenge can be issued. + * + * @return the SSL certificate provided by a secure transport + */ + Certificate getCertificate(); + + /** + * This gets the <code>Trace</code> object associated with the + * channel. The trace is used to log various events for the life + * of the transaction such as low level read and write events + * as well as milestone events and errors. + * + * @return this returns the trace associated with the socket + */ + Trace getTrace(); + + /** + * This provides a <code>ByteCursor</code> for this channel. The + * cursor provides a seekable view of the input buffer and will + * allow the server kernel to peek into the input buffer without + * having to take the data from the input. This allows overflow + * to be pushed back on to the cursor for subsequent reads. + * + * @return this returns the input cursor for the channel + */ + ByteCursor getCursor(); + + /** + * This provides a <code>ByteWriter</code> for the channel. This + * is used to provide a blocking output mechanism for the channel. + * Enabling blocking reads ensures that output buffering can be + * limited to an extent, which ensures that memory remains low at + * high load periods. Writes to the sender may result in the data + * being copied and queued until the socket is write ready. + * + * @return this returns the output sender for this channel + */ + ByteWriter getWriter(); + + /** + * This returns the <code>Map</code> of attributes used to hold + * connection information for the channel. The attributes here + * are taken from the pipeline attributes and may contain details + * such as SSL certificates or other such useful information. + * + * @return returns the attributes associated with the channel + */ + Map getAttributes(); + + /** + * Because the channel represents a duplex means of communication + * there needs to be a means to close it down. This provides such + * a means. By closing the channel the cursor and sender will no + * longer send or recieve data to or from the network. The client + * will also be signaled that the connection has been severed. + */ + void close(); + +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushScheduler.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushScheduler.java new file mode 100644 index 0000000..43a64ed --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushScheduler.java @@ -0,0 +1,197 @@ +/* + * FlushScheduler.java February 2008 + * + * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import static java.nio.channels.SelectionKey.OP_WRITE; +import static org.simpleframework.transport.TransportEvent.WRITE_BLOCKING; +import static org.simpleframework.transport.TransportEvent.WRITE_WAIT; + +import java.io.IOException; + +import org.simpleframework.transport.reactor.Operation; +import org.simpleframework.transport.reactor.Reactor; +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>FlushScheduler</code> object is used to schedule a task + * for execution when it is write ready. This is used by the socket + * flusher to ensure that the writing thread can be blocked until + * such time as all the bytes required to be written are written. + * <p> + * All methods are invoked by a <code>SocketFlusher</code> object + * which is synchronized. This ensures that the methods of the + * scheduler are thread safe in that only one thread will access + * them at any given time. The lock used by the socket flusher can + * thus be safely as it will be synchronized on by the flusher. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.SocketFlusher + */ +class FlushScheduler { + + /** + * This is the operation that is scheduled for execution. + */ + private Operation task; + + /** + * This is the reactor to used to execute the operation. + */ + private Reactor reactor; + + /** + * This is the trace that listens to all transport events. + */ + private Trace trace; + + /** + * This is the lock that is used to signal a blocked thread. + */ + private Object lock; + + /** + * This is used to determine if the scheduler is running. + */ + private volatile boolean running; + + /** + * This is used to determine if the scheduler is interrupted. + */ + private volatile boolean closed; + + /** + * This is used to determine if there is currently a flush. + */ + private volatile boolean flushing; + + /** + * Constructor for the <code>FlushScheduler</code> object. This + * is* used to create a scheduler that will execute the provided + * task when the associated socket is write ready. + * + * @param socket this is the associated socket for the scheduler + * @param reactor this is the rector used to schedule execution + * @param task this is the task that is executed when writable + * @param lock this is the lock used to signal blocking threads + */ + public FlushScheduler(Socket socket, Reactor reactor, Operation task, Object lock) { + this.trace = socket.getTrace(); + this.reactor = reactor; + this.task = task; + this.lock = lock; + } + + /** + * This is used to repeat schedule the operation for execution. + * This is executed if the operation has not fully completed + * its task. If the scheduler is not in a running state then + * this will not schedule the task for a repeat execution. + */ + public void repeat() throws IOException { + if(closed) { + throw new TransportException("Socket closed"); + } + if(running) { + trace.trace(WRITE_WAIT); + reactor.process(task, OP_WRITE); + } + } + + /** + * This is used to schedule the task for execution. If this is + * given a boolean true to indicate that it wishes to block + * then this will block the calling thread until such time as + * the <code>ready</code> method is invoked. + * + * @param block indicates whether the thread should block + */ + public void schedule(boolean block) throws IOException { + if(closed) { + throw new TransportException("Socket closed"); + } + if(!running) { + trace.trace(WRITE_WAIT); + reactor.process(task, OP_WRITE); + running = true; + } + if(block) { + listen(); + } + } + + /** + * This is used to listen for a notification from the reactor to + * tell the thread that the write operation has completed. If + * the thread is interrupted upon this call then this will throw + * an <code>IOException</code> with the root cause. + */ + private void listen() throws IOException { + if(flushing) { + throw new TransportException("Socket already flushing"); + } + try { + if(!closed) { + try { + flushing = true; + trace.trace(WRITE_BLOCKING); + lock.wait(120000); + } finally { + flushing = false; + } + } + } catch(Exception e) { + throw new TransportException("Could not schedule for flush", e); + } + if(closed) { + throw new TransportException("Socket closed"); + } + } + + /** + * This is used to notify any waiting threads that they no longer + * need to wait. This is used when the flusher no longer needs + * the waiting thread to block. Such an occurrence happens when + * all shared data has been written or has been duplicated. + */ + public void release() { + lock.notifyAll(); + } + + /** + * This is used to signal any blocking threads to wake up. When + * this is invoked blocking threads are signaled and they can + * return. This is typically done when the task has finished. + */ + public void ready() { + lock.notifyAll(); + running = false; + } + + /** + * This is used to close the scheduler when the reactor is + * closed by the server. An close will happen when the server + * has been shutdown, it ensures there are no threads lingering + * waiting for a notification when the reactor has closed. + */ + public void close() { + lock.notifyAll(); + closed = true; + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushSignaller.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushSignaller.java new file mode 100644 index 0000000..e7f2c4f --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushSignaller.java @@ -0,0 +1,120 @@ +/* + * FlushSignaller.java February 2008 + * + * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import static org.simpleframework.transport.TransportEvent.ERROR; + +import java.nio.channels.SocketChannel; + +import org.simpleframework.transport.reactor.Operation; +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>FlushSignaller</code> is an operation that performs + * writes operation asynchronously. This will basically determine + * if the socket is write ready and drain each queued buffer to + * the socket until there are no more pending buffers. + * + * @author Niall Gallagher + */ +class FlushSignaller implements Operation { + + /** + * This is the writer that is used to write the data. + */ + private final SocketFlusher writer; + + /** + * This is the socket that this will be flushing. + */ + private final Socket socket; + + /** + * This is used to trace the activity for the operation. + */ + private final Trace trace; + + /** + * Constructor for the <code>FlushSignaller</code> object. This + * will create an operation that is used to flush the buffer + * queue to the underlying socket. This ensures that the data + * is written to the socket in the queued order. + * + * @param writer this is the writer to flush the data to + * @param socket this is the socket to be flushed + */ + public FlushSignaller(SocketFlusher writer, Socket socket) { + this.trace = socket.getTrace(); + this.socket = socket; + this.writer = writer; + } + + /** + * This is used to acquire the trace object that is associated + * with the operation. A trace object is used to collection details + * on what operations are being performed. For instance it may + * contain information relating to I/O events or errors. + * + * @return this returns the trace associated with this operation + */ + public Trace getTrace() { + return trace; + } + + /** + * This returns the socket channel for the connected pipeline. It + * is this channel that is used to determine if there are bytes + * that can be written. When closed this is no longer selectable. + * + * @return this returns the connected channel for the pipeline + */ + public SocketChannel getChannel() { + return socket.getChannel(); + } + + /** + * This is used to perform the drain of the pending buffer + * queue. This will drain each pending queue if the socket is + * write ready. If the socket is not write ready the operation + * is enqueued for selection and this returns. This ensures + * that all the data will eventually be delivered. + */ + public void run() { + try { + writer.execute(); + } catch(Exception cause) { + trace.trace(ERROR, cause); + cancel(); + } + } + + /** + * This is used to cancel the operation if it has timed out. + * If the delegate is waiting too long to flush the contents + * of the buffers to the underlying transport then the socket + * is closed and the flusher times out to avoid deadlock. + */ + public void cancel() { + try { + writer.abort(); + }catch(Exception cause){ + trace.trace(ERROR, cause); + } + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Handshake.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Handshake.java new file mode 100644 index 0000000..2a4b9a2 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Handshake.java @@ -0,0 +1,652 @@ +/* + * Handshake.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import static java.nio.channels.SelectionKey.OP_READ; +import static java.nio.channels.SelectionKey.OP_WRITE; +import static org.simpleframework.transport.PhaseType.COMMIT; +import static org.simpleframework.transport.PhaseType.CONSUME; +import static org.simpleframework.transport.PhaseType.PRODUCE; +import static org.simpleframework.transport.TransportEvent.ERROR; +import static org.simpleframework.transport.TransportEvent.HANDSHAKE_BEGIN; +import static org.simpleframework.transport.TransportEvent.HANDSHAKE_DONE; +import static org.simpleframework.transport.TransportEvent.HANDSHAKE_FAILED; +import static org.simpleframework.transport.TransportEvent.READ; +import static org.simpleframework.transport.TransportEvent.WRITE; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SocketChannel; +import java.util.concurrent.Future; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; + +import org.simpleframework.transport.reactor.Reactor; +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>Handshake</code> object is used to perform secure SSL + * negotiations on a pipeline or <code>Transport</code>. This can + * be used to perform an SSL handshake. To perform the negotiation + * this uses an SSL engine provided with the transport to direct + * the conversation. The SSL engine tells the negotiation what is + * expected next, whether this is a response to the client or a + * message from it. During the negotiation this may need to wait + * for either a write ready event or a read ready event. Event + * notification is done using the processor provided. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.TransportProcessor + */ +class Handshake implements Negotiation { + + /** + * This is the processor used to process the secure transport. + */ + private final TransportProcessor processor; + + /** + * This is the certificate associated with this negotiation. + */ + private final NegotiationState state; + + /** + * This is the socket channel used to read and write data to. + */ + private final SocketChannel channel; + + /** + * This is the transport dispatched when the negotiation ends. + */ + private final Transport transport; + + /** + * This is the reactor used to register for I/O notifications. + */ + private final Reactor reactor; + + /** + * This is the output buffer used to generate data to. + */ + private final ByteBuffer output; + + /** + * This is the input buffer used to read data from the socket. + */ + private final ByteBuffer input; + + /** + * This is an empty byte buffer used to generate a response. + */ + private final ByteBuffer empty; + + /** + * This is the SSL engine used to direct the conversation. + */ + private final SSLEngine engine; + + /** + * This is the trace that is used to monitor handshake events. + */ + private final Trace trace; + + /** + * This determines if the handshake is from the client side. + */ + private final boolean client; + + /** + * Constructor for the <code>Handshake</code> object. This is + * used to create an operation capable of performing negotiations + * for SSL connections. Typically this is used to perform request + * response negotiations, such as a handshake or termination. + * + * @param processor the processor used to dispatch the transport + * @param transport the transport to perform the negotiation for + * @param reactor this is the reactor used for I/O notifications + */ + public Handshake(TransportProcessor processor, Transport transport, Reactor reactor) { + this(processor, transport, reactor, 20480); + } + + /** + * Constructor for the <code>Handshake</code> object. This is + * used to create an operation capable of performing negotiations + * for SSL connections. Typically this is used to perform request + * response negotiations, such as a handshake or termination. + * + * @param transport the transport to perform the negotiation for + * @param processor the processor used to dispatch the transport + * @param reactor this is the reactor used for I/O notifications + * @param size the size of the buffers used for the negotiation + */ + public Handshake(TransportProcessor processor, Transport transport, Reactor reactor, int size) { + this(processor, transport, reactor, size, false); + } + + /** + * Constructor for the <code>Handshake</code> object. This is + * used to create an operation capable of performing negotiations + * for SSL connections. Typically this is used to perform request + * response negotiations, such as a handshake or termination. + * + * @param transport the transport to perform the negotiation for + * @param processor the processor used to dispatch the transport + * @param reactor this is the reactor used for I/O notifications + * @param client determines the side of the SSL handshake + */ + public Handshake(TransportProcessor processor, Transport transport, Reactor reactor, boolean client) { + this(processor, transport, reactor, 20480, client); + } + + /** + * Constructor for the <code>Handshake</code> object. This is + * used to create an operation capable of performing negotiations + * for SSL connections. Typically this is used to perform request + * response negotiations, such as a handshake or termination. + * + * @param transport the transport to perform the negotiation for + * @param processor the processor used to dispatch the transport + * @param reactor this is the reactor used for I/O notifications + * @param size the size of the buffers used for the negotiation + * @param client determines the side of the SSL handshake + */ + public Handshake(TransportProcessor processor, Transport transport, Reactor reactor, int size, boolean client) { + this.state = new NegotiationState(this, transport); + this.output = ByteBuffer.allocate(size); + this.input = ByteBuffer.allocate(size); + this.channel = transport.getChannel(); + this.engine = transport.getEngine(); + this.trace = transport.getTrace(); + this.empty = ByteBuffer.allocate(0); + this.processor = processor; + this.transport = transport; + this.reactor = reactor; + this.client = client; + } + + /** + * This is used to acquire the trace object that is associated + * with the operation. A trace object is used to collection details + * on what operations are being performed. For instance it may + * contain information relating to I/O events or errors. + * + * @return this returns the trace associated with this operation + */ + public Trace getTrace() { + return trace; + } + + /** + * This returns the socket channel for the connected pipeline. It + * is this channel that is used to determine if there are bytes + * that can be read. When closed this is no longer selectable. + * + * @return this returns the connected channel for the pipeline + */ + public SelectableChannel getChannel() { + return channel; + } + + /** + * This is used to start the negotiation. Once started this will + * send a message to the other side, once sent the negotiation + * reads the response. However if the response is not yet ready + * this will schedule the negotiation for a selectable operation + * ensuring that it can resume execution when ready. + */ + public void run() { + if(engine != null) { + trace.trace(HANDSHAKE_BEGIN); + engine.setUseClientMode(client); + input.flip(); + } + begin(); + } + + /** + * This is used to terminate the negotiation. This is excecuted + * when the negotiation times out. When the negotiation expires it + * is rejected by the processor and must be canceled. Canceling + * is basically termination of the connection to free resources. + */ + public void cancel() { + try { + terminate(); + } catch(Exception cause) { + trace.trace(ERROR, cause); + } + } + + /** + * This is used to start the negotation. Once started this will + * send a message to the other side, once sent the negotiation + * reads the response. However if the response is not yet ready + * this will schedule the negotiation for a selectable operation + * ensuring that it can resume execution when ready. + */ + private void begin() { + try { + resume(); + } catch(Exception cause) { + trace.trace(ERROR, cause); + cancel(); + } + } + + /** + * This is the main point of execution within the negotiation. It + * is where the negotiation is performed. Negotiations are done + * by performing a request response flow, governed by the SSL + * engine associated with the pipeline. Typically the client is + * the one to initiate the handshake and the server initiates the + * termination sequence. This may be executed several times + * depending on whether reading or writing blocks. + */ + public void resume() throws IOException { + Runnable task = process(); + + if(task != null) { + task.run(); + } + } + + /** + * This is the main point of execution within the negotiation. It + * is where the negotiation is performed. Negotiations are done + * by performing a request response flow, governed by the SSL + * engine associated with the transport. Typically the client is + * the one to initiate the handshake and the server initiates the + * termination sequence. This may be executed several times + * depending on whether reading or writing blocks. + * + * @return this returns a task used to execute the next phase + */ + private Runnable process() throws IOException { + PhaseType require = exchange(); + + if(require == CONSUME) { + return new Consumer(this, reactor, trace); + } + if(require == PRODUCE) { + return new Producer(this, reactor, trace); + } + return new Committer(this, reactor, trace); + } + + /** + * This is the main point of execution within the negotiation. It + * is where the negotiation is performed. Negotiations are done + * by performing a request response flow, governed by the SSL + * engine associated with the transport. Typically the client is + * the one to initiate the handshake and the server initiates the + * termination sequence. This may be executed several times + * depending on whether reading or writing blocks. + * + * @return this returns what is expected next in the negotiation + */ + private PhaseType exchange() throws IOException { + HandshakeStatus status = engine.getHandshakeStatus(); + + switch(status){ + case NEED_WRAP: + return write(); + case NOT_HANDSHAKING: + case NEED_UNWRAP: + return read(); + } + return COMMIT; + } + + /** + * This is used to perform the read part of the negotiation. The + * read part is where the other side sends information where it + * is consumed and is used to determine what action to take. + * Typically it is the SSL engine that determines what action is + * to be taken depending on the data send from the other side. + * + * @return the next action that should be taken by the handshake + */ + private PhaseType read() throws IOException { + return read(5); + } + + /** + * This is used to perform the read part of the negotiation. The + * read part is where the other side sends information where it + * is consumed and is used to determine what action to take. + * Typically it is the SSL engine that determines what action is + * to be taken depending on the data send from the other side. + * + * @param count this is the number of times a read can repeat + * + * @return the next action that should be taken by the handshake + */ + private PhaseType read(int count) throws IOException { + while(count > 0) { + SSLEngineResult result = engine.unwrap(input, output); + HandshakeStatus status = result.getHandshakeStatus(); + + switch(status) { + case NOT_HANDSHAKING: + return COMMIT; + case NEED_WRAP: + return PRODUCE; + case FINISHED: + case NEED_UNWRAP: + return read(count-1); + case NEED_TASK: + execute(); + } + } + return CONSUME; + } + + /** + * This is used to perform the write part of the negotiation. The + * read part is where the this sends information to the other side + * and the other side interprets the data and determines what action + * to take. After a write the negotiation typically completes or + * waits for the next response from the other side. + * + * @return the next action that should be taken by the handshake + */ + private PhaseType write() throws IOException { + return write(5); + } + + /** + * This is used to perform the write part of the negotiation. The + * read part is where the this sends information to the other side + * and the other side interprets the data and determines what action + * to take. After a write the negotiation typically completes or + * waits for the next response from the other side. + * + * @param count this is the number of times a read can repeat + * + * @return the next action that should be taken by the handshake + */ + private PhaseType write(int count) throws IOException { + while(count > 0) { + SSLEngineResult result = engine.wrap(empty, output); + HandshakeStatus status = result.getHandshakeStatus(); + + switch(status) { + case NOT_HANDSHAKING: + case FINISHED: + case NEED_UNWRAP: + return PRODUCE; + case NEED_WRAP: + return write(count-1); + case NEED_TASK: + execute(); + } + } + return PRODUCE; + } + + /** + * This is used to execute the delegated tasks. These tasks are + * used to digest the information received from the client in + * order to generate a response. This may need to execute several + * tasks from the associated SSL engine. + */ + private void execute() throws IOException { + while(true) { + Runnable task = engine.getDelegatedTask(); + + if(task == null) { + break; + } + task.run(); + } + } + + /** + * This is used to receive data from the client. If at any + * point during the negotiation a message is required that + * can not be read immediately this is used to asynchronously + * read the data when a select operation is signalled. + * + * @return this returns true when the message has been read + */ + public boolean receive() throws IOException { + int count = input.capacity(); + + if(count > 0) { + input.compact(); + } + int size = channel.read(input); + + if(trace != null) { + trace.trace(READ, size); + } + if(size < 0) { + throw new TransportException("Client closed connection"); + } + if(count > 0) { + input.flip(); + } + return size > 0; + } + + /** + * Here we attempt to send all data within the output buffer. If + * all of the data is delivered to the other side then this will + * return true. If however there is content yet to be sent to + * the other side then this returns false, telling the negotiation + * that in order to resume it must attempt to send the content + * again after a write ready operation on the underlying socket. + * + * @return this returns true if all of the content is delivered + */ + public boolean send() throws IOException { + int require = output.position(); + int count = 0; + + if(require > 0) { + output.flip(); + } + while(count < require) { + int size = channel.write(output); + + if(trace != null) { + trace.trace(WRITE, size); + } + if(size <= 0) { + break; + } + count += size; + } + if(require > 0) { + output.compact(); + } + return count == require; + } + + /** + * This method is invoked when the negotiation is done and the + * next phase of the connection is to take place. This will + * be invoked when the SSL handshake has completed and the new + * secure transport is to be handed to the processor. + */ + private void dispatch() throws IOException { + Transport secure = new SecureTransport(transport, state, output, input); + + if(processor != null) { + trace.trace(HANDSHAKE_DONE); + processor.process(secure); + } + } + + /** + * This method is used to terminate the handshake. Termination + * typically occurs when there has been some error in the handshake + * or when there is a timeout on some event, such as waiting for + * for a read or write operation to occur. As a result the TCP + * channel is closed and any challenge future is cancelled. + */ + private void terminate() throws IOException { + Future<Certificate> future = state.getFuture(); + + trace.trace(HANDSHAKE_FAILED); + transport.close(); + future.cancel(true); + } + + /** + * This is used to execute the completion task after a challenge + * for the clients X509 certificate. Execution of the completion + * task in this way allows any challanger to be notified that + * the handshake has complete. + */ + private void complete() throws IOException { + Runnable task = state.getFuture(); + + if(task != null) { + task.run(); + } + } + + /** + * This method is invoked when the negotiation is done and the + * next phase of the connection is to take place. If a certificate + * challenge was issued then the completion task is executed, if + * this was the handshake for the initial connection a transport + * is created and handed to the processor. + */ + public void commit() throws IOException { + if(!state.isChallenge()) { + dispatch(); + } else { + complete(); + } + } + + /** + * The <code>Committer</code> task is used to transfer the transport + * created to the processor. This is executed when the SSL + * handshake is completed. It allows the transporter to use the + * newly created transport to read and write in plain text and + * to have the SSL transport encrypt and decrypt transparently. + */ + private class Committer extends Phase { + + /** + * Constructor for the <code>Committer</code> task. This is used to + * pass the transport object object to the processor when the + * SSL handshake has completed. + * + * @param state this is the underlying negotiation to use + * @param reactor this is the reactor used for I/O notifications + * @param trace the trace that is used to monitor the handshake + */ + public Committer(Negotiation state, Reactor reactor, Trace trace) { + super(state, reactor, trace, OP_READ); + } + + /** + * This is used to execute the task. It is up to the specific + * task implementation to decide what to do when executed. If + * the task needs to read or write data then it can attempt + * to perform the read or write, if it incomplete the it can + * be scheduled for execution with the reactor. + */ + @Override + public void execute() throws IOException{ + state.commit(); + } + } + + /** + * The <code>Consumer</code> task is used to schedule the negotiation + * for a read operation. This allows the negotiation to receive any + * messages generated by the client asynchronously. Once this has + * completed then it will resume the negotiation. + */ + private class Consumer extends Phase { + + /** + * Constructor for the <code>Consumer</code> task. This is used + * to create a task which will schedule a read operation for + * the negotiation. When the operation completes this will + * resume the negotiation. + * + * @param state this is the negotiation object that is used + * @param reactor this is the reactor used for I/O notifications + * @param trace the trace that is used to monitor the handshake + */ + public Consumer(Negotiation state, Reactor reactor, Trace trace) { + super(state, reactor, trace, OP_READ); + } + + /** + * This method is used to determine if the task is ready. This + * is executed when the select operation is signalled. When this + * is true the the task completes. If not then this will + * schedule the task again for the specified select operation. + * + * @return this returns true when the task has completed + */ + @Override + protected boolean ready() throws IOException { + return state.receive(); + } + } + + /** + * The <code>Producer</code> is used to schedule the negotiation + * for a write operation. This allows the negotiation to send any + * messages generated during the negotiation asynchronously. Once + * this has completed then it will resume the negotiation. + */ + private class Producer extends Phase { + + /** + * Constructor for the <code>Producer</code> task. This is used + * to create a task which will schedule a write operation for + * the negotiation. When the operation completes this will + * resume the negotiation. + * + * @param state this is the negotiation object that is used + * @param reactor this is the reactor used for I/O notifications + * @param trace the trace that is used to monitor the handshake + */ + public Producer(Negotiation state, Reactor reactor, Trace trace) { + super(state, reactor, trace, OP_WRITE); + } + + /** + * This method is used to determine if the task is ready. This + * is executed when the select operation is signalled. When this + * is true the the task completes. If not then this will + * schedule the task again for the specified select operation. + * + * @return this returns true when the task has completed + */ + @Override + protected boolean ready() throws IOException { + return state.send(); + } + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Negotiation.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Negotiation.java new file mode 100644 index 0000000..4140b34 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Negotiation.java @@ -0,0 +1,69 @@ +/* + * Negotiation.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; + +import org.simpleframework.transport.reactor.Operation; + +/** + * The <code>Negotiation</code> interface is used to represent an + * SSL negotiation. When an operation can not be completed this + * will allow a task to perform asynchronous operations and resume + * the negotiation when those operations can be fulfilled. + * + * @author Niall Gallagher + */ +interface Negotiation extends Operation { + + /** + * This is used to resume the negotiation when an operation + * has completed. This will continue the decrypt and encrypt + * sequence of messages required to fulfil the negotiation. + */ + void resume() throws IOException; + + /** + * This method is invoked when the negotiation is done and + * the next phase of the connection is to take place. This + * will typically be invoked when an SSL handshake or + * termination exchange has completed successfully. + */ + void commit() throws IOException; + + /** + * This is used to send any messages the negotiation may have. + * If the negotiation can not send the information during its + * execution then this method will be executed when a select + * operation is signaled. + * + * @return this returns true when the message has been sent + */ + boolean send() throws IOException; + + /** + * This is used to receive data from the other side. If at any + * point during the negotiation a message is required that + * can not be read immediately this is used to asynchronously + * read the data when a select operation is signaled. + * + * @return this returns true when the message has been read + */ + boolean receive() throws IOException; +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/NegotiationState.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/NegotiationState.java new file mode 100644 index 0000000..160b5f9 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/NegotiationState.java @@ -0,0 +1,337 @@ +/* + * NegotiationCertificate.java June 2013 + * + * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import static org.simpleframework.transport.TransportEvent.CERTIFICATE_CHALLENGE; +import static org.simpleframework.transport.TransportEvent.ERROR; + +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLSession; +import javax.security.cert.X509Certificate; + +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>NegotiationState</code> represents the certificate + * that is sent by a client during a secure HTTPS conversation. This + * may or may not contain an X509 certificate chain from the client. + * If it does not a <code>CertificateChallenge</code> may be used to + * issue a renegotiation of the connection. One completion of the + * renegotiation the challenge executes a completion operation. + * + * @author Niall Gallagher + */ +class NegotiationState implements Certificate { + + /** + * This is used to hold the completion task for the challenge. + */ + private final RunnableFuture<Certificate> future; + + /** + * This is the handshake used to acquire the certificate details. + */ + private final Negotiation negotiation; + + /** + * This is the challenge used to request the client certificate. + */ + private final Challenge challenge; + + /** + * This is the runnable task that is executed on task completion. + */ + private final Delegate delegate; + + /** + * This is the socket representing the underlying TCP connection. + */ + private final Socket socket; + + /** + * Constructor for the <code>NegotiationCertificate</code> object. + * This creates an object used to provide certificate details and + * a means to challenge for certificate details for the connected + * client if required. + * + * @param negotiation the negotiation associated with this + * @param socket the underlying TCP connection to the client + */ + public NegotiationState(Negotiation negotiation, Socket socket) { + this.delegate = new Delegate(socket); + this.future = new FutureTask<Certificate>(delegate, this); + this.challenge = new Challenge(socket); + this.negotiation = negotiation; + this.socket = socket; + } + + /** + * This is used to determine if the state is in challenge mode. + * In challenge mode a challenge future will be executed on + * completion of the challenge. This will the completion task. + * + * @return this returns true if the state is in challenge mode + */ + public boolean isChallenge() { + return delegate.isSet(); + } + + /** + * This returns the completion task associated with any challenge + * made for the client certificate. If this returns null then no + * challenge has been made for the client certificate. + * + * @return this returns the challenge completion task if any + */ + public RunnableFuture<Certificate> getFuture() { + return future; + } + + /** + * This returns a challenge for the certificate. A challenge is + * issued by providing a <code>Runnable</code> task which is to + * be executed when the challenge has completed. Typically this + * task should be used to drive completion of an HTTPS request. + * + * @return this returns a challenge for the client certificate + */ + public CertificateChallenge getChallenge() throws Exception { + return challenge; + } + + /** + * This will return the X509 certificate chain, if any, that + * has been sent by the client. A certificate chain is typically + * only send when the server explicitly requests the certificate + * on the initial connection or when it is challenged for. + * + * @return this returns the clients X509 certificate chain + */ + public X509Certificate[] getChain() throws Exception { + SSLSession session = getSession(); + + if(session != null) { + return session.getPeerCertificateChain(); + } + return null; + } + + /** + * This is used to acquire the SSL session associated with the + * handshake. The session makes all of the details associated + * with the handshake available, including the cipher suites + * used and the SSL context used to create the session. + * + * @return the SSL session associated with the connection + */ + public SSLSession getSession() throws Exception{ + SSLEngine engine = socket.getEngine(); + + if(engine != null) { + return engine.getSession(); + } + return null; + } + + /** + * This is used to determine if the X509 certificate chain is + * present for the request. If it is not present then a challenge + * can be used to request the certificate. + * + * @return true if the certificate chain is present + */ + public boolean isChainPresent() { + try { + return getChain() != null; + } catch(Exception e) { + return false; + } + } + + /** + * The <code>Challenge</code> object is used to enable the server + * to challenge for the client X509 certificate if desired. It + * performs the challenge by performing an SSL renegotiation to + * request that the client sends the + */ + private class Challenge implements CertificateChallenge { + + /** + * This is the SSL engine that is used to begin the handshake. + */ + private final SSLEngine engine; + + /** + * This is used to trace the certificate challenge request. + */ + private final Trace trace; + + /** + * Constructor for the <code>Challenge</code> object. This can + * be used to challenge the client for their X509 certificate. + * It does this by performing an SSL renegotiation on the + * existing TCP connection. + * + * @param socket this is the TCP connection to the client + */ + public Challenge(Socket socket) { + this.engine = socket.getEngine(); + this.trace = socket.getTrace(); + } + + /** + * This method will challenge the client for their certificate. + * It does so by performing an SSL renegotiation. Successful + * completion of the SSL renegotiation results in the client + * providing their certificate, and execution of the task. + */ + public Future<Certificate> challenge() { + return challenge(null); + } + + /** + * This method will challenge the client for their certificate. + * It does so by performing an SSL renegotiation. Successful + * completion of the SSL renegotiation results in the client + * providing their certificate, and execution of the task. + * + * @param completion task to be run on successful challenge + */ + public Future<Certificate> challenge(Runnable task) { + try { + if(!isChainPresent()) { + resume(task); + } else { + future.run(); + } + } catch(Exception cause) { + trace.trace(ERROR, cause); + } + return future; + } + + /** + * This method will challenge the client for their certificate. + * It does so by performing an SSL renegotiation. Successful + * completion of the SSL renegotiation results in the client + * providing their certificate, and execution of the task. + * + * @param completion task to be run on successful challenge + */ + private void resume(Runnable task) { + try { + trace.trace(CERTIFICATE_CHALLENGE); + delegate.set(task); + engine.setNeedClientAuth(true); + engine.beginHandshake(); + negotiation.resume(); + } catch(Exception cause) { + trace.trace(ERROR, cause); + negotiation.cancel(); + } + } + } + + /** + * The <code>Delegate</code> is basically a settable runnable object. + * It enables the challenge to set an optional runnable that will + * be executed when the challenge has completed. If the challenge + * has not been given a completion task this runs straight through + * without any state change or action on the certificate. + */ + private class Delegate implements Runnable { + + /** + * This is the reference to the runnable that is to be executed. + */ + private final AtomicReference<Runnable> task; + + /** + * This is used to determine if the challenge is ready to run. + */ + private final AtomicBoolean ready; + + /** + * This is used to trace any errors when running the task. + */ + private final Trace trace; + + /** + * Constructor for the <code>Delegate</code> object. This is + * used to create a wrapper for the completion task so that it + * can be executed safely and have any errors traced. + * + * @param socket this socket the handshake is associated with + */ + public Delegate(Socket socket) { + this.task = new AtomicReference<Runnable>(); + this.ready = new AtomicBoolean(); + this.trace = socket.getTrace(); + } + + /** + * This is used to determine if the delegate is ready to be + * used. It is ready only after the completion task has been + * set. When ready a challenge can be executed. + * + * @return this returns true if a completion task is set + */ + public boolean isSet() { + return ready.get(); + } + + /** + * This is used to set the completion task that is to be executed + * when the challenge has finished. This can be set to null if + * no task is to be executed on completion. + * + * @param runnable the task to run when the challenge finishes + */ + public void set(Runnable runnable) { + ready.set(true); + task.set(runnable); + } + + /** + * This is used to run the completion task. If no completion + * task has been set this will run through without any change to + * the state of the certificate. All errors thrown by the task + * will be caught and traced. + */ + public void run() { + try { + Runnable runnable = task.get(); + + if(runnable != null) { + runnable.run(); + } + } catch(Exception cause) { + trace.trace(ERROR, cause); + } finally { + task.set(null); + } + } + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/OperationFactory.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/OperationFactory.java new file mode 100644 index 0000000..343a12b --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/OperationFactory.java @@ -0,0 +1,150 @@ +/* + * OperationFactory.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; + +import javax.net.ssl.SSLEngine; + +import org.simpleframework.transport.reactor.Operation; +import org.simpleframework.transport.reactor.Reactor; + +/** + * The <code>OperationFactory</code> is used to create operations + * for the transport processor. Depending on the configuration of the + * pipeline object this will create different operations. Typically + * this will create an SSL handshake operation if the pipeline has + * an <code>SSLEngine</code> instance. This allows the transport + * processor to complete the handshake before handing the transport + * to the transporter for processing. + * + * @author Niall Gallagher + */ +class OperationFactory { + + /** + * This is the processor used to process the created transport. + */ + private final TransportProcessor processor; + + /** + * This is the reactor used to register for I/O notifications. + */ + private final Reactor reactor; + + /** + * This is the threshold for the asynchronous buffers to use. + */ + private final int threshold; + + /** + * This is the size of the buffers to be used by the transport. + */ + private final int buffer; + + /** + * This determines if the SSL handshake is for the client side. + */ + private final boolean client; + + /** + * Constructor for the <code>OperationFactory</code> object. This + * uses the processor provided to hand off the created transport + * when it has been created. All operations created typically + * execute in an asynchronous thread. + * + * @param processor the processor used to dispatch the transport + * @param reactor this is the reactor used for I/O notifications + * @param buffer this is the initial size of the buffer to use + */ + public OperationFactory(TransportProcessor processor, Reactor reactor, int buffer) { + this(processor, reactor, buffer, 20480); + } + + /** + * Constructor for the <code>OperationFactory</code> object. This + * uses the processor provided to hand off the created transport + * when it has been created. All operations created typically + * execute in an asynchronous thread. + * + * @param processor the processor used to dispatch the transport + * @param reactor this is the reactor used for I/O notifications + * @param buffer this is the initial size of the buffer to use + * @param threshold maximum size of the output buffer to use + */ + public OperationFactory(TransportProcessor processor, Reactor reactor, int buffer, int threshold) { + this(processor, reactor, buffer, threshold, false); + } + + /** + * Constructor for the <code>OperationFactory</code> object. This + * uses the processor provided to hand off the created transport + * when it has been created. All operations created typically + * execute in an asynchronous thread. + * + * @param processor the processor used to dispatch the transport + * @param reactor this is the reactor used for I/O notifications + * @param buffer this is the initial size of the buffer to use + * @param threshold maximum size of the output buffer to use + * @param client determines if the SSL handshake is for a client + */ + public OperationFactory(TransportProcessor processor, Reactor reactor, int buffer, int threshold, boolean client) { + this.processor = processor; + this.threshold = threshold; + this.reactor = reactor; + this.buffer = buffer; + this.client = client; + } + + /** + * This method is used to create <code>Operation</code> object to + * process the next phase of the negotiation. The operations that + * are created using this factory ensure the processing can be + * done asynchronously, which reduces the overhead the connection + * thread has when handing the pipelines over for processing. + * + * @param socket this is the pipeline that is to be processed + * + * @return this returns the operation used for processing + */ + public Operation getInstance(Socket socket) throws IOException { + return getInstance(socket, socket.getEngine()); + } + + /** + * This method is used to create <code>Operation</code> object to + * process the next phase of the negotiation. The operations that + * are created using this factory ensure the processing can be + * done asynchronously, which reduces the overhead the connection + * thread has when handing the pipelines over for processing. + * + * @param socket this is the pipeline that is to be processed + * @param engine this is the engine used for SSL negotiations + * + * @return this returns the operation used for processing + */ + private Operation getInstance(Socket socket, SSLEngine engine) throws IOException { + Transport transport = new SocketTransport(socket, reactor, buffer, threshold); + + if(engine != null) { + return new Handshake(processor, transport, reactor, client); + } + return new TransportDispatcher(processor, transport); + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Phase.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Phase.java new file mode 100644 index 0000000..a2bb2cd --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Phase.java @@ -0,0 +1,165 @@ +/* + * Phase.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import static org.simpleframework.transport.TransportEvent.ERROR; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; + +import org.simpleframework.transport.reactor.Operation; +import org.simpleframework.transport.reactor.Reactor; +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>Phase</code> object represents an asynchronous phase + * within the negotiation. This is typically used to either schedule + * an asynchronous read or write when it can not be performed + * directly. It ensures that the negotiation does not block the + * thread so that execution can be optimized of high concurrency. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.Handshake + */ +abstract class Phase implements Operation { + + /** + * This is the negotiation that this task will operate on. + */ + protected final Negotiation state; + + /** + * This is the reactor that is used to schedule execution. + */ + protected final Reactor reactor; + + /** + * This is the trace used to monitor the handshake socket. + */ + protected final Trace trace; + + /** + * This is the required operation for the task to complete. + */ + protected final int require; + + /** + * Constructor for the <code>Phase</code> object. This is used to + * create an operation that performs some phase of a negotiation. + * It allows the negotiation to schedule the read and write + * operations asynchronously. + * + * @param state this is the negotiation this task works on + * @param reactor this is the reactor used to schedule the task + * @param trace the trace that is used to monitor the handshake + * @param require this is the required operation for the task + */ + public Phase(Negotiation state, Reactor reactor, Trace trace, int require) { + this.reactor = reactor; + this.require = require; + this.state = state; + this.trace = trace; + } + + /** + * This is used to acquire the trace object that is associated + * with the operation. A trace object is used to collection details + * on what operations are being performed. For instance it may + * contain information relating to I/O events or errors. + * + * @return this returns the trace associated with this operation + */ + public Trace getTrace() { + return trace; + } + + /** + * This is the <code>SelectableChannel</code> which is used to + * determine if the operation should be executed. If the channel + * is ready for a given I/O event it can be run. For instance if + * the operation is used to perform some form of read operation + * it can be executed when ready to read data from the channel. + * + * @return this returns the channel used to govern execution + */ + public SelectableChannel getChannel() { + return state.getChannel(); + } + + /** + * This is used to execute the task. It is up to the specific + * task implementation to decide what to do when executed. If + * the task needs to read or write data then it can attempt + * to perform the read or write, if it incomplete the it can + * be scheduled for execution with the reactor. + */ + public void run() { + try { + execute(); + }catch(Exception cause) { + trace.trace(ERROR, cause); + cancel(); + } + } + + /** + * This is used to cancel the operation if it has timed out. This + * is typically invoked when it has been waiting in a selector for + * an extended duration of time without any active operations on + * it. In such a case the reactor must purge the operation to free + * the memory and open channels associated with the operation. + */ + public void cancel() { + try { + state.cancel(); + }catch(Exception cause) { + trace.trace(ERROR, cause); + } + } + + /** + * This is used to execute the task. It is up to the specific + * task implementation to decide what to do when executed. If + * the task needs to read or write data then it can attempt + * to perform the read or write, if it incomplete the it can + * be scheduled for execution with the reactor. + */ + protected void execute() throws IOException { + boolean done = ready(); + + if(!done) { + reactor.process(this, require); + } else { + state.resume(); + } + } + + /** + * This method is used to determine if the task is ready. This is + * executed when the select operation is signaled. When this is + * true the the task completes. If not then this will schedule + * the task again for the specified select operation. + * + * @return this returns true when the task has completed + */ + protected boolean ready() throws IOException { + return true; + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/PhaseType.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/PhaseType.java new file mode 100644 index 0000000..dd202d6 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/PhaseType.java @@ -0,0 +1,45 @@ +/* + * PhaseType.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +/** + * The <code>PhaseType</code> enumeration is used to determine what + * phase of the negotiation the handshake is in. This allows the + * negotiation to control the selection for read and write ready + * operations. Also, status signals completion of the handshake. + * + * @author Niall Gallagher + */ +enum PhaseType { + + /** + * Tells the negotiation that a read operations is needed. + */ + CONSUME, + + /** + * Tells the negotiation that a write operation is required. + */ + PRODUCE, + + /** + * Tells the negotiation that the the handshake is complete. + */ + COMMIT +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SecureTransport.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SecureTransport.java new file mode 100644 index 0000000..2083873 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SecureTransport.java @@ -0,0 +1,428 @@ +/* + * SecureTransport.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Map; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLEngineResult.Status; + +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>SecureTransport</code> object provides an implementation + * of a transport used to send and receive data over SSL. Data read + * from this transport is decrypted using an <code>SSLEngine</code>. + * Also, all data is written is encrypted with the same engine. This + * ensures that data can be send and received in a transparent way. + * + * @author Niall Gallagher + */ +class SecureTransport implements Transport { + + /** + * This is the certificate associated with this SSL connection. + */ + private Certificate certificate; + + /** + * This is the transport used to send data over the socket. + */ + private Transport transport; + + /** + * This buffer is used to output the data for the SSL sent. + */ + private ByteBuffer output; + + /** + * This is the internal buffer used to exchange the SSL data. + */ + private ByteBuffer input; + + /** + * This is the internal buffer used to exchange the SSL data. + */ + private ByteBuffer swap; + + /** + * This is the SSL engine used to encrypt and decrypt data. + */ + private SSLEngine engine; + + /** + * This is the trace that is used to monitor socket activity. + */ + private Trace trace; + + /** + * This is used to determine if the transport was closed. + */ + private boolean closed; + + /** + * This is used to determine if the end of stream was reached. + */ + private boolean finished; + + /** + * Constructor for the <code>SecureTransport</code> object. This + * is used to create a transport for sending and receiving data + * over SSL. This must be created with a pipeline that has already + * performed the SSL handshake and is read to used. + * + * @param transport this is the transport to delegate operations to + * @param certificate this is the certificate for the connection + * @param input this is the input buffer used to read the data + * @param swap this is the swap buffer to be used for reading + */ + public SecureTransport(Transport transport, Certificate certificate, ByteBuffer input, ByteBuffer swap) { + this(transport, certificate, input, swap, 20480); + } + + /** + * Constructor for the <code>SecureTransport</code> object. This + * is used to create a transport for sending and receiving data + * over SSL. This must be created with a pipeline that has already + * performed the SSL handshake and is read to used. + * + * @param transport this is the transport to delegate operations to + * @param certificate this is the certificate for the connection + * @param input this is the input buffer used to read the data + * @param swap this is the swap buffer to be used for reading + * @param size this is the size of the buffers to be allocated + */ + public SecureTransport(Transport transport, Certificate certificate, ByteBuffer input, ByteBuffer swap, int size) { + this.output = ByteBuffer.allocate(size); + this.engine = transport.getEngine(); + this.trace = transport.getTrace(); + this.certificate = certificate; + this.transport = transport; + this.input = input; + this.swap = swap; + } + + /** + * This is used to acquire the SSL certificate used when the + * server is using a HTTPS connection. For plain text connections + * or connections that use a security mechanism other than SSL + * this will be null. This is only available when the connection + * makes specific use of an SSL engine to secure the connection. + * + * @return this returns the associated SSL certificate if any + */ + public Certificate getCertificate() { + return certificate; + } + + /** + * This is used to acquire the trace object that is associated + * with the socket. A trace object is used to collection details + * on what operations are being performed on the socket. For + * instance it may contain information relating to I/O events + * or more application specific events such as errors. + * + * @return this returns the trace associated with this socket + */ + public Trace getTrace() { + return trace; + } + + /** + * This is used to acquire the SSL engine used for HTTPS. If the + * pipeline is connected to an SSL transport this returns an SSL + * engine which can be used to establish the secure connection + * and send and receive content over that connection. If this is + * null then the pipeline represents a normal transport. + * + * @return the SSL engine used to establish a secure transport + */ + public SSLEngine getEngine() { + return engine; + } + + /** + * This method is used to get the <code>Map</code> of attributes + * by this pipeline. The attributes map is used to maintain details + * about the connection. Information such as security credentials + * to client details can be placed within the attribute map. + * + * @return this returns the map of attributes for this pipeline + */ + public Map getAttributes() { + return transport.getAttributes(); + } + + /** + * This method is used to acquire the <code>SocketChannel</code> + * for the connection. This allows the server to acquire the input + * and output streams with which to communicate. It can also be + * used to configure the connection and perform various network + * operations that could otherwise not be performed. + * + * @return this returns the socket used by this HTTP pipeline + */ + public SocketChannel getChannel() { + return transport.getChannel(); + } + + /** + * This is used to perform a non-blocking read on the transport. + * If there are no bytes available on the input buffers then + * this method will return zero and the buffer will remain the + * same. If there is data and the buffer can be filled then this + * will return the number of bytes read. Finally if the socket + * is closed this will return a -1 value. + * + * @param buffer this is the buffer to append the bytes to + * + * @return this returns the number of bytes that have been read + */ + public int read(ByteBuffer buffer) throws IOException { + if(closed) { + throw new TransportException("Transport is closed"); + } + if(finished) { + return -1; + } + int count = fill(buffer); + + if(count <= 0) { + return process(buffer); + } + return count; + } + + /** + * This is used to perform a non-blocking read on the transport. + * If there are no bytes available on the input buffers then + * this method will return zero and the buffer will remain the + * same. If there is data and the buffer can be filled then this + * will return the number of bytes read. + * + * @param buffer this is the buffer to append the bytes to + * + * @return this returns the number of bytes that have been read + */ + private int process(ByteBuffer buffer) throws IOException { + int size = swap.position(); + + if(size >= 0) { + swap.compact(); + } + int space = swap.remaining(); + + if(space > 0) { + size = transport.read(swap); + + if(size < 0) { + finished = true; + } + } + if(size > 0 || space > 0) { + swap.flip(); + receive(); + } + return fill(buffer); + } + + /** + * This is used to fill the provided buffer with data that has + * been read from the secure socket channel. This enables reading + * of the decrypted data in chunks that are smaller than the + * size of the input buffer used to contain the plain text data. + * + * @param buffer this is the buffer to append the bytes to + * + * @return this returns the number of bytes that have been read + */ + private int fill(ByteBuffer buffer) throws IOException { + int space = buffer.remaining(); + int count = input.position(); + + if(count > 0) { + if(count > space) { + count = space; + } + } + return fill(buffer, count); + + } + + /** + * This is used to fill the provided buffer with data that has + * been read from the secure socket channel. This enables reading + * of the decrypted data in chunks that are smaller than the + * size of the input buffer used to contain the plain text data. + * + * @param buffer this is the buffer to append the bytes to + * @param count this is the number of bytes that are to be read + * + * @return this returns the number of bytes that have been read + */ + private int fill(ByteBuffer buffer, int count) throws IOException { + input.flip(); + + if(count > 0) { + count = append(buffer, count); + } + input.compact(); + return count; + } + + /** + * This will append bytes within the transport to the given buffer. + * Once invoked the buffer will contain the transport bytes, which + * will have been drained from the buffer. This effectively moves + * the bytes in the buffer to the end of the packet instance. + * + * @param buffer this is the buffer containing the bytes + * @param count this is the number of bytes that should be used + * + * @return returns the number of bytes that have been moved + */ + private int append(ByteBuffer buffer, int count) throws IOException { + ByteBuffer segment = input.slice(); + + if(closed) { + throw new TransportException("Transport is closed"); + } + int mark = input.position(); + int size = mark + count; + + if(count > 0) { + input.position(size); + segment.limit(count); + buffer.put(segment); + } + return count; + } + + /** + * This is used to perform a non-blocking read on the transport. + * If there are no bytes available on the input buffers then + * this method will return zero and the buffer will remain the + * same. If there is data and the buffer can be filled then this + * will return the number of bytes read. Finally if the socket + * is closed this will return a -1 value. + */ + private void receive() throws IOException { + int count = swap.remaining(); + + while(count > 0) { + SSLEngineResult result = engine.unwrap(swap, input); + Status status = result.getStatus(); + + switch(status) { + case BUFFER_OVERFLOW: + case BUFFER_UNDERFLOW: + return; + case CLOSED: + throw new TransportException("Transport error " + result); + } + count = swap.remaining(); + + if(count <= 0) { + break; + } + } + } + + /** + * This method is used to deliver the provided buffer of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or send directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param buffer this is the array of bytes to send to the client + */ + public void write(ByteBuffer buffer) throws IOException { + if(closed) { + throw new TransportException("Transport is closed"); + } + int capacity = output.capacity(); + int ready = buffer.remaining(); + int length = ready; + + while(ready > 0) { + int size = Math.min(ready, capacity / 2); + int mark = buffer.position(); + + if(length * 2 > capacity) { + buffer.limit(mark + size); + } + send(buffer); + output.clear(); + ready -= size; + } + } + + /** + * This method is used to deliver the provided buffer of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or send directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param buffer this is the array of bytes to send to the client + */ + private void send(ByteBuffer buffer) throws IOException { + SSLEngineResult result = engine.wrap(buffer, output); + Status status = result.getStatus(); + + switch(status){ + case BUFFER_OVERFLOW: + case BUFFER_UNDERFLOW: + case CLOSED: + throw new TransportException("Transport error " + status); + default: + output.flip(); + } + transport.write(output); + } + + /** + * This method is used to flush the contents of the buffer to + * the client. This method will block until such time as all of + * the data has been sent to the client. If at any point there + * is an error sending the content an exception is thrown. + */ + public void flush() throws IOException { + if(closed) { + throw new TransportException("Transport is closed"); + } + transport.flush(); + } + + /** + * This is used to close the sender and the underlying transport. + * If a close is performed on the sender then no more bytes can + * be read from or written to the transport and the client will + * received a connection close on their side. + */ + public void close() throws IOException { + if(!closed) { + transport.close(); + closed = true; + } + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/ServerCleaner.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/ServerCleaner.java new file mode 100644 index 0000000..27aaf79 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/ServerCleaner.java @@ -0,0 +1,86 @@ +/* + * ServerCleaner.java February 2009 + * + * Copyright (C) 2009, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import org.simpleframework.common.thread.ConcurrentExecutor; +import org.simpleframework.common.thread.Daemon; +import org.simpleframework.transport.reactor.Reactor; + +/** + * The <code>ServerCleaner</code> object allows for the termination + * and resource recovery to be done asynchronously. This ensures that + * should a HTTP request be used to terminate the processor that it + * does not block waiting for the servicing thread pool to terminate + * causing a deadlock. + * + * @author Niall Gallagher + */ +class ServerCleaner extends Daemon { + + /** + * This is the internal processor that is to be terminated. + */ + private final TransportProcessor processor; + + /** + * This is the thread pool implementation used by the server. + */ + private final ConcurrentExecutor executor; + + /** + * This is the internal write reactor that is terminated. + */ + private final Reactor reactor; + + /** + * Constructor for the <code>ServerCleaner</code> object. For an + * orderly termination of the processor, the processor and reactor + * provided to the constructor will be stopped asynchronously. + * + * @param processor this is the processor that is to be stopped + * @param executor this is the executor used by the server + * @param reactor this is the reactor that is to be closed + */ + public ServerCleaner(TransportProcessor processor, ConcurrentExecutor executor, Reactor reactor) { + this.processor = processor; + this.executor = executor; + this.reactor = reactor; + } + + /** + * When this method runs it will firstly stop the processor in + * a synchronous fashion. Once the <code>TransportProcessor</code> + * has stopped it will stop the <code>Reactor</code> ensuring that + * all threads will be released. + * <p> + * It is important to note that stopping the processor before + * stopping the reactor is required. This ensures that if there + * are any threads executing within the processor that require + * the reactor threads, they can complete without a problem. + */ + public void run() { + try { + processor.stop(); + executor.stop(); + reactor.stop(); + } catch(Exception e) { + return; + } + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Socket.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Socket.java new file mode 100644 index 0000000..b59cb0f --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Socket.java @@ -0,0 +1,89 @@ +/* + * Socket.java February 2001 + * + * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.nio.channels.SocketChannel; +import java.util.Map; + +import javax.net.ssl.SSLEngine; + +import org.simpleframework.transport.trace.Trace; + +/** + * This is a <code>Socket</code> interface that is used to represent + * a socket. This has a map that allows attributes to be associated + * with the client connection. Attributes such as security details + * or other transport related details can be exposed by placing them + * in the socket map. The <code>Processor</code> can then use these + * attributes as required. + * <p> + * This provides the connected <code>SocketChannel</code> that can + * be used to read and write data asynchronously. The socket channel + * must be selectable and in non-blocking mode. If the socket is not + * in a non-blocking state the connection will not be processed. + * + * @author Niall Gallagher + */ +public interface Socket { + + /** + * This is used to acquire the trace object that is associated + * with the socket. A trace object is used to collection details + * on what operations are being performed on the socket. For + * instance it may contain information relating to I/O events + * or more application specific events such as errors. + * + * @return this returns the trace associated with this socket + */ + Trace getTrace(); + + /** + * This is used to acquire the SSL engine used for security. If + * the socket is connected to an SSL transport this returns an + * SSL engine which can be used to establish the secure connection + * and send and receive content over that connection. If this is + * null then the socket represents a normal transport. + * + * @return the SSL engine used to establish a secure transport + */ + SSLEngine getEngine(); + + /** + * This method is used to acquire the <code>SocketChannel</code> + * for the connection. This allows the server to acquire the input + * and output streams with which to communicate. It can also be + * used to configure the connection and perform various network + * operations that could otherwise not be performed. + * + * @return this returns the socket used by this socket + */ + SocketChannel getChannel(); + + /** + * This method is used to get the <code>Map</code> of attributes + * for this socket. The attributes map is used to maintain details + * about the connection. Information such as security credentials + * to client details can be placed within the attribute map. + * + * @return this returns the map of attributes for this socket + */ + Map getAttributes(); +} + + diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBuffer.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBuffer.java new file mode 100644 index 0000000..d3cdc22 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBuffer.java @@ -0,0 +1,308 @@ +/* + * SocketBuffer.java February 2014 + * + * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import static org.simpleframework.transport.TransportEvent.CLOSE; +import static org.simpleframework.transport.TransportEvent.ERROR; +import static org.simpleframework.transport.TransportEvent.WRITE; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>SocketBuffer</code> represents a buffer that aggregates + * small fragments in to a single buffer before sending them. This + * is primarily used as a means to avoid sending many small packets + * rather than reasonable size ones for performance. This also + * enables a higher level of concurrency, as it will allow data + * that can't be sent over the socket to be buffered until it gets + * the signal that says it can be sent on. + * + * @author Niall Gallagher + */ +class SocketBuffer { + + /** + * This is a small internal buffer to collect fragments. + */ + private SocketBufferAppender appender; + + /** + * This is the underlying socket to sent to the data over. + */ + private SocketChannel channel; + + /** + * This is a reference to the last buffer to be sent. + */ + private ByteBuffer reference; + + /** + * This is used to trace various events that occur. + */ + private Trace trace; + + /** + * This is the recommended minimum packet size to send. + */ + private int chunk; + + /** + * This is used to determine if the buffer was closed. + */ + private boolean closed; + + /** + * Constructor for the <code>SocketBuffer</code> object. This is + * used to create a buffer that will collect small fragments sent + * in to a more reasonably sized packet. + * + * @param socket this is the socket to write the data to + * @param chunk this is the minimum packet size to used + * @param limit this is the maximum size of the output buffer + */ + public SocketBuffer(Socket socket, int chunk, int limit) { + this.appender = new SocketBufferAppender(socket, chunk, limit); + this.channel = socket.getChannel(); + this.trace = socket.getTrace(); + this.chunk = chunk; + } + + /** + * This is used to determine if the buffer is ready to be written + * to. A buffer is ready when it does not hold a reference to + * any other buffer internally. The the <code>flush</code> method + * must return true for a buffer to be considered ready. + * + * @return returns true if the buffer is ready to write to + */ + public synchronized boolean ready() throws IOException { + if(closed) { + throw new TransportException("Buffer has been closed"); + } + if(reference != null) { + int remaining = reference.remaining(); + + if(remaining <= 0) { + reference = null; + return true; + } + return false; + } + return true; + } + + /** + * This will write the bytes to underlying channel if the data is + * greater than the minimum buffer size. If it is less than the + * minimum size then it will be appended to the internal buffer. + * If it is larger than the maximum size of the internal buffer + * a reference is kept to it. This reference can only be cleared + * with the <code>flush</code> method, which will attempt to + * write the data to the channel, and buffer any remaining data + * if the underly connection is busy. + * + * @param data this is the data to write the the channel. + * + * @return this returns true if no reference was held + */ + public synchronized boolean write(ByteBuffer duplicate) throws IOException { + if(closed) { + throw new TransportException("Buffer has been closed"); + } + if(reference != null) { + throw new IOException("Buffer already pending write"); + } + int count = appender.length(); + + if(count > 0) { + return merge(duplicate); + } + int remaining = duplicate.remaining(); + + if(remaining < chunk) { + appender.append(duplicate);// just save it.. + return true; + } + if(!flush(duplicate)) { // attempt a write + int space = appender.space(); + + if(remaining < space) { + appender.append(duplicate); + return true; + } + reference = duplicate; + return false; + } + return true; + } + + /** + * This method is used to perform a merge of the buffer to be sent + * with the current buffer. If the internal buffer is large enough + * to send after the merge then it will be sent. Also, if the + * remaining bytes in the buffer are large enough for a packet + * then that too will be sent over the socket. + * + * @param duplicate this is the buffer to be merged + * + * @return this returns true if no reference was held + */ + private synchronized boolean merge(ByteBuffer duplicate) throws IOException { + if(closed) { + throw new TransportException("Buffer has been closed"); + } + int count = appender.length(); + int merged = appender.append(duplicate); + int payload = merged + count; + + if(payload >= chunk) { // viable packet size + int written = appender.write(channel); + + if(written < payload) {// count not fully flush buffer + reference = duplicate; + return false; + } + return write(duplicate); // we are back at zero + } + return true; // everything was buffered as chunk >= capacity + } + + /** + * This method is used to fully flush the contents of the buffer to + * the underlying output stream. This will only ever return true + * if there are no references held and no data internally buffered. + * If before this method is invoked a reference to a byte buffer + * is held then this will attempt to merge it with the internal + * buffer so that the <code>ready</code> method can return true. + * This ensures that the writing thread does not need to block. + * + * @return this returns true if all of the bytes are sent + */ + public synchronized boolean flush() throws IOException { + if(closed) { + throw new TransportException("Buffer has been closed"); + } + int count = appender.length(); + + if(count > 0) { + int written = appender.write(channel); + + if(written < count) { + compact(); + return false; // we are still buffering + } + } + if(reference != null) { + if(!flush(reference)) { + compact(); + return false; + } + reference = null; + } + return true; // no more data buffered + } + + /** + * This write method will write the contents of the buffer to the + * provided byte channel. If the whole buffer can be be written + * then this will simply return the number of bytes that have. + * The number of bytes remaining within the packet after a write + * can be acquired from the <code>length</code> method. Once all + * of the bytes are written the packet must be closed. + * + * @param channel this is the channel to write the packet to + * @param segment this is the segment that is to be written + * + * @return this returns the number of bytes that were written + */ + private synchronized boolean flush(ByteBuffer segment) throws IOException { + if(closed) { + throw new TransportException("Buffer has been closed"); + } + int require = segment.remaining(); + int count = 0; + + while(count < require) { + int size = channel.write(segment); + + if(size <= 0) { + break; + } + if(trace != null) { + trace.trace(WRITE, size); + } + count += size; + } + if(count == require) { + return true; + } + return false; + } + + /** + * To ensure that we can release any references and thus avoid a + * blocking thread this method will attempt to merge references + * in to the internal buffer. Compacting in this manner is done + * only if the full reference can fit in to the available space. + */ + private synchronized void compact() throws IOException { + if(closed) { + throw new TransportException("Buffer has been closed"); + } + if(reference != null) { + int remaining = reference.remaining(); + int space = appender.space(); + + if(remaining < space) { + appender.append(reference); // try to release the buffer + reference = null; + } + } + } + + /** + * This is used to close the writer and the underlying socket. + * If a close is performed on the writer then no more bytes + * can be read from or written to the writer and the client + * will receive a connection close on their side. This also + * ensures that the TCP FIN ACK is sent before the actual + * channel is closed. This is required for a clean shutdown. + */ + public synchronized void close() throws IOException { + if(closed) { + throw new TransportException("Buffer has been closed"); + } + if(!closed) { + try{ + closed = true; + trace.trace(CLOSE); + channel.socket().shutdownOutput(); + }catch(Throwable cause){ + trace.trace(ERROR, cause); + } + channel.close(); + } + } +} + + diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferAppender.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferAppender.java new file mode 100644 index 0000000..1b9c279 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferAppender.java @@ -0,0 +1,289 @@ +/* + * SocketBufferAppender.java February 2008 + * + * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import static org.simpleframework.transport.TransportEvent.WRITE; +import static org.simpleframework.transport.TransportEvent.WRITE_BUFFER; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.channels.ByteChannel; +import java.nio.charset.Charset; + +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>SocketBufferAppender</code> represents a buffer fragment + * collector. This provides write access to a direct byte buffer which + * is used to collect fragments. Once a sufficient amount of data + * has been collected by this then can be written out to a channel. + * + * @author Niall Gallagher + */ +class SocketBufferAppender { + + /** + * This is the buffer used to store the contents of the buffer. + */ + private ByteBuffer buffer; + + /** + * This is the trace used to watch the buffering events. + */ + private Trace trace; + + /** + * This represents the the initial size of the buffer to use. + */ + private int chunk; + + /** + * This represents the largest this appender can grow to. + */ + private int limit; + + /** + * Constructor for the <code>SocketBufferAppender</code> object. This + * is used to create an appender that can collect smaller fragments + * in to a larger buffer so that it can be delivered more efficiently. + * + * @param socket this is the socket to append data for + * @param chunk this is the initial size of the buffer + * @param limit this is the maximum size of the buffer + */ + public SocketBufferAppender(Socket socket, int chunk, int limit) { + this.buffer = ByteBuffer.allocateDirect(chunk); + this.trace = socket.getTrace(); + this.chunk = chunk; + this.limit = limit; + } + + /** + * This is used to determine how much space is left to append + * data to this buffer. This is typically equivalent to capacity + * minus the length. However in the event that the buffer uses + * a private memory store that can not be written to then this + * can return zero regardless of the capacity and length. + * + * @return the space left within the buffer to append data to + */ + public int space() { + return buffer.remaining(); + } + + /** + * This represents the capacity of the backing store. The buffer + * is full when length is equal to capacity and it can typically + * be appended to when the length is less than the capacity. The + * only exception is when <code>space</code> returns zero, which + * means that the buffer can not have bytes appended to it. + * + * @return this is the capacity of other backing byte storage + */ + public int capacity() { + return buffer.capacity(); + } + + /** + * This is used to determine how mnay bytes remain within this + * buffer. It represents the number of write ready bytes, so if + * the length is greater than zero the buffer can be written to + * a byte channel. When length is zero the buffer can be closed. + * + * @return this is the number of bytes remaining in this buffer + */ + public int length() { + return capacity() - space(); + } + + /** + * This is used to encode the underlying byte sequence to text. + * Converting the byte sequence to text can be useful when either + * debugging what exactly is being sent. Also, for transports + * that require string delivery of buffers this can be used. + * + * @return this returns the bytes sequence as a string object + */ + public String encode() throws IOException { + return encode("UTF-8"); + } + + /** + * This is used to encode the underlying byte sequence to text. + * Converting the byte sequence to text can be useful when either + * debugging what exactly is being sent. Also, for transports + * that require string delivery of buffers this can be used. + * + * @param encoding this is the character set to use for encoding + * + * @return this returns the bytes sequence as a string object + */ + public String encode(String encoding) throws IOException { + ByteBuffer segment = buffer.duplicate(); + + if(segment != null) { + segment.flip(); + } + return encode(encoding, segment); + } + + /** + * This is used to encode the underlying byte sequence to text. + * Converting the byte sequence to text can be useful when either + * debugging what exactly is being sent. Also, for transports + * that require string delivery of buffers this can be used. + * + * @param encoding this is the character set to use for encoding + * @param segment this is the buffer that is to be encoded + * + * @return this returns the bytes sequence as a string object + */ + private String encode(String encoding, ByteBuffer segment) throws IOException { + Charset charset = Charset.forName(encoding); + CharBuffer text = charset.decode(segment); + + return text.toString(); + } + + /** + * This will append bytes within the given buffer to the buffer. + * Once invoked the buffer will contain the buffer bytes, which + * will have been drained from the buffer. This effectively moves + * the bytes in the buffer to the end of the buffer instance. + * + * @param data this is the buffer containing the bytes + * + * @return returns the number of bytes that have been moved + */ + public int append(ByteBuffer data) throws IOException { + int require = data.remaining(); + int space = space(); + + if(require > space) { + require = space; + } + return append(data, require); + } + + /** + * This will append bytes within the given buffer to the buffer. + * Once invoked the buffer will contain the buffer bytes, which + * will have been drained from the buffer. This effectively moves + * the bytes in the buffer to the end of the buffer instance. + * + * @param data this is the buffer containing the bytes + * @param count this is the number of bytes that should be used + * + * @return returns the number of bytes that have been moved + */ + public int append(ByteBuffer data, int count) throws IOException { + ByteBuffer segment = data.slice(); + int mark = data.position(); + int size = mark + count; + + if(count > 0) { + if(trace != null) { + trace.trace(WRITE_BUFFER, count); + } + data.position(size); + segment.limit(count); + buffer.put(segment); + } + return count; + } + + /** + * This write method will write the contents of the buffer to the + * provided byte channel. If the whole buffer can be be written + * then this will simply return the number of bytes that have. + * The number of bytes remaining within the buffer after a write + * can be acquired from the <code>length</code> method. Once all + * of the bytes are written the buffer must be closed. + * + * @param channel this is the channel to write the buffer to + * + * @return this returns the number of bytes that were written + */ + public int write(ByteChannel channel) throws IOException { + int size = length(); + + if(size <= 0) { + return 0; + } + return write(channel, size); + } + + /** + * This write method will write the contents of the buffer to the + * provided byte channel. If the whole buffer can be be written + * then this will simply return the number of bytes that have. + * The number of bytes remaining within the buffer after a write + * can be acquired from the <code>length</code> method. Once all + * of the bytes are written the buffer must be closed. + * + * @param channel this is the channel to write the buffer to + * @param count the number of bytes to write to the channel + * + * @return this returns the number of bytes that were written + */ + public int write(ByteChannel channel, int count) throws IOException { + if(count > 0) { + buffer.flip(); + } else { + return 0; + } + return write(channel, buffer); + } + + /** + * This write method will write the contents of the buffer to the + * provided byte channel. If the whole buffer can be be written + * then this will simply return the number of bytes that have. + * The number of bytes remaining within the buffer after a write + * can be acquired from the <code>length</code> method. Once all + * of the bytes are written the buffer must be closed. + * + * @param channel this is the channel to write the buffer to + * @param segment this is the buffer that is to be written + * + * @return this returns the number of bytes that were written + */ + private int write(ByteChannel channel, ByteBuffer segment) throws IOException { + int require = segment.remaining(); + int count = 0; + + while(count < require) { + int size = channel.write(segment); + + if(size <= 0) { + break; + } + if(trace != null) { + trace.trace(WRITE, size); + } + count += size; + } + if(count >= 0) { + segment.compact(); + } + return count; + } +} + diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferWriter.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferWriter.java new file mode 100644 index 0000000..346aef3 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferWriter.java @@ -0,0 +1,103 @@ +/* + * SocketBufferWriter.java February 2008 + * + * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.simpleframework.transport.reactor.Reactor; + +/** + * The <code>SocketBufferWriter</code> is used to represent the means + * to write buffers to an underlying transport. This manages all of + * the selection required to determine if the socket is write ready. + * If the buffer to be written is to block then this will wait + * until all queue buffers are fully written. + * + * @author Niall Gallagher + */ +class SocketBufferWriter { + + /** + * This is the flusher that is used to asynchronously flush. + */ + private final SocketFlusher flusher; + + /** + * This is the writer that is used to queue the buffers. + */ + private final SocketBuffer writer; + + /** + * Constructor for the <code>SocketBufferWriter</code> object. This + * is used to create a writer that can write buffers to the socket + * in such a way that it write either asynchronously or block + * the calling thread until such time as the buffers are written. + * + * @param socket this is the pipeline that this writes to + * @param reactor this is the writer used to scheduler writes + * @param buffer this is the initial size of the output buffer + * @param threshold this is the maximum size of the buffer + */ + public SocketBufferWriter(Socket socket, Reactor reactor, int buffer, int threshold) throws IOException { + this.writer = new SocketBuffer(socket, buffer, threshold); + this.flusher = new SocketFlusher(writer, socket, reactor); + } + + /** + * This method is used to deliver the provided buffer of bytes to + * the underlying transport. This will not modify the data that + * is to be written, this will simply queue the buffers in the + * order that they are provided. + * + * @param buffer this is the array of bytes to send to the client + */ + public void write(ByteBuffer buffer) throws IOException { + boolean done = writer.write(buffer); // returns true if we can buffer + + if(!done) { + flusher.flush(); // we could not fully write or buffer the data so we must flush + } + } + + /** + * This method is used to flush all of the queued buffers to + * the client. This method will not block but will simply flush + * any data to the underlying transport. Internally the data + * will be queued for delivery to the connected entity. + */ + public void flush() throws IOException { + boolean done = writer.flush(); // returns true only if everything is delivered + + if(!done) { + flusher.flush(); // here we will block for an op write event if the buffer contains a reference + } + } + + /** + * This is used to close the writer and the underlying socket. + * If a close is performed on the writer then no more bytes + * can be read from or written to the writer and the client + * will receive a connection close on their side. + */ + public void close() throws IOException { + flusher.close(); + writer.close(); + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java new file mode 100644 index 0000000..a10cee7 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java @@ -0,0 +1,142 @@ +/* + * SocketFlusher.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; + +import org.simpleframework.transport.reactor.Reactor; + +/** + * The <code>SocketFlusher</code> flushes bytes to the underlying + * socket channel. This allows asynchronous writes to the socket + * to be managed in such a way that there is order to the way data + * is delivered over the socket. This uses a selector to dispatch + * flush invocations to the underlying socket when the socket is + * write ready. This allows the writing thread to continue without + * having to wait for all the data to be written to the socket. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.SocketBufferWriter + */ +class SocketFlusher { + + /** + * This is the signaller used to determine when to flush. + */ + private FlushSignaller signaller; + + /** + * This is the scheduler used to block and signal the writer. + */ + private FlushScheduler scheduler; + + /** + * This is the writer used to queue the buffers written. + */ + private SocketBuffer buffer; + + /** + * This is used to determine if the socket flusher is closed. + */ + private boolean closed; + + /** + * Constructor for the <code>SocketFlusher</code> object. This is + * used to flush buffers to the underlying socket asynchronously. + * When finished flushing all of the buffered data this signals + * any threads that are blocking waiting for the write to finish. + * + * @param buffer this is used to write the buffered buffers + * @param reactor this is used to perform asynchronous writes + * @param socket this is the socket used to select with + */ + public SocketFlusher(SocketBuffer buffer, Socket socket, Reactor reactor) throws IOException { + this.signaller = new FlushSignaller(this, socket); + this.scheduler = new FlushScheduler(socket, reactor, signaller, this); + this.buffer = buffer; + } + + /** + * Here in this method we schedule a flush when the underlying + * writer is write ready. This allows the writer thread to return + * without having to fully flush the content to the underlying + * transport. If there are references queued this will block. + */ + public synchronized void flush() throws IOException { + if(closed) { + throw new TransportException("Flusher is closed"); + } + boolean block = !buffer.ready(); + + if(!closed) { + scheduler.schedule(block); + } + } + + /** + * This is executed when the flusher is to write all of the data to + * the underlying socket. In this situation the writes are attempted + * in a non blocking way, if the task does not complete then this + * will simply enqueue the writing task for OP_WRITE and leave the + * method. This returns true if all the buffers are written. + */ + public synchronized void execute() throws IOException { + boolean ready = buffer.flush(); + + if(!ready) { + boolean block = !buffer.ready(); + + if(!block && !closed) { + scheduler.release(); + } + scheduler.repeat(); + } else{ + scheduler.ready(); + } + } + + /** + * This is used to abort the flushing process when the reactor has + * been stopped. An abort to the flusher typically happens when the + * server has been shutdown. It prevents threads lingering waiting + * for a I/O operation which prevents the server from shutting down. + */ + public synchronized void abort() throws IOException { + scheduler.close(); + buffer.close(); + } + + /** + * This is used to close the flusher ensuring that all of the + * data within the writer will be flushed regardless of the + * amount of data within the writer that needs to be written. If + * the writer does not block then this waits to be finished. + */ + public synchronized void close() throws IOException { + boolean ready = buffer.flush(); + + if(!closed) { + closed = true; + } + if(!ready) { + scheduler.schedule(true); + } + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketProcessor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketProcessor.java new file mode 100644 index 0000000..a5c52b3 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketProcessor.java @@ -0,0 +1,61 @@ +/* + * SocketProcessor.java February 2001 + * + * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; + +/** + * The <code>SocketProcessor</code> interface represents a processor that + * is used to accept <code>Socket</code> objects. Implementations of + * this object will typically hand the socket over for processing either + * by some form of protocol handler or message processor. If the socket + * contains an <code>SSLEngine</code> an SSL hand shake may be performed + * before any messages on the socket are interpreted. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.connect.SocketConnection + */ +public interface SocketProcessor { + + /** + * Used to process the <code>Socket</code> which is a full duplex + * TCP connection to a higher layer the application. It is this + * layer that is responsible for interpreting a protocol or handling + * messages in some manner. In the case of HTTP this will initiate + * the consumption of a HTTP request after any SSL handshake is + * finished if the connection is secure. + * + * @param socket this is the connected HTTP socket to process + */ + void process(Socket socket) throws IOException; + + /** + * This method is used to stop the <code>SocketProcessor</code> such + * that it will accept no more sockets. Stopping the server ensures + * that all resources occupied will be released. This is required + * so that all threads are stopped, and all memory is released. + * <p> + * Typically this method is called once all connections to the + * server have been stopped. As a final act of shutting down the + * entire server all threads must be stopped, this allows collection + * of unused memory and the closing of file and socket resources. + */ + void stop() throws IOException; +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketTransport.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketTransport.java new file mode 100644 index 0000000..b0ba04a --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketTransport.java @@ -0,0 +1,262 @@ +/* + * SocketTransport.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import static org.simpleframework.transport.TransportEvent.READ; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Map; + +import javax.net.ssl.SSLEngine; + +import org.simpleframework.transport.reactor.Reactor; +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>SocketTransport</code> object offers a transport that can + * send and receive bytes in a non-blocking manner. The contract of + * the <code>Transport</code> is that it must either write the data + * it is asked to write or it must queue that data for delivery. For + * the vast majority of cases data is written directly to the socket + * without any need for queuing or selection for write ready events. + * <p> + * In the event that the client TCP window is full and writing would + * block this makes use of a queue of buffers which can be used to + * append data to. The buffers are lazily instantiated so the memory + * required is created only in the rare case that they are needed. + * Once a buffer is full it is queued to an asynchronous thread where + * the buffer queue is drained and sent to the client when the TCP + * window of the client is capable of accepting it. + * <p> + * In order to improve the network performance of this transport the + * default packet size sent to the TCP stack is four kilobytes. This + * ensures that the fragments of response delivered to the TCP layer + * are sufficiently large for optimum network performance. + * + * @author Niall Gallagher + */ +public class SocketTransport implements Transport { + + /** + * This is the writer that is used to flush the buffer queue. + */ + private SocketBufferWriter writer; + + /** + * This is the underlying byte channel used to send the data. + */ + private SocketChannel channel; + + /** + * This is the socket that this transport is representing. + */ + private Socket socket; + + /** + * This is the trace used to monitor all transport events. + */ + private Trace trace; + + /** + * This is used to determine if the transport has been closed. + */ + private boolean closed; + + /** + * Constructor for the <code>SocketTransport</code> object. This + * requires a reactor to perform asynchronous writes and also the + * pipeline which is used to read and write data. This transport + * will use a queue of buffers which are lazily initialized so as + * to only allocate the memory on demand. + * + * @param socket this is used to read and write the data + * @param reactor this is used to perform asynchronous writes + */ + public SocketTransport(Socket socket, Reactor reactor) throws IOException { + this(socket, reactor, 4096); + } + + /** + * Constructor for the <code>SocketTransport</code> object. This + * requires a reactor to perform asynchronous writes and also the + * pipeline which is used to read and write data. This transport + * will use a queue of buffers which are lazily initialized so as + * to only allocate the memory on demand. + * + * @param socket this is used to read and write the data + * @param reactor this is used to perform asynchronous writes + * @param buffer this is the size of the output buffer to use + */ + public SocketTransport(Socket socket, Reactor reactor, int buffer) throws IOException { + this(socket, reactor, buffer, 20480); + } + + /** + * Constructor for the <code>SocketTransport</code> object. This + * requires a reactor to perform asynchronous writes and also the + * pipeline which is used to read and write data. This transport + * will use a queue of buffers which are lazily initialized so as + * to only allocate the memory on demand. + * + * @param socket this is used to read and write the data + * @param reactor this is used to perform asynchronous writes + * @param buffer this is the size of the output buffer to use + * @param threshold this is the maximum size of the output buffer + */ + public SocketTransport(Socket socket, Reactor reactor, int buffer, int threshold) throws IOException { + this.writer = new SocketBufferWriter(socket, reactor, buffer, threshold); + this.channel = socket.getChannel(); + this.trace = socket.getTrace(); + this.socket = socket; + } + + /** + * This is used to acquire the SSL certificate used when the + * server is using a HTTPS connection. For plain text connections + * or connections that use a security mechanism other than SSL + * this will be null. This is only available when the connection + * makes specific use of an SSL engine to secure the connection. + * + * @return this returns the associated SSL certificate if any + */ + public Certificate getCertificate() { + return null; + } + + /** + * This is used to acquire the trace object that is associated + * with the socket. A trace object is used to collection details + * on what operations are being performed on the socket. For + * instance it may contain information relating to I/O events + * or more application specific events such as errors. + * + * @return this returns the trace associated with this socket + */ + public Trace getTrace() { + return trace; + } + + /** + * This method is used to get the <code>Map</code> of attributes + * by this pipeline. The attributes map is used to maintain details + * about the connection. Information such as security credentials + * to client details can be placed within the attribute map. + * + * @return this returns the map of attributes for this pipeline + */ + public Map getAttributes() { + return socket.getAttributes(); + } + + /** + * This is used to acquire the SSL engine used for https. If the + * pipeline is connected to an SSL transport this returns an SSL + * engine which can be used to establish the secure connection + * and send and receive content over that connection. If this is + * null then the pipeline represents a normal transport. + * + * @return the SSL engine used to establish a secure transport + */ + public SSLEngine getEngine() { + return socket.getEngine(); + } + + /** + * This method is used to acquire the <code>SocketChannel</code> + * for the connection. This allows the server to acquire the input + * and output streams with which to communicate. It can also be + * used to configure the connection and perform various network + * operations that could otherwise not be performed. + * + * @return this returns the socket used by this HTTP pipeline + */ + public SocketChannel getChannel() { + return socket.getChannel(); + } + + /** + * This is used to perform a non-blocking read on the transport. + * If there are no bytes available on the input buffers then + * this method will return zero and the buffer will remain the + * same. If there is data and the buffer can be filled then this + * will return the number of bytes read. Finally if the socket + * is closed this will return a -1 value. + * + * @param data this is the buffer to append the bytes to + * + * @return this returns the number of bytes that were read + */ + public int read(ByteBuffer data) throws IOException { + if(closed) { + throw new TransportException("Transport is closed"); + } + int count = channel.read(data); + + if(trace != null) { + trace.trace(READ, count); + } + return count; + } + + /** + * This method is used to deliver the provided buffer of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or send directly. This + * will buffer the bytes within the internal buffer to ensure + * that the response fragments are sufficiently large for the + * network. Smaller packets result poorer performance. + * + * @param data this is the array of bytes to send to the client + */ + public void write(ByteBuffer data) throws IOException{ + if(closed) { + throw new TransportException("Transport is closed"); + } + writer.write(data); + } + + /** + * This is used to flush the internal buffer to the underlying + * socket. Flushing with this method is always non-blocking, so + * if the socket is not write ready and the buffer can be queued + * it will be queued and the calling thread will return. + */ + public void flush() throws IOException { + if(closed) { + throw new TransportException("Transport is closed"); + } + writer.flush(); + } + + /** + * This method is used to flush the internal buffer and close + * the underlying socket. This method will not complete until + * all buffered data is written and the underlying socket is + * closed at which point this can be disposed of. + */ + public void close() throws IOException { + if(!closed) { + writer.flush(); + writer.close(); + closed = true; + } + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketWrapper.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketWrapper.java new file mode 100644 index 0000000..805ab91 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketWrapper.java @@ -0,0 +1,144 @@ +/* + * SocketWrapper.java February 2001 + * + * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; + +import javax.net.ssl.SSLEngine; + +import org.simpleframework.transport.trace.Trace; + +/** + * This is a <code>SocketWrapper</code> objects that represents a TCP + * socket connections. This contains a map that allows attributes to be + * associated with the client connection. Attributes such as security + * certificates or other transport related details can be exposed to + * the <code>Request</code> using the socket attribute map. + * <p> + * This provides the connected <code>SocketChannel</code> that can be + * used to receive and response to HTTP requests. The socket channel + * must be selectable and in non-blocking mode. If the socket is not + * in a non-blocking state the connection will not be processed. + * + * @author Niall Gallagher + */ +public class SocketWrapper implements Socket { + + /** + * This is the socket that provides the input and output. + */ + private final SocketChannel channel; + + /** + * This is used to encrypt content for secure connections. + */ + private final SSLEngine engine; + + /** + * This can be used to trace specific events for the socket. + */ + private final Trace trace; + + /** + * This is used to store the attributes for the socket. + */ + private final Map map; + + /** + * This creates a <code>SocketWrapper</code> from a socket channel. + * Any implementations of the object may use this constructor to + * ensure that all the data is initialized. + * + * @param channel the socket channel that is used as the transport + * @param trace used to trace specific events for the socket + */ + public SocketWrapper(SocketChannel channel, Trace trace) { + this(channel, trace, null); + } + + /** + * This creates a <code>SecureSocket</code> from a socket channel. + * Any implementations of the object may use this constructor to + * ensure that all the data is initialized. + * + * @param channel the socket channel that is used as the transport + * @param trace used to trace specific events for the socket + * @param engine this is the SSL engine used for secure transport + */ + public SocketWrapper(SocketChannel channel, Trace trace, SSLEngine engine) { + this.map = new HashMap(); + this.channel = channel; + this.engine = engine; + this.trace = trace; + } + + /** + * This is used to acquire the trace object that is associated + * with the socket. A trace object is used to collection details + * on what operations are being performed on the socket. For + * instance it may contain information relating to I/O events + * or more application specific events such as errors. + * + * @return this returns the trace associated with this socket + */ + public Trace getTrace() { + return trace; + } + + /** + * This is used to acquire the SSL engine used for HTTPS. If the + * socket is connected to an SSL transport this returns an SSL + * engine which can be used to establish the secure connection + * and send and receive content over that connection. If this is + * null then the socket represents a normal transport. + * + * @return the SSL engine used to establish a secure transport + */ + public SSLEngine getEngine() { + return engine; + } + + /** + * This method is used to acquire the <code>SocketChannel</code> + * for the connection. This allows the server to acquire the input + * and output streams with which to communicate. It can also be + * used to configure the connection and perform various network + * operations that could otherwise not be performed. + * + * @return this returns the socket used by this HTTP socket + */ + public SocketChannel getChannel() { + return channel; + } + + /** + * This method is used to get the <code>Map</code> of attributes + * by this socket. The attributes map is used to maintain details + * about the connection. Information such as security credentials + * to client details can be placed within the attribute map. + * + * @return this returns the map of attributes for this socket + */ + public Map getAttributes() { + return map; + } +} + diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Transport.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Transport.java new file mode 100644 index 0000000..cc499f3 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Transport.java @@ -0,0 +1,91 @@ +/* + * Transport.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * The <code>Transport</code> interface represents a low level means + * to deliver content to the connected client. Typically this will + * be a connected, non-blocking, TCP connection. However, for tests + * and other purposes this may be adapted. The general contract of + * the transport is that it provides non-blocking reads and blocking + * writes. Blocking writes are required to ensure that memory does + * not build up in output buffers during high load periods. + * + * @author Niall Gallagher + */ +public interface Transport extends Socket { + + /** + * This is used to acquire the SSL certificate used when the + * server is using a HTTPS connection. For plain text connections + * or connections that use a security mechanism other than SSL + * this will be null. This is only available when the connection + * makes specific use of an SSL engine to secure the connection. + * + * @return this returns the associated SSL certificate if any + */ + Certificate getCertificate() throws IOException; + + /** + * This is used to perform a non-blocking read on the transport. + * If there are no bytes available on the input buffers then + * this method will return zero and the buffer will remain the + * same. If there is data and the buffer can be filled then this + * will return the number of bytes read. Finally if the socket + * is closed this will return a -1 value. + * + * @param buffer this is the buffer to append the bytes to + * + * @return this returns the number of bytes that have been read + */ + int read(ByteBuffer buffer) throws IOException; + + /** + * This method is used to deliver the provided buffer of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or send directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param buffer this is the buffer of bytes to send to the client + */ + void write(ByteBuffer buffer) throws IOException; + + /** + * This method is used to flush the contents of the buffer to + * the client. This method will block not block but will simply + * flush any data to the underlying transport. Internally the + * data will be queued for delivery to the connected entity. + */ + void flush() throws IOException; + + /** + * This is used to close the transport and the underlying socket. + * If a close is performed on the transport then no more bytes + * can be read from or written to the transport and the client + * will receive a connection close on their side. + */ + void close() throws IOException; +} + + + + diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportChannel.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportChannel.java new file mode 100644 index 0000000..2d5ff1a --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportChannel.java @@ -0,0 +1,195 @@ +/* + * TransportChannel.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import static org.simpleframework.transport.TransportEvent.ERROR; + +import java.io.IOException; +import java.nio.channels.SocketChannel; +import java.util.Map; + +import javax.net.ssl.SSLEngine; + +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>TransportChannel</code> provides a means to deliver and + * receive content over a transport. This essentially provides two + * adapters which enable simpler communication with the underlying + * transport. They hide the complexities involved with buffering and + * resetting data written to and read from the socket. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.TransportProcessor + */ +public class TransportChannel implements Channel { + + /** + * This is the certificate associated with this SSL channel. + */ + private final Certificate certificate; + + /** + * This represents the underlying transport that is to be used. + */ + private final Transport transport; + + /** + * This is the engine that is used to secure the transport. + */ + private final SSLEngine engine; + + /** + * This is used to provide a cursor view on the input. + */ + private final ByteCursor cursor; + + /** + * This is used to provide a blocking means for sending data. + */ + private final ByteWriter writer; + + /** + * This is the trace used to monitor events on the channel. + */ + private final Trace trace; + + /** + * Constructor for the <code>TransportChannel</code> object. The + * transport channel basically wraps a channel and provides a + * means to send and receive data using specialized adapters. + * These adapters provide a simpler means for communicating over + * the network to the connected client. + * + * @param transport this is the underlying transport to be used + */ + public TransportChannel(Transport transport) throws IOException { + this.cursor = new TransportCursor(transport); + this.writer = new TransportWriter(transport); + this.certificate = transport.getCertificate(); + this.engine = transport.getEngine(); + this.trace = transport.getTrace(); + this.transport = transport; + } + + /** + * This is used to determine if the channel is secure and that + * data read from and data written to the request is encrypted. + * Channels transferred over SSL are considered secure and will + * have this return true, otherwise it will return false. + * + * @return true if this is secure for reading and writing + */ + public boolean isSecure() { + return engine != null; + } + + /** + * This is used to acquire the SSL certificate used for security. + * If the socket is connected to an SSL transport this returns an + * SSL certificate which was provided during the secure handshake + * between the client and server. If not certificates are present + * in the provided instance, a challenge can be issued. + * + * @return the SSL certificate provided by a secure transport + */ + public Certificate getCertificate() { + return certificate; + } + + /** + * This gets the <code>Trace</code> object associated with the + * channel. The trace is used to log various events for the life + * of the transaction such as low level read and write events + * as well as milestone events and errors. + * + * @return this returns the trace associated with the socket + */ + public Trace getTrace() { + return trace; + } + + /** + * This is the connected socket channel associated with this. In + * order to determine if content can be read or written to or + * from the channel this socket can be used with a selector. This + * provides a means to react to I/O events as they occur rather + * than polling the channel which is generally less performant. + * + * @return this returns the connected socket channel + */ + public SocketChannel getSocket() { + return transport.getChannel(); + } + + /** + * This returns the <code>Map</code> of attributes used to hold + * connection information for the channel. The attributes here + * are taken from the pipeline attributes and may contain details + * such as SSL certificates or other such useful information. + * + * @return returns the attributes associated with the channel + */ + public Map getAttributes() { + return transport.getAttributes(); + } + + /** + * This provides a <code>ByteCursor</code> for this channel. The + * cursor provides a seekable view of the input buffer and will + * allow the server kernel to peek into the input buffer without + * having to take the data from the input. This allows overflow + * to be pushed back on to the cursor for subsequent reads. + * + * @return this returns the input cursor for the channel + */ + public ByteCursor getCursor() { + return cursor; + } + + /** + * This provides a <code>ByteWriter</code> for the channel. This + * is used to provide a blocking output mechanism for the channel. + * Enabling blocking reads ensures that output buffering can be + * limited to an extent, which ensures that memory remains low at + * high load periods. Writes to the sender may result in the data + * being copied and queued until the socket is write ready. + * + * @return this returns the output sender for this channel + */ + public ByteWriter getWriter() { + return writer; + } + + /** + * Because the channel represents a duplex means of communication + * there needs to be a means to close it down. This provides such + * a means. By closing the channel the cursor and sender will no + * longer send or receive data to or from the network. The client + * will also be signalled that the connection has been severed. + */ + public void close() { + try { + transport.close(); + }catch(Exception cause) { + trace.trace(ERROR, cause); + } + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportCursor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportCursor.java new file mode 100644 index 0000000..d25cb90 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportCursor.java @@ -0,0 +1,260 @@ +/* + * TransportCursor.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; + +/** + * The <code>TransportCursor</code> object represents a cursor that + * can read and buffer data from an underlying transport. If the + * number of bytes read from the cursor is more than required for + * the HTTP request then those bytes can be pushed back in to the + * cursor using the <code>reset</code> method. This will only allow + * the last read to be reset within the cursor safely. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.Transport + */ +public class TransportCursor implements ByteCursor { + + /** + * This is the stream for the bytes read by this cursor object. + */ + private ByteReader reader; + + /** + * This is the buffer used to collect the bytes pushed back. + */ + private byte[] buffer; + + /** + * This is the number of bytes that have been pushed back. + */ + private int count; + + /** + * This is the mark from the last read from this cursor object. + */ + private int mark; + + /** + * This is the position to read data from the internal buffer. + */ + private int pos; + + /** + * This is the maximum number of bytes that can be pushed back. + */ + private int limit; + + /** + * Constructor for the <code>TransportCursor</code> object. This + * requires a transport to read the bytes from. By default this + * will create a buffer of of the specified size to read the + * input in to which enabled bytes to be buffered internally. + * + * @param transport this is the underlying transport to use + */ + public TransportCursor(Transport transport) { + this(transport, 2048); + } + + /** + * Constructor for the <code>TransportCursor</code> object. This + * requires a transport to read the bytes from. By default this + * will create a buffer of of the specified size to read the + * input in to which enabled bytes to be buffered internally. + * + * @param transport this is the underlying transport to use + * @param size this is the size of the internal buffer to use + */ + public TransportCursor(Transport transport, int size) { + this.reader = new TransportReader(transport, size); + this.buffer = new byte[0]; + this.limit = size; + } + + /** + * Determines whether the cursor is still open. The cursor is + * considered open if there are still bytes to read. If there is + * still bytes buffered and the underlying transport is closed + * then the cursor is still considered open. + * + * @return true if there is nothing more to be read from this + */ + public boolean isOpen() throws IOException { + return reader.isOpen(); + } + + /** + * Determines whether the cursor is ready for reading. When the + * cursor is ready then it guarantees that some amount of bytes + * can be read from the underlying stream without blocking. + * + * @return true if some data can be read without blocking + */ + public boolean isReady() throws IOException { + return ready() > 0; + } + + /** + * Provides the number of bytes that can be read from the stream + * without blocking. This is typically the number of buffered or + * available bytes within the stream. When this reaches zero then + * the cursor may perform a blocking read. + * + * @return the number of bytes that can be read without blocking + */ + public int ready() throws IOException { + if(count > 0) { + return count; + } + return reader.ready(); + } + + /** + * Reads a block of bytes from the underlying stream. This will + * read up to the requested number of bytes from the underlying + * stream. If there are no ready bytes on the stream this can + * return zero, representing the fact that nothing was read. + * + * @param data this is the array to read the bytes in to + * + * @return this returns the number of bytes read from the stream + */ + public int read(byte[] data) throws IOException { + return read(data, 0, data.length); + } + + /** + * Reads a block of bytes from the underlying stream. This will + * read up to the requested number of bytes from the underlying + * stream. If there are no ready bytes on the stream this can + * return zero, representing the fact that nothing was read. + * + * @param data this is the array to read the bytes in to + * @param off this is the offset to begin writing the bytes to + * @param len this is the number of bytes that are requested + * + * @return this returns the number of bytes read from the stream + */ + public int read(byte[] data, int off, int len) throws IOException { + if(count <= 0) { + mark = pos; + return reader.read(data, off, len); + } + int size = Math.min(count, len); + + if(size > 0) { + System.arraycopy(buffer, pos, data, off, size); + mark = pos; + pos += size; + count -= size; + } + return size; + } + + /** + * Pushes the provided data on to the cursor. Data pushed on to + * the cursor will be the next data read from the cursor. This + * complements the <code>reset</code> method which will reset + * the cursors position on a stream. Allowing data to be pushed + * on to the cursor allows more flexibility. + * + * @param data this is the data to be pushed on to the cursor + */ + public void push(byte[] data) throws IOException { + push(data, 0, data.length); + } + + /** + * Pushes the provided data on to the cursor. Data pushed on to + * the cursor will be the next data read from the cursor. This + * complements the <code>reset</code> method which will reset + * the cursors position on a stream. Allowing data to be pushed + * on to the cursor allows more flexibility. + * + * @param data this is the data to be pushed on to the cursor + * @param off this is the offset to begin reading the bytes + * @param len this is the number of bytes that are to be used + */ + public void push(byte[] data, int off, int len) throws IOException { + int size = buffer.length; + + if(size < len + count) { + expand(len + count); + } + int start = pos - len; + + if(len > 0) { + System.arraycopy(data, off, buffer, start, len); + mark = start; + pos = start; + count += len; + } + } + + /** + * This is used to ensure that there is enough space in the buffer + * to allow for more bytes to be added. If the buffer is already + * larger than the required capacity the this will do nothing. + * + * @param capacity the minimum size needed for the buffer + */ + private void expand(int capacity) throws IOException { + if(capacity > limit) { + throw new TransportException("Capacity limit exceeded"); + } + byte[] temp = new byte[capacity]; + int start = capacity - count; + int shift = pos - mark + ; + if(count > 0) { + System.arraycopy(buffer, pos, temp, start, count); + } + pos = capacity - count; + mark = pos - shift; + buffer = temp; + } + + /** + * Moves the cursor backward within the stream. This ensures + * that any bytes read from the last read can be pushed back + * in to the stream so that they can be read again. This will + * throw an exception if the reset can not be performed. + * + * @param size this is the number of bytes to reset back + * + * @return this is the number of bytes that have been reset + */ + public int reset(int size) throws IOException { + if(mark == pos) { + return reader.reset(size); + } + if(pos - size < mark) { + size = pos - mark; + } + if(size > 0) { + count += size; + pos -= size; + } + return size; + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportDispatcher.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportDispatcher.java new file mode 100644 index 0000000..ec48140 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportDispatcher.java @@ -0,0 +1,114 @@ +/* + * TransportDispatcher.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.nio.channels.SocketChannel; + +import org.simpleframework.transport.reactor.Operation; +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>TransportDispatcher</code> operation is used transfer a + * transport to the processor so it can be processed. This is used so + * that when a transport is given to the processor it can be dispatched + * in another thread to the processor. This is needed so that the + * connection thread is occupied only briefly. + * + * @author Niall Gallagher + */ +class TransportDispatcher implements Operation { + + /** + * This is the processor used to transfer the transport to. + */ + private final TransportProcessor processor; + + /** + * This is the transport to be passed to the processor. + */ + private final Transport transport; + + /** + * Constructor for the <code>TransportDispatcher</code> object. This + * is used to transfer a transport to a processor. Transferring the + * transport using an operation ensures that the thread that is + * used to process the transport is not occupied for long. + * + * @param transport this is the transport this exchange uses + * @param processor this is the negotiation to dispatch to + */ + public TransportDispatcher(TransportProcessor processor, Transport transport) { + this.transport = transport; + this.processor = processor; + } + + /** + * This is the <code>SelectableChannel</code> which is used to + * determine if the operation should be executed. If the channel + * is ready for a given I/O event it can be run. For instance if + * the operation is used to perform some form of read operation + * it can be executed when ready to read data from the channel. + * + * @return this returns the channel used to govern execution + */ + public SocketChannel getChannel() { + return transport.getChannel(); + } + + /** + * This is used to acquire the trace object that is associated + * with the operation. A trace object is used to collection details + * on what operations are being performed. For instance it may + * contain information relating to I/O events or errors. + * + * @return this returns the trace associated with this operation + */ + public Trace getTrace() { + return transport.getTrace(); + } + + /** + * This is used to transfer the transport to the processor. This + * will typically be executed asynchronously so that it does not + * delay the thread that passes the <code>Transport</code> to the + * transport processor, ensuring quicker processing. + */ + public void run() { + try { + processor.process(transport); + }catch(Exception e) { + cancel(); + } + } + + /** + * This is used to cancel the operation if it has timed out. This + * is typically invoked when it has been waiting in a selector for + * an extended duration of time without any active operations on + * it. In such a case the reactor must purge the operation to free + * the memory and open channels associated with the operation. + */ + public void cancel() { + try { + transport.close(); + }catch(Exception e) { + return; + } + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportEvent.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportEvent.java new file mode 100644 index 0000000..97be771 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportEvent.java @@ -0,0 +1,91 @@ +/* + * TransportEvent.java October 2012 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +/** + * The <code>TransportEvent</code> enum represents various events that + * can occur with the transport. Events that are available here are + * typically those that refer to low level I/O operations within the + * server. If a <code>Trace</code> has been associated with the socket + * connection then it will receive these events as they occur. + * + * @author Niall Gallagher + */ +public enum TransportEvent { + + /** + * This event represents a read operation on the underlying socket. + */ + READ, + + /** + * This event occurs when there is no more data available to read. + */ + READ_WAIT, + + /** + * This event represents a write operation on the underlying socket. + */ + WRITE, + + /** + * This event represents a write buffer operation on the underlying socket. + */ + WRITE_BUFFER, + + /** + * This event occurs when no more data can be sent over the socket. + */ + WRITE_WAIT, + + /** + * This event occurs when a thread must wait for a write to finish. + */ + WRITE_BLOCKING, + + /** + * This event occurs with HTTPS when a new SSL handshake starts. + */ + HANDSHAKE_BEGIN, + + /** + * This event occurs with HTTPS when a SSL handshake has finished. + */ + HANDSHAKE_DONE, + + /** + * This event occurs when a server challenges for an X509 certificate. + */ + CERTIFICATE_CHALLENGE, + + /** + * This event indicates that the handshake failed in some way. + */ + HANDSHAKE_FAILED, + + /** + * This event occurs when the underlying connection is terminated. + */ + CLOSE, + + /** + * This event occurs when there is an error with the transport. + */ + ERROR +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportException.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportException.java new file mode 100644 index 0000000..c6394ee --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportException.java @@ -0,0 +1,55 @@ +/* + * TransportException.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; + +/** + * The <code>TransportException</code> object is thrown when there + * is a problem with the transport. Typically this is done thrown if + * there is a problem reading or writing to the transport. + * + * @author Niall Gallagher + */ +public class TransportException extends IOException { + + /** + * Constructor for the <code>TransportException</code> object. If + * there is a problem sending or reading from a transport then it + * will throw a transport exception to report the error. + * + * @param message this is the message associated with the error + */ + public TransportException(String message) { + super(message); + } + + /** + * Constructor for the <code>TransportException</code> object. If + * there is a problem sending or reading from a transport then it + * will throw a transport exception to report the error. + * + * @param message this is the message associated with the error + * @param cause this is the cause of the producer exception + */ + public TransportException(String message, Throwable cause) { + super(message); + initCause(cause); + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportProcessor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportProcessor.java new file mode 100644 index 0000000..13f505b --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportProcessor.java @@ -0,0 +1,63 @@ +/* + * TransportProcessor.java February 2007 + * + * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; + +/** + * This is the <code>TransportProcessor</code> used to process the + * provided transport in a higher layer. It is the responsibility of + * the delegate to handle protocols and message processing. In the + * case of HTTP this will process requests for a container. The + * transport provided can be either a direct transport or provide + * some form of secure encoding such as SSL. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.Transport + */ +public interface TransportProcessor { + + /** + * This is used to process a <code>Transport</code> instance in + * a higher layer that can handle a protocol. A transport can be + * a direct transport or a secure transport providing SSL. At this + * point any SSL handshake will have already completed. + * <p> + * Typical usage of this method is to accept multiple transport + * objects, each representing a unique TCP channel to the client, + * and process requests from those transports concurrently. + * + * @param transport the transport to process requests from + */ + void process(Transport transport) throws IOException; + + /** + * This method is used to stop the <code>TransportProcessor</code> + * such that it will accept no more pipelines. Stopping the connector + * ensures that all resources occupied will be released. This is + * required so that all threads are stopped and released. + * <p> + * Typically this method is called once all connections to the + * server have been stopped. As a final act of shutting down the + * entire server all threads must be stopped, this allows collection + * of unused memory and the closing of file and socket resources. + */ + void stop() throws IOException; +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportReader.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportReader.java new file mode 100644 index 0000000..713c162 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportReader.java @@ -0,0 +1,229 @@ +/* + * TransportReader.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * The <code>TransportReader</code> object represents a reader that + * can read and buffer data from an underlying transport. If the + * number of bytes read from the reader is more than required for + * the HTTP request then those bytes can be pushed back in to the + * cursor using the <code>reset</code> method. This will only allow + * the last read to be reset within the cursor safely. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.Transport + */ +class TransportReader implements ByteReader { + + /** + * This is the underlying transport to read the bytes from. + */ + private Transport transport; + + /** + * This is used to store the bytes read from the transport. + */ + private ByteBuffer buffer; + + /** + * This is used to determine if the transport has been closed. + */ + private boolean closed; + + /** + * This represents the number of bytes that are ready to read. + */ + private int count; + + /** + * Constructor for the <code>TransportReader</code> object. This + * requires a transport to read the bytes from. By default this + * will create a buffer of two kilobytes to read the input in to + * which ensures several requests can be read at once. + * + * @param transport this is the underlying transport to use + */ + public TransportReader(Transport transport) { + this(transport, 2048); + } + + /** + * Constructor for the <code>TransportReader</code> object. This + * requires a transport to read the bytes from. By default this + * will create a buffer of of the specified size to read the + * input in to which enabled bytes to be buffered internally. + * + * @param transport this is the underlying transport to use + * @param size this is the size of the internal buffer to use + */ + public TransportReader(Transport transport, int size) { + this.buffer = ByteBuffer.allocate(size); + this.transport = transport; + } + + /** + * Determines whether the source is still open. The source is + * considered open if there are still bytes to read. If there is + * still bytes buffered and the underlying transport is closed + * then the source is still considered open. + * + * @return true if there is nothing more to be read from this + */ + public boolean isOpen() throws IOException { + return count != -1; + } + + /** + * Determines whether the source is ready for reading. When the + * source is ready then it guarantees that some amount of bytes + * can be read from the underlying stream without blocking. + * + * @return true if some data can be read without blocking + */ + public boolean isReady() throws IOException { + return ready() > 0; + } + + /** + * Reads a block of bytes from the underlying stream. This will + * read up to the requested number of bytes from the underlying + * stream. If there are no ready bytes on the stream this can + * return zero, representing the fact that nothing was read. + * + * @param data this is the array to read the bytes in to + * + * @return this returns the number of bytes read from the stream + */ + public int read(byte[] data) throws IOException { + return read(data, 0, data.length); + } + + /** + * Reads a block of bytes from the underlying stream. This will + * read up to the requested number of bytes from the underlying + * stream. If there are no ready bytes on the stream this can + * return zero, representing the fact that nothing was read. + * + * @param data this is the array to read the bytes in to + * @param off this is the offset to begin writing the bytes to + * @param len this is the number of bytes that are requested + * + * @return this returns the number of bytes read from the stream + */ + public int read(byte[] data, int off, int len) throws IOException { + if(count <= 0) { // has the channel ended + return count; + } + int size = Math.min(len, count); // get the minimum + + if(size > 0) { + buffer.get(data, off, size); // get the bytes + count -= size; + } + return Math.max(0, size); + } + + /** + * Provides the number of bytes that can be read from the stream + * without blocking. This is typically the number of buffered or + * available bytes within the stream. When this reaches zero then + * the source may perform a blocking read. + * + * @return the number of bytes that can be read without blocking + */ + public int ready() throws IOException { + if(count < 0) { + return count; + } + if(count > 0) { // if the are ready bytes don't read + return count; + } + return peek(); + } + + /** + * Provides the number of bytes that can be read from the stream + * without blocking. This is typically the number of buffered or + * available bytes within the stream. When this reaches zero then + * the source may perform a blocking read. + * + * @return the number of bytes that can be read without blocking + */ + private int peek() throws IOException { + if(count <= 0) { // reset the buffer for filling + buffer.clear(); + } + if(count > 0) { + buffer.compact(); // compact the buffer + } + count += transport.read(buffer); // how many were read + + if(count > 0) { + buffer.flip(); // if there is something then flip + } + if(count < 0) { // close when stream is fully read + close(); + } + return count; + } + + /** + * Moves the source backward within the stream. This ensures + * that any bytes read from the last read can be pushed back + * in to the stream so that they can be read again. This will + * throw an exception if the reset can not be performed. + * + * @param size this is the number of bytes to reset back + * + * @return this is the number of bytes that have been reset + */ + public int reset(int size) throws IOException { + int mark = buffer.position(); + + if(size > mark) { + size = mark; + } + if(mark > 0) { + buffer.position(mark - size); + count += size; + } + return size; + } + + /** + * This is used to close the underlying transport. This is used + * when the transport returns a negative value, indicating that + * the client has closed the connection on the other side. If + * this is invoked the read method returns -1 and the reader + * is no longer open, further bytes can no longer be read. + */ + public void close() throws IOException { + if(!closed) { + transport.close(); + closed = true; + count = -1; + } + } +} + + diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportSocketProcessor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportSocketProcessor.java new file mode 100644 index 0000000..04e3c9b --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportSocketProcessor.java @@ -0,0 +1,163 @@ +/* + * TransportSocketProcessor.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; + +import org.simpleframework.common.thread.ConcurrentExecutor; +import org.simpleframework.common.thread.Daemon; +import org.simpleframework.transport.reactor.ExecutorReactor; +import org.simpleframework.transport.reactor.Operation; +import org.simpleframework.transport.reactor.Reactor; + +/** + * The <code>TransportSocketProcessor</code> is used to convert sockets + * to transports. This acts as an adapter to a transport processor + * which converts a connected socket to a <code>Transport</code> that + * can be used to read and write data. Depending on whether there is + * an <code>SSLEngine</code> associated with the socket or not, there + * could be an SSL handshake performed. + * + * @author Niall Gallagher + */ +public class TransportSocketProcessor implements SocketProcessor { + + /** + * This is the executor used to execute the I/O operations. + */ + private final ConcurrentExecutor executor; + + /** + * This is the factory used to create the required operations. + */ + private final OperationFactory factory; + + /** + * This is the processor used to process transport objects. + */ + private final Reactor reactor; + + /** + * This is used to clean the internals of the processor. + */ + private final Daemon cleaner; + + /** + * Constructor for the <code>TransportSocketProcessor</code> object. + * The transport processor is used to process plain connections + * and wrap those connections in a <code>Transport</code> that + * can be used to send and receive data to and from. + * + * @param processor this is used to process transports + */ + public TransportSocketProcessor(TransportProcessor processor) throws IOException { + this(processor, 8); + } + + /** + * Constructor for the <code>TransportSocketProcessor</code> object. + * The transport processor is used to process plain connections + * and wrap those connections in a <code>Transport</code> that + * can be used to send and receive data to and from. + * + * @param processor this is used to process transports + * @param threads this is the number of threads this will use + */ + public TransportSocketProcessor(TransportProcessor processor, int threads) throws IOException { + this(processor, threads, 4096); + } + + /** + * Constructor for the <code>TransportSocketProcessor</code> object. + * The transport processor is used to process plain connections + * and wrap those connections in a <code>Transport</code> that + * can be used to send and receive data to and from. + * + * @param processor this is used to process transports + * @param threads this is the number of threads this will use + * @param buffer this is the initial size of the output buffer + */ + public TransportSocketProcessor(TransportProcessor processor, int threads, int buffer) throws IOException { + this(processor, threads, buffer, 20480); + } + + /** + * Constructor for the <code>TransportSocketProcessor</code> object. + * The transport processor is used to process plain connections + * and wrap those connections in a <code>Transport</code> that + * can be used to send and receive data to and from. + * + * @param processor this is used to process transports + * @param threads this is the number of threads this will use + * @param buffer this is the initial size of the output buffer + * @param threshold this is the maximum size of the output buffer + */ + public TransportSocketProcessor(TransportProcessor processor, int threads, int buffer, int threshold) throws IOException { + this(processor, threads, buffer, threshold, false); + } + + /** + * Constructor for the <code>TransportSocketProcessor</code> object. + * The transport processor is used to process plain connections + * and wrap those connections in a <code>Transport</code> that + * can be used to send and receive data to and from. + * + * @param processor this is used to process transports + * @param threads this is the number of threads this will use + * @param buffer this is the initial size of the output buffer + * @param threshold this is the maximum size of the output buffer + * @param client determines if the SSL handshake is for a client + */ + public TransportSocketProcessor(TransportProcessor processor, int threads, int buffer, int threshold, boolean client) throws IOException { + this.executor = new ConcurrentExecutor(Operation.class, threads); + this.reactor = new ExecutorReactor(executor); + this.factory = new OperationFactory(processor, reactor, buffer, threshold, client); + this.cleaner = new ServerCleaner(processor, executor, reactor); + } + + /** + * Used to connect the <code>Socket</code> which is a full duplex + * TCP connection to a higher layer the application. It is this + * layer that is responsible for interpreting a protocol or handling + * messages in some manner. In the case of HTTP this will initiate + * the consumption of a HTTP request after any SSL handshake is + * finished if the connection is secure. + * + * @param socket this is the connected HTTP pipeline to process + */ + public void process(Socket socket) throws IOException { + Operation task = factory.getInstance(socket); + + if(task != null) { + reactor.process(task); + } + } + + /** + * This is implemented to shut down the server asynchronously. It + * will start a process to perform the shutdown. Asynchronous + * shutdown allows a server resource executed via a HTTP request + * can stop the server without any danger of killing itself or + * even worse causing a deadlock. + */ + public void stop() throws IOException { + cleaner.start(); + executor.stop(); + } + } diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportWriter.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportWriter.java new file mode 100644 index 0000000..c6eb436 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportWriter.java @@ -0,0 +1,150 @@ +/* + * TransportWriter.java February 2007 + * + * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The <code>TransportWriter</code> object is used to write bytes to + * and underlying transport. This is essentially an adapter between + * an <code>OutputStream</code> and the underlying transport. Each + * byte array segment written to the underlying transport is wrapped + * in a bytes buffer so that it can be sent by the transport layer. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.Transport + */ +public class TransportWriter implements ByteWriter { + + /** + * This is used to determine if the transport has been closed. + */ + private final AtomicBoolean closed; + + /** + * This is the underlying transport to write the bytes to. + */ + private final Transport transport; + + /** + * Constructor for the <code>TransportWriter</code> object. This + * is used to create an adapter for the transport such that a + * byte array can be used to write bytes to the array. + * + * @param transport the underlying transport to write bytes to + */ + public TransportWriter(Transport transport) { + this.closed = new AtomicBoolean(); + this.transport = transport; + } + + /** + * This method is used to deliver the provided array of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or write directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param array this is the array of bytes to write to the client + */ + public void write(byte[] array) throws IOException { + write(array, 0, array.length); + } + + /** + * This method is used to deliver the provided array of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or write directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param array this is the array of bytes to write to the client + * @param off this is the offset within the array to write from + * @param len this is the number of bytes that are to be sent + */ + public void write(byte[] array, int off, int len) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(array, off, len); + + if(len > 0) { + write(buffer); + } + } + + /** + * This method is used to deliver the provided buffer of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or write directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param buffer this is the buffer of bytes to write to the client + */ + public void write(ByteBuffer buffer) throws IOException { + int mark = buffer.position(); + int size = buffer.limit(); + + if(mark > size) { + throw new IOException("Buffer position greater than limit"); + } + write(buffer, 0, size - mark); + } + + /** + * This method is used to deliver the provided buffer of bytes to + * the underlying transport. Depending on the connection type the + * array may be encoded for SSL transport or write directly. Any + * implementation may choose to buffer the bytes for performance. + * + * @param buffer this is the buffer of bytes to write to the client + * @param off this is the offset within the buffer to write from + * @param len this is the number of bytes that are to be sent + */ + public void write(ByteBuffer buffer, int off, int len) throws IOException { + int mark = buffer.position(); + int limit = buffer.limit(); + + if(limit - mark > len) { + buffer.limit(mark + len); // reduce usable size + } + transport.write(buffer); + buffer.limit(limit); + } + + /** + * This method is used to flush the contents of the buffer to + * the client. This method will block until such time as all of + * the data has been sent to the client. If at any point there + * is an error writing the content an exception is thrown. + */ + public void flush() throws IOException { + transport.flush(); + } + + /** + * This is used to close the writer and the underlying transport. + * If a close is performed on the writer then no more bytes can + * be read from or written to the transport and the client will + * received a connection close on their side. + */ + public void close() throws IOException { + if(!closed.getAndSet(true)) { + transport.close(); + } + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/Connection.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/Connection.java new file mode 100644 index 0000000..490ce6d --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/Connection.java @@ -0,0 +1,73 @@ +/* + * Connection.java October 2002 + * + * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.connect; + +import java.io.Closeable; +import java.io.IOException; +import java.net.SocketAddress; + +import javax.net.ssl.SSLContext; + +/** + * The <code>Connection</code> object is used to manage connections + * from a server socket. In order to achieve this it spawns a task + * to listen for incoming connect requests. When a TCP connection + * request arrives it hands off the <code>SocketChannel</code> to + * the <code>SocketProcessor</code> which processes the request. + * <p> + * This handles connections from a <code>ServerSocketChannel</code> + * object so that features such as SSL can be used by a server that + * uses this package. The background acceptor process will terminate + * if the connection is closed. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.SocketProcessor + */ +public interface Connection extends Closeable { + + /** + * This creates a new background task that will listen to the + * specified <code>ServerAddress</code> for incoming TCP connect + * requests. When an connection is accepted it is handed to the + * internal <code>Server</code> implementation as a pipeline. The + * background task is a non daemon task to ensure the server is + * kept active, to terminate the connection this can be closed. + * + * @param address this is the address used to accept connections + * + * @return this returns the actual local address that is used + */ + SocketAddress connect(SocketAddress address) throws IOException; + + /** + * This creates a new background task that will listen to the + * specified <code>ServerAddress</code> for incoming TCP connect + * requests. When an connection is accepted it is handed to the + * internal <code>Server</code> implementation as a pipeline. The + * background task is a non daemon task to ensure the server is + * kept active, to terminate the connection this can be closed. + * + * @param address this is the address used to accept connections + * @param context this is used for secure SSL connections + * + * @return this returns the actual local address that is used + */ + SocketAddress connect(SocketAddress address, SSLContext context) throws IOException; +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionEvent.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionEvent.java new file mode 100644 index 0000000..0a29ba8 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionEvent.java @@ -0,0 +1,42 @@ +/* + * ConnectionEvent.java October 2012 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.connect; + +/** + * The <code>ConnectionEvent</code> enum represents various events that + * can occur with a new connection. When a new connection is accepted + * then the accept event is dispatched to a <code>Trace</code> object + * if one has been associated with the connection. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.trace.Trace + */ +public enum ConnectionEvent { + + /** + * This event occurs when the server accepts a new connection. + */ + ACCEPT, + + /** + * This event occurs when there is an error with the connection. + */ + ERROR +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionException.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionException.java new file mode 100644 index 0000000..87e5196 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionException.java @@ -0,0 +1,58 @@ +/* + * ConnectionException.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.connect; + +import java.io.IOException; + +/** + * The <code>ConnectionException</code> is thrown if there is a problem + * establishing a connection to the server. Such a problem can occur + * if the server has been stopped when a new connection arrives. This + * can also be thrown if some other connection related issue occurs. + * + * @author Niall Gallagher + */ +class ConnectionException extends IOException { + + /** + * Constructor for the <code>ConnectionException</code> object. This + * is used to represent an exception that is thrown when an error + * occurs during the connect process. Typically this is thrown if + * there is a problem connecting or accepting from a socket. + * + * @param message this is the message describing the exception + */ + public ConnectionException(String message) { + super(message); + } + + /** + * Constructor for the <code>ConnectionException</code> object. This + * is used to represent an exception that is thrown when an error + * occurs during the connect process. Typically this is thrown if + * there is a problem connecting or accepting from a socket. + * + * @param message this is the message describing the exception + * @param cause this is the cause of the producer exception + */ + public ConnectionException(String message, Throwable cause) { + super(message); + initCause(cause); + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAcceptor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAcceptor.java new file mode 100644 index 0000000..ca6fc92 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAcceptor.java @@ -0,0 +1,315 @@ +/* + * Acceptor.java October 2002 + * + * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.connect; + +import static org.simpleframework.transport.connect.ConnectionEvent.ACCEPT; +import static org.simpleframework.transport.connect.ConnectionEvent.ERROR; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.SocketAddress; +import java.nio.channels.SelectableChannel; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.simpleframework.transport.SocketProcessor; +import org.simpleframework.transport.Socket; +import org.simpleframework.transport.SocketWrapper; +import org.simpleframework.transport.reactor.Operation; +import org.simpleframework.transport.trace.Trace; +import org.simpleframework.transport.trace.TraceAnalyzer; + +/** + * The <code>SocketAcceptor</code> object is used to accept incoming + * TCP connections from a specified socket address. This is used by + * the <code>Connection</code> object as a background process to + * accept the connections and hand them to a socket connector. + * <p> + * This is capable of processing SSL connections created by the + * internal server socket. All SSL connections are forced to finish + * the SSL handshake before being dispatched to the server. This + * ensures that there are no problems with reading the request. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.connect.SocketConnection + */ +class SocketAcceptor implements Operation { + + /** + * This is the server socket channel used to accept connections. + */ + private final ServerSocketChannel listener; + + /** + * The handler that manages the incoming TCP connections. + */ + private final SocketProcessor processor; + + /** + * This is the server socket to bind the socket address to. + */ + private final ServerSocket socket; + + /** + * If provided the SSL context is used to create SSL engines. + */ + private final SSLContext context; + + /** + * This is the tracing analyzer used to trace accepted sockets. + */ + private final TraceAnalyzer analyzer; + + /** + * This is the local address to bind the listen socket to. + */ + private final SocketAddress address; + + /** + * This is used to collect trace events with the acceptor. + */ + private final Trace trace; + + /** + * Constructor for the <code>SocketAcceptor</code> object. This + * accepts new TCP connections from the specified server socket. + * Each of the connections that is accepted is configured for + * performance for the application. + * + * @param address this is the address to accept connections from + * @param processor this is used to initiate the HTTP processing + * @param analyzer this is the tracing analyzer to be used + */ + public SocketAcceptor(SocketAddress address, SocketProcessor processor, TraceAnalyzer analyzer) throws IOException { + this(address, processor, analyzer, null); + } + + /** + * Constructor for the <code>SocketAcceptor</code> object. This + * accepts new TCP connections from the specified server socket. + * Each of the connections that is accepted is configured for + * performance for the applications. + * + * @param address this is the address to accept connections from + * @param processor this is used to initiate the HTTP processing + * @param analyzer this is the tracing analyzer to be used + * @param context this is the SSL context used for secure HTTPS + */ + public SocketAcceptor(SocketAddress address, SocketProcessor processor, TraceAnalyzer analyzer, SSLContext context) throws IOException { + this.listener = ServerSocketChannel.open(); + this.trace = analyzer.attach(listener); + this.socket = listener.socket(); + this.context = context; + this.analyzer = analyzer; + this.processor = processor; + this.address = address; + } + + /** + * This is used to acquire the local socket address that this is + * listening to. This required in case the socket address that + * is specified is an emphemeral address, that is an address that + * is assigned dynamically when a port of 0 is specified. + * + * @return this returns the address for the listening address + */ + public SocketAddress getAddress() { + return socket.getLocalSocketAddress(); + } + + /** + * This is used to acquire the trace object that is associated + * with the operation. A trace object is used to collection details + * on what operations are being performed. For instance it may + * contain information relating to I/O events or errors. + * + * @return this returns the trace associated with this operation + */ + public Trace getTrace() { + return trace; + } + + /** + * This is the <code>SelectableChannel</code> which is used to + * determine if the operation should be executed. If the channel + * is ready for a given I/O event it can be run. For instance if + * the operation is used to perform some form of read operation + * it can be executed when ready to read data from the channel. + * + * @return this returns the channel used to govern execution + */ + public SelectableChannel getChannel() { + return listener; + } + + /** + * This is used to configure the server socket for non-blocking + * mode. It will also bind the server socket to the socket port + * specified in the <code>SocketAddress</code> object. Once done + * the acceptor is ready to accept newly arriving connections. + * + * @param address this is the server socket address to bind to + */ + public void bind() throws IOException { + listener.configureBlocking(false); + socket.setReuseAddress(true); + socket.bind(address, 100); + } + + /** + * This is used to accept a new TCP connections. When the socket + * is ready to accept a connection this method is invoked. It will + * then create a HTTP pipeline object using the accepted socket + * and if provided with an <code>SSLContext</code> it will also + * provide an <code>SSLEngine</code> which is handed to the + * processor to handle the HTTP requests. + */ + public void run() { + try { + accept(); + } catch(Exception cause) { + pause(); + } + } + + /** + * This is used to throttle the acceptor when there is an error + * such as exhaustion of file descriptors. This will prevent the + * CPU from being hogged by the acceptor on such occasions. If + * the thread can not be put to sleep then this will freeze. + */ + private void pause() { + try { + Thread.sleep(10); + } catch(Exception cause) { + trace.trace(ERROR, cause); + } + } + + /** + * This is used to cancel the operation if the reactor decides to + * reject it for some reason. Typically this method will never be + * invoked as this operation never times out. However, should the + * reactor cancel the operation this will close the socket. + */ + public void cancel() { + try { + close(); + } catch(Throwable cause) { + trace.trace(ERROR, cause); + } + } + + /** + * The main processing done by this object is done using a thread + * calling the <code>run</code> method. Here the TCP connections + * are accepted from the <code>ServerSocketChannel</code> which + * creates the socket objects. Each socket is then encapsulated in + * to a pipeline and dispatched to the processor for processing. + * + * @throws IOException if there is a problem accepting the socket + */ + private void accept() throws IOException { + SocketChannel channel = listener.accept(); + + while(channel != null) { + Trace trace = analyzer.attach(channel); + + configure(channel); + + if(context == null) { + process(channel, trace, null); + } else { + process(channel, trace); + } + channel = listener.accept(); + } + } + + /** + * This method is used to configure the accepted channel. This + * will disable Nagles algorithm to improve the performance of the + * channel, also this will ensure the accepted channel disables + * blocking to ensure that it works within the processor object. + * + * @param channel this is the channel that is to be configured + */ + private void configure(SocketChannel channel) throws IOException { + channel.socket().setTcpNoDelay(true); + channel.configureBlocking(false); + } + + /** + * This method is used to dispatch the socket for processing. The + * socket will be configured and connected to the client, this + * will hand processing to the <code>Server</code> which will + * create the pipeline instance used to wrap the socket object. + * + * @param channel this is the connected socket to be processed + * @param trace this is the trace to associate with the socket + */ + private void process(SocketChannel channel, Trace trace) throws IOException { + SSLEngine engine = context.createSSLEngine(); + + try { + process(channel, trace, engine); + } catch(Exception cause) { + trace.trace(ERROR, cause); + channel.close(); + } + } + + /** + * This method is used to dispatch the socket for processing. The + * socket will be configured and connected to the client, this + * will hand processing to the <code>Server</code> which will + * create the pipeline instance used to wrap the socket object. + * + * @param channel this is the connected socket to be processed + * @param trace this is the trace to associate with the socket + * @param engine this is the SSL engine used for secure HTTPS + */ + private void process(SocketChannel channel, Trace trace, SSLEngine engine) throws IOException { + Socket socket = new SocketWrapper(channel, trace, engine); + + try { + trace.trace(ACCEPT); + processor.process(socket); + } catch(Exception cause) { + trace.trace(ERROR, cause); + channel.close(); + } + } + + /** + * This is used to close the server socket channel so that the + * port that it is bound to is released. This allows the acceptor + * to close off the interface to the server. Ensuring the socket + * is closed allows it to be recreated at a later point. + * + * @throws IOException thrown if the socket can not be closed + */ + public void close() throws IOException { + listener.close(); + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAnalyzer.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAnalyzer.java new file mode 100644 index 0000000..be6b95c --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAnalyzer.java @@ -0,0 +1,84 @@ +/* + * SocketAnalyzer.java February 2012 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.connect; + +import java.nio.channels.SelectableChannel; + +import org.simpleframework.transport.trace.TraceAnalyzer; +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>SocketAnalyzer</code> is used to wrap an analyzer object. + * Wrapping an analyzer in this way ensures that even if the analyzer + * is badly written there is little chance that it will affect the + * operation of the server. All <code>Trace</code> objects returned + * from this will catch all exceptions within the created trace. + * + * @author Niall Gallagher + */ +class SocketAnalyzer implements TraceAnalyzer { + + /** + * This is the analyzer that is used to create the trace objects. + */ + private final TraceAnalyzer analyzer; + + /** + * Constructor for the <code>SocketAnalyzer</code> object. This will + * be given the analyzer that is to be used to create traces. This + * can be a null value, in which case the trace provided will be + * a simple empty void that swallows all trace events. + * + * @param analyzer the analyzer that is to be wrapped by this + */ + public SocketAnalyzer(TraceAnalyzer analyzer) { + this.analyzer = analyzer; + } + + /** + * This method is used to attach a trace to the specified channel. + * Attaching a trace basically means associating events from that + * trace with the specified socket. It ensures that the events + * from a specific channel can be observed in isolation. + * + * @param channel this is the channel to associate with the trace + * + * @return this returns a trace associated with the channel + */ + public Trace attach(SelectableChannel channel) { + Trace trace = null; + + if(analyzer != null) { + trace = analyzer.attach(channel); + } + return new SocketTrace(trace); + } + + /** + * This is used to stop the analyzer and clear all trace information. + * Stopping the analyzer is typically done when the server is stopped + * and is used to free any resources associated with the analyzer. If + * an analyzer does not hold information this method can be ignored. + */ + public void stop() { + if(analyzer != null) { + analyzer.stop(); + } + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketConnection.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketConnection.java new file mode 100644 index 0000000..c462284 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketConnection.java @@ -0,0 +1,141 @@ +/* + * SocketConnection.java October 2002 + * + * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.connect; + +import java.io.IOException; +import java.net.SocketAddress; + +import javax.net.ssl.SSLContext; + +import org.simpleframework.transport.SocketProcessor; +import org.simpleframework.transport.trace.TraceAnalyzer; + +/** + * The <code>SocketConnection</code>is used to manage connections + * from a server socket. In order to achieve this it spawns a task + * to listen for incoming connect requests. When a TCP connection + * request arrives it hands off the <code>SocketChannel</code> to + * the <code>SocketProcessor</code> which processes the request. + * <p> + * This handles connections from a <code>ServerSocketChannel</code> + * object so that features such as SSL can be used by a server that + * uses this package. The background acceptor process will terminate + * if the connection is closed. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.SocketProcessor + */ +public class SocketConnection implements Connection { + + /** + * This is used to maintain the active connection end points. + */ + private SocketListenerManager manager; + + /** + * The processor is used to process connected HTTP pipelines. + */ + private SocketProcessor processor; + + /** + * This is used to determine if the connection has been closed. + */ + private boolean closed; + + /** + * Constructor for the <code>SocketConnection</code> object. This + * will create a new connection that accepts incoming connections + * and hands these connections as <code>Socket</code> objects + * to the specified connector. This in turn will deliver request + * and response objects to the internal container. + * + * @param processor this is the connector that receives requests + */ + public SocketConnection(SocketProcessor processor) throws IOException { + this(processor, null); + } + + /** + * Constructor for the <code>SocketConnection</code> object. This + * will create a new connection that accepts incoming connections + * and hands these connections as <code>Socket</code> objects + * to the specified processor. This in turn will deliver request + * and response objects to the internal container. + * + * @param processor this is the connector that receives requests + * @param analyzer this is used to create a trace for the socket + */ + public SocketConnection(SocketProcessor processor, TraceAnalyzer analyzer) throws IOException { + this.manager = new SocketListenerManager(processor, analyzer); + this.processor = processor; + } + + /** + * This creates a new background task that will listen to the + * specified <code>ServerAddress</code> for incoming TCP connect + * requests. When an connection is accepted it is handed to the + * internal socket connector. + * + * @param address this is the address used to accept connections + * + * @return this returns the actual local address that is used + */ + public SocketAddress connect(SocketAddress address) throws IOException { + if(closed) { + throw new ConnectionException("Connection is closed"); + } + return manager.listen(address); + } + + /** + * This creates a new background task that will listen to the + * specified <code>ServerAddress</code> for incoming TCP connect + * requests. When an connection is accepted it is handed to the + * internal socket connector. + * + * @param address this is the address used to accept connections + * @param context this is used for secure SSL connections + * + * @return this returns the actual local address that is used + */ + public SocketAddress connect(SocketAddress address, SSLContext context) throws IOException { + if(closed) { + throw new ConnectionException("Connection is closed"); + } + return manager.listen(address, context); + } + + /** + * This is used to close the connection and the server socket + * used to accept connections. This will perform a close of all + * connected server sockets that have been created from using + * the <code>connect</code> method. The connection can be + * reused after the existing server sockets have been closed. + * + * @throws IOException thrown if there is a problem closing + */ + public void close() throws IOException { + if(!closed) { + manager.close(); + processor.stop(); + } + closed = true; + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListener.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListener.java new file mode 100644 index 0000000..5718273 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListener.java @@ -0,0 +1,125 @@ +/* + * SocketListener.java October 2002 + * + * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.connect; + +import static java.nio.channels.SelectionKey.OP_ACCEPT; + +import java.io.Closeable; +import java.io.IOException; +import java.net.SocketAddress; + +import javax.net.ssl.SSLContext; + +import org.simpleframework.transport.SocketProcessor; +import org.simpleframework.transport.reactor.SynchronousReactor; +import org.simpleframework.transport.reactor.Reactor; +import org.simpleframework.transport.trace.TraceAnalyzer; + +/** + * The <code>SocketListener</code> object is represents the interface + * to the server that the clients can connect to. This is responsible + * for making call backs to the <code>SocketAcceptor</code> when there + * is a new connection waiting to be accepted. When the connection + * is to be closed the interface object can be closed. + * + * @author Niall Gallagher + */ +class SocketListener implements Closeable { + + /** + * This is the acceptor that is used to accept the connections. + */ + private final SocketAcceptor acceptor; + + /** + * This is the reactor used to notify the acceptor of sockets. + */ + private final Reactor reactor; + + /** + * Constructor for the <code>SocketListener</code> object. This + * needs a socket address and a processor to hand created sockets + * to. This creates a <code>Reactor</code> which will notify the + * acceptor when there is a new connection waiting to be accepted. + * + * @param address this is the address to listen for new sockets + * @param processor this is the processor that sockets are handed to + * @param analyzer this is used to create a trace to monitor events + */ + public SocketListener(SocketAddress address, SocketProcessor processor, TraceAnalyzer analyzer) throws IOException { + this(address, processor, analyzer, null); + } + + /** + * Constructor for the <code>SocketListener</code> object. This + * needs a socket address and a processor to hand created sockets + * to. This creates a <code>Reactor</code> which will notify the + * acceptor when there is a new connection waiting to be accepted. + * + * @param address this is the address to listen for new sockets + * @param processor this is the processor that sockets are handed to + * @param analyzer this is used to create a trace to monitor events + * @param context this is the SSL context used for secure HTTPS + */ + public SocketListener(SocketAddress address, SocketProcessor processor, TraceAnalyzer analyzer, SSLContext context) throws IOException { + this.acceptor = new SocketAcceptor(address, processor, analyzer, context); + this.reactor = new SynchronousReactor(); + } + + /** + * This is used to acquire the local socket address that this is + * listening to. This required in case the socket address that + * is specified is an emphemeral address, that is an address that + * is assigned dynamically when a port of 0 is specified. + * + * @return this returns the address for the listening address + */ + public SocketAddress getAddress() { + return acceptor.getAddress(); + } + + /** + * This is used to register the socket acceptor to listen for + * new connections that are ready to be accepted. Once this is + * registered it will remain registered until the interface is + * closed, at which point the socket is closed. + */ + public void process() throws IOException { + try { + acceptor.bind(); + reactor.process(acceptor, OP_ACCEPT); + } catch(Exception cause) { + throw new ConnectionException("Listen error", cause); + } + } + + /** + * This is used to close the connection and the server socket + * used to accept connections. This will perform a close of the + * connected server socket and the dispatching thread. + */ + public void close() throws IOException { + try { + acceptor.close(); + reactor.stop(); + } catch(Exception cause) { + throw new ConnectionException("Close error", cause); + } + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListenerManager.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListenerManager.java new file mode 100644 index 0000000..b0330f1 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListenerManager.java @@ -0,0 +1,127 @@ +/* + * SocketListenerManager.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.connect; + +import java.io.Closeable; +import java.io.IOException; +import java.net.SocketAddress; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import javax.net.ssl.SSLContext; + +import org.simpleframework.transport.SocketProcessor; +import org.simpleframework.transport.trace.TraceAnalyzer; + +/** + * The <code>SocketListenerManager</code> contains all the listeners + * that have been created for a connection. This set is used to hold + * and manage the listeners that have been created for a connection. + * All listeners will be closed if the listener manager is closed. + * This ensures all resources held by the manager can be released. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.connect.SocketConnection + */ +class SocketListenerManager implements Closeable { + + /** + * This is the set of active socket listeners for this manager. + */ + private final Set<SocketListener> listeners; + + /** + * This is the processor that listeners will dispatch sockets to. + */ + private final SocketProcessor processor; + + /** + * This is the analyzer used to create a trace for the sockets. + */ + private final TraceAnalyzer analyzer; + + /** + * Constructor for the <code>SocketListenerManager</code> object. + * This is used to create a manager that will enable listeners to + * be created to listen to specified sockets for incoming TCP + * connections, which will be converted to socket objects. + * + * @param processor this is the processor to hand sockets to + * @param analyzer this is the agent used to trace socket events + */ + public SocketListenerManager(SocketProcessor processor, TraceAnalyzer analyzer) { + this.listeners = new CopyOnWriteArraySet<SocketListener>(); + this.analyzer = new SocketAnalyzer(analyzer); + this.processor = processor; + } + + /** + * This creates a new background task that will listen to the + * specified <code>ServerAddress</code> for incoming TCP connect + * requests. When an connection is accepted it is handed to the + * internal socket connector. + * + * @param address this is the address used to accept connections + * + * @return this returns the actual local address that is used + */ + public SocketAddress listen(SocketAddress address) throws IOException { + return listen(address, null); + } + + /** + * This creates a new background task that will listen to the + * specified <code>ServerAddress</code> for incoming TCP connect + * requests. When an connection is accepted it is handed to the + * internal socket connector. + * + * @param address this is the address used to accept connections + * @param context this is used for secure SSL connections + * + * @return this returns the actual local address that is used + */ + public SocketAddress listen(SocketAddress address, SSLContext context) throws IOException { + SocketListener listener = new SocketListener(address, processor, analyzer, context); + + if(processor != null) { + listener.process(); + listeners.add(listener); + } + return listener.getAddress(); + } + + /** + * This is used to close all the listeners that have been + * added to the connection. Closing all the listeners in the + * set ensures that there are no lingering threads or sockets + * consumed by the connection after the connection is closed. + * + * @throws IOException thrown if there is an error closing + */ + public void close() throws IOException { + for(Closeable listener : listeners) { + listener.close(); + } + if(analyzer != null) { + analyzer.stop(); + } + listeners.clear(); + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketTrace.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketTrace.java new file mode 100644 index 0000000..1805533 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketTrace.java @@ -0,0 +1,75 @@ +/* + * SocketTrace.java February 2012 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.connect; + +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>SocketTrace</code> is used to wrap an trace for safety. + * Wrapping an trace in this way ensures that even if the trace is + * badly written there is little chance that it will affect the + * operation of the server. + * + * @author Niall Gallagher + */ +class SocketTrace implements Trace { + + /** + * This is the actual trace that is being wrapped by this. + */ + private final Trace trace; + + /** + * Constructor for the <code>SocketTrace</code> object. This will + * create a trace object that wraps the one provided. If the + * provided trace is null then this will simply ignore all events. + * + * @param trace this is the trace that is to be wrapped by this + */ + public SocketTrace(Trace trace) { + this.trace = trace; + } + + /** + * This method is used to accept an event that occurred on the socket + * associated with this trace. Typically the event is a symbolic + * description of the event such as an enum or a string. + * + * @param event this is the event that occurred on the socket + */ + public void trace(Object event) { + if(trace != null) { + trace.trace(event); + } + } + + /** + * This method is used to accept an event that occurred on the socket + * associated with this trace. Typically the event is a symbolic + * description of the event such as an enum or a string. + * + * @param event this is the event that occurred on the socket + * @param value provides additional information such as an exception + */ + public void trace(Object event, Object value) { + if(trace != null) { + trace.trace(event, value); + } + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Action.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Action.java new file mode 100644 index 0000000..4989eae --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Action.java @@ -0,0 +1,71 @@ +/* + * Action.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.nio.channels.SelectableChannel; + +/** + * The <code>Action</code> object is used to represent an action that + * the distributor is to process. This contains the operation and + * the required I/O events as an integer bit mask. When an operation + * is considered ready it will be handed to an executor to execute. + * + * @author Niall Gallagher + */ +interface Action extends Runnable { + + /** + * This is used to get the expiry for the operation. The expiry + * represents some static time in the future when the action will + * expire if it does not become ready. This is used to cancel the + * operation so that it does not remain in the distributor. + * + * @return the remaining time this operation will wait for + */ + long getExpiry(); + + /** + * This returns the I/O operations that the action is interested + * in as an integer bit mask. When any of these operations are + * ready the distributor will execute the provided operation. + * + * @return the integer bit mask of interested I/O operations + */ + int getInterest(); + + /** + * This is the <code>SelectableChannel</code> which is used to + * determine if the operation should be executed. If the channel + * is ready for a given I/O event it can be run. For instance if + * the operation is used to perform some form of read operation + * it can be executed when ready to read data from the channel. + * + * @return this returns the channel used to govern execution + */ + SelectableChannel getChannel(); + + /** + * This is used to acquire the <code>Operation</code> that is to + * be executed when the required operations are ready. It is the + * responsibility of the distributor to invoke the operation. + * + * @return the operation to be executed when it is ready + */ + Operation getOperation(); +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java new file mode 100644 index 0000000..65dc8d2 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java @@ -0,0 +1,741 @@ +/* + * ActionDistributor.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import static java.nio.channels.SelectionKey.OP_READ; +import static java.nio.channels.SelectionKey.OP_WRITE; +import static org.simpleframework.transport.reactor.ReactorEvent.CHANNEL_CLOSED; +import static org.simpleframework.transport.reactor.ReactorEvent.CLOSE_SELECTOR; +import static org.simpleframework.transport.reactor.ReactorEvent.ERROR; +import static org.simpleframework.transport.reactor.ReactorEvent.EXECUTE_ACTION; +import static org.simpleframework.transport.reactor.ReactorEvent.INVALID_KEY; +import static org.simpleframework.transport.reactor.ReactorEvent.READ_INTEREST_READY; +import static org.simpleframework.transport.reactor.ReactorEvent.REGISTER_INTEREST; +import static org.simpleframework.transport.reactor.ReactorEvent.REGISTER_READ_INTEREST; +import static org.simpleframework.transport.reactor.ReactorEvent.REGISTER_WRITE_INTEREST; +import static org.simpleframework.transport.reactor.ReactorEvent.SELECT; +import static org.simpleframework.transport.reactor.ReactorEvent.SELECT_CANCEL; +import static org.simpleframework.transport.reactor.ReactorEvent.SELECT_EXPIRED; +import static org.simpleframework.transport.reactor.ReactorEvent.UPDATE_INTEREST; +import static org.simpleframework.transport.reactor.ReactorEvent.UPDATE_READ_INTEREST; +import static org.simpleframework.transport.reactor.ReactorEvent.UPDATE_WRITE_INTEREST; +import static org.simpleframework.transport.reactor.ReactorEvent.WRITE_INTEREST_READY; + +import java.io.IOException; +import java.nio.channels.Channel; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; + +import org.simpleframework.common.thread.Daemon; +import org.simpleframework.transport.trace.Trace; + + /** + * The <code>ActionDistributor</code> is used to execute operations + * that have an interested I/O event ready. This acts much like a + * scheduler would in that it delays the execution of the operations + * until such time as the associated <code>SelectableChannel</code> + * has an interested I/O event ready. + * <p> + * This distributor has two modes, one mode is used to cancel the + * channel once an I/O event has occurred. This means that the channel + * is removed from the <code>Selector</code> so that the selector + * does not break when asked to select again. cancelling the channel + * is useful when the operation execution may not fully read the + * payload or when the operation takes a significant amount of time. + * + * @see org.simpleframework.transport.reactor.ExecutorReactor + */ +class ActionDistributor extends Daemon implements OperationDistributor { + + /** + * This is used to determine the operations that need cancelling. + */ + private Map<Channel, ActionSet> executing; + + /** + * This is used to keep track of actions currently in selection. + */ + private Map<Channel, ActionSet> selecting; + + /** + * This is the queue that is used to invalidate channels. + */ + private Queue<Channel> invalid; + + /** + * This is the queue that is used to provide the operations. + */ + private Queue<Action> pending; + + /** + * This is the selector used to select for interested events. + */ + private ActionSelector selector; + + /** + * This is used to execute the operations that are ready. + */ + private Executor executor; + + /** + * This is used to signal when the distributor has closed. + */ + private Latch latch; + + /** + * This is the duration in milliseconds the operation expires in. + */ + private long expiry; + + /** + * This is time in milliseconds when the next expiry will occur. + */ + private long update; + + /** + * This is used to determine the mode the distributor uses. + */ + private boolean cancel; + + /** + * Constructor for the <code>ActionDistributor</code> object. This + * will create a distributor that distributes operations when those + * operations show that they are ready for a given I/O event. The + * interested I/O events are provided as a bitmask taken from the + * actions of the <code>SelectionKey</code>. Distribution of the + * operations is passed to the provided executor object. + * + * @param executor this is the executor used to execute operations + */ + public ActionDistributor(Executor executor) throws IOException { + this(executor, true); + } + + /** + * Constructor for the <code>ActionDistributor</code> object. This + * will create a distributor that distributes operations when those + * operations show that they are ready for a given I/O event. The + * interested I/O events are provided as a bitmask taken from the + * actions of the <code>SelectionKey</code>. Distribution of the + * operations is passed to the provided executor object. + * + * @param executor this is the executor used to execute operations + * @param cancel should the channel be removed from selection + */ + public ActionDistributor(Executor executor, boolean cancel) throws IOException { + this(executor, cancel, 120000); + } + + /** + * Constructor for the <code>ActionDistributor</code> object. This + * will create a distributor that distributes operations when those + * operations show that they are ready for a given I/O event. The + * interested I/O events are provided as a bitmask taken from the + * actions of the <code>SelectionKey</code>. Distribution of the + * operations is passed to the provided executor object. + * + * @param executor this is the executor used to execute operations + * @param cancel should the channel be removed from selection + * @param expiry this the maximum idle time for an operation + */ + public ActionDistributor(Executor executor, boolean cancel, long expiry) throws IOException { + this.selecting = new LinkedHashMap<Channel, ActionSet>(); + this.executing = new LinkedHashMap<Channel, ActionSet>(); + this.pending = new ConcurrentLinkedQueue<Action>(); + this.invalid = new ConcurrentLinkedQueue<Channel>(); + this.selector = new ActionSelector(); + this.latch = new Latch(); + this.executor = executor; + this.cancel = cancel; + this.expiry = expiry; + this.start(); + } + + /** + * This is used to process the <code>Operation</code> object. This + * will wake up the selector if it is currently blocked selecting + * and register the operations associated channel. Once the + * selector is awake it will acquire the operation from the queue + * and register the associated <code>SelectableChannel</code> for + * selection. The operation will then be executed when the channel + * is ready for the interested I/O events. + * + * @param task this is the task that is scheduled for distribution + * @param require this is the bit-mask value for interested events + */ + public void process(Operation task, int require) throws IOException { + Action action = new ExecuteAction(task, require, expiry); + + if(!isActive()) { + throw new IOException("Distributor is closed"); + } + pending.offer(action); + selector.wake(); + } + + /** + * This is used to close the distributor such that it cancels all + * of the registered channels and closes down the selector. This + * is used when the distributor is no longer required, after the + * close further attempts to process operations will fail. + */ + public void close() throws IOException { + stop(); + selector.wake(); + latch.close(); + } + + /** + * This returns the number of channels that are currently selecting + * with this distributor. When busy this can get quite high, however + * it must return to zero as soon as all tasks have completed. + * + * @return return the number of channels currently selecting + */ + public int size() { + return selecting.size(); + } + + /** + * Performs the execution of the distributor. Each distributor runs + * on an asynchronous thread to the <code>Reactor</code> which is + * used to perform the selection on a set of channels. Each time + * there is a new operation to be processed this will take the + * operation from the ready queue, cancel all outstanding channels, + * and register the operations associated channel for selection. + */ + public void run() { + try { + execute(); + } finally { + purge(); + } + } + + /** + * Performs the execution of the distributor. Each distributor runs + * on an asynchronous thread to the <code>Reactor</code> which is + * used to perform the selection on a set of channels. Each time + * there is a new operation to be processed this will take the + * operation from the ready queue, cancel all outstanding channels, + * and register the operations associated channel for selection. + */ + private void execute() { + while(isActive()) { + try { + register(); + cancel(); + expire(); + distribute(); + validate(); + } catch(Exception cause) { + report(cause); + } + } + } + + /** + * This will purge all the actions from the distributor when the + * distributor ends. If there are any threads waiting on the close + * to finish they are signalled when all operations are purged. + * This will allow them to return ensuring no operations linger. + */ + private void purge() { + try { + register(); + cancel(); + clear(); + } catch(Exception cause) { + report(cause); + } + } + + /** + * This method is called to ensure that if there is a global + * error that each action will know about it. Such an issue could + * be file handle exhaustion or an out of memory error. It is + * also possible that a poorly behaving action could cause an + * issue which should be know the the entire system. + * + * @param cause this is the exception to report + */ + private void report(Exception cause) { + Set<Channel> channels = selecting.keySet(); + + for(Channel channel : channels) { + ActionSet set = selecting.get(channel); + Action[] list = set.list(); + + for(Action action : list) { + Operation operation = action.getOperation(); + Trace trace = operation.getTrace(); + + try { + trace.trace(ERROR, cause); + } catch(Exception e) { + invalid.offer(channel); + } + } + } + invalid.clear(); + } + + /** + * Here we perform an expire which will take all of the registered + * sockets and expire it. This ensures that the operations can be + * executed within the executor and the cancellation of the sockets + * can be performed. Once this method has finished then all of + * the operations will have been scheduled for execution. + */ + private void clear() throws IOException { + List<ActionSet> sets = selector.registeredSets(); + + for(ActionSet set : sets) { + Action[] list = set.list(); + + for(Action action : list) { + Operation task = action.getOperation(); + Trace trace = task.getTrace(); + + try { + trace.trace(CLOSE_SELECTOR); + expire(set, Long.MAX_VALUE); + } catch(Exception cause) { + trace.trace(ERROR, cause); + } + } + } + selector.close(); + latch.signal(); + } + + /** + * This method is used to expire registered operations that remain + * idle within the selector. Operations specify a time at which + * point they wish to be cancelled if the I/O event they wait on + * has not arisen. This will enables the cancelled operation to be + * cancelled so that the resources it occupies can be released. + */ + private void expire() throws IOException { + List<ActionSet> sets = selector.registeredSets(); + + if(cancel) { + long time = System.currentTimeMillis(); + + if(update <= time) { + for(ActionSet set : sets) { + expire(set, time); + } + update = time +10000; + } + } + } + + /** + * This method is used to expire registered operations that remain + * idle within the selector. Operations specify a time at which + * point they wish to be if the I/O event they wait on + * has not arisen. This will enables the cancelled operation to be + * cancelled so that the resources it occupies can be released. + * + * @param set this is the selection set check for expired actions + * @param time this is the time to check the expiry against + */ + private void expire(ActionSet set, long time) throws IOException { + Action[] actions = set.list(); + SelectionKey key = set.key(); + + if(key.isValid()) { + int mask = key.interestOps(); + + for(Action action : actions) { + int interest = action.getInterest(); + long expiry = action.getExpiry(); + + if(expiry < time) { + expire(set, action); + mask &= ~interest; + } + } + update(set, mask); + } + } + + /** + * This is used to update the interested operations of a set of + * actions. If there are no interested operations the set will be + * cancelled, otherwise the selection key will be updated with the + * new operations provided by the bitmask. + * + * @param set this is the action set that is to be updated + * @param interest this is the bitmask containing the operations + */ + private void update(ActionSet set, int interest) throws IOException { + SelectionKey key = set.key(); + + if(interest == 0) { + Channel channel = key.channel(); + + selecting.remove(channel); + key.cancel(); + } else { + key.interestOps(interest); + } + } + + /** + * This method is used to expire registered operations that remain + * idle within the selector. Operations specify a time at which + * point they wish to be cancelled if the I/O event they wait on + * has not arisen. This will enables the cancelled operation to be + * cancelled so that the resources it occupies can be released. + * + * @param set this is the action set containing the actions + * @param action this is the actual action to be cancelled + */ + private void expire(ActionSet set, Action action) throws IOException { + Action cancel = new CancelAction(action); + + if(set != null) { + Operation task = action.getOperation(); + Trace trace = task.getTrace(); + int interest = action.getInterest(); + + try { + trace.trace(SELECT_EXPIRED, interest); + set.remove(interest); + execute(cancel); + } catch(Exception cause) { + trace.trace(ERROR, cause); + } + } + } + + /** + * This method is used to perform simple validation. It ensures + * that directly after the processing loop any channels that + * are registered that have been cancelled or are closed will + * be removed from the selecting map and rejected. + */ + private void validate() throws IOException { + Set<Channel> channels = selecting.keySet(); + + for(Channel channel : channels) { + ActionSet set = selecting.get(channel); + SelectionKey key = set.key(); + + if(!key.isValid()) { + invalid.offer(channel); + } + } + for(Channel channel : invalid) { + invalidate(channel); + } + invalid.clear(); + } + + /** + * This method is used to remove the channel from the selecting + * registry. It is rare that this will every happen, however it + * is important that tasks are cleared out in this manner as it + * could lead to a memory leak if left for a long time. + * + * @param channel this is the channel being validated + */ + private void invalidate(Channel channel) throws IOException { + ActionSet set = selecting.remove(channel); + Action[] list = set.list(); + + for(Action action : list) { + Operation task = action.getOperation(); + Trace trace = task.getTrace(); + + try { + trace.trace(INVALID_KEY); + execute(action); // reject + } catch(Exception cause) { + trace.trace(ERROR, cause); + } + } + } + + /** + * This is used to cancel any selection keys that have previously + * been selected with an interested I/O event. Performing a cancel + * here ensures that on a the next select the associated channel + * is not considered, this ensures the select does not break. + */ + private void cancel() throws IOException { + Collection<ActionSet> list = executing.values(); + + for(ActionSet set : list) { + Action[] actions = set.list(); + + for(Action action : actions) { + Operation task = action.getOperation(); + Trace trace = task.getTrace(); + + trace.trace(SELECT_CANCEL); + } + set.cancel(); + set.clear(); + } + executing.clear(); + } + + /** + * Here all the enqueued <code>Operation</code> objects will be + * registered for selection. Each operations channel is used for + * selection on the interested I/O events. Once the I/O event + * occurs for the channel the operation is scheduled for execution. + */ + private void register() throws IOException { + while(!pending.isEmpty()) { + Action action = pending.poll(); + + if(action != null) { + SelectableChannel channel = action.getChannel(); + ActionSet set = executing.remove(channel); + + if(set == null) { + set = selecting.get(channel); + } + if(set != null) { + update(action, set); + } else { + register(action); + } + } + } + } + + /** + * Here the specified <code>Operation</code> object is registered + * with the selector. If the associated channel had previously + * been cancelled it is removed from the cancel map to ensure it + * is not removed from the selector when cancellation is done. + * + * @param action this is the operation that is to be registered + */ + private void register(Action action) throws IOException { + SelectableChannel channel = action.getChannel(); + Operation task = action.getOperation(); + Trace trace = task.getTrace(); + + try { + if(channel.isOpen()) { + trace.trace(SELECT); + select(action); + } else { + trace.trace(CHANNEL_CLOSED); + selecting.remove(channel); + execute(action); // reject + } + }catch(Exception cause) { + trace.trace(ERROR, cause); + } + } + + /** + * Here the specified <code>Operation</code> object is registered + * with the selector. If the associated channel had previously + * been cancelled it is removed from the cancel map to ensure it + * is not removed from the selector when cancellation is done. + * + * @param action this is the operation that is to be registered + * @param set this is the action set to register the action with + */ + private void update(Action action, ActionSet set) throws IOException { + Operation task = action.getOperation(); + Trace trace = task.getTrace(); + SelectionKey key = set.key(); + int interest = action.getInterest(); + int current = key.interestOps(); + int updated = current | interest; + + try { + if(OP_READ == (interest & OP_READ)) { + trace.trace(UPDATE_READ_INTEREST); + } + if(OP_WRITE == (interest & OP_WRITE)) { + trace.trace(UPDATE_WRITE_INTEREST); + } + trace.trace(UPDATE_INTEREST, updated); + key.interestOps(updated); + set.attach(action); + } catch(Exception cause) { + trace.trace(ERROR, cause); + } + } + + /** + * This method is used to perform an actual select on a channel. It + * will register the channel with the internal selector using the + * required I/O event bit mask. In order to ensure that selection + * is performed correctly the provided channel must be connected. + * + * @param action this is the operation that is to be registered + * + * @return this returns the selection key used for selection + */ + private void select(Action action) throws IOException { + SelectableChannel channel = action.getChannel(); + Operation task = action.getOperation(); + Trace trace = task.getTrace(); + int interest = action.getInterest(); + + if(interest > 0) { + ActionSet set = selector.register(channel, interest); + + if(OP_READ == (interest & OP_READ)) { + trace.trace(REGISTER_READ_INTEREST); + } + if(OP_WRITE == (interest & OP_WRITE)) { + trace.trace(REGISTER_WRITE_INTEREST); + } + trace.trace(REGISTER_INTEREST, interest); + set.attach(action); + selecting.put(channel, set); + } + } + + /** + * This method is used to perform the select and if required queue + * the operations that are ready for execution. If the selector + * is woken up without any ready channels then this will return + * quietly. If however there are a number of channels ready to be + * processed then they are handed to the executor object and + * marked as ready for cancellation. + */ + private void distribute() throws IOException { + if(selector.select(5000) > 0) { + if(isActive()) { + process(); + } + } + } + + /** + * This will iterate over the set of selection keys and process each + * of them. The <code>Operation</code> associated with the selection + * key is handed to the executor to perform the channel operation. + * Also, if configured to cancel, this method will add the channel + * and the associated selection key to the cancellation map. + */ + private void process() throws IOException{ + List<ActionSet> ready = selector.selectedSets(); + + for(ActionSet set : ready) { + process(set); + remove(set); + } + } + + /** + * This will use the specified action set to acquire the channel + * and <code>Operation</code> associated with it to hand to the + * executor to perform the channel operation. + * + * @param set this is the set of actions that are to be processed + */ + private void process(ActionSet set) throws IOException { + Action[] actions = set.ready(); + + for(Action action : actions) { + Operation task = action.getOperation(); + Trace trace = task.getTrace(); + int interest = action.getInterest(); + + try { + if(OP_READ == (interest & OP_READ)) { + trace.trace(READ_INTEREST_READY, interest); + } + if(OP_WRITE == (interest & OP_WRITE)) { + trace.trace(WRITE_INTEREST_READY, interest); + } + execute(action); + } catch(Exception cause) { + trace.trace(ERROR, cause); + } + } + } + + /** + * This method ensures that references to the actions and channel + * are cleared from this instance. To ensure there are no memory + * leaks it is important to clear out all actions and channels. + * Also, if configured to cancel executing actions this will + * register the channel and actions to cancel on the next loop. + * + * @param set this is the set of actions that are to be removed + */ + private void remove(ActionSet set) throws IOException { + Channel channel = set.channel(); + SelectionKey key = set.key(); + + if(key.isValid()) { + int interest = set.interest(); + int ready = key.readyOps(); + + if(cancel) { + int remaining = interest & ~ready; + + if(remaining == 0) { + executing.put(channel, set); + } else { + key.interestOps(remaining); + } + set.remove(ready); + } + } else { + selecting.remove(channel); + } + } + + /** + * This is where the action is handed off to the executor. Before + * the action is executed a trace event is generated, this will + * ensure that the entry and exit points can be tracked. It is + * also useful in debugging performance issues and memory leaks. + * + * @param action this is the action to execute + */ + private void execute(Action action) { + Operation task = action.getOperation(); + Trace trace = task.getTrace(); + int interest = action.getInterest(); + + try { + trace.trace(EXECUTE_ACTION, interest); + executor.execute(action); + } catch(Exception cause) { + trace.trace(ERROR, cause); + } + } +} + +
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSelector.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSelector.java new file mode 100644 index 0000000..a9e78e9 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSelector.java @@ -0,0 +1,193 @@ +/* + * ActionSelector.java February 2013 + * + * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +/** + * The <code>ActionSelector</code> object is used to perform socket + * based selection with the help of the <code>ActionSet</code> object. + * All registered channels have an associated action set. The action + * set contains a set of actions that should be executed when the + * selector selects the channel. + * + * @author Niall Gallagher + */ +class ActionSelector { + + /** + * This is the selector used to perform the select operations. + */ + private final Selector selector; + + /** + * Constructor for the <code>ActionSelector</code> object. This is + * used to create a selector that can register socket channels + * with an associated <code>ActionSet</code>. The set can then be + * used to store actions to be executed upon selection. + */ + public ActionSelector() throws IOException { + this.selector = Selector.open(); + } + + /** + * This is used to perform a select on the selector. This returns + * the number of channels that have been selected. If this returns + * a number greater than zero the <code>ready</code> method can + * be used to acquire the actions that are ready for execution. + * + * @param timeout this is the timeout associated with the select + * + * @return this returns the number of channels that are ready + */ + public int select(long timeout) throws IOException { + return selector.select(timeout); + } + + /** + * This performs the actual registration of the channel for selection + * based on the provided interest bitmask. When the channel has been + * registered for selection this returns an <code>ActionSet</code> + * for the selection key. The set can then be used to register the + * actions that should be executed when selection succeeds. + * + * @param channel this is the channel to register for selection + * @param interest this is the interested operations bitmask + * + * @return this is the action set associated with the registration + */ + public ActionSet register(SelectableChannel channel, int interest) throws IOException { + SelectionKey key = channel.register(selector, interest); + Object value = key.attachment(); + + if(value == null) { + value = new ActionSet(key); + key.attach(value); + } + return (ActionSet)value; + } + + /** + * This is used to acquire all the action sets that are associated + * with this selector. Only action sets that have a valid selection + * key are returned. Modification of the list will not affect the + * associated selector instance. + * + * @return this returns all the associated action sets for this + */ + public List<ActionSet> registeredSets() { + Set<SelectionKey> keys = selector.keys(); + Iterator<SelectionKey> ready = keys.iterator(); + + return registeredSets(ready); + } + + /** + * This is used to acquire all the action sets that are associated + * with this selector. Only action sets that have a valid selection + * key are returned. Modification of the list will not affect the + * associated selector instance. + * + * @param keys the selection keys to get the associated sets from + * + * @return this returns all the associated action sets for this + */ + private List<ActionSet> registeredSets(Iterator<SelectionKey> keys) { + List<ActionSet> sets = new LinkedList<ActionSet>(); + + while(keys.hasNext()) { + SelectionKey key = keys.next(); + ActionSet actions = (ActionSet)key.attachment(); + + if(!key.isValid()) { + key.cancel(); + } else { + sets.add(actions); + } + } + return sets; + } + + /** + * This is used to acquire all the action sets that are selected + * by this selector. All action sets returned are unregistered from + * the selector and must be registered again to hear about further + * I/O events that occur on the associated channel. + * + * @return this returns all the selected action sets for this + */ + public List<ActionSet> selectedSets() throws IOException { + Set<SelectionKey> keys = selector.selectedKeys(); + Iterator<SelectionKey> ready = keys.iterator(); + + return selectedSets(ready); + } + + /** + * This is used to acquire all the action sets that are selected + * by this selector. All action sets returned are unregistered from + * the selector and must be registered again to hear about further + * I/O events that occur on the associated channel. + * + * @param keys the selection keys to get the associated sets from + * + * @return this returns all the selected action sets for this + */ + private List<ActionSet> selectedSets(Iterator<SelectionKey> keys) { + List<ActionSet> ready = new LinkedList<ActionSet>(); + + while(keys.hasNext()) { + SelectionKey key = keys.next(); + ActionSet actions = (ActionSet)key.attachment(); + + if(key != null) { + keys.remove(); + } + if(key != null) { + ready.add(actions); + } + } + return ready; + } + + /** + * This is used to wake the selector if it is in the middle of a + * select operation. Waking up the selector in this manner is + * useful if further actions are to be registered with it. + */ + public void wake() throws IOException { + selector.wakeup(); + } + + /** + * This is used to close the associated selector. Further attempts + * to register a channel with the selector will fail. All actions + * should be cancelled before closing the selector in this way. + */ + public void close() throws IOException { + selector.close(); + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSet.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSet.java new file mode 100644 index 0000000..adc4ef7 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSet.java @@ -0,0 +1,269 @@ +/* + * ActionSet.java February 2013 + * + * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import static java.nio.channels.SelectionKey.OP_ACCEPT; +import static java.nio.channels.SelectionKey.OP_CONNECT; +import static java.nio.channels.SelectionKey.OP_READ; +import static java.nio.channels.SelectionKey.OP_WRITE; + +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; + +/** + * The <code>ActionSet</code> object represents a set of actions that + * are associated with a particular selection key. Here the set + * stores an <code>Action</code> for each of the interested operation + * types. In some situations a single action may be interested in + * several operations which must be remembered by the set. + * + * @author Niall Gallagher + */ +class ActionSet { + + /** + * This is the selection key associated with the action set. + */ + private final SelectionKey key; + + /** + * This contains the the actions indexed by operation type. + */ + private final Action[] set; + + /** + * Constructor for the <code>ActionSet</code> object. This is + * used to create a set for storing actions keyed by operation + * type. Only one action is kept per operation type. + * + * @param key this is the associated selection key + */ + public ActionSet(SelectionKey key) { + this.set = new Action[4]; + this.key = key; + } + + /** + * This provides the selection key associated with the action set. + * For each ready operation on the selection key the set contains + * an action that can be executed. + * + * @return this provides the selection key for this action set + */ + public SelectionKey key() { + return key; + } + + /** + * This provides the channel associated with the action set. This + * is the channel that is registered for selection using the + * interested operations for the set. + * + * @return this returns the selectable channel for the action set + */ + public SelectableChannel channel() { + return key.channel(); + } + + /** + * This provides an iterator of the actions that exist within the + * action set. Regardless of whether a single action is interested + * is several operations this will return an iteration of unique + * actions. Modifications to the iterator do not affect the set. + * + * @return this returns an iterator of unique actions for the set + */ + public Action[] list() { + Action[] actions = new Action[4]; + int count = 0; + + for(Action action : set) { + if(action != null) { + actions[count++] = action; + } + } + return copyOf(actions, count); + } + + /** + * This is sued to acquire all actions that match the currently + * ready operations of the key. All actions returned by this will + * be executed and the interest will typically be removed. + * + * @return returns the array of ready operations for the set + */ + public Action[] ready() { + int ready = key.readyOps(); + + if(ready != 0) { + return get(ready); + } + return new Action[]{}; + } + + /** + * This is used to attach an action to the set for a specific + * interest bitmask. If the bitmask contains several operations + * then the action is registered for each individual operation. + * + * @param action this is the action that is to be attached + * @param interest this is the interest for the action + */ + public void attach(Action action) { + int interest = action.getInterest(); + + if((interest | OP_READ) == interest) { + set[0] = action; + } + if((interest | OP_WRITE) == interest) { + set[1] = action; + } + if((interest | OP_ACCEPT) == interest) { + set[2] = action; + } + if((interest | OP_CONNECT) == interest) { + set[3] = action; + } + } + + /** + * This is used to remove interest from the set. Removal of + * interest from the set is performed by registering a null for + * the interest operation. + * + * @param interest this is the interest to be removed + */ + public Action[] remove(int interest) { + Action[] actions = get(interest); + + if((interest | OP_READ) == interest) { + set[0] = null; + } + if((interest | OP_WRITE) == interest) { + set[1] = null; + } + if((interest | OP_ACCEPT) == interest) { + set[2] = null; + } + if((interest | OP_CONNECT) == interest) { + set[3] = null; + } + return actions; + } + + /** + * This is used to acquire the actions that match the bitmask of + * interest operations. If there are no actions representing the + * interest required an empty array will be returned. + * + * @param interest this is the interest to acquire actions for + * + * @return this will return an array of actions for the interest + */ + public Action[] get(int interest) { + Action[] actions = new Action[4]; + int count = 0; + + if((interest | OP_READ) == interest) { + if(set[0] != null) { + actions[count++] = set[0]; + } + } + if((interest | OP_WRITE) == interest) { + if(set[1] != null) { + actions[count++] = set[1]; + } + } + if((interest | OP_ACCEPT) == interest) { + if(set[2] != null) { + actions[count++] = set[2]; + } + } + if((interest | OP_CONNECT) == interest) { + if(set[3] != null) { + actions[count++] = set[3]; + } + } + return copyOf(actions, count); + } + + /** + * This is used to create a copy of the specified list with only + * the first few non null values. This ensures we can keep the + * internal array immutable and still use arrays. + * + * @param list this is the list that is to be copied to a new array + * @param count this is the number of entries to copy from the list + * + * @return a copy of the original list up to the specified count + */ + private Action[] copyOf(Action[] list, int count) { + Action[] copy = new Action[count]; + + for(int i = 0; i < count; i++) { + copy[i] = list[i]; + } + return copy; + } + + /** + * This is used to acquire the operations that this is interested + * in. If there are currently no registered actions then this will + * return zero. Interest is represented by non-null actions only. + * + * @return this returns the interested operations for this + */ + public int interest() { + int interest = 0; + + if(set[0] != null) { + interest |= OP_READ; + } + if(set[1] != null) { + interest |= OP_WRITE; + } + if(set[2] != null) { + interest |= OP_ACCEPT; + } + if(set[3] != null) { + interest |= OP_CONNECT; + } + return interest; + } + + /** + * This is used to clear all interest from the set. This will + * basically clear out any actions that have been registered with + * the set. After invocation the iterator will be empty. + */ + public void clear() { + set[0] = set[1] = + set[2] = set[3] = null; + } + + /** + * This is used to cancel the <code>SelectionKey</code> associated + * with the action set. Canceling the key in this manner ensures + * it is not returned in further selection operations. + */ + public void cancel() { + key.cancel(); + } +} +
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/CancelAction.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/CancelAction.java new file mode 100644 index 0000000..c4ad894 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/CancelAction.java @@ -0,0 +1,114 @@ +/* + * CancelAction.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.nio.channels.SelectableChannel; + +/** + * The <code>CancelAction</code> object is used to represent a task + * that can be executed to cancel an operation. This is used in the + * place of a normal <code>Operation</code> to pass for execution + * when the operation has expired before the I/O event is was + * interested in occurred. Before this is executed the operation is + * removed from selection. + * + * @author Niall Gallagher + */ +class CancelAction implements Action { + + /** + * This is the operation that is to be canceled by this action. + */ + private final Operation task; + + /** + * This is the operation object that is to be canceled. + */ + private final Action action; + + /** + * Constructor for the <code>Cancellation</code> object. This is + * used to create a runnable task that delegates to the cancel + * method of the operation. This will be executed asynchronously + * by the executor after being removed from selection. + * + * @param action this is the task that is to be canceled by this + */ + public CancelAction(Action action) { + this.task = action.getOperation(); + this.action = action; + } + + /** + * This method is executed by the <code>Executor</code> object + * if the operation expires before the required I/O event(s) + * have occurred. It is typically used to shutdown the socket + * and release any resources associated with the operation. + */ + public void run() { + task.cancel(); + } + + /** + * This is used to get the expiry for the operation. The expiry + * represents some static time in the future when the action will + * expire if it does not become ready. This is used to cancel the + * operation so that it does not remain in the distributor. + * + * @return the remaining time this operation will wait for + */ + public long getExpiry() { + return 0; + } + + /** + * This returns the I/O operations that the action is interested + * in as an integer bit mask. When any of these operations are + * ready the distributor will execute the provided operation. + * + * @return the integer bit mask of interested I/O operations + */ + public int getInterest() { + return action.getInterest(); + } + + /** + * This is the <code>SelectableChannel</code> which is used to + * determine if the operation should be executed. If the channel + * is ready for a given I/O event it can be run. For instance if + * the operation is used to perform some form of read operation + * it can be executed when ready to read data from the channel. + * + * @return this returns the channel used to govern execution + */ + public SelectableChannel getChannel() { + return action.getChannel(); + } + + /** + * This is used to acquire the <code>Operation</code> that is to + * be executed when the required operations are ready. It is the + * responsibility of the distributor to invoke the operation. + * + * @return the operation to be executed when it is ready + */ + public Operation getOperation() { + return task; + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecuteAction.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecuteAction.java new file mode 100644 index 0000000..b92174c --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecuteAction.java @@ -0,0 +1,121 @@ +/* + * ExecuteAction.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.nio.channels.SelectableChannel; + +/** + * The <code>ExecuteAction</code> object is represents an action + * that the distributor is to process. This contains the operation + * and the required I/O events as an integer bit mask as well as the + * selectable channel used to register for selection. In order to + * ensure that the action does not remain within the distributor for + * too long the action has an expiry time. + * + * @author Niall Gallagher + */ +class ExecuteAction implements Action { + + /** + * The task to execute when the required operations is ready. + */ + private final Operation task; + + /** + * This is the bit mask of required operations to be executed. + */ + private final int require; + + /** + * This is the time in the future that the event will expire in. + */ + private final long expiry; + + /** + * Constructor for the <code>Event</code> object. The actions are + * used to encapsulate the task to execute and the operations + * to listen to when some action is to be performed. + * + * @param task this is the task to be executed when it is ready + * @param require this is the required operations to listen to + */ + public ExecuteAction(Operation task, int require, long expiry) { + this.expiry = System.currentTimeMillis() + expiry; + this.require = require; + this.task = task; + } + + /** + * This is used to execute the operation for the action. This will + * be executed when the interested I/O event is ready for the + * associated <code>SelectableChannel</code> object. If the action + * expires before the interested I/O operation is ready this will + * not be executed, instead the operation is canceled. + */ + public void run() { + task.run(); + } + + /** + * This is used to get the expiry for the operation. The expiry + * represents some static time in the future when the action will + * expire if it does not become ready. This is used to cancel the + * operation so that it does not remain in the distributor. + * + * @return the remaining time this operation will wait for + */ + public long getExpiry() { + return expiry; + } + + /** + * This is the <code>SelectableChannel</code> which is used to + * determine if the operation should be executed. If the channel + * is ready for a given I/O event it can be run. For instance if + * the operation is used to perform some form of read operation + * it can be executed when ready to read data from the channel. + * + * @return this returns the channel used to govern execution + */ + public SelectableChannel getChannel() { + return task.getChannel(); + } + + /** + * This is used to acquire the <code>Operation</code> that is to + * be executed when the required operations are ready. It is the + * responsibility of the distributor to invoke the operation. + * + * @return the operation to be executed when it is ready + */ + public Operation getOperation() { + return task; + } + + /** + * This returns the I/O operations that the action is interested + * in as an integer bit mask. When any of these operations are + * ready the distributor will execute the provided operation. + * + * @return the integer bit mask of interested I/O operations + */ + public int getInterest() { + return require; + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecutorReactor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecutorReactor.java new file mode 100644 index 0000000..9d741d8 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecutorReactor.java @@ -0,0 +1,132 @@ +/* + * ExecutorReactor.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.io.IOException; +import java.util.concurrent.Executor; + +/** + * The <code>ExecutorReactor</code> is used to schedule operation for + * execution using an <code>Executor</code> implementation. This can be + * useful when the operations performed are time intensive. For example + * if the operations performed a read of the underlying channel and + * then had to parse the contents of the payload. Such operations would + * reduce the performance of the reactor if it could not delegate to + * some other form of executor, as it would delay their execution. + * + * @author Niall Gallagher + */ +public class ExecutorReactor implements Reactor { + + /** + * This is used to distribute the ready operations for execution. + */ + private final OperationDistributor exchange; + + /** + * This is used to execute the operations that ready to run. + */ + private final Executor executor; + + /** + * Constructor for the <code>ExecutorReactor</code> object. This is + * used to create a reactor that can delegate to the executor. This + * also accepts the operations it is interested in, the value is + * taken from the <code>SelectionKey</code> object. A bit mask can + * be used to show interest in several operations at once. + * + * @param executor this is the executor used to run the operations + */ + public ExecutorReactor(Executor executor) throws IOException { + this(executor, 1); + } + + /** + * Constructor for the <code>ExecutorReactor</code> object. This is + * used to create a reactor that can delegate to the executor. This + * also accepts the operations it is interested in, the value is + * taken from the <code>SelectionKey</code> object. A bit mask can + * be used to show interest in several operations at once. + * + * @param executor this is the executor used to run the operations + * @param count this is the number of distributors to be used + */ + public ExecutorReactor(Executor executor, int count) throws IOException { + this(executor, count, 120000); + } + + /** + * Constructor for the <code>ExecutorReactor</code> object. This is + * used to create a reactor that can delegate to the executor. This + * also accepts the operations it is interested in, the value is + * taken from the <code>SelectionKey</code> object. A bit mask can + * be used to show interest in several operations at once. + * + * @param executor this is the executor used to run the operations + * @param count this is the number of distributors to be used + * @param expiry the length of time to maintain and idle operation + */ + public ExecutorReactor(Executor executor, int count, long expiry) throws IOException { + this.exchange = new PartitionDistributor(executor, count, expiry); + this.executor = executor; + } + + /** + * This method is used to execute the provided operation without + * the need to specifically check for I/O events. This is used if + * the operation knows that the <code>SelectableChannel</code> is + * ready, or if the I/O operation can be performed without knowing + * if the channel is ready. Typically this is an efficient means + * to perform a poll rather than a select on the channel. + * + * @param task this is the task to execute immediately + */ + public void process(Operation task) throws IOException { + executor.execute(task); + } + + /** + * This method is used to execute the provided operation when there + * is an I/O event that task is interested in. This will used the + * operations <code>SelectableChannel</code> object to determine + * the events that are ready on the channel. If this reactor is + * interested in any of the ready events then the task is executed. + * + * @param task this is the task to execute on interested events + * @param require this is the bit-mask value for interested events + */ + public void process(Operation task, int require) throws IOException { + exchange.process(task, require); + } + + /** + * This is used to stop the reactor so that further requests to + * execute operations does nothing. This will clean up all of + * the reactors resources and unregister any operations that are + * currently awaiting execution. This should be used to ensure + * any threads used by the reactor gracefully stop. + */ + public void stop() throws IOException { + exchange.close(); + } +} + + + + diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Latch.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Latch.java new file mode 100644 index 0000000..deaee98 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Latch.java @@ -0,0 +1,71 @@ +/* + * Latch.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +/** + * The <code>Latch</code> is used to provide a simple latch that will + * allow a thread to block until it is signaled that it is ready. + * The latch will block on the <code>close</code> method and when the + * latch is signaled the close method will release all threads. + * + * @author Niall Gallagher + */ +class Latch extends CountDownLatch { + + /** + * Constructor for the <code>Latch</code> object. This will + * create a count down latch that will block when it is + * closed. Any blocked threads will be released when the + * latch is signaled that it is ready. + */ + public Latch() { + super(1); + } + + /** + * This is used to signal that the latch is ready. Invoking + * this method will release all threads that are blocking on + * the close method. This method is used when the distributor + * is closed and all operations have been purged. + */ + public void signal() throws IOException { + try { + countDown(); + } catch(Exception e) { + throw new IOException("Thread interrupted"); + } + } + + /** + * This will block all threads attempting to close the latch. + * All threads will be release when the latch is signaled. This + * is used to ensure the distributor blocks until it has fully + * purged all registered operations that are registered. + */ + public void close() throws IOException { + try { + await(); + } catch(Exception e){ + throw new IOException("Thread interrupted"); + } + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Operation.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Operation.java new file mode 100644 index 0000000..e97be61 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Operation.java @@ -0,0 +1,69 @@ +/* + * Operation.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.nio.channels.SelectableChannel; + +import org.simpleframework.transport.trace.Trace; + +/** + * The <code>Operation</code> interface is used to describe a task + * that can be executed when the associated channel is ready for some + * operation. Typically the <code>SelectableChannel</code> is used to + * register with a selector with a set of given interested operations + * when those operations can be performed this is executed. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.reactor.Reactor + */ +public interface Operation extends Runnable { + + /** + * This is used to acquire the trace object that is associated + * with the operation. A trace object is used to collection details + * on what operations are being performed. For instance it may + * contain information relating to I/O events or errors. + * + * @return this returns the trace associated with this operation + */ + Trace getTrace(); + + /** + * This is the <code>SelectableChannel</code> which is used to + * determine if the operation should be executed. If the channel + * is ready for a given I/O event it can be run. For instance if + * the operation is used to perform some form of read operation + * it can be executed when ready to read data from the channel. + * + * @return this returns the channel used to govern execution + */ + SelectableChannel getChannel(); + + /** + * This is used to cancel the operation if it has timed out. This + * is typically invoked when it has been waiting in a selector for + * an extended duration of time without any active operations on + * it. In such a case the reactor must purge the operation to free + * the memory and open channels associated with the operation. + */ + void cancel(); +} + + diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/OperationDistributor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/OperationDistributor.java new file mode 100644 index 0000000..3a83909 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/OperationDistributor.java @@ -0,0 +1,62 @@ +/* + * Distributor.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.io.IOException; + +/** + * The <code>Distributor</code> object is used to execute operations + * that have an interested I/O event ready. This acts much like a + * scheduler would in that it delays the execution of the operations + * until such time as the associated <code>SelectableChannel</code> + * has an interested I/O event ready. + * <p> + * This distributor has two modes, one mode is used to cancel the + * channel once an I/O event has occurred. This means that the channel + * is removed from the <code>Selector</code> so that the selector + * does not break when asked to select again. Canceling the channel + * is useful when the operation execution may not fully read the + * payload or when the operation takes a significant amount of time. + * + * @see org.simpleframework.transport.reactor.ActionDistributor + */ +interface OperationDistributor { + + /** + * This is used to process the <code>Operation</code> object. This + * will wake up the selector if it is currently blocked selecting + * and register the operations associated channel. Once the + * selector is awake it will acquire the operation from the queue + * and register the associated <code>SelectableChannel</code> for + * selection. The operation will then be executed when the channel + * is ready for the interested I/O events. + * + * @param task this is the task that is scheduled for distribution + * @param require this is the bit-mask value for interested events + */ + void process(Operation task, int require) throws IOException; + + /** + * This is used to close the distributor such that it cancels all + * of the registered channels and closes down the selector. This + * is used when the distributor is no longer required, after the + * close further attempts to process operations will fail. + */ + void close() throws IOException; +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/PartitionDistributor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/PartitionDistributor.java new file mode 100644 index 0000000..b0d24ac --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/PartitionDistributor.java @@ -0,0 +1,136 @@ +/* + * PartitionDistributor.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.util.concurrent.Executor; + +/** + * The <code>PartitionDistributor</code> object is a distributor that + * partitions the selection process in to several threads. Each of + * the threads has a single selector, and operations are distributed + * amongst the threads using the hash code of the socket. Partitions + * ensure that several selector threads can share a higher load and + * respond to a more I/O events. + * + * @author Niall Gallagher + */ +class PartitionDistributor implements OperationDistributor { + + /** + * This contains the distributors that represent a partition. + */ + private final OperationDistributor[] list; + + /** + * Constructor for the <code>PartitionDistributor</code> object. + * This will create a distributor that partitions the operations + * amongst a pool of selectors using the channels hash code. + * + * @param executor this is the executor used to run operations + * @param count this is the number of partitions to be used + */ + public PartitionDistributor(Executor executor, int count) throws IOException { + this(executor, count, 120000); + } + + /** + * Constructor for the <code>PartitionDistributor</code> object. + * This will create a distributor that partitions the operations + * amongst a pool of selectors using the channels hash code. + * + * @param executor this is the executor used to run operations + * @param count this is the number of partitions to be used + * @param expiry this is the expiry duration that is to be used + */ + public PartitionDistributor(Executor executor, int count, long expiry) throws IOException { + this.list = new OperationDistributor[count]; + this.start(executor, expiry); + } + + /** + * This is used to create the partitions that represent a thread + * used for selection. Operations will index to a particular one + * using the hash code of the operations channel. If there is only + * one partition all operations will index to the partition. + * + * @param executor the executor used to run the operations + * @param expiry this is the expiry duration that is to be used + */ + private void start(Executor executor, long expiry) throws IOException { + for(int i = 0; i < list.length; i++) { + list[i] = new ActionDistributor(executor, true, expiry); + } + } + + /** + * This is used to process the <code>Operation</code> object. This + * will wake up the selector if it is currently blocked selecting + * and register the operations associated channel. Once the + * selector is awake it will acquire the operation from the queue + * and register the associated <code>SelectableChannel</code> for + * selection. The operation will then be executed when the channel + * is ready for the interested I/O events. + * + * @param task this is the task that is scheduled for distribution + * @param require this is the bit-mask value for interested events + */ + public void process(Operation task, int require) throws IOException { + int length = list.length; + + if(length == 1) { + list[0].process(task, require); + } else { + process(task, require, length); + } + } + + /** + * This is used to process the <code>Operation</code> object. This + * will wake up the selector if it is currently blocked selecting + * and register the operations associated channel. Once the + * selector is awake it will acquire the operation from the queue + * and register the associated <code>SelectableChannel</code> for + * selection. The operation will then be executed when the channel + * is ready for the interested I/O events. + * + * @param task this is the task that is scheduled for distribution + * @param require this is the bit-mask value for interested events + * @param length this is the number of distributors to hash with + */ + private void process(Operation task, int require, int length) throws IOException { + SelectableChannel channel = task.getChannel(); + int hash = channel.hashCode(); + + list[hash % length].process(task, require); + } + + /** + * This is used to close the distributor such that it cancels all + * of the registered channels and closes down the selector. This + * is used when the distributor is no longer required, after the + * close further attempts to process operations will fail. + */ + public void close() throws IOException { + for(OperationDistributor entry : list) { + entry.close(); + } + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Reactor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Reactor.java new file mode 100644 index 0000000..a947cb5 --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Reactor.java @@ -0,0 +1,79 @@ +/* + * Reactor.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.io.IOException; + +/** + * The <code>Reactor</code> interface is used to describe an object + * that is used to schedule asynchronous I/O operations. An operation + * is performed by handing it to the reactor, which will determine + * if an interested event has occurred. This allows the operation to + * perform the task in a manner that does not block. + * <p> + * Implementing an <code>Operation</code> object requires that the + * operation itself is aware of the I/O task it is performing. For + * example, if the operation is concerned with reading data from the + * underlying channel then the operation should perform the read, if + * there is more data required then that operation to register with + * the reactor again to receive further notifications. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.reactor.Operation + */ +public interface Reactor { + + /** + * This method is used to execute the provided operation without + * the need to specifically check for I/O events. This is used if + * the operation knows that the <code>SelectableChannel</code> is + * ready, or if the I/O operation can be performed without knowing + * if the channel is ready. Typically this is an efficient means + * to perform a poll rather than a select on the channel. + * + * @param task this is the task to execute immediately + */ + void process(Operation task) throws IOException; + + /** + * This method is used to execute the provided operation when there + * is an I/O event that task is interested in. This will used the + * operations <code>SelectableChannel</code> object to determine + * the events that are ready on the channel. If this reactor is + * interested in any of the ready events then the task is executed. + * + * @param task this is the task to execute on interested events + * @param require this is the bitmask value for interested events + */ + void process(Operation task, int require) throws IOException; + + /** + * This is used to stop the reactor so that further requests to + * execute operations does nothing. This will clean up all of + * the reactors resources and unregister any operations that are + * currently awaiting execution. This should be used to ensure + * any threads used by the reactor gracefully stop. + */ + void stop() throws IOException; +} + + + + diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ReactorEvent.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ReactorEvent.java new file mode 100644 index 0000000..529644e --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ReactorEvent.java @@ -0,0 +1,120 @@ +/* + * ReactorEvent.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +/** + * The <code>ReactorEvent</code> enumeration is used for tracing the + * operations that occur within the reactor. This is useful when the + * performance of the system needs to be monitored or when there is a + * resource or memory consumption issue that needs to be debugged. + * + * @author Niall Gallagher + */ +public enum ReactorEvent { + + /** + * This event indicates the registration of an I/O interest. + */ + SELECT, + + /** + * This indicates that the selected I/O interest has not occurred. + */ + SELECT_EXPIRED, + + /** + * This occurs when a selection key is cancelled for all interests. + */ + SELECT_CANCEL, + + /** + * This is used to indicate the channel is already selecting. + */ + ALREADY_SELECTING, + + /** + * This occurs rarely however it indicates an invalid registration. + */ + INVALID_KEY, + + /** + * This occurs upon the initial registration of an I/O interest. + */ + REGISTER_INTEREST, + + /** + * This occurs upon the initial registration of a read I/O interest. + */ + REGISTER_READ_INTEREST, + + /** + * This occurs upon the initial registration of a write I/O interest. + */ + REGISTER_WRITE_INTEREST, + + /** + * This is used to indicate the operation interest changed. + */ + UPDATE_INTEREST, + + /** + * This occurs upon the initial registration of a read I/O interest. + */ + UPDATE_READ_INTEREST, + + /** + * This occurs upon the initial registration of a write I/O interest. + */ + UPDATE_WRITE_INTEREST, + + /** + * This indicates that the I/O interest has been satisfied. + */ + INTEREST_READY, + + /** + * This indicates that the I/O read interest has been satisfied. + */ + READ_INTEREST_READY, + + /** + * This indicates that the I/O write interest has been satisfied. + */ + WRITE_INTEREST_READY, + + /** + * This is the final action of executing the action. + */ + EXECUTE_ACTION, + + /** + * This occurs on an attempt to register an closed channel. + */ + CHANNEL_CLOSED, + + /** + * This occurs when the selector has been shutdown globally. + */ + CLOSE_SELECTOR, + + /** + * This occurs if there is an error with the selection. + */ + ERROR, +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/SynchronousReactor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/SynchronousReactor.java new file mode 100644 index 0000000..102829d --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/SynchronousReactor.java @@ -0,0 +1,107 @@ +/* + * SynchronousReactor.java February 2007 + * + * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.reactor; + +import java.io.IOException; +import java.util.concurrent.Executor; + +import org.simpleframework.common.thread.SynchronousExecutor; + +/** + * The <code>SynchronousReactor</code> object is used to execute the + * ready operations of within a single synchronous thread. This is + * used when the I/O operations to be performed do not require much + * time to execute and so will not block the execution thread. + * + * @author Niall Gallagher + */ +public class SynchronousReactor implements Reactor { + + /** + * This is used to distribute the ready operations for execution. + */ + private final OperationDistributor exchange; + + /** + * This is used to execute the operations that ready to run. + */ + private final Executor executor; + + /** + * Constructor for the <code>SynchronousReactor</code> object. This + * is used to create a reactor that does not require thread pooling + * to execute the ready operations. All I/O operations are run + * in the selection thread and should complete quickly. + */ + public SynchronousReactor() throws IOException { + this(false); + } + + /** + * Constructor for the <code>SynchronousReactor</code> object. This + * is used to create a reactor that does not require thread pooling + * to execute the ready operations. All I/O operations are run + * in the selection thread and should complete quickly. + * + * @param cancel determines the selection key should be cancelled + */ + public SynchronousReactor(boolean cancel) throws IOException { + this.executor = new SynchronousExecutor(); + this.exchange = new ActionDistributor(executor, cancel); + } + + /** + * This method is used to execute the provided operation without + * the need to specifically check for I/O events. This is used if + * the operation knows that the <code>SelectableChannel</code> is + * ready, or if the I/O operation can be performed without knowing + * if the channel is ready. Typically this is an efficient means + * to perform a poll rather than a select on the channel. + * + * @param task this is the task to execute immediately + */ + public void process(Operation task) throws IOException { + executor.execute(task); + } + + /** + * This method is used to execute the provided operation when there + * is an I/O event that task is interested in. This will used the + * operations <code>SelectableChannel</code> object to determine + * the events that are ready on the channel. If this reactor is + * interested in any of the ready events then the task is executed. + * + * @param task this is the task to execute on interested events + * @param require this is the bit-mask value for interested events + */ + public void process(Operation task, int require) throws IOException { + exchange.process(task, require); + } + + /** + * This is used to stop the reactor so that further requests to + * execute operations does nothing. This will clean up all of + * the reactors resources and unregister any operations that are + * currently awaiting execution. This should be used to ensure + * any threads used by the reactor graceful stop. + */ + public void stop() throws IOException { + exchange.close(); + } +} diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/Trace.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/Trace.java new file mode 100644 index 0000000..7e3711d --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/Trace.java @@ -0,0 +1,57 @@ +/* + * Trace.java October 2012 + * + * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.trace; + +/** + * The <code>Trace</code> interface represents an trace log for various + * connection events. A trace is not limited to low level I/O events + * it can also gather event data that relates to protocol specific + * events. Using a trace in this manner enables problems to be solved + * with connections as they arise. + * <p> + * When implementing a <code>Trace</code> there should be special + * attention paid to its affect on the performance of the server. The + * trace is used deep within the core and any delays experienced in + * the trace will be reflected in the performance of the server. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.trace.TraceAnalyzer + */ +public interface Trace { + + /** + * This method is used to accept an event that occurred on the socket + * associated with this trace. Typically the event is a symbolic + * description of the event such as an enum or a string. + * + * @param event this is the event that occurred on the socket + */ + void trace(Object event); + + /** + * This method is used to accept an event that occurred on the socket + * associated with this trace. Typically the event is a symbolic + * description of the event such as an enum or a string. + * + * @param event this is the event that occurred on the socket + * @param value provides additional information such as an exception + */ + void trace(Object event, Object value); +}
\ No newline at end of file diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/TraceAnalyzer.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/TraceAnalyzer.java new file mode 100644 index 0000000..af3550b --- /dev/null +++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/TraceAnalyzer.java @@ -0,0 +1,59 @@ +/* + * TraceAnalyzer.java October 2012 + * + * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.simpleframework.transport.trace; + +import java.nio.channels.SelectableChannel; + +/** + * The <code>TraceAnalyzer</code> object represents a tracing analyzer + * used to monitor events on a socket. Its primary responsibilities + * are to create <code>Trace</code> objects that are attached to a + * specific socket channel. When any event occurs on that channel the + * trace is notified and can forward the details on for analysis. + * <p> + * An analyzer implementation must make sure that it does not affect + * the performance of the server. If there are delays creating a trace + * or within the trace itself it will have an impact on performance. + * + * @author Niall Gallagher + * + * @see org.simpleframework.transport.trace.Trace + */ +public interface TraceAnalyzer { + + /** + * This method is used to attach a trace to the specified channel. + * Attaching a trace basically means associating events from that + * trace with the specified socket. It ensures that the events + * from a specific channel can be observed in isolation. + * + * @param channel this is the channel to associate with the trace + * + * @return this returns a trace associated with the channel + */ + Trace attach(SelectableChannel channel); + + /** + * This is used to stop the agent and clear all trace information. + * Stopping the agent is typically done when the server is stopped + * and is used to free any resources associated with the agent. If + * an agent does not hold information this method can be ignored. + */ + void stop(); +}
\ No newline at end of file diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/MockSocket.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/MockSocket.java new file mode 100644 index 0000000..bd1b582 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/MockSocket.java @@ -0,0 +1,45 @@ + +package org.simpleframework.transport; + +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; + +import javax.net.ssl.SSLEngine; + +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + +public class MockSocket implements Socket { + + private SocketChannel socket; + private SSLEngine engine; + private Map map; + + public MockSocket(SocketChannel socket) { + this(socket, null); + } + + public MockSocket(SocketChannel socket, SSLEngine engine) { + this.map = new HashMap(); + this.engine = engine; + this.socket = socket; + } + + public SSLEngine getEngine() { + return engine; + } + + public SocketChannel getChannel() { + return socket; + } + + public Map getAttributes() { + return map; + } + + public Trace getTrace() { + return new MockTrace(); + } +} + diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/ServerBuffer.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/ServerBuffer.java new file mode 100644 index 0000000..34cdc0c --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/ServerBuffer.java @@ -0,0 +1,75 @@ +package org.simpleframework.transport; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.util.concurrent.CountDownLatch; + +public class ServerBuffer extends Thread { + + private ByteArrayOutputStream buffer; + private ServerSocket server; + private CountDownLatch latch; + + public ServerBuffer() throws Exception { + this.buffer = new ByteArrayOutputStream(); + this.latch = new CountDownLatch(1); + this.server = getSocket(); + this.start(); + } + + public ByteArrayOutputStream getBuffer(){ + return buffer; + } + + public void awaitClose() throws Exception { + latch.await(); + } + + public int getPort() { + return server.getLocalPort(); + } + + private ServerSocket getSocket() throws Exception { + // Scan the ephemeral port range + for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket + try { + ServerSocket socket = new ServerSocket(i); + System.out.println("port=["+socket.getLocalPort()+"]"); + return socket; + } catch(Exception e) { + Thread.sleep(200); + } + } + // Scan a second time for good measure, maybe something got freed up + for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket + try { + ServerSocket socket = new ServerSocket(i); + System.out.println("port=["+socket.getLocalPort()+"]"); + return socket; + } catch(Exception e) { + Thread.sleep(200); + } + } + throw new IOException("Could not create a client socket"); + } + + public void run() { + try { + java.net.Socket socket = server.accept(); + InputStream in = socket.getInputStream(); + int count = 0; + + while((count = in.read()) != -1) { + buffer.write(count); + System.err.write(count); + System.err.flush(); + } + } catch(Exception e) { + e.printStackTrace(); + } finally { + latch.countDown(); + } + } +}
\ No newline at end of file diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketBufferTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketBufferTest.java new file mode 100644 index 0000000..a893f04 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketBufferTest.java @@ -0,0 +1,86 @@ +package org.simpleframework.transport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import junit.framework.TestCase; + +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + +public class SocketBufferTest extends TestCase { + + + public void testBulkWrite() throws Exception { + ServerBuffer reader = new ServerBuffer(); + SocketAddress address = new InetSocketAddress("localhost", reader.getPort()); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); // underlying socket must be non-blocking + channel.connect(address); + + while(!channel.finishConnect()) { // wait to finish connection + Thread.sleep(10); + }; + Trace trace = new MockTrace(); + SocketWrapper wrapper = new SocketWrapper(channel, trace); + SocketBuffer builder = new SocketBuffer(wrapper, 100, 4096); + + for(int i = 0; i < 10000; i++){ + ByteBuffer buf = ByteBuffer.wrap(("message-"+i+"\n").getBytes()); + + if(i > 18) { + System.err.println("FAIL......."+i); + } + if(!builder.write(buf)){ + while(!builder.flush()) { + System.err.println("FLUSHING!!!"); + Thread.sleep(1); + } + } + } + while(!builder.flush()) { + System.err.println("FLUSHING!!!"); + } + builder.close(); + reader.awaitClose(); + + String data = reader.getBuffer().toString(); + String[] list = data.split("\\n"); + + for(int i = 0; i < 10000; i++){ + String msg = list[i]; + if(!msg.equals("message-"+i)) { + System.err.println(list[i]); + } + assertEquals("At index " + i + " value="+list[i] +" expect message-"+i, list[i], "message-"+i); + } + } + + public void testSimpleWrite() throws Exception { + ServerBuffer reader = new ServerBuffer(); + SocketAddress address = new InetSocketAddress("localhost", reader.getPort()); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); // underlying socket must be non-blocking + channel.connect(address); + + while(!channel.finishConnect()) { // wait to finish connection + Thread.sleep(10); + }; + Trace trace = new MockTrace(); + SocketWrapper wrapper = new SocketWrapper(channel, trace); + SocketBuffer builder = new SocketBuffer(wrapper, 100, 4096); + + builder.write(ByteBuffer.wrap("hello there ".getBytes())); + builder.write(ByteBuffer.wrap("this ".getBytes())); + builder.write(ByteBuffer.wrap("is ".getBytes())); + builder.write(ByteBuffer.wrap("a ".getBytes())); + builder.write(ByteBuffer.wrap("test".getBytes())); + builder.flush(); + builder.close(); + reader.awaitClose(); + + assertEquals(reader.getBuffer().toString(), "hello there this is a test"); + } +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java new file mode 100644 index 0000000..6654f31 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java @@ -0,0 +1,92 @@ +package org.simpleframework.transport; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; + +import junit.framework.TestCase; + +import org.simpleframework.common.thread.ConcurrentExecutor; +import org.simpleframework.transport.reactor.ExecutorReactor; +import org.simpleframework.transport.reactor.Reactor; +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + +public class SocketTransportPipeTest extends TestCase { + + private static final int ITERATIONS = 100000; + + public void testPipe() throws Exception { + ServerSocket server = new ServerSocket(0); + SocketAddress address = new InetSocketAddress("localhost", server.getLocalPort()); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); // underlying socket must be non-blocking + channel.connect(address); + while(!channel.finishConnect()) { // wait to finish connection + Thread.sleep(10); + }; + Trace trace = new MockTrace(); + SocketWrapper wrapper = new SocketWrapper(channel, trace); + Executor executor = new ConcurrentExecutor(Runnable.class); + Reactor reactor = new ExecutorReactor(executor); + SocketTransport transport = new SocketTransport(wrapper,reactor); + java.net.Socket socket = server.accept(); + final InputStream read = socket.getInputStream(); + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + final LinkedBlockingQueue<String> sent = new LinkedBlockingQueue<String>(); + final LinkedBlockingQueue<String> received = new LinkedBlockingQueue<String>(); + Thread thread = new Thread(new Runnable() { + public void run(){ + try { + byte[] token = new byte[]{'\r','\n'}; + int pos = 0; + int count = 0; + while((count = read.read()) != -1){ + if(count != token[pos++]){ + pos = 0; + } + if(pos == token.length) { + String value = buffer.toString().trim(); + String expect = sent.take(); + + if(!value.equals(expect)) { + throw new Exception("Out of sequence expected " + expect + " but got " + value); + } + received.offer(value); + buffer.reset(); + pos = 0; + } else { + buffer.write(count); + System.err.write(count); + System.err.flush(); + } + + } + }catch(Exception e){ + e.printStackTrace(); + } + } + }); + thread.start(); + for(int i = 0; i < ITERATIONS; i++) { + String message = "message-"+i; + transport.write(ByteBuffer.wrap((message+"\r\n").getBytes())); + sent.offer(message); + } + transport.flush(); + transport.close(); + + for(int i = 0; i < ITERATIONS; i++) { + assertEquals(received.take(), "message-"+i); + } + assertTrue(sent.isEmpty()); + assertTrue(received.isEmpty()); + + } +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportTest.java new file mode 100644 index 0000000..1e09cd0 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportTest.java @@ -0,0 +1,51 @@ +package org.simpleframework.transport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.concurrent.Executor; + +import junit.framework.TestCase; + +import org.simpleframework.common.thread.ConcurrentExecutor; +import org.simpleframework.transport.reactor.ExecutorReactor; +import org.simpleframework.transport.reactor.Reactor; +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + +public class SocketTransportTest extends TestCase { + + + public void testBulkWrite() throws Exception { + ServerBuffer reader = new ServerBuffer(); + SocketAddress address = new InetSocketAddress("localhost", reader.getPort()); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); // underlying socket must be non-blocking + channel.connect(address); + + while(!channel.finishConnect()) { // wait to finish connection + Thread.sleep(10); + }; + Trace trace = new MockTrace(); + SocketWrapper wrapper = new SocketWrapper(channel, trace); + Executor executor = new ConcurrentExecutor(Runnable.class); + Reactor reactor = new ExecutorReactor(executor); + SocketTransport transport = new SocketTransport(wrapper,reactor); + for(int i = 0; i < 10000; i++){ + transport.write(ByteBuffer.wrap(("message-"+i+"\n").getBytes())); + } + transport.close(); + reader.awaitClose(); + + String data = reader.getBuffer().toString(); + String[] list = data.split("\\n"); + + for(int i = 0; i < 10000; i++){ + if(!list[i].equals("message-"+i)) { + System.err.println(list[i]); + } + assertEquals("At index " + i + " value="+list[i] +" expect message-"+i, list[i], "message-"+i); + } + } +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/StreamTransport.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/StreamTransport.java new file mode 100644 index 0000000..75f999e --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/StreamTransport.java @@ -0,0 +1,66 @@ +package org.simpleframework.transport; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SocketChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Map; + +import javax.net.ssl.SSLEngine; + +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + +public class StreamTransport implements Transport { + + private final WritableByteChannel write; + private final ReadableByteChannel read; + private final OutputStream out; + + public StreamTransport(InputStream in, OutputStream out) { + this.write = Channels.newChannel(out); + this.read = Channels.newChannel(in); + this.out = out; + } + + public void close() throws IOException { + write.close(); + read.close(); + } + + public void flush() throws IOException { + out.flush(); + } + + public int read(ByteBuffer buffer) throws IOException { + return read.read(buffer); + } + + public void write(ByteBuffer buffer) throws IOException { + write.write(buffer); + } + + public Map getAttributes() { + return null; + } + + public SocketChannel getChannel() { + return null; + } + + public SSLEngine getEngine() { + return null; + } + + public Certificate getCertificate() { + return null; + } + + public Trace getTrace() { + return new MockTrace(); + } +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportCursorTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportCursorTest.java new file mode 100644 index 0000000..161115f --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportCursorTest.java @@ -0,0 +1,83 @@ +package org.simpleframework.transport; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import junit.framework.TestCase; + +public class TransportCursorTest extends TestCase { + + private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; + private static final String SOURCE = ALPHABET + "\r\n"; + + public void testCursor() throws IOException { + byte[] data = SOURCE.getBytes("ISO-8859-1"); + InputStream source = new ByteArrayInputStream(data); + Transport transport = new StreamTransport(source, System.out); + ByteCursor cursor = new TransportCursor(transport); + byte[] buffer = new byte[1024]; + + assertEquals(cursor.ready(), data.length); + assertEquals(26, cursor.read(buffer, 0, 26)); + assertEquals(26, cursor.reset(26)); + assertEquals(new String(buffer, 0, 26), ALPHABET); + + assertEquals(cursor.ready(), data.length); + assertEquals(26, cursor.read(buffer, 0, 26)); + assertEquals(26, cursor.reset(26)); + assertEquals(new String(buffer, 0, 26), ALPHABET); + + assertEquals(cursor.ready(), data.length); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(4, cursor.reset(26)); + assertEquals(new String(buffer, 0, 4), "abcd"); + + assertEquals(cursor.ready(), data.length); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(4, cursor.reset(26)); + assertEquals(new String(buffer, 0, 4), "abcd"); + + assertEquals(cursor.ready(), data.length); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "abcd"); + + assertEquals(cursor.ready(), data.length - 4); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "efgh"); + + assertEquals(cursor.ready(), data.length - 8); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "ijkl"); + + assertEquals(cursor.ready(), data.length - 12); + assertEquals(12, cursor.reset(12)); + assertEquals(10, cursor.read(buffer, 0, 10)); + assertEquals(new String(buffer, 0, 10), "abcdefghij"); + + cursor.push("1234".getBytes("ISO-8859-1")); + cursor.push("5678".getBytes("ISO-8859-1")); + cursor.push("90".getBytes("ISO-8859-1")); + + assertEquals(cursor.ready(), 10); + assertEquals(2, cursor.read(buffer, 0, 2)); + assertEquals(new String(buffer, 0, 2), "90"); + + assertEquals(cursor.ready(), 8); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "5678"); + + assertEquals(cursor.ready(), 4); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "1234"); + + assertEquals(4, cursor.reset(4)); + assertEquals(cursor.ready(), 4); + assertEquals(4, cursor.read(buffer, 0, 4)); + assertEquals(new String(buffer, 0, 4), "1234"); + + assertEquals(8, cursor.read(buffer, 0, 8)); + assertEquals(new String(buffer, 0, 8), "klmnopqr"); + } + +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java new file mode 100644 index 0000000..1a14431 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java @@ -0,0 +1,404 @@ +package org.simpleframework.transport; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.LinkedList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +import junit.framework.TestCase; + +import org.simpleframework.transport.reactor.ExecutorReactor; +import org.simpleframework.transport.reactor.Reactor; + +/** + * Measure the performance of the transports to ensure that the perform + * well and that they send the correct sequence of bytes and that the + * blocks sent are in the correct order. This also performs a comparison + * with direct socket output streams to ensure there is a reasonable + * performance difference. + * + * @author Niall Gallagher + */ +public class TransportTest extends TestCase { + + private static final int REPEAT = 1000; + + public void testTransport() throws Exception { + testTransport(REPEAT); + } + + public void testTransport(int repeat) throws Exception { + for(int i = 1; i < 7; i++) { // just do some random sizes + testTransport(i, 100); + } + for(int i = 4092; i < 4102; i++) { + testTransport(i, 100); + } + for(int i = 8190; i < 8200; i++) { + testTransport(i, 100); + } + for(int i = 11282; i < 11284; i++) { + testTransport(i, 1000); + } + for(int i = 204800; i < 204805; i++) { + testTransport(i, 1000); + } + testTransport(16, repeat); + testTransport(64, repeat); + testTransport(256, repeat); + testTransport(1024, repeat); + testTransport(2048, repeat); + testTransport(4096, repeat); + testTransport(4098, repeat); + testTransport(8192, repeat); + testTransport(8197, repeat); + } + + // Test blocking transport + private void testTransport(int size, int repeat) throws Exception { + // ThreadDumper dumper = new ThreadDumper(); + SocketConsumer consumer = new SocketConsumer(size, repeat); + SocketAddress address = new InetSocketAddress("localhost", consumer.getPort()); + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); // underlying socket must be non-blocking + channel.connect(address); + + while(!channel.finishConnect()) { // wait to finish connection + Thread.sleep(10); + }; + ExecutorService executor = Executors.newFixedThreadPool(20); + Reactor reactor = new ExecutorReactor(executor); + // Transport transport = new SocketTransport(channel, reactor, 2, 3);//XXX bug + MockSocket pipeline = new MockSocket(channel); + Transport transport = new SocketTransport(pipeline, reactor, 8192); + OutputStream out = new TransportOutputStream(transport); + + // dumper.start(); + testOutputStream(consumer, out, size, repeat); + + out.close(); + executor.shutdown(); + channel.close(); + reactor.stop(); + // dumper.kill(); + Thread.sleep(100); + } + + public void s_testSocket() throws Exception { + s_testSocket(REPEAT); + } + + public void s_testSocket(int repeat) throws Exception { + testSocket(16, repeat); + testSocket(64, repeat); + testSocket(256, repeat); + testSocket(1024, repeat); + testSocket(2048, repeat); + testSocket(4098, repeat); + testSocket(8192, repeat); + } + + // Test blocking socket + private void testSocket(int size, int repeat) throws Exception { + // ThreadDumper dumper = new ThreadDumper(); + SocketConsumer consumer = new SocketConsumer(size, repeat); + Socket socket = new Socket("localhost", consumer.getPort()); + OutputStream out = socket.getOutputStream(); + + //dumper.start(); + testOutputStream(consumer, out, size, repeat); + + out.close(); + socket.close(); + //dumper.kill(); + Thread.sleep(100); + } + + private class AlpahbetIterator { + + private byte[] alphabet = "abcdefghijklmnopqstuvwxyz".getBytes(); + + private int off; + + public byte next() { + if(off == alphabet.length) { + off = 0; + } + return alphabet[off++]; + } + + public void reset() { + off = 0; + } + } + + private void testOutputStream(SocketConsumer consumer, OutputStream out, int size, int repeat) throws Exception { + byte[] block = new byte[size]; // write size + AlpahbetIterator it = new AlpahbetIterator(); // write known data + + for(int i = 1; i < block.length; i++) { + block[i] = it.next(); + } + AtomicLong count = new AtomicLong(); + PerformanceMonitor monitor = new PerformanceMonitor(consumer, count, out.getClass().getSimpleName(), size); + + for(int i = 0; i < repeat; i++) { + block[0] = (byte) i; // mark the first byte in the block to be sure we get blocks in sequence + //System.err.println("["+i+"]"+new String(block,"ISO-8859-1")); + out.write(block); // manipulation of the underlying buffer is taking place when the compact is invoked, this is causing major problems as the next packet will be out of sequence + count.addAndGet(block.length); + } + Thread.sleep(2000); // wait for all bytes to flush through to consumer + monitor.kill(); + } + + private class PerformanceMonitor extends Thread { + private AtomicLong count; + + private volatile boolean dead; + + private SocketConsumer consumer; + + private String name; + + private int size; + + public PerformanceMonitor(SocketConsumer consumer, AtomicLong count, String name, int size) { + this.consumer = consumer; + this.count = count; + this.name = name; + this.size = size; + this.start(); + } + + public void run() { + int second = 0; + while(!dead) { + try { + long octets = count.longValue(); + System.out.printf("%s,%s,%s,%s,%s%n", name, size, second++, octets, consumer.getWindow()); + Thread.sleep(1000); + } catch(Exception e) { + e.printStackTrace(); + } + } + } + + public void kill() throws Exception { + dead = true; + } + } + + private class SocketConsumer extends Thread { + + private ServerSocket server; + + private Window window; + + private long repeat; + + private long size; + + public SocketConsumer(int size, int repeat) throws Exception { + this.window = new Window(20); + this.server = getSocket(); + this.repeat = repeat; + this.size = size; + this.start(); + } + + public int getPort() { + return server.getLocalPort(); + } + + public String getWindow() { + return window.toString(); + } + + private ServerSocket getSocket() throws Exception { + // Scan the ephemeral port range + for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket + try { + ServerSocket socket = new ServerSocket(i); + System.out.println("port=["+socket.getLocalPort()+"]"); + return socket; + } catch(Exception e) { + Thread.sleep(200); + } + } + // Scan a second time for good measure, maybe something got freed up + for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket + try { + ServerSocket socket = new ServerSocket(i); + System.out.println("port=["+socket.getLocalPort()+"]"); + return socket; + } catch(Exception e) { + Thread.sleep(200); + } + } + throw new IOException("Could not create a client socket"); + } + + public void run() { + long count = 0; + int windowOctet = 0; + int expectWindowOctet = 0; + + try { + Socket socket = server.accept(); + InputStream in = socket.getInputStream(); + InputStream source = new BufferedInputStream(in); + AlpahbetIterator it = new AlpahbetIterator(); + + scan: for(int i = 0; i < repeat; i++) { + int octet = source.read(); // check first byte in the block to make sure its correct in sequence + + if(octet == -1) { + break scan; + } + count++; // we have read another byte + windowOctet = octet & 0x000000ff; + expectWindowOctet = i & 0x000000ff; + + if((byte) octet != (byte) i) { + throw new Exception("Wrong sequence of blocks sent, was " + + (byte)octet + " should have been " + (byte)i + " count is "+count+" window is "+window+" compare "+explore(it, source, 5)); + } + window.recieved(octet); + + for(int j = 1, k = 0; j < size; j++, k++) { + octet = source.read(); + + if(octet == -1) { + break scan; + } + byte next = it.next(); + + if((byte) octet != next) { + throw new Exception("Invalid data received expected "+((byte)octet)+"("+((char)octet)+ + ") but was "+next+"("+((char)next)+") total count is "+count+" block count is "+k+" window is expected "+ + expectWindowOctet+"("+((char)expectWindowOctet)+")("+((byte)expectWindowOctet)+") got "+windowOctet+"("+ + ((char)windowOctet)+")("+((byte)windowOctet)+") "+window+" compare "+explore(it, source, 5)); + } + count++; + } + it.reset(); + } + } catch(Throwable e) { + e.printStackTrace(); + } + if(count != size * repeat) { + new Exception("Invalid number of bytes read, was " + count + + " should have been " + (size * repeat)).printStackTrace(); + } + try { + // server.close(); + }catch(Exception e) { + e.printStackTrace(); + } + } + + private String explore(AlpahbetIterator it, InputStream source, int count) throws IOException { + StringBuffer buf = new StringBuffer(); + buf.append("expected ("); + for(int i = 0; i < count; i++) { + buf.append( (char)it.next() ); + } + buf.append(") is ("); + for(int i = 0; i < count; i++) { + buf.append( (char)source.read() ); + } + buf.append(")"); + return buf.toString(); + } + } + + + private static class TransportOutputStream extends OutputStream { + + private Transport transport; + + public TransportOutputStream(Transport transport) { + this.transport = transport; + } + + public void write(int octet) throws IOException { + byte[] data = new byte[] { (byte) octet }; + write(data); + } + + public void write(byte[] data, int off, int len) throws IOException { + try { + ByteBuffer buffer = ByteBuffer.wrap(data, off, len); + ByteBuffer safe = buffer.asReadOnlyBuffer(); + + if(len > 0) { + transport.write(safe); + } + } catch(Exception e) { + e.printStackTrace(); + throw new IOException("Write failed"); + } + } + + public void flush() throws IOException { + try { + transport.flush(); + } catch(Exception e) { + e.printStackTrace(); + throw new IOException("Flush failed"); + } + } + + public void close() throws IOException { + try { + transport.close(); + } catch(Exception e) { + e.printStackTrace(); + throw new IOException("Close failed"); + } + } + + } + + private static class Window { + + private final LinkedList<String> window; + private final int size; + + public Window(int size) { + this.window = new LinkedList<String>(); + this.size = size; + } + + public synchronized void recieved(int sequence) { + window.addLast(String.valueOf(sequence)); + + if(window.size() > size) { + window.removeFirst(); + } + } + + public synchronized String toString() { + StringBuilder builder = new StringBuilder("["); + String delim = ""; + for(String b : window) { + builder.append(delim).append(b); + delim=", "; + } + builder.append("]"); + return builder.toString(); + } + } + +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java new file mode 100644 index 0000000..3032692 --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/reactor/DistributorTest.java @@ -0,0 +1,269 @@ +package org.simpleframework.transport.reactor; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.simpleframework.transport.trace.MockTrace; +import org.simpleframework.transport.trace.Trace; + + +public class DistributorTest extends TestCase { + + private static final String PAYLOAD = + "POST /index.html HTTP/1.0\r\n"+ + "Content-Type: multipart/form-data; boundary=AaB03x\r\n"+ + "Accept: image/gif;q=1.0,\r\n image/jpeg;q=0.8,\r\n"+ + " \t\t image/png;\t\r\n\t"+ + " q=1.0,*;q=0.1\r\n"+ + "Accept-Language: fr;q=0.1, en-us;q=0.4, en-gb; q=0.8, en;q=0.7\r\n"+ + "Host: some.host.com \r\n"+ + "Cookie: $Version=1; UID=1234-5678; $Path=/; $Domain=.host.com\r\n"+ + "Cookie: $Version=1; NAME=\"Niall Gallagher\"; $path=\"/\"\r\n"+ + "\r\n" + + "--AaB03x\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file1.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file1.txt\r\n"+ + "--AaB03x\r\n"+ + "Content-Type: multipart/mixed; boundary=BbC04y\r\n\r\n"+ + "--BbC04y\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file2.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file3.txt ...\r\n"+ + "--BbC04y\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file3.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file4.txt ...\r\n"+ + "--BbC04y\r\n"+ + "Content-Disposition: form-data; name='pics'; filename='file4.txt'\r\n"+ + "Content-Type: text/plain\r\n\r\n"+ + "example contents of file4.txt ...\r\n"+ + "--BbC04y--\r\n"+ + "--AaB03x--\r\n"; + + public class Client extends Thread { + + private CountDownLatch latch; + private String message; + private int requests; + private int port; + + public Client(CountDownLatch latch, String message, int port, int requests) throws Exception { + this.message = message; + this.requests = requests; + this.port = port; + this.latch = latch; + this.start(); + } + + public void run() { + try { + latch.await(); + + Socket socket = new Socket("localhost", port); + OutputStream out = socket.getOutputStream(); + byte[] payload = message.getBytes(); + + for(int i = 0; i < requests; i++){ + out.write(payload); + } + out.close(); + } catch(Exception e) { + e.printStackTrace(); + } + } + } + + public class Worker implements Operation { + + private BlockingQueue<Worker> done; + private Reactor reactor; + private SocketChannel channel; + private ByteBuffer buffer; + private String payload; + private int accumulate; + private long finish; + private long start; + private int id; + + public Worker(BlockingQueue<Worker> done, Reactor reactor, SocketChannel channel, String payload, int id) throws Exception { + this.buffer = ByteBuffer.allocate(8192); + this.start = System.currentTimeMillis(); + this.finish = start + 60000; + this.payload = payload; + this.channel = channel; + this.reactor = reactor; + this.done = done; + this.id = id; + } + + public Trace getTrace() { + return new MockTrace(); + } + + public long getExpiry(TimeUnit unit) { + return unit.convert(finish - System.currentTimeMillis(), MILLISECONDS); + } + + public int getAccumulate() { + return accumulate; + } + + // XXX should this be executed in a thread!!!!???? yes... + public void cancel() { + System.err.println("############################# Worker has been canceled"); + } + + public void run() { + try { + // N.B Fundamental to performance + buffer.clear(); + + if(channel.isOpen()) { + int count = channel.read(buffer); + accumulate += count; + + System.err.println("Worker-"+id+" read ["+count +"] of payload sized ["+payload.length()+"] took ["+(System.currentTimeMillis() -start)+"]"); + + if(count != -1) { + reactor.process(this, SelectionKey.OP_READ); + } else { + channel.close(); + done.offer(this); + System.err.println("Worker-"+id+" Channel is closed after time ["+(System.currentTimeMillis() - start)+"] and read ["+accumulate+"]"); + } + } else { + System.err.println("Worker-"+id+" Channel is closed after time ["+(System.currentTimeMillis() - start)+"] and read ["+accumulate+"]"); + done.offer(this); + } + }catch(Exception e) { + e.printStackTrace(); + } + } + + public SocketChannel getChannel() { + return channel; + } + + } + + public class Server extends Thread { + + private BlockingQueue<SocketChannel> ready; + private CountDownLatch latch; + private ServerSocketChannel server; + private Selector selector; + private int port; + + public Server(CountDownLatch latch, BlockingQueue<SocketChannel> ready, int port) throws Exception { + this.server = ServerSocketChannel.open(); + this.selector = Selector.open(); + this.latch = latch; + this.port = port; + this.ready = ready; + this.start(); + } + + private void configure() throws Exception { + server.socket().bind(new InetSocketAddress(port)); + server.configureBlocking(false); + } + + public void run() { + try { + configure(); + execute(); + } catch(Exception e) { + e.printStackTrace(); + } + } + + private void execute() throws Exception { + SelectionKey serverKey = server.register(selector, SelectionKey.OP_ACCEPT); + + latch.countDown(); + + while(true){ + selector.select(); + Set keys = selector.selectedKeys(); + + for(Iterator i = keys.iterator(); i.hasNext();){ + SelectionKey key = (SelectionKey) i.next(); + i.remove(); + + if(key != serverKey) { + return; + } + if(key.isAcceptable()) { + SocketChannel channel = server.accept(); + channel.configureBlocking(false); + ready.offer(channel); + } + } + } + } + } + + public static void main(String[] list) throws Exception { + new DistributorTest().testReactor(); + } + + public void testReactor() throws Exception { + testReactor(PAYLOAD, 200, 100, 10, 8123); + } + + private void testReactor(String payload, int clients, int requests, int threads, int port) throws Exception { + BlockingQueue<Worker> done = new LinkedBlockingQueue<Worker>(); + BlockingQueue<SocketChannel> ready = new LinkedBlockingQueue<SocketChannel>(); + CountDownLatch latch = new CountDownLatch(1); + Server server = new Server(latch, ready, port); + Executor executor = Executors.newFixedThreadPool(10); + Reactor reactor = new ExecutorReactor(executor, 1); + + long start = System.currentTimeMillis(); + + for(int i = 0; i < clients; i++) { + new Client(latch, payload, port, requests); + } + for(int i = 0; i < clients; i++) { + SocketChannel channel = ready.take(); + Worker worker = new Worker(done, reactor, channel, payload, i); + + reactor.process(worker); + } + int total = 0; + + for(int i = 0; i < clients; i++) { + Worker worker = done.take(); + int accumulate = worker.getAccumulate(); + total += accumulate; + System.err.println("Accumulated ["+accumulate+"] of ["+(requests*payload.length())+"] closed ["+worker.getChannel().socket().isClosed()+"]"); + } + System.err.println("Accumulated ["+total+"] of ["+(clients*requests*payload.length())+"]"); + System.err.println("Total time to process ["+(clients*requests)+"] payloads from ["+clients+"] clients took ["+(System.currentTimeMillis() - start)+"]"); + } + + + + +} + + diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/CompareQueueTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/CompareQueueTest.java new file mode 100644 index 0000000..ca2f5db --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/CompareQueueTest.java @@ -0,0 +1,174 @@ +package org.simpleframework.transport.trace; + +import java.text.DecimalFormat; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +import junit.framework.TestCase; + +import org.simpleframework.common.thread.ConcurrentExecutor; + +public class CompareQueueTest extends TestCase { + + private static final int TEST_DURATION = 10000; + private static final int THREAD_COUNT = 100; + + private final Executor blockingReadExecutor = new ConcurrentExecutor(BlockingConsumer.class, THREAD_COUNT); + private final Executor concurrentReadExecutor = new ConcurrentExecutor(ConcurrentConsumer.class, THREAD_COUNT); + private final Executor writeExecutor = new ConcurrentExecutor(Producer.class, THREAD_COUNT); + + public void testLinkedBlockingQueue() throws Exception { + BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(); + AtomicBoolean active = new AtomicBoolean(true); + AtomicLong writeCount = new AtomicLong(); + AtomicLong readCount = new AtomicLong(); + CountDownLatch startLatch = new CountDownLatch(THREAD_COUNT); + CountDownLatch stopLatch = new CountDownLatch(THREAD_COUNT); + DecimalFormat format = new DecimalFormat("###,###,###"); + + for(int i = 0; i < THREAD_COUNT; i++) { + BlockingConsumer consumer = new BlockingConsumer(queue, stopLatch, active, readCount); + blockingReadExecutor.execute(consumer); + } + Thread.sleep(1000); + + for(int i = 0; i < THREAD_COUNT; i++) { + Producer producer = new Producer(queue, startLatch, active, writeCount); + writeExecutor.execute(producer); + } + Thread.sleep(TEST_DURATION); + active.set(false); + stopLatch.await(); + + System.err.printf("read=%s write=%s%n", format.format(readCount.get()), format.format(writeCount.get())); + } + + public void testConcurrentQueue() throws Exception { + Queue<Object> queue = new ConcurrentLinkedQueue<Object>(); + AtomicBoolean active = new AtomicBoolean(true); + AtomicLong writeCount = new AtomicLong(); + AtomicLong readCount = new AtomicLong(); + CountDownLatch startLatch = new CountDownLatch(THREAD_COUNT); + CountDownLatch stopLatch = new CountDownLatch(THREAD_COUNT); + DecimalFormat format = new DecimalFormat("###,###,###"); + + for(int i = 0; i < THREAD_COUNT; i++) { + ConcurrentConsumer consumer = new ConcurrentConsumer(queue, stopLatch, active, readCount); + concurrentReadExecutor.execute(consumer); + } + Thread.sleep(1000); + + for(int i = 0; i < THREAD_COUNT; i++) { + Producer producer = new Producer(queue, startLatch, active, writeCount); + writeExecutor.execute(producer); + } + Thread.sleep(TEST_DURATION); + active.set(false); + stopLatch.await(); + + System.err.printf("read=%s write=%s%n", format.format(readCount.get()), format.format(writeCount.get())); + } + + private static class Producer implements Runnable { + + private final Queue<Object> queue; + private final AtomicBoolean active; + private final AtomicLong count; + private final CountDownLatch latch; + + public Producer(Queue<Object> queue, CountDownLatch latch, AtomicBoolean active, AtomicLong count) { + this.queue = queue; + this.active = active; + this.count = count; + this.latch = latch; + } + + public void run() { + try { + latch.countDown(); + latch.await(); + + while(active.get()) { + Long value = count.getAndIncrement(); + queue.offer(value); + } + } catch(Exception e) { + e.printStackTrace(); + } + } + + } + + private static class ConcurrentConsumer implements Runnable { + + private final Queue<Object> queue; + private final AtomicBoolean active; + private final AtomicLong count; + private final CountDownLatch latch; + + public ConcurrentConsumer(Queue<Object> queue, CountDownLatch latch, AtomicBoolean active, AtomicLong count) { + this.queue = queue; + this.active = active; + this.count = count; + this.latch = latch; + } + + public void run() { + try { + while(active.get()) { + Object value = queue.poll(); + if(value != null) { + count.getAndIncrement(); + } else { + LockSupport.parkNanos(100); + } + } + latch.countDown(); + latch.await(); + }catch(Exception e) { + e.printStackTrace(); + } + } + } + + private static class BlockingConsumer implements Runnable { + + private final BlockingQueue<Object> queue; + private final AtomicBoolean active; + private final AtomicLong count; + private final CountDownLatch latch; + + public BlockingConsumer(BlockingQueue<Object> queue, CountDownLatch latch, AtomicBoolean active, AtomicLong count) { + this.queue = queue; + this.active = active; + this.count = count; + this.latch = latch; + } + + public void run() { + try { + while(active.get()) { + try { + Object value = queue.take(); + if(value != null) { + count.getAndIncrement(); + } + }catch(Exception e) { + e.printStackTrace(); + } + } + latch.countDown(); + latch.await(); + }catch(Exception e) { + e.printStackTrace(); + } + } + } +} diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/MockTrace.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/MockTrace.java new file mode 100644 index 0000000..583326e --- /dev/null +++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/trace/MockTrace.java @@ -0,0 +1,6 @@ +package org.simpleframework.transport.trace; + +public class MockTrace implements Trace{ + public void trace(Object event) {} + public void trace(Object event, Object value) {} +} |