summaryrefslogtreecommitdiffstats
path: root/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushScheduler.java
blob: 43a64ed56ba9486112b39e486c1884d302d90abf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/*
 * FlushScheduler.java February 2008
 *
 * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
 * implied. See the License for the specific language governing 
 * permissions and limitations under the License.
 */

package org.simpleframework.transport;

import static java.nio.channels.SelectionKey.OP_WRITE;
import static org.simpleframework.transport.TransportEvent.WRITE_BLOCKING;
import static org.simpleframework.transport.TransportEvent.WRITE_WAIT;

import java.io.IOException;

import org.simpleframework.transport.reactor.Operation;
import org.simpleframework.transport.reactor.Reactor;
import org.simpleframework.transport.trace.Trace;

/**
 * The <code>FlushScheduler</code> object is used to schedule a task 
 * for execution when it is write ready. This is used by the socket
 * flusher to ensure that the writing thread can be blocked until
 * such time as all the bytes required to be written are written.
 * <p>
 * All methods are invoked by a <code>SocketFlusher</code> object
 * which is synchronized. This ensures that the methods of the 
 * scheduler are thread safe in that only one thread will access
 * them at any given time. The lock used by the socket flusher can
 * thus be safely as it will be synchronized on by the flusher.
 * 
 * @author Niall Gallagher
 *
 * @see org.simpleframework.transport.SocketFlusher
 */
class FlushScheduler {
   
   /**
    * This is the operation that is scheduled for execution.
    */
   private Operation task;
   
   /**
    * This is the reactor to used to execute the operation.
    */
   private Reactor reactor;
   
   /**
    * This is the trace that listens to all transport events.
    */
   private Trace trace;
   
   /**
    * This is the lock that is used to signal a blocked thread.
    */
   private Object lock;
   
   /**
    * This is used to determine if the scheduler is running.
    */
   private volatile boolean running;
   
   /**
    * This is used to determine if the scheduler is interrupted.
    */
   private volatile boolean closed;
   
   /**
    * This is used to determine if there is currently a flush.
    */
   private volatile boolean flushing;
   
   /**
    * Constructor for the <code>FlushScheduler</code> object. This 
    * is* used to create a scheduler that will execute the provided
    * task when the associated socket is write ready. 
    * 
    * @param socket this is the associated socket for the scheduler
    * @param reactor this is the rector used to schedule execution
    * @param task this is the task that is executed when writable
    * @param lock this is the lock used to signal blocking threads
    */
   public FlushScheduler(Socket socket, Reactor reactor, Operation task, Object lock) {
      this.trace = socket.getTrace();
      this.reactor = reactor;
      this.task = task;
      this.lock = lock;
   }
   
   /**
    * This is used to repeat schedule the operation for execution.
    * This is executed if the operation has not fully completed
    * its task. If the scheduler is not in a running state then
    * this will not schedule the task for a repeat execution.
    */
   public void repeat() throws IOException {
      if(closed) {
         throw new TransportException("Socket closed");
      }
      if(running) {
         trace.trace(WRITE_WAIT);
         reactor.process(task, OP_WRITE);
      }
   }

   /**
    * This is used to schedule the task for execution. If this is
    * given a boolean true to indicate that it wishes to block
    * then this will block the calling thread until such time as
    * the <code>ready</code> method is invoked.
    * 
    * @param block indicates whether the thread should block
    */
   public void schedule(boolean block) throws IOException {
      if(closed) {
         throw new TransportException("Socket closed");
      }
      if(!running) {
         trace.trace(WRITE_WAIT);
         reactor.process(task, OP_WRITE);
         running = true;
      }
      if(block) {
         listen();
      }
   }
   
   /**
    * This is used to listen for a notification from the reactor to
    * tell the thread that the write operation has completed. If
    * the thread is interrupted upon this call then this will throw
    * an <code>IOException</code> with the root cause.
    */
   private void listen() throws IOException {
      if(flushing) {
         throw new TransportException("Socket already flushing");
      }
      try {
         if(!closed) {
            try {
               flushing = true;
               trace.trace(WRITE_BLOCKING);
               lock.wait(120000);
            } finally {
               flushing = false;
            }
         }
      } catch(Exception e) {
         throw new TransportException("Could not schedule for flush", e);
      }
      if(closed) {
         throw new TransportException("Socket closed");
      }
   }
      
   /**
    * This is used to notify any waiting threads that they no longer
    * need to wait. This is used when the flusher no longer needs
    * the waiting thread to block. Such an occurrence happens when
    * all shared data has been written or has been duplicated.    
    */
   public void release() {
      lock.notifyAll();
   }

   /**
    * This is used to signal any blocking threads to wake up. When
    * this is invoked blocking threads are signaled and they can
    * return. This is typically done when the task has finished.
    */
   public void ready() {
      lock.notifyAll();
      running = false;
   }
   
   /**
    * This is used to close the scheduler when the reactor is
    * closed by the server. An close will happen when the server
    * has been shutdown, it ensures there are no threads lingering
    * waiting for a notification when the reactor has closed.
    */
   public void close() {
      lock.notifyAll();
      closed = true;
   }
}