KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > enterprise > web > connector > grizzly > ReadTask


1 /*
2  * The contents of this file are subject to the terms
3  * of the Common Development and Distribution License
4  * (the License). You may not use this file except in
5  * compliance with the License.
6  *
7  * You can obtain a copy of the license at
8  * https://glassfish.dev.java.net/public/CDDLv1.0.html or
9  * glassfish/bootstrap/legal/CDDLv1.0.txt.
10  * See the License for the specific language governing
11  * permissions and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL
14  * Header Notice in each file and include the License file
15  * at glassfish/bootstrap/legal/CDDLv1.0.txt.
16  * If applicable, add the following below the CDDL Header,
17  * with the fields enclosed by brackets [] replaced by
18  * you own identifying information:
19  * "Portions Copyrighted [year] [name of copyright owner]"
20  *
21  * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
22  */

23 package com.sun.enterprise.web.connector.grizzly;
24
25 import java.io.IOException JavaDoc;
26
27 import java.net.Socket JavaDoc;
28
29 import java.nio.ByteBuffer JavaDoc;
30 import java.nio.channels.SocketChannel JavaDoc;
31
32 import java.util.logging.Level JavaDoc;
33
34 /**
35  * Read available data on a non blocking <code>SocketChannel</code>.
36  * <code>StreamAlgorithm</code> stategy will decide if more bytes are required
37  * or not. Once the <code>StreamAlgorithm</code> is ready, the
38  * <code>ProcessorTask</code> attached to this class will be executed.
39  *
40  * @author Scott Oaks
41  * @author Jean-Francois Arcand
42  */

