summaryrefslogtreecommitdiffstats
path: root/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java
diff options
context:
space:
mode:
Diffstat (limited to 'simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java')
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java741
1 files changed, 741 insertions, 0 deletions
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java
new file mode 100644
index 0000000..65dc8d2
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java
@@ -0,0 +1,741 @@
+/*
+ * ActionDistributor.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import static java.nio.channels.SelectionKey.OP_READ;
+import static java.nio.channels.SelectionKey.OP_WRITE;
+import static org.simpleframework.transport.reactor.ReactorEvent.CHANNEL_CLOSED;
+import static org.simpleframework.transport.reactor.ReactorEvent.CLOSE_SELECTOR;
+import static org.simpleframework.transport.reactor.ReactorEvent.ERROR;
+import static org.simpleframework.transport.reactor.ReactorEvent.EXECUTE_ACTION;
+import static org.simpleframework.transport.reactor.ReactorEvent.INVALID_KEY;
+import static org.simpleframework.transport.reactor.ReactorEvent.READ_INTEREST_READY;
+import static org.simpleframework.transport.reactor.ReactorEvent.REGISTER_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.REGISTER_READ_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.REGISTER_WRITE_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.SELECT;
+import static org.simpleframework.transport.reactor.ReactorEvent.SELECT_CANCEL;
+import static org.simpleframework.transport.reactor.ReactorEvent.SELECT_EXPIRED;
+import static org.simpleframework.transport.reactor.ReactorEvent.UPDATE_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.UPDATE_READ_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.UPDATE_WRITE_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.WRITE_INTEREST_READY;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+
+import org.simpleframework.common.thread.Daemon;
+import org.simpleframework.transport.trace.Trace;
+
+ /**
+ * The <code>ActionDistributor</code> is used to execute operations
+ * that have an interested I/O event ready. This acts much like a
+ * scheduler would in that it delays the execution of the operations
+ * until such time as the associated <code>SelectableChannel</code>
+ * has an interested I/O event ready.
+ * <p>
+ * This distributor has two modes, one mode is used to cancel the
+ * channel once an I/O event has occurred. This means that the channel
+ * is removed from the <code>Selector</code> so that the selector
+ * does not break when asked to select again. cancelling the channel
+ * is useful when the operation execution may not fully read the
+ * payload or when the operation takes a significant amount of time.
+ *
+ * @see org.simpleframework.transport.reactor.ExecutorReactor
+ */
+class ActionDistributor extends Daemon implements OperationDistributor {
+
+ /**
+ * This is used to determine the operations that need cancelling.
+ */
+ private Map<Channel, ActionSet> executing;
+
+ /**
+ * This is used to keep track of actions currently in selection.
+ */
+ private Map<Channel, ActionSet> selecting;
+
+ /**
+ * This is the queue that is used to invalidate channels.
+ */
+ private Queue<Channel> invalid;
+
+ /**
+ * This is the queue that is used to provide the operations.
+ */
+ private Queue<Action> pending;
+
+ /**
+ * This is the selector used to select for interested events.
+ */
+ private ActionSelector selector;
+
+ /**
+ * This is used to execute the operations that are ready.
+ */
+ private Executor executor;
+
+ /**
+ * This is used to signal when the distributor has closed.
+ */
+ private Latch latch;
+
+ /**
+ * This is the duration in milliseconds the operation expires in.
+ */
+ private long expiry;
+
+ /**
+ * This is time in milliseconds when the next expiry will occur.
+ */
+ private long update;
+
+ /**
+ * This is used to determine the mode the distributor uses.
+ */
+ private boolean cancel;
+
+ /**
+ * Constructor for the <code>ActionDistributor</code> object. This
+ * will create a distributor that distributes operations when those
+ * operations show that they are ready for a given I/O event. The
+ * interested I/O events are provided as a bitmask taken from the
+ * actions of the <code>SelectionKey</code>. Distribution of the
+ * operations is passed to the provided executor object.
+ *
+ * @param executor this is the executor used to execute operations
+ */
+ public ActionDistributor(Executor executor) throws IOException {
+ this(executor, true);
+ }
+
+ /**
+ * Constructor for the <code>ActionDistributor</code> object. This
+ * will create a distributor that distributes operations when those
+ * operations show that they are ready for a given I/O event. The
+ * interested I/O events are provided as a bitmask taken from the
+ * actions of the <code>SelectionKey</code>. Distribution of the
+ * operations is passed to the provided executor object.
+ *
+ * @param executor this is the executor used to execute operations
+ * @param cancel should the channel be removed from selection
+ */
+ public ActionDistributor(Executor executor, boolean cancel) throws IOException {
+ this(executor, cancel, 120000);
+ }
+
+ /**
+ * Constructor for the <code>ActionDistributor</code> object. This
+ * will create a distributor that distributes operations when those
+ * operations show that they are ready for a given I/O event. The
+ * interested I/O events are provided as a bitmask taken from the
+ * actions of the <code>SelectionKey</code>. Distribution of the
+ * operations is passed to the provided executor object.
+ *
+ * @param executor this is the executor used to execute operations
+ * @param cancel should the channel be removed from selection
+ * @param expiry this the maximum idle time for an operation
+ */
+ public ActionDistributor(Executor executor, boolean cancel, long expiry) throws IOException {
+ this.selecting = new LinkedHashMap<Channel, ActionSet>();
+ this.executing = new LinkedHashMap<Channel, ActionSet>();
+ this.pending = new ConcurrentLinkedQueue<Action>();
+ this.invalid = new ConcurrentLinkedQueue<Channel>();
+ this.selector = new ActionSelector();
+ this.latch = new Latch();
+ this.executor = executor;
+ this.cancel = cancel;
+ this.expiry = expiry;
+ this.start();
+ }
+
+ /**
+ * This is used to process the <code>Operation</code> object. This
+ * will wake up the selector if it is currently blocked selecting
+ * and register the operations associated channel. Once the
+ * selector is awake it will acquire the operation from the queue
+ * and register the associated <code>SelectableChannel</code> for
+ * selection. The operation will then be executed when the channel
+ * is ready for the interested I/O events.
+ *
+ * @param task this is the task that is scheduled for distribution
+ * @param require this is the bit-mask value for interested events
+ */
+ public void process(Operation task, int require) throws IOException {
+ Action action = new ExecuteAction(task, require, expiry);
+
+ if(!isActive()) {
+ throw new IOException("Distributor is closed");
+ }
+ pending.offer(action);
+ selector.wake();
+ }
+
+ /**
+ * This is used to close the distributor such that it cancels all
+ * of the registered channels and closes down the selector. This
+ * is used when the distributor is no longer required, after the
+ * close further attempts to process operations will fail.
+ */
+ public void close() throws IOException {
+ stop();
+ selector.wake();
+ latch.close();
+ }
+
+ /**
+ * This returns the number of channels that are currently selecting
+ * with this distributor. When busy this can get quite high, however
+ * it must return to zero as soon as all tasks have completed.
+ *
+ * @return return the number of channels currently selecting
+ */
+ public int size() {
+ return selecting.size();
+ }
+
+ /**
+ * Performs the execution of the distributor. Each distributor runs
+ * on an asynchronous thread to the <code>Reactor</code> which is
+ * used to perform the selection on a set of channels. Each time
+ * there is a new operation to be processed this will take the
+ * operation from the ready queue, cancel all outstanding channels,
+ * and register the operations associated channel for selection.
+ */
+ public void run() {
+ try {
+ execute();
+ } finally {
+ purge();
+ }
+ }
+
+ /**
+ * Performs the execution of the distributor. Each distributor runs
+ * on an asynchronous thread to the <code>Reactor</code> which is
+ * used to perform the selection on a set of channels. Each time
+ * there is a new operation to be processed this will take the
+ * operation from the ready queue, cancel all outstanding channels,
+ * and register the operations associated channel for selection.
+ */
+ private void execute() {
+ while(isActive()) {
+ try {
+ register();
+ cancel();
+ expire();
+ distribute();
+ validate();
+ } catch(Exception cause) {
+ report(cause);
+ }
+ }
+ }
+
+ /**
+ * This will purge all the actions from the distributor when the
+ * distributor ends. If there are any threads waiting on the close
+ * to finish they are signalled when all operations are purged.
+ * This will allow them to return ensuring no operations linger.
+ */
+ private void purge() {
+ try {
+ register();
+ cancel();
+ clear();
+ } catch(Exception cause) {
+ report(cause);
+ }
+ }
+
+ /**
+ * This method is called to ensure that if there is a global
+ * error that each action will know about it. Such an issue could
+ * be file handle exhaustion or an out of memory error. It is
+ * also possible that a poorly behaving action could cause an
+ * issue which should be know the the entire system.
+ *
+ * @param cause this is the exception to report
+ */
+ private void report(Exception cause) {
+ Set<Channel> channels = selecting.keySet();
+
+ for(Channel channel : channels) {
+ ActionSet set = selecting.get(channel);
+ Action[] list = set.list();
+
+ for(Action action : list) {
+ Operation operation = action.getOperation();
+ Trace trace = operation.getTrace();
+
+ try {
+ trace.trace(ERROR, cause);
+ } catch(Exception e) {
+ invalid.offer(channel);
+ }
+ }
+ }
+ invalid.clear();
+ }
+
+ /**
+ * Here we perform an expire which will take all of the registered
+ * sockets and expire it. This ensures that the operations can be
+ * executed within the executor and the cancellation of the sockets
+ * can be performed. Once this method has finished then all of
+ * the operations will have been scheduled for execution.
+ */
+ private void clear() throws IOException {
+ List<ActionSet> sets = selector.registeredSets();
+
+ for(ActionSet set : sets) {
+ Action[] list = set.list();
+
+ for(Action action : list) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+
+ try {
+ trace.trace(CLOSE_SELECTOR);
+ expire(set, Long.MAX_VALUE);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+ }
+ selector.close();
+ latch.signal();
+ }
+
+ /**
+ * This method is used to expire registered operations that remain
+ * idle within the selector. Operations specify a time at which
+ * point they wish to be cancelled if the I/O event they wait on
+ * has not arisen. This will enables the cancelled operation to be
+ * cancelled so that the resources it occupies can be released.
+ */
+ private void expire() throws IOException {
+ List<ActionSet> sets = selector.registeredSets();
+
+ if(cancel) {
+ long time = System.currentTimeMillis();
+
+ if(update <= time) {
+ for(ActionSet set : sets) {
+ expire(set, time);
+ }
+ update = time +10000;
+ }
+ }
+ }
+
+ /**
+ * This method is used to expire registered operations that remain
+ * idle within the selector. Operations specify a time at which
+ * point they wish to be if the I/O event they wait on
+ * has not arisen. This will enables the cancelled operation to be
+ * cancelled so that the resources it occupies can be released.
+ *
+ * @param set this is the selection set check for expired actions
+ * @param time this is the time to check the expiry against
+ */
+ private void expire(ActionSet set, long time) throws IOException {
+ Action[] actions = set.list();
+ SelectionKey key = set.key();
+
+ if(key.isValid()) {
+ int mask = key.interestOps();
+
+ for(Action action : actions) {
+ int interest = action.getInterest();
+ long expiry = action.getExpiry();
+
+ if(expiry < time) {
+ expire(set, action);
+ mask &= ~interest;
+ }
+ }
+ update(set, mask);
+ }
+ }
+
+ /**
+ * This is used to update the interested operations of a set of
+ * actions. If there are no interested operations the set will be
+ * cancelled, otherwise the selection key will be updated with the
+ * new operations provided by the bitmask.
+ *
+ * @param set this is the action set that is to be updated
+ * @param interest this is the bitmask containing the operations
+ */
+ private void update(ActionSet set, int interest) throws IOException {
+ SelectionKey key = set.key();
+
+ if(interest == 0) {
+ Channel channel = key.channel();
+
+ selecting.remove(channel);
+ key.cancel();
+ } else {
+ key.interestOps(interest);
+ }
+ }
+
+ /**
+ * This method is used to expire registered operations that remain
+ * idle within the selector. Operations specify a time at which
+ * point they wish to be cancelled if the I/O event they wait on
+ * has not arisen. This will enables the cancelled operation to be
+ * cancelled so that the resources it occupies can be released.
+ *
+ * @param set this is the action set containing the actions
+ * @param action this is the actual action to be cancelled
+ */
+ private void expire(ActionSet set, Action action) throws IOException {
+ Action cancel = new CancelAction(action);
+
+ if(set != null) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+ int interest = action.getInterest();
+
+ try {
+ trace.trace(SELECT_EXPIRED, interest);
+ set.remove(interest);
+ execute(cancel);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+ }
+
+ /**
+ * This method is used to perform simple validation. It ensures
+ * that directly after the processing loop any channels that
+ * are registered that have been cancelled or are closed will
+ * be removed from the selecting map and rejected.
+ */
+ private void validate() throws IOException {
+ Set<Channel> channels = selecting.keySet();
+
+ for(Channel channel : channels) {
+ ActionSet set = selecting.get(channel);
+ SelectionKey key = set.key();
+
+ if(!key.isValid()) {
+ invalid.offer(channel);
+ }
+ }
+ for(Channel channel : invalid) {
+ invalidate(channel);
+ }
+ invalid.clear();
+ }
+
+ /**
+ * This method is used to remove the channel from the selecting
+ * registry. It is rare that this will every happen, however it
+ * is important that tasks are cleared out in this manner as it
+ * could lead to a memory leak if left for a long time.
+ *
+ * @param channel this is the channel being validated
+ */
+ private void invalidate(Channel channel) throws IOException {
+ ActionSet set = selecting.remove(channel);
+ Action[] list = set.list();
+
+ for(Action action : list) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+
+ try {
+ trace.trace(INVALID_KEY);
+ execute(action); // reject
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+ }
+
+ /**
+ * This is used to cancel any selection keys that have previously
+ * been selected with an interested I/O event. Performing a cancel
+ * here ensures that on a the next select the associated channel
+ * is not considered, this ensures the select does not break.
+ */
+ private void cancel() throws IOException {
+ Collection<ActionSet> list = executing.values();
+
+ for(ActionSet set : list) {
+ Action[] actions = set.list();
+
+ for(Action action : actions) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+
+ trace.trace(SELECT_CANCEL);
+ }
+ set.cancel();
+ set.clear();
+ }
+ executing.clear();
+ }
+
+ /**
+ * Here all the enqueued <code>Operation</code> objects will be
+ * registered for selection. Each operations channel is used for
+ * selection on the interested I/O events. Once the I/O event
+ * occurs for the channel the operation is scheduled for execution.
+ */
+ private void register() throws IOException {
+ while(!pending.isEmpty()) {
+ Action action = pending.poll();
+
+ if(action != null) {
+ SelectableChannel channel = action.getChannel();
+ ActionSet set = executing.remove(channel);
+
+ if(set == null) {
+ set = selecting.get(channel);
+ }
+ if(set != null) {
+ update(action, set);
+ } else {
+ register(action);
+ }
+ }
+ }
+ }
+
+ /**
+ * Here the specified <code>Operation</code> object is registered
+ * with the selector. If the associated channel had previously
+ * been cancelled it is removed from the cancel map to ensure it
+ * is not removed from the selector when cancellation is done.
+ *
+ * @param action this is the operation that is to be registered
+ */
+ private void register(Action action) throws IOException {
+ SelectableChannel channel = action.getChannel();
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+
+ try {
+ if(channel.isOpen()) {
+ trace.trace(SELECT);
+ select(action);
+ } else {
+ trace.trace(CHANNEL_CLOSED);
+ selecting.remove(channel);
+ execute(action); // reject
+ }
+ }catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+
+ /**
+ * Here the specified <code>Operation</code> object is registered
+ * with the selector. If the associated channel had previously
+ * been cancelled it is removed from the cancel map to ensure it
+ * is not removed from the selector when cancellation is done.
+ *
+ * @param action this is the operation that is to be registered
+ * @param set this is the action set to register the action with
+ */
+ private void update(Action action, ActionSet set) throws IOException {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+ SelectionKey key = set.key();
+ int interest = action.getInterest();
+ int current = key.interestOps();
+ int updated = current | interest;
+
+ try {
+ if(OP_READ == (interest & OP_READ)) {
+ trace.trace(UPDATE_READ_INTEREST);
+ }
+ if(OP_WRITE == (interest & OP_WRITE)) {
+ trace.trace(UPDATE_WRITE_INTEREST);
+ }
+ trace.trace(UPDATE_INTEREST, updated);
+ key.interestOps(updated);
+ set.attach(action);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+
+ /**
+ * This method is used to perform an actual select on a channel. It
+ * will register the channel with the internal selector using the
+ * required I/O event bit mask. In order to ensure that selection
+ * is performed correctly the provided channel must be connected.
+ *
+ * @param action this is the operation that is to be registered
+ *
+ * @return this returns the selection key used for selection
+ */
+ private void select(Action action) throws IOException {
+ SelectableChannel channel = action.getChannel();
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+ int interest = action.getInterest();
+
+ if(interest > 0) {
+ ActionSet set = selector.register(channel, interest);
+
+ if(OP_READ == (interest & OP_READ)) {
+ trace.trace(REGISTER_READ_INTEREST);
+ }
+ if(OP_WRITE == (interest & OP_WRITE)) {
+ trace.trace(REGISTER_WRITE_INTEREST);
+ }
+ trace.trace(REGISTER_INTEREST, interest);
+ set.attach(action);
+ selecting.put(channel, set);
+ }
+ }
+
+ /**
+ * This method is used to perform the select and if required queue
+ * the operations that are ready for execution. If the selector
+ * is woken up without any ready channels then this will return
+ * quietly. If however there are a number of channels ready to be
+ * processed then they are handed to the executor object and
+ * marked as ready for cancellation.
+ */
+ private void distribute() throws IOException {
+ if(selector.select(5000) > 0) {
+ if(isActive()) {
+ process();
+ }
+ }
+ }
+
+ /**
+ * This will iterate over the set of selection keys and process each
+ * of them. The <code>Operation</code> associated with the selection
+ * key is handed to the executor to perform the channel operation.
+ * Also, if configured to cancel, this method will add the channel
+ * and the associated selection key to the cancellation map.
+ */
+ private void process() throws IOException{
+ List<ActionSet> ready = selector.selectedSets();
+
+ for(ActionSet set : ready) {
+ process(set);
+ remove(set);
+ }
+ }
+
+ /**
+ * This will use the specified action set to acquire the channel
+ * and <code>Operation</code> associated with it to hand to the
+ * executor to perform the channel operation.
+ *
+ * @param set this is the set of actions that are to be processed
+ */
+ private void process(ActionSet set) throws IOException {
+ Action[] actions = set.ready();
+
+ for(Action action : actions) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+ int interest = action.getInterest();
+
+ try {
+ if(OP_READ == (interest & OP_READ)) {
+ trace.trace(READ_INTEREST_READY, interest);
+ }
+ if(OP_WRITE == (interest & OP_WRITE)) {
+ trace.trace(WRITE_INTEREST_READY, interest);
+ }
+ execute(action);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+ }
+
+ /**
+ * This method ensures that references to the actions and channel
+ * are cleared from this instance. To ensure there are no memory
+ * leaks it is important to clear out all actions and channels.
+ * Also, if configured to cancel executing actions this will
+ * register the channel and actions to cancel on the next loop.
+ *
+ * @param set this is the set of actions that are to be removed
+ */
+ private void remove(ActionSet set) throws IOException {
+ Channel channel = set.channel();
+ SelectionKey key = set.key();
+
+ if(key.isValid()) {
+ int interest = set.interest();
+ int ready = key.readyOps();
+
+ if(cancel) {
+ int remaining = interest & ~ready;
+
+ if(remaining == 0) {
+ executing.put(channel, set);
+ } else {
+ key.interestOps(remaining);
+ }
+ set.remove(ready);
+ }
+ } else {
+ selecting.remove(channel);
+ }
+ }
+
+ /**
+ * This is where the action is handed off to the executor. Before
+ * the action is executed a trace event is generated, this will
+ * ensure that the entry and exit points can be tracked. It is
+ * also useful in debugging performance issues and memory leaks.
+ *
+ * @param action this is the action to execute
+ */
+ private void execute(Action action) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+ int interest = action.getInterest();
+
+ try {
+ trace.trace(EXECUTE_ACTION, interest);
+ executor.execute(action);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+}
+
+ \ No newline at end of file