summaryrefslogtreecommitdiffstats
path: root/simple/simple-http/src/main/java/org/simpleframework/http/message/FileUploadConsumer.java
blob: 831801382d10b7df75292deed8af24400d9f836a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
/*
 * FileUploadConsumer.java February 2013
 *
 * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
 * implied. See the License for the specific language governing 
 * permissions and limitations under the License.
 */

package org.simpleframework.http.message;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import org.simpleframework.common.buffer.Allocator;
import org.simpleframework.transport.ByteCursor;

/**
 * The <code>FileUploadConsumer</code> object is used to consume a
 * list of parts encoded in the multipart format. This is can consume 
 * any number of parts from a cursor. Each part consumed is added to an
 * internal part list which can be used to acquire the contents of the
 * upload and inspect the headers provided for each uploaded part. To
 * ensure that only a fixed number of bytes are consumed this wraps
 * the provided cursor with a counter to ensure reads a limited amount.
 * 
 * @author Niall Gallagher
 */
public class FileUploadConsumer implements BodyConsumer {

   /**
    * This is used to read and parse the contents of the part series.
    */
   private final BodyConsumer consumer;
   
   /**
    * This counts the number of bytes remaining the the part series. 
    */
   private final AtomicLong count;
   
   /**
    * Constructor for the <code>FileUploadConsumer</code> object.
    * This is used to create an object that read a series of parts 
    * from a fixed length body. When consuming the body this will not
    * read any more than the content length from the cursor.
    * 
    * @param allocator this is the allocator used to allocate buffers
    * @param boundary this is the boundary that is used by this
    * @param length this is the number of bytes for this part series
    */
   public FileUploadConsumer(Allocator allocator, byte[] boundary, long length) {
      this.consumer = new PartSeriesConsumer(allocator, boundary, length);
      this.count = new AtomicLong(length);
   }   
   
   /**
    * This is used to acquire the body that has been consumed. This
    * will return a body which can be used to read the content of
    * the message, also if the request is multipart upload then all
    * of the parts are provided as <code>Part</code> objects. 
    * Each part can then be read as an individual message.
    *  
    * @return the body that has been consumed by this instance
    */
   public Body getBody() {
      return consumer.getBody();
   }
   
   /**
    * This method is used to consume bytes from the provided cursor.
    * Consuming of bytes from the cursor should be done in such a
    * way that it does not block. So typically only the number of
    * ready bytes in the <code>ByteCursor</code> object should be 
    * read. If there are no ready bytes then this will return.
    *
    * @param cursor used to consume the bytes from the HTTP pipeline
    */ 
   public void consume(ByteCursor cursor) throws IOException {
      ByteCounter counter = new ByteCounter(cursor);
      
      while(counter.isReady()) {
         if(consumer.isFinished()) {
            break;
         }
         consumer.consume(counter);
      }
   }

   /**
    * This is used to determine whether the consumer has finished 
    * reading. The consumer is considered finished if it has read a
    * terminal token or if it has exhausted the stream and can not
    * read any more. Once finished the consumed bytes can be parsed.
    *
    * @return true if the consumer has finished reading its content
    */ 
   public boolean isFinished() {
      long remaining = count.get();
      
      if(consumer.isFinished()) {
         return true;
      }
      return remaining <= 0;
   }
   
   /**
    * The <code>ByteCounter</code> is a wrapper for a cursor that can
    * be used to restrict the number of bytes consumed. This will
    * count the bytes consumed and ensure that any requested data is
    * restricted to a chunk less than or equal to the remaining bytes.
    */
   private class ByteCounter implements ByteCursor {
      
      /**
       * This is the cursor that this counter will delegate to.
       */
      private final ByteCursor cursor;
      
      /**
       * Constructor for the <code>Counter</code> object. This is used
       * to create a special cursor that counts the bytes read and 
       * limits reads to the remaining bytes left in the part series. 
       * 
       * @param cursor this is the cursor that is delegated to
       */
      public ByteCounter(ByteCursor cursor) {
         this.cursor = cursor;
      }

