summaryrefslogtreecommitdiffstats
path: root/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java
diff options
context:
space:
mode:
Diffstat (limited to 'simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java')
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java142
1 files changed, 142 insertions, 0 deletions
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java
new file mode 100644
index 0000000..a10cee7
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java
@@ -0,0 +1,142 @@
+/*
+ * SocketFlusher.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+import org.simpleframework.transport.reactor.Reactor;
+
+/**
+ * The <code>SocketFlusher</code> flushes bytes to the underlying
+ * socket channel. This allows asynchronous writes to the socket
+ * to be managed in such a way that there is order to the way data
+ * is delivered over the socket. This uses a selector to dispatch
+ * flush invocations to the underlying socket when the socket is
+ * write ready. This allows the writing thread to continue without
+ * having to wait for all the data to be written to the socket.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.SocketBufferWriter
+ */
+class SocketFlusher {
+
+ /**
+ * This is the signaller used to determine when to flush.
+ */
+ private FlushSignaller signaller;
+
+ /**
+ * This is the scheduler used to block and signal the writer.
+ */
+ private FlushScheduler scheduler;
+
+ /**
+ * This is the writer used to queue the buffers written.
+ */
+ private SocketBuffer buffer;
+
+ /**
+ * This is used to determine if the socket flusher is closed.
+ */
+ private boolean closed;
+
+ /**
+ * Constructor for the <code>SocketFlusher</code> object. This is
+ * used to flush buffers to the underlying socket asynchronously.
+ * When finished flushing all of the buffered data this signals
+ * any threads that are blocking waiting for the write to finish.
+ *
+ * @param buffer this is used to write the buffered buffers
+ * @param reactor this is used to perform asynchronous writes
+ * @param socket this is the socket used to select with
+ */
+ public SocketFlusher(SocketBuffer buffer, Socket socket, Reactor reactor) throws IOException {
+ this.signaller = new FlushSignaller(this, socket);
+ this.scheduler = new FlushScheduler(socket, reactor, signaller, this);
+ this.buffer = buffer;
+ }
+
+ /**
+ * Here in this method we schedule a flush when the underlying
+ * writer is write ready. This allows the writer thread to return
+ * without having to fully flush the content to the underlying
+ * transport. If there are references queued this will block.
+ */
+ public synchronized void flush() throws IOException {
+ if(closed) {
+ throw new TransportException("Flusher is closed");
+ }
+ boolean block = !buffer.ready();
+
+ if(!closed) {
+ scheduler.schedule(block);
+ }
+ }
+
+ /**
+ * This is executed when the flusher is to write all of the data to
+ * the underlying socket. In this situation the writes are attempted
+ * in a non blocking way, if the task does not complete then this
+ * will simply enqueue the writing task for OP_WRITE and leave the
+ * method. This returns true if all the buffers are written.
+ */
+ public synchronized void execute() throws IOException {
+ boolean ready = buffer.flush();
+
+ if(!ready) {
+ boolean block = !buffer.ready();
+
+ if(!block && !closed) {
+ scheduler.release();
+ }
+ scheduler.repeat();
+ } else{
+ scheduler.ready();
+ }
+ }
+
+ /**
+ * This is used to abort the flushing process when the reactor has
+ * been stopped. An abort to the flusher typically happens when the
+ * server has been shutdown. It prevents threads lingering waiting
+ * for a I/O operation which prevents the server from shutting down.
+ */
+ public synchronized void abort() throws IOException {
+ scheduler.close();
+ buffer.close();
+ }
+
+ /**
+ * This is used to close the flusher ensuring that all of the
+ * data within the writer will be flushed regardless of the
+ * amount of data within the writer that needs to be written. If
+ * the writer does not block then this waits to be finished.
+ */
+ public synchronized void close() throws IOException {
+ boolean ready = buffer.flush();
+
+ if(!closed) {
+ closed = true;
+ }
+ if(!ready) {
+ scheduler.schedule(true);
+ }
+ }
+}