43 public class ReadTask extends TaskBase {
44     
45
46     /**
47      * The <code>TaskContext</code> instance associated with this object.
48      * The <code>TaskContext</code> is initialized at startup and then recycled.
49      */

50     protected TaskContext taskContext;
51     
52     
53     /**
54      * The <code>TaskEvent</code> instance used by this object to notify its
55      * listeners
56      */

57     protected TaskEvent taskEvent;
58
59     
60     /**
61      * The <code>ByteBuffer</code> used by this task to buffer the request
62      * stream.
63      */

64     protected ByteBuffer JavaDoc byteBuffer;
65     
66     
67     /**
68      * The <code>ProcessorTask</code> used by this class.
69      */

70     protected ProcessorTask processorTask;
71   
72     
73     /**
74      * Max post size.
75      */

76     protected int maxPostSize = 25 * 1024 * 1024;
77      
78     
79     /**
80      * The recycled <code>OutputStream</code> used by this buffer.
81      */

82     protected ByteBufferInputStream inputStream;
83
84
85     /**
86      * The <code>Algorithm</code> used to parse the request and determine
87      * of the bytes has been all read from the <code>SocketChannel</code>
88      */

89     protected StreamAlgorithm algorithm;
90     
91     
92     /**
93      * <code>true</code> only when another object has already read bytes
94      * from the channel.
95      */

96     protected boolean bytesAvailable = false;
97
98     
99     /**
100      * Has the task been returned to the pool
101      */

102     protected volatile boolean isReturned = false;
103
104
105     // ----------------------------------------------------- Constructor ----/
106

107     
108     public ReadTask(){
109         ;//
110
}
111     
112     
113     public ReadTask(StreamAlgorithm algorithm,
114                     boolean useDirectByteBuffer, boolean useByteBufferView){
115         type = READ_TASK;
116         this.algorithm = algorithm;
117         byteBuffer = algorithm.allocate(useDirectByteBuffer,useByteBufferView);
118         inputStream = new ByteBufferInputStream();
119     }
120     
121     
122     /**
123      * Force this task to always use the same <code>ProcessorTask</code> instance.
124      */

125     public void attachProcessor(ProcessorTask processorTask){
126         this.processorTask = processorTask;
127         configureProcessorTask();
128     }
129     
130     
131     /**
132      * Set appropriate attribute on the <code>ProcessorTask</code>.
133      */

134     public void configureProcessorTask(){
135         // Disable blocking keep-alive mechanism. Keep-Alive mechanism
136
// will be managed by this class instead.
137
processorTask.useAlternateKeepAlive(true);
138         processorTask.setSelectionKey(key);
139         processorTask.setSocket(((SocketChannel JavaDoc)key.channel()).socket());
140         processorTask.setHandler(algorithm.getHandler());
141
142     }
143     
144       
145     /**
146      * Return the <code>ProcessorTask</code> to the pool.
147      */

148     public void detachProcessor(){
149         if (processorTask != null){
150             processorTask.recycle();
151         }
152         
153         // Notify listeners
154
if ( listeners != null ) {
155             for (int i=listeners.size()-1; i > -1; i--){
156                 if ( taskEvent == null ) {
157                     taskEvent = new TaskEvent<ReadTask>();
158                 }
159                 taskEvent.attach(this);
160                 taskEvent.setStatus(TaskEvent.COMPLETED);
161                 listeners.get(i).taskEvent(taskEvent);
162             }
163             clearTaskListeners();
164         }
165         
166         if (recycle && processorTask != null){
167             selectorThread.returnTask(processorTask);
168             processorTask = null;
169         }
170     }
171     
172     
173     /**
174      * Read data from the socket and process it using this thread, and only if
175      * the <code>StreamAlgorith</code> stategy determine no more bytes are
176      * are needed.
177      */

178     public void doTask() throws IOException JavaDoc {
179         int count = 0;
180         Socket JavaDoc socket = null;
181         SocketChannel JavaDoc socketChannel = null;
182         boolean keepAlive = false;
183         Exception JavaDoc exception = null;
184         isReturned = false;
185         
186         try {
187             socketChannel = (SocketChannel JavaDoc)key.channel();
188             socket = socketChannel.socket();
189             algorithm.setSocketChannel(socketChannel);
190            
191             int loop = 0;
192             int bufferSize = 0;
193             while ( socketChannel.isOpen() && (bytesAvailable ||
194                     ((count = socketChannel.read(byteBuffer))> -1))){
195
196                 // Avoid calling the Selector.
197
if ( count == 0 && !bytesAvailable){
198                     loop++;
199                     if (loop > 2){
200                         break;
201                     }
202                     continue;
203                 }
204                 bytesAvailable = false;
205                 
206                 byteBuffer = algorithm.preParse(byteBuffer);
207                 inputStream.setByteBuffer(byteBuffer);
208                 inputStream.setSelectionKey(key);
209                 
210                 // try to predict which HTTP method we are processing
211
if ( algorithm.parse(byteBuffer) ){
212                     keepAlive = executeProcessorTask();
213                     if (!keepAlive) {
214                         break;
215                     }
216                 } else {
217                     // We must call the Selector since we don't have all the
218
// bytes
219
keepAlive = true;
220                 }
221             }
222         // Catch IO AND NIO exception
223
} catch (IOException JavaDoc ex) {
224             exception = ex;
225         } catch (RuntimeException JavaDoc ex) {
226             exception = ex;
227         } finally {
228             manageKeepAlive(keepAlive,count,exception);
229         }
230     }
231
232
233     /**
234      * Evaluate if the <code>SelectionKey</code> needs to be registered to
235      * the main <code>Selector</code>
236      */

237     protected void manageKeepAlive(boolean keepAlive,int count,
238             Exception JavaDoc exception){
239
240         // The key is invalid when the Task has been cancelled.
241
if ( count == -1 || !key.isValid() || exception != null ){
242             keepAlive = false;
243             
244             if ( exception != null){
245                 // Make sure we have detached the processorTask
246
detachProcessor();
247                 SelectorThread.logger().
248                   log(Level.FINEST, "SocketChannel Read Exception: ",exception);
249             }
250         }
251
252         if (keepAlive) {
253             registerKey();
254         }
255             
256         terminate(keepAlive);
257     }
258  
259     
260     /**
261      * Execute the <code>ProcessorTask</code> only if the request has
262      * been fully read. Guest the size of the request by using the
263      * content-type HTTP headers.
264      * @return false if the request wasn't fully read by the channel.
265      * so we need to respin the key on the Selector.
266      */

267     public boolean executeProcessorTask() throws IOException JavaDoc{
268         boolean registerKey = false;
269         
270         if (SelectorThread.logger().isLoggable(Level.FINEST))
271             SelectorThread.logger().log(Level.FINEST,"executeProcessorTask");
272         
273         if ( algorithm.getHandler()
274                 .handle(null, Handler.REQUEST_BUFFERED) == Handler.BREAK ){
275             return true;
276         }
277         
278         // Get a processor task. If the processorTask != null, that means we
279
// failed to load all the bytes in a single channel.read().
280
if (processorTask == null){
281             attachProcessor(selectorThread.getProcessorTask());
282         }
283         
284         try {
285             // The socket might not have been read entirely and the parsing
286
// will fail, so we need to respin another event.
287
registerKey = processorTask.process(inputStream,null);
288         } catch (Exception JavaDoc e) {
289             SelectorThread.logger()
290                 .log(Level.SEVERE,"readTask.processException", e);
291         }
292         detachProcessor();
293         return registerKey;
294     }
295
296     
297     /**
298      * Return this object to the pool
299      */

300     protected void returnTask(){
301         if ( recycle && !isReturned ) {
302             isReturned = true;
303             selectorThread.returnTask(this);
304         }
305     }
306     
307
308     public void taskEvent(TaskEvent event){
309         if (event.getStatus() == TaskEvent.COMPLETED){
310             terminate(false);
311         }
312     }
313     
314
315     /**
316      * Complete the processing.
317      */

318     public void terminate(boolean keepAlive){
319         // Safeguard to avoid returning the instance more than once.
320
if ( isReturned ){
321             return;
322         }
323         
324         if ( !keepAlive ){
325             finishConnection();
326         }
327         recycle();
328         returnTask();
329     }
330     
331     
332     /**
333      * Clear the current state and make this object ready for another request.
334      */

335     public void recycle(){
336         byteBuffer = algorithm.postParse(byteBuffer);
337
338         byteBuffer.clear();
339         inputStream.recycle();
340         algorithm.recycle();
341         key = null;
342         inputStream.setSelectionKey(null);
343     }
344     
345     // -------------------------------------------------------- TaskEvent ---//
346

347        
348     /**
349      * Cancel the <code>SelectionKey</code> and close its underlying
350      * <code>SocketChannel</code>. Add this <code>Task</code> to the Keep-Alive
351      * sub-system.
352      */

353     protected void finishConnection(){
354         
355         if (SelectorThread.logger().isLoggable(Level.FINEST))
356             SelectorThread.logger().log(Level.FINEST,"finishConnection");
357         
358         try{
359             if (taskContext != null){
360                 taskContext.recycle();
361             }
362         } catch (IOException JavaDoc ioe){
363             ;
364         }
365
366         selectorThread.cancelKey(key);
367     }
368
369     
370     /**
371      * Register the <code>SelectionKey</code> with the <code>Selector</code>
372      */

373     protected void registerKey(){
374         if (key.isValid()){
375             if (SelectorThread.logger().isLoggable(Level.FINEST))
376                 SelectorThread.logger().log(Level.FINEST,"registerKey");
377
378             selectorThread.registerKey(key);
379         } else {
380             terminate(false);
381         }
382     }
383     
384     
385     // -------------------------------------------------------- getter/setter--/
386

387
388     /**
389      * Return the associated <code>ProcessorTask</code>.
390      * @return the associated <code>ProcessorTask</code>, null if not used.
391      */

392     public ProcessorTask getProcessorTask(){
393         return processorTask;
394     }
395     
396     
397     /**
398      * Set the algorithm used by this instance to predict the end of the NIO
399      * Stream.
400      */

401     public void setStreamAlgorithm(StreamAlgorithm algorithm){
402         this.algorithm = algorithm;
403     }
404     
405     
406     /**
407      * Return the underlying <code>ByteBuffer</code> used by this class.
408      */

409     public ByteBuffer JavaDoc getByteBuffer(){
410         return byteBuffer;
411     }
412     
413     
414     /**
415      * If the attached byteBuffer was already filled, tell the
416      * Algorithm to re-use the bytes.
417      */

418     public void setBytesAvailable(boolean bytesAvailable){
419         this.bytesAvailable = bytesAvailable;
420     }
421
422 }
423
Popular Tags