      /**
       * Determines whether the cursor is still open. The cursor is
       * considered open if there are still bytes to read. If there is
       * still bytes buffered and the underlying transport is closed
       * then the cursor is still considered open. 
       * 
       * @return true if the read method does not return a -1 value
       */
      public boolean isOpen() throws IOException {
         return cursor.isOpen();
      }

      /**
       * Determines whether the cursor is ready for reading. When the
       * cursor is ready then it guarantees that some amount of bytes
       * can be read from the underlying stream without blocking.
       *
       * @return true if some data can be read without blocking
       */ 
      public boolean isReady() throws IOException {
         long limit = count.get();         
         
         if(limit > 0) {
            return cursor.isReady();
         }
         return false;
      }

      /**
       * Provides the number of bytes that can be read from the stream
       * without blocking. This is typically the number of buffered or
       * available bytes within the stream. When this reaches zero then
       * the cursor may perform a blocking read.
       *
       * @return the number of bytes that can be read without blocking
       */
      public int ready() throws IOException {
         int limit = (int)count.get();
         int ready = cursor.ready();
         
         if(ready > limit) {
            return limit;
         }
         return ready;
      }

      /**
       * Reads a block of bytes from the underlying stream. This will
       * read up to the requested number of bytes from the underlying
       * stream. If there are no ready bytes on the stream this can 
       * return zero, representing the fact that nothing was read.
       *
       * @param data this is the array to read the bytes in to 
       *
       * @return this returns the number of bytes read from the stream 
       */
      public int read(byte[] data) throws IOException {
         return read(data, 0, data.length);
      }

      /**
       * Reads a block of bytes from the underlying stream. This will
       * read up to the requested number of bytes from the underlying
       * stream. If there are no ready bytes on the stream this can 
       * return zero, representing the fact that nothing was read.
       *
       * @param data this is the array to read the bytes in to
       * @param off this is the offset to begin writing the bytes to
       * @param len this is the number of bytes that are requested 
       *
       * @return this returns the number of bytes read from the stream 
       */ 
      public int read(byte[] data, int off, int len) throws IOException {
         int limit = (int)count.get();
         int size = Math.min(limit, len);
         int chunk = cursor.read(data, off, size);
         
         if(chunk > 0) {
            count.addAndGet(-chunk);
         }
         return chunk;
      }

      /**
       * Pushes the provided data on to the cursor. Data pushed on to
       * the cursor will be the next data read from the cursor. This
       * complements the <code>reset</code> method which will reset
       * the cursors position on a stream. Allowing data to be pushed
       * on to the cursor allows more flexibility.
       * 
       * @param data this is the data to be pushed on to the cursor
       */
      public void push(byte[] data) throws IOException {
         push(data, 0, data.length);
      }

      /**
       * Pushes the provided data on to the cursor. Data pushed on to
       * the cursor will be the next data read from the cursor. This
       * complements the <code>reset</code> method which will reset
       * the cursors position on a stream. Allowing data to be pushed
       * on to the cursor allows more flexibility.
       * 
       * @param data this is the data to be pushed on to the cursor
       * @param off this is the offset to begin reading the bytes
       * @param len this is the number of bytes that are to be used 
       */
      public void push(byte[] data, int off, int len) throws IOException {
         if(len > 0) {
            count.addAndGet(len);
         }
         cursor.push(data, off, len);
      }

      /**
       * Moves the cursor backward within the stream. This ensures 
       * that any bytes read from the last read can be pushed back
       * in to the stream so that they can be read again. This will
       * throw an exception if the reset can not be performed.
       *
       * @param len this is the number of bytes to reset back
       *
       * @return this is the number of bytes that have been reset
       */
      public int reset(int len) throws IOException {
         int reset = cursor.reset(len);
         
         if(reset > 0) {
            count.addAndGet(reset);
         }
         return reset;
      }  
   }
}