KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > enterprise > web > connector > grizzly > SelectorThread


1 /*
2  * The contents of this file are subject to the terms
3  * of the Common Development and Distribution License
4  * (the License). You may not use this file except in
5  * compliance with the License.
6  *
7  * You can obtain a copy of the license at
8  * https://glassfish.dev.java.net/public/CDDLv1.0.html or
9  * glassfish/bootstrap/legal/CDDLv1.0.txt.
10  * See the License for the specific language governing
11  * permissions and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL
14  * Header Notice in each file and include the License file
15  * at glassfish/bootstrap/legal/CDDLv1.0.txt.
16  * If applicable, add the following below the CDDL Header,
17  * with the fields enclosed by brackets [] replaced by
18  * you own identifying information:
19  * "Portions Copyrighted [year] [name of copyright owner]"
20  *
21  * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
22  */

23 package com.sun.enterprise.web.connector.grizzly;
24
25 import java.io.IOException JavaDoc;
26 import java.io.InterruptedIOException JavaDoc;
27 import java.net.BindException JavaDoc;
28 import java.net.InetAddress JavaDoc;
29 import java.net.InetSocketAddress JavaDoc;
30 import java.net.ServerSocket JavaDoc;
31 import java.net.Socket JavaDoc;
32 import java.net.SocketException JavaDoc;
33 import java.util.Iterator JavaDoc;
34 import java.util.Set JavaDoc;
35 import java.util.logging.Logger JavaDoc;
36 import java.util.logging.Level JavaDoc;
37 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
38 import java.nio.channels.CancelledKeyException JavaDoc;
39 import java.nio.channels.Selector JavaDoc;
40 import java.nio.channels.SelectionKey JavaDoc;
41 import java.nio.channels.ServerSocketChannel JavaDoc;
42 import java.nio.channels.SocketChannel JavaDoc;
43
44 import java.security.AccessControlException JavaDoc;
45
46 import org.apache.coyote.Adapter;
47 import org.apache.coyote.RequestGroupInfo;
48 import org.apache.tomcat.util.net.ServerSocketFactory;
49 import org.apache.tomcat.util.net.SSLImplementation;
50
51 import com.sun.org.apache.commons.modeler.Registry;
52 import javax.management.ObjectName JavaDoc;
53 import javax.management.MBeanServer JavaDoc;
54 import javax.management.MBeanRegistration JavaDoc;
55
56 import com.sun.enterprise.web.connector.grizzly.algorithms.NoParsingAlgorithm;
57
58 /**
59  * This class implement an NIO socket HTTP Listener. This class
60  * supports three stagegy:
61  *
62  * Mode Blocking: This mode uses NIO blocking mode, and doesn't uses any of the
63  * java.nio.* classes.
64  *
65  *
66  * Mode Non-Blocking: This mode uses NIO non blocking mode and read the entire
67  * request stream before processing the request. The stragegy used is
68  * to find the content-lenght header and buffer bytes until the end of
69  * the stream is read.
70  *
71  * @author Jean-Francois Arcand
72  */

73 public class SelectorThread extends Thread JavaDoc implements MBeanRegistration JavaDoc{
74     
75     private int serverTimeout = Constants.DEFAULT_SERVER_SOCKET_TIMEOUT;
76
77     private InetAddress JavaDoc inet;
78     protected int port;
79
80     // SSL required stuff.
81
private ServerSocketFactory factory;
82     private ServerSocket JavaDoc serverSocket;
83     protected SSLImplementation sslImplementation = null;
84
85     
86     /**
87      * The <code>ServerSocketChannel</code> used in blocking mode.
88      */

89     private ServerSocketChannel JavaDoc serverSocketChannel;
90     
91     protected volatile boolean running = false;
92     private volatile boolean paused = false;
93     private boolean initialized = false;
94     private boolean reinitializing = false;
95     // ----------------------------------------------------- JMX Support ---/
96
protected String JavaDoc domain;
97     protected ObjectName JavaDoc oname;
98     protected ObjectName JavaDoc globalRequestProcessorName;
99     private ObjectName JavaDoc keepAliveMbeanName;
100     private ObjectName JavaDoc pwcConnectionQueueMbeanName;
101     private ObjectName JavaDoc pwcFileCacheMbeanName;
102     protected MBeanServer JavaDoc mserver;
103     protected ObjectName JavaDoc processorWorkerThreadName;
104
105
106     // ------------------------------------------------------Socket setting --/
107

108     protected boolean tcpNoDelay=false;
109     
110     
111     protected int linger=100;
112     
113     
114     protected int socketTimeout=-1;
115     
116     
117     protected int maxKeepAliveRequests = Constants.DEFAULT_MAX_KEEP_ALIVE;
118         
119     // ------------------------------------------------------ Properties----/
120

121     /**
122      * Default HTTP header buffer size.
123      */

124     protected int maxHttpHeaderSize = Constants.DEFAULT_HEADER_SIZE;
125
126
127     /**
128      * Number of polled <code>Read*Task</code> instance.
129      */

130     protected int minReadQueueLength = 10;
131
132
133     /**
134      * Number of polled <code>ProcessorTask</code> instance.
135      */

136     protected int minProcessorQueueLength = 10;
137
138
139     /**
140      * The <code>Selector</code> used by the connector.
141      */

142     protected Selector JavaDoc selector;
143
144
145     /**
146      * Associated adapter.
147      */

148     protected Adapter adapter = null;
149     
150     
151     /**
152      * Is SSL enabled
153      */

154     private boolean secure = false;
155
156     
157     /**
158      * The queue shared by this thread and code>ReadTask</code>
159      */

160     protected Pipeline readPipeline;
161     
162
163     /**
164      * The queue shared by this thread and the code>ProcessorTask</code>.
165      */

166     protected Pipeline processorPipeline;
167     
168   
169     /**
170      * Placeholder for <code>Pipeline</code> statistic.
171      */

172     protected PipelineStatistic pipelineStat;
173     
174     /**
175      * The default <code>Pipeline</code> used.
176      */

177     protected String JavaDoc pipelineClassName =
178         com.sun.enterprise.web.connector.grizzly.
179
            LinkedListPipeline.class.getName();
180     
181     /**
182      * Maximum number of <code>WorkerThread</code>
183      */

184     protected int maxProcessorWorkerThreads = 20; // By default
185

186     
187     /**
188      * Maximum number of <code>ReadWorkerThread</code>
189      */

190     protected int maxReadWorkerThreads = -1; // By default
191

192     
193     /**
194      * Minimum numbers of <code>WorkerThread</code> created
195      */

196     protected int minWorkerThreads = 5;
197     
198
199     /**
200      * Minimum numbers of <code>WorkerThread</code>
201      * before creating new thread.
202      * <implementation-note>
203      * Not used in 9.x
204      * </implementation-note>
205      */

206     protected int minSpareThreads = 2;
207
208     
209     /**
210      * The number used when increamenting the <code>Pipeline</code>
211      * thread pool.
212      */

213     protected int threadsIncrement = 1;
214     
215     
216     /**
217      * The timeout used by the thread when processing a request.
218      */

219     protected int threadsTimeout = Constants.DEFAULT_TIMEOUT;
220
221
222     /**
223      * Are we using NIO non bloking mode
224      */

225     protected boolean useNioNonBlocking = true;
226
227     
228     /**
229      * Is the <code>ByteBuffer</code> used by the <code>ReadTask</code> use
230      * direct <code>ByteBuffer</code> or not.
231      */

232     protected boolean useDirectByteBuffer = false;
233     
234   
235     /**
236      * Monitoring object used to store information.
237      */

238     private RequestGroupInfo globalRequestProcessor= new RequestGroupInfo();
239     
240     
241     /**
242      * Keep-alive stats
243      */

244     private KeepAliveStats keepAliveStats = new KeepAliveStats();
245
246
247     /**
248      * If <code>true</code>, display the NIO configuration information.
249      */

250     protected boolean displayConfiguration = false;
251     
252     
253     /**
254      * Is monitoring already started.
255      */

256     private boolean isMonitoringEnabled = false;
257     
258
259     /**
260      * The current number of simulatenous connection.
261      */

262     protected int currentConnectionNumber;
263
264
265     /**
266      * Is this Selector currently in Wating mode?
267      */

268     protected volatile boolean isWaiting = false;
269     
270
271     /**
272      * The input request buffer size.
273      */

274     protected int requestBufferSize = Constants.DEFAULT_REQUEST_BUFFER_SIZE;
275     
276     
277     /**
278      * Create view <code>ByteBuffer</code> from another <code>ByteBuffer</code>
279      */

280     protected boolean useByteBufferView = false;
281     
282
283     /*
284      * Number of seconds before idle keep-alive connections expire
285      */

286     private int keepAliveTimeoutInSeconds = Constants.DEFAULT_TIMEOUT;
287
288     
289     /**
290      * Number of seconds before idle keep-alive connections expire
291      */

292     private int kaTimeout = Constants.DEFAULT_TIMEOUT * 1000;
293     
294     
295     /**
296      * Recycle the <code>Task</code> after running them
297      */

298     protected boolean recycleTasks = Constants.DEFAULT_RECYCLE;
299     
300     
301     /**
302      * The <code>Selector</code> timeout value. By default, it is set to 60000
303      * miliseconds (as in the j2se 1.5 ORB).
304      */

305     protected static int selectorTimeout = 1000;
306
307
308     /**
309      * Maximum pending connection before refusing requests.
310      */

311     protected int maxQueueSizeInBytes = Constants.DEFAULT_QUEUE_SIZE;
312
313
314     /**
315      * The <code>Algorithm</code> used to predict the end of the NIO stream
316      */

317     protected Class JavaDoc algorithmClass;
318     
319     
320     /**
321      * The <code>Algorithm</code> used to parse the NIO stream.
322      */

323     protected String JavaDoc algorithmClassName = DEFAULT_ALGORITHM;
324     
325     
326     /**
327      * The default NIO stream algorithm.
328      */

329     public final static String JavaDoc DEFAULT_ALGORITHM =
330         com.sun.enterprise.web.connector.grizzly.algorithms.
331
            NoParsingAlgorithm.class.getName();
332
333     
334     /**
335      * Server socket backlog.
336      */

337     protected int ssBackLog = 4096;
338     
339     
340     /**
341      * Next time the exprireKeys() will delete keys.
342      */

343     private long nextKeysExpiration = 0;
344     
345     
346     /**
347      * The default response-type
348      */

349     protected String JavaDoc defaultResponseType =
350             org.apache.coyote.tomcat5.Constants.DEFAULT_RESPONSE_TYPE;
351
352
353     /**
354      * The forced response-type
355      */

356     protected String JavaDoc forcedResponseType =
357             org.apache.coyote.tomcat5.Constants.DEFAULT_RESPONSE_TYPE;
358     
359     
360     /**
361      * The root folder where application are deployed
362      */

363     protected static String JavaDoc rootFolder = "";
364     
365     // ----------------------------------------------------- Collections --//
366

367     
368     /**
369      * List of <code>SelectionKey</code> event to register next time the
370      * <code>Selector</code> wakeups. This is needed since there a bug
371      * in j2se 1.4.x that prevent registering selector event if the call
372      * is done on another thread.
373      */

374     private ConcurrentLinkedQueue JavaDoc<SelectionKey JavaDoc> keysToEnable =
375         new ConcurrentLinkedQueue JavaDoc<SelectionKey JavaDoc>();
376          
377     
378     // ---------------------------------------------------- Object pools --//
379

380
381     /**
382      * <code>ConcurrentLinkedQueue</code> used as an object pool.
383      * If the list becomes empty, new <code>ProcessorTask</code> will be
384      * automatically added to the list.
385      */

386     protected ConcurrentLinkedQueue JavaDoc<ProcessorTask> processorTasks =
387         new ConcurrentLinkedQueue JavaDoc<ProcessorTask>();
388                                                                              
389     /**
390      * <code>ConcurrentLinkedQueue</code> used as an object pool.
391      * If the list becomes empty, new <code>ReadTask</code> will be
392      * automatically added to the list.
393      */

394     protected ConcurrentLinkedQueue JavaDoc<ReadTask> readTasks =
395         new ConcurrentLinkedQueue JavaDoc<ReadTask>();
396
397     
398     /**
399      * List of active <code>ProcessorTask</code>.
400      */

401     protected ConcurrentLinkedQueue JavaDoc<ProcessorTask> activeProcessorTasks =
402         new ConcurrentLinkedQueue JavaDoc<ProcessorTask>();
403     
404     // ----------------------------------------- Multi-Selector supports --//
405

406     /**
407      * The number of <code>SelectorReadThread</code>
408      */

409     protected int selectorReadThreadsCount = 0;
410
411     
412     /**
413      * The <code>Selector</code> used to register OP_READ
414      */

415     protected SelectorReadThread[] readThreads;
416     
417     
418     /**
419      * The current <code>readThreads</code> used to process OP_READ.
420      */

421     int curReadThread;
422
423     
424     /**
425      * The logger used by the grizzly classes.
426      */

427     protected static Logger JavaDoc logger = Logger.getLogger("GRIZZLY");
428     
429     
430     // ----------------------------------------- Keep-Alive subsystems --//
431

432      
433     /**
434      * Keep-Alive subsystem. If a client opens a socket but never close it,
435      * the <code>SelectionKey</code> will stay forever in the
436      * <code>Selector</code> keys, and this will eventualy produce a
437      * memory leak.
438      */

439     protected KeepAlivePipeline keepAlivePipeline;
440     
441     
442     // ------------------------------------------------- FileCache support --//
443

444     
445     /**
446      * The FileCacheFactory associated with this Selector
447      */

448     protected FileCacheFactory fileCacheFactory;
449     
450         /**
451      * Timeout before remove the static resource from the cache.
452      */

453     protected int secondsMaxAge = -1;
454     
455     
456     /**
457      * The maximum entries in the <code>fileCache</code>
458      */

459     protected int maxCacheEntries = 1024;
460     
461  
462     /**
463      * The maximum size of a cached resources.
464      */

465     protected long minEntrySize = 2048;
466             
467                
468     /**
469      * The maximum size of a cached resources.
470      */

471     protected long maxEntrySize = 537600;
472     
473     
474     /**
475      * The maximum cached bytes
476      */

477     protected long maxLargeFileCacheSize = 10485760;
478  
479     
480     /**
481      * The maximum cached bytes
482      */

483     protected long maxSmallFileCacheSize = 1048576;
484     
485     
486     /**
487      * Is the FileCache enabled.
488      */

489     protected boolean isFileCacheEnabled = true;
490     
491     
492     /**
493      * Is the large FileCache enabled.
494      */

495     protected boolean isLargeFileCacheEnabled = true;
496     
497     // --------------------------------------------- Asynch supports -----//
498

499     /**
500      * Is asynchronous mode enabled?
501      */

502     protected boolean asyncExecution = false;
503     
504     
505     /**
506      * When the asynchronous mode is enabled, the execution of this object
507      * will be delegated to the <code>AsyncHandler</code>
508      */

509     protected AsyncHandler asyncHandler;
510     
511     
512     // ---------------------------------------------------- Constructor --//
513

514     
515     /**
516      * Create the <code>Selector</code> object. Each instance of this class
517      * will listen to a specific port.
518      */

519     public SelectorThread(){
520     }
521     
522     // ------------------------------------------------------ Selector hook --/
523

524     
525    /**
526      * Enable all registered interestOps. Due a a NIO bug, all interestOps
527      * invokation needs to occurs on the same thread as the selector thread.
528      */

529     public void enableSelectionKeys(){
530         SelectionKey JavaDoc selectionKey;
531         int size = keysToEnable.size();
532         long currentTime = (Long JavaDoc)System.currentTimeMillis();
533         for (int i=0; i < size; i++) {
534             selectionKey = keysToEnable.poll();
535
536             selectionKey.interestOps(
537                     selectionKey.interestOps() | SelectionKey.OP_READ);
538
539             selectionKey.attach(currentTime);
540             keepAlivePipeline.trap(selectionKey);
541         }
542     }
543     
544     
545     /**
546      * Register a <code>SelectionKey</code> to this <code>Selector</code>
547      * running of this thread.
548      */

549     public void registerKey(SelectionKey JavaDoc key){
550         if ( key == null ) return;
551         
552         if ( keepAlivePipeline.dropConnection() ) {
553             cancelKey(key);
554             return;
555         }
556         
557         // add SelectionKey & Op to list of Ops to enable
558
keysToEnable.add(key);
559         // tell the Selector Thread there's some ops to enable
560
selector.wakeup();
561         // wakeup() will force the SelectorThread to bail out
562
// of select() to process your registered request
563
}
564
565    // -------------------------------------------------------------- Init //
566

567
568     /**
569      * initialized the endpoint by creating the <code>ServerScoketChannel</code>
570      * and by initializing the server socket.
571      */

572     public void initEndpoint() throws IOException JavaDoc, InstantiationException JavaDoc {
573         SelectorThreadConfig.configure(this);
574         
575         initFileCacheFactory();
576         initPipeline();
577         initAlgorithm();
578         initMonitoringLevel();
579         
580         setName("SelectorThread-" + port);
581         
582         try{
583                     
584             // SSL is not yet supported by the VM
585
if (secure){
586                 useNioNonBlocking = false;
587             }
588             
589             if ( useNioNonBlocking ){
590                 // Create the socket listener
591
serverSocketChannel = ServerSocketChannel.open();
592                 selector = Selector.open();
593
594                 serverSocket = serverSocketChannel.socket();
595                 serverSocket.setReuseAddress(true);
596                 if ( inet == null)
597                     serverSocket.bind(new InetSocketAddress JavaDoc(port),ssBackLog);
598                 else
599                     serverSocket.bind(new InetSocketAddress JavaDoc(inet,port),ssBackLog);
600
601                 serverSocketChannel.configureBlocking(false);
602                 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
603             } else {
604                 if (inet == null) {
605                     serverSocket = factory.createSocket(port,ssBackLog);
606                 } else {
607                     serverSocket = factory.createSocket(port,ssBackLog,inet);
608                 }
609                 serverSocket.setReuseAddress(true);
610             }
611         } catch (SocketException JavaDoc ex){
612             throw new BindException JavaDoc(ex.getMessage() + ": " + port);
613         }
614         
615         serverSocket.setSoTimeout(serverTimeout);
616         
617         if (useNioNonBlocking){
618             if ( selectorReadThreadsCount > 1 ){
619                 readThreads = new SelectorReadThread[selectorReadThreadsCount];
620                 initSelectorReadThread();
621             } else {
622                 initProcessorTask(maxProcessorWorkerThreads);
623                 initReadTask(minReadQueueLength);
624             }
625             SelectorFactory.maxSelectors = maxProcessorWorkerThreads;
626         } else {
627             initReadBlockingTask(maxProcessorWorkerThreads);
628         }
629         
630         initialized = true;
631         
632         if ( useNioNonBlocking){
633             logger.log(Level.FINE,"Initializing Grizzly Non-Blocking Mode");
634         } else if ( !useNioNonBlocking ){
635             logger.log(Level.FINE,"Initializing Grizzly Blocking Mode");
636         }
637         
638     }
639      
640     
641     /**
642      * Create a new <code>Pipeline</code> instance using the
643      * <code>pipelineClassName</code> value.
644      */

645     protected Pipeline newPipeline(int maxThreads,
646                                    int minThreads,
647                                    String JavaDoc name,
648                                    int port,
649                                    int priority){
650         
651         Class JavaDoc className = null;
652         Pipeline pipeline = null;
653         try{
654             className = Class.forName(pipelineClassName);
655             pipeline = (Pipeline)className.newInstance();
656         } catch (ClassNotFoundException JavaDoc ex){
657             logger.log(Level.WARNING,
658                        "Unable to load Pipeline: " + pipelineClassName);
659             pipeline = new LinkedListPipeline();
660         } catch (InstantiationException JavaDoc ex){
661             logger.log(Level.WARNING,
662                        "Unable to instantiate Pipeline: "
663                        + pipelineClassName);
664             pipeline = new LinkedListPipeline();
665         } catch (IllegalAccessException JavaDoc ex){
666             logger.log(Level.WARNING,
667                        "Unable to instantiate Pipeline: "
668                        + pipelineClassName);
669             pipeline = new LinkedListPipeline();
670         }
671         
672         if (logger.isLoggable(Level.FINE)){
673             logger.log(Level.FINE,
674                        "http-listener " + port + " uses pipeline: "
675                        + pipeline.getClass().getName());
676         }
677         
678         pipeline.setMaxThreads(maxThreads);
679         pipeline.setMinThreads(minThreads);
680         pipeline.setName(name);
681         pipeline.setPort(port);
682         pipeline.setPriority(priority);
683         pipeline.setQueueSizeInBytes(maxQueueSizeInBytes);
684         pipeline.setThreadsIncrement(threadsIncrement);
685         pipeline.setThreadsTimeout(threadsTimeout);
686         
687         return pipeline;
688     }
689     
690     
691     /**
692      * Initialize the fileCacheFactory associated with this instance
693      */

694     private void initFileCacheFactory(){
695         fileCacheFactory = FileCacheFactory.getFactory(port);
696         fileCacheFactory.setIsEnabled(isFileCacheEnabled);
697         fileCacheFactory.setLargeFileCacheEnabled(isLargeFileCacheEnabled);
698         fileCacheFactory.setSecondsMaxAge(secondsMaxAge);
699         fileCacheFactory.setMaxCacheEntries(maxCacheEntries);
700         fileCacheFactory.setMinEntrySize(minEntrySize);
701         fileCacheFactory.setMaxEntrySize(maxEntrySize);
702         fileCacheFactory.setMaxLargeCacheSize(maxLargeFileCacheSize);
703         fileCacheFactory.setMaxSmallCacheSize(maxSmallFileCacheSize);
704         fileCacheFactory.setIsMonitoringEnabled(isMonitoringEnabled);
705     }
706        
707     
708     /**
709      * Injects <code>PipelineStatistic</code> into every
710      * <code>Pipeline</code>, for monitoring purposes.
711      */

712     private void enablePipelineStats(){
713         pipelineStat.start();
714
715         processorPipeline.setPipelineStatistic(pipelineStat);
716         pipelineStat.setProcessorPipeline(processorPipeline);
717
718         if (keepAlivePipeline != null){
719             keepAlivePipeline.setKeepAliveStats(keepAliveStats);
720         }
721     }
722     
723
724     /**
725      * Removes <code>PipelineStatistic</code> from every
726      * <code>Pipeline</code>, when monitoring has been turned off.
727      */

728     private void disablePipelineStats(){
729         pipelineStat.stop();
730         
731         processorPipeline.setPipelineStatistic(null);
732         pipelineStat.setProcessorPipeline(null);
733
734         if (keepAlivePipeline != null){
735             keepAlivePipeline.setKeepAliveStats(null);
736         }
737
738     }
739
740     
741     /**
742      * Load using reflection the <code>Algorithm</code> class.
743      */

744     protected void initAlgorithm(){
745         try{
746             algorithmClass = Class.forName(algorithmClassName);
747             logger.log(Level.FINE,
748                        "Using Algorithm: " + algorithmClassName);
749         } catch (ClassNotFoundException JavaDoc ex){
750             logger.log(Level.FINE,
751                        "Unable to load Algorithm: " + algorithmClassName);
752         } finally {
753             if ( algorithmClass == null ){
754                 algorithmClass = NoParsingAlgorithm.class;
755             }
756         }
757     }
758     
759     
760     /**
761      * Initialize the keep-alive mechanism.
762      */

763     protected void initKeepAlivePipeline(){
764         keepAlivePipeline = new KeepAlivePipeline();
765         keepAlivePipeline.setMaxKeepAliveRequests(maxKeepAliveRequests);
766         keepAlivePipeline
767             .setKeepAliveTimeoutInSeconds(keepAliveTimeoutInSeconds);
768         keepAlivePipeline.setPort(port);
769         keepAlivePipeline.setThreadsTimeout(threadsTimeout);
770
771         keepAliveStats.setMaxConnections(maxKeepAliveRequests);
772         keepAliveStats.setSecondsTimeouts(keepAliveTimeoutInSeconds);
773     }
774     
775     
776     /**
777      * Init the <code>Pipeline</code>s used by the <code>WorkerThread</code>s.
778      */

779     protected void initPipeline(){
780         initKeepAlivePipeline();
781         
782         processorPipeline = newPipeline(maxProcessorWorkerThreads,
783                                         minWorkerThreads, "http",
784                                         port,Thread.MAX_PRIORITY);
785         processorPipeline.initPipeline();
786
787         // SSL needs at least two threads, and so re-use the default behaviour.
788
if ( secure && maxReadWorkerThreads == 0){
789             maxReadWorkerThreads = -1;
790             logger.log(Level.WARNING,
791                        "http-listener " + port +
792                        " is security-enabled and needs at least 2 threads");
793         }
794         
795         // Only creates the pipeline if the max > 0
796
if ( maxReadWorkerThreads > 0 ){
797             readPipeline = newPipeline(maxReadWorkerThreads,
798                                        minWorkerThreads, "read",
799                                        port,Thread.NORM_PRIORITY);
800             readPipeline.initPipeline();
801         } else {
802             readPipeline = (maxReadWorkerThreads == 0 ? null:processorPipeline);
803         }
804     }
805     
806     
807     /**
808      * Create a pool of <code>ReadTask</code>
809      */

810     protected void initReadTask(int size){
811         ReadTask task;
812         for (int i=0; i < size; i++){
813             task = newReadTask();
814             readTasks.offer(task);
815         }
816     }
817     
818     
819     /**
820      * Return a new <code>ReadTask</code> instance
821      */

822     private ReadTask newReadTask(){
823         StreamAlgorithm streamAlgorithm = null;
824         
825         try{
826             streamAlgorithm = (StreamAlgorithm)algorithmClass.newInstance();
827         } catch (InstantiationException JavaDoc ex){
828             logger.log(Level.WARNING,
829                        "Unable to instantiate Algorithm: "+ algorithmClassName);
830         } catch (IllegalAccessException JavaDoc ex){
831             logger.log(Level.WARNING,
832                        "Unable to instantiate Algorithm: " + algorithmClassName);
833         } finally {
834             if ( streamAlgorithm == null)
835                 streamAlgorithm = new NoParsingAlgorithm();
836         }
837         streamAlgorithm.setPort(port);
838         
839         ReadTask task;
840         if ( maxReadWorkerThreads <= 0) {
841             task = new ReadTask(streamAlgorithm, useDirectByteBuffer,
842                                 useByteBufferView);
843         } else {
844             task = new AsyncReadTask(streamAlgorithm, useDirectByteBuffer,
845                                       useByteBufferView);
846         }
847
848         task.setPipeline(readPipeline);
849         task.setSelectorThread(this);
850         task.setRecycle(recycleTasks);
851         
852         return task;
853     }
854     
855     
856     /**
857      * Create a pool of <code>ReadBlockingTask</code>
858      */

859     private void initReadBlockingTask(int size){
860         for (int i=0; i < size; i++){
861             readTasks.offer(newReadBlockingTask(false));
862         }
863     }
864
865     
866     /**
867      * Create a <code>ReadBlockingTask</code> instance used with blocking
868      * socket.
869      */

870     private ReadBlockingTask newReadBlockingTask(boolean initialize){
871                                                         
872         ReadBlockingTask task = new ReadBlockingTask();
873         task.setSelectorThread(this);
874         
875         if (maxReadWorkerThreads > 0){
876             task.setPipeline(readPipeline);
877         }
878  
879         task.setRecycle(recycleTasks);
880         task.attachProcessor(newProcessorTask(false,initialize));
881         task.setPipelineStatistic(pipelineStat);
882  
883         return task;
884     }
885     
886
887     /**
888      * Initialize <code>SelectorReadThread</code> used to process
889      * OP_READ operations.
890      */

891     private void initSelectorReadThread() throws IOException JavaDoc,
892                                                  InstantiationException JavaDoc {
893         for (int i = 0; i < readThreads.length; i++) {
894             readThreads[i] = new SelectorReadThread();
895             readThreads[i].countName = i;
896             readThreads[i].setMaxThreads(maxProcessorWorkerThreads);
897             readThreads[i].setBufferSize(requestBufferSize);
898             readThreads[i].setMaxKeepAliveRequests(maxKeepAliveRequests);
899             readThreads[i]
900                     .setKeepAliveTimeoutInSeconds(keepAliveTimeoutInSeconds);
901             readThreads[i].maxQueueSizeInBytes = maxQueueSizeInBytes;
902             readThreads[i].fileCacheFactory = fileCacheFactory;
903             readThreads[i].maxReadWorkerThreads = maxReadWorkerThreads;
904             readThreads[i].defaultResponseType = defaultResponseType;
905             readThreads[i].forcedResponseType = forcedResponseType;
906             readThreads[i].minReadQueueLength = minReadQueueLength;
907             readThreads[i].maxHttpHeaderSize = maxHttpHeaderSize;
908             
909             if ( asyncExecution ) {
910                 readThreads[i].asyncExecution = asyncExecution;
911                 readThreads[i].asyncHandler = asyncHandler;
912             }
913             
914             readThreads[i].threadsIncrement = threadsIncrement;
915             readThreads[i].setPort(port);
916             readThreads[i].setAdapter(adapter);
917             readThreads[i].initEndpoint();
918             readThreads[i].start();
919         }
920         curReadThread = 0;
921     }
922
923
924     /**
925      * Return an instance of <code>SelectorReadThread</code> to use
926      * for registering OP_READ
927      */

928     private synchronized SelectorReadThread getSelectorReadThread() {
929         if (curReadThread == readThreads.length)
930             curReadThread = 0;
931         return readThreads[curReadThread++];
932     }
933     
934     
935     /**
936      * Create a pool of <code>ProcessorTask</code>
937      */

938     protected void initProcessorTask(int size){
939         for (int i=0; i < size; i++){
940             processorTasks.offer(newProcessorTask(useNioNonBlocking,false));
941         }
942     }
943
944
945     /**
946      * Initialize <code>ProcessorTask</code>
947      */

948     protected void rampUpProcessorTask(){
949         Iterator JavaDoc<ProcessorTask> iterator = processorTasks.iterator();
950         while (iterator.hasNext()) {
951             iterator.next().initialize();
952         }
953     }
954     
955
956     /**
957      * Create <code>ProcessorTask</code> objects and configure it to be ready
958      * to proceed request.
959      */

960     protected ProcessorTask newProcessorTask(boolean useNioNonBlocking,
961                                              boolean initialize){
962                                                  
963         ProcessorTask task = new ProcessorTask(useNioNonBlocking, initialize);
964         task.setAdapter(adapter);
965         task.setMaxHttpHeaderSize(maxHttpHeaderSize);
966         task.setBufferSize(requestBufferSize);
967         task.setSelectorThread(this);
968         task.setRecycle(recycleTasks);
969         task.setDefaultResponseType(defaultResponseType);
970         task.setForcedResponseType(forcedResponseType);
971         
972         // Asynch extentions
973
if ( asyncExecution ) {
974             task.setEnableAsyncExecution(asyncExecution);
975             task.setAsyncHandler(asyncHandler);
976         }
977               
978         
979         if (!useNioNonBlocking){
980             task.setMaxKeepAliveRequests(maxKeepAliveRequests);
981         }
982                         
983         if (secure){
984             task.setSSLImplementation(sslImplementation);
985         }
986          
987         if ( keepAlivePipeline.dropConnection() ) {
988             task.setDropConnection(true);
989         }
990         
991         task.setPipeline(processorPipeline);
992         return task;
993     }
994  
995  
996     /**
997      * Return a <code>ProcessorTask</code> from the pool. If the pool is empty,
998      * create a new instance.
999      */

1000     protected ProcessorTask getProcessorTask(){
1001        ProcessorTask processorTask = null;
1002        if (recycleTasks) {
1003            processorTask = processorTasks.poll();
1004        }
1005        
1006        if (processorTask == null){
1007            processorTask = newProcessorTask(true, false);
1008        }
1009        
1010        if ( isMonitoringEnabled() ){
1011           activeProcessorTasks.offer(processorTask);
1012        }
1013
1014        
1015        return processorTask;
1016    }
1017        
1018    
1019    /**
1020     * Return a <code>ReadTask</code> from the pool. If the pool is empty,
1021     * create a new instance.
1022     */

1023    protected ReadTask getReadTask(SelectionKey JavaDoc key) throws IOException JavaDoc{
1024        ReadTask task = null;
1025        if ( recycleTasks ) {
1026            task = readTasks.poll();
1027        }
1028        
1029        if (task == null){
1030            task = newReadTask();
1031        }
1032
1033        task.setSelectionKey(key);
1034        return task;
1035    }
1036    
1037    
1038    /**
1039     * Return a <code>ReadBlockingTask</code> from the pool.
1040     * If the pool is empty,
1041     * create a new instance.
1042     */

1043    protected ReadBlockingTask getReadBlockingTask(Socket JavaDoc socket){
1044        ReadBlockingTask task = null;
1045        if (recycleTasks) {
1046            task = (ReadBlockingTask)readTasks.poll();
1047        }
1048               
1049        if (task == null){
1050            task = newReadBlockingTask(false);
1051        }
1052        
1053        ProcessorTask processorTask = task.getProcessorTask();
1054        processorTask.setSocket(socket);
1055        
1056        return task;
1057    }
1058
1059    
1060    // --------------------------------------------------------- Thread run --/
1061

1062    
1063    /**
1064     * Start the endpoint (this)
1065     */

1066    public void run(){
1067        try{
1068            startEndpoint();
1069        } catch (Exception JavaDoc ex){
1070            logger.log(Level.SEVERE,"selectorThread.errorOnRequest", ex);
1071        }
1072    }
1073
1074    
1075    // ------------------------------------------------------------Start ----/
1076

1077    
1078    /**
1079     * Start the Acceptor Thread and wait for incoming connection, in a non
1080     * blocking mode.
1081     */

1082    public void startEndpoint() throws IOException JavaDoc, InstantiationException JavaDoc {
1083        running = true;
1084        
1085        kaTimeout = keepAliveTimeoutInSeconds * 1000;
1086        rampUpProcessorTask();
1087        registerComponents();
1088 
1089        displayConfiguration();
1090
1091        startPipelines();
1092        
1093        // (1) acceptSocket only if secure or blocking
1094
if (secure || !useNioNonBlocking){
1095            startBlockingMode();
1096        } else if (useNioNonBlocking){
1097            startNonBlockingMode();
1098        }
1099    }
1100    
1101    
1102    /**
1103     * Starts the <code>Pipeline</code> used by this <code>Selector</code>
1104     */

1105    protected void startPipelines(){
1106        if (readPipeline != null){
1107            readPipeline.startPipeline();
1108        }
1109
1110        processorPipeline.startPipeline();
1111    }
1112
1113    
1114    /**
1115     * Stop the <code>Pipeline</code> used by this <code>Selector</code>
1116     */

1117    protected void stopPipelines(){
1118        if ( keepAlivePipeline != null )
1119            keepAlivePipeline.stopPipeline();
1120
1121        if (readPipeline != null){
1122            readPipeline.stopPipeline();
1123        }
1124        
1125        processorPipeline.stopPipeline();
1126
1127    }
1128    
1129    
1130    
1131    /**
1132     * Start a blocking server <code>Socket</code>
1133     */

1134    protected void startBlockingMode(){
1135        Socket JavaDoc socket = null;
1136        while (running){
1137            socket = acceptSocket();
1138            if (socket == null) {
1139                continue;
1140            }
1141            if (secure) {
1142                try {
1143                    factory.handshake(socket);
1144                } catch (Throwable JavaDoc ex) {
1145                    logger.log(Level.FINE,
1146                               "selectorThread.sslHandshakeException", ex);
1147                    try {
1148                        socket.close();
1149                    } catch (IOException JavaDoc ioe){
1150                        // Do nothing
1151
}
1152                    continue;
1153                }
1154            }
1155
1156            try {
1157                handleConnection(socket);
1158            } catch (Throwable JavaDoc ex) {
1159                logger.log(Level.FINE,
1160                           "selectorThread.handleConnectionException",
1161                           ex);
1162                try {
1163                    socket.close();
1164                } catch (IOException JavaDoc ioe){
1165                    // Do nothing
1166
}
1167                continue;
1168            }
1169        }
1170    }
1171    
1172    
1173    /**
1174     * Start a non blocking <code>Selector</code> object.
1175     */

1176    protected void startNonBlockingMode(){
1177        while (running) {
1178            doSelect();
1179        }
1180    }
1181    
1182    
1183    /**
1184     * Execute a <code>Selector.select()</code> operation.
1185     */

1186    protected void doSelect(){
1187        SelectionKey JavaDoc key = null;
1188        Set JavaDoc readyKeys;
1189        Iterator JavaDoc<SelectionKey JavaDoc> iterator;
1190        int selectorState;
1191
1192        try{
1193            selectorState = 0;
1194            enableSelectionKeys();
1195
1196            try{
1197                selectorState = selector.select(selectorTimeout);
1198            } catch (CancelledKeyException JavaDoc ex){
1199                ;
1200            }
1201
1202            readyKeys = selector.selectedKeys();
1203            iterator = readyKeys.iterator();
1204            while (iterator.hasNext()) {
1205                key = iterator.next();
1206                iterator.remove();
1207                key.attach(null);
1208                if (key.isValid()) {
1209                    handleConnection(key);
1210                } else {
1211                    cancelKey(key);
1212                }
1213            }
1214            
1215            expireIdleKeys();
1216            
1217            if (selectorState <= 0){
1218                selector.selectedKeys().clear();
1219                return;
1220            }
1221        } catch (Throwable JavaDoc t){
1222            if ( key != null ){
1223                key.attach(null);
1224                key.cancel();
1225            }
1226            logger.log(Level.FINE,"selectorThread.errorOnRequest",t);
1227        }
1228    }
1229    
1230    
1231    /**
1232     * Cancel keep-alive connections.
1233     */

1234    private void expireIdleKeys(){
1235        if ( keepAliveTimeoutInSeconds <= 0 || !selector.isOpen()) return;
1236        long current = System.currentTimeMillis();
1237
1238        if (current < nextKeysExpiration) {
1239            return;
1240        }
1241        nextKeysExpiration = current + kaTimeout;
1242        
1243        Set JavaDoc<SelectionKey JavaDoc> readyKeys = selector.keys();
1244        if (readyKeys.isEmpty()){
1245            return;
1246        }
1247        Iterator JavaDoc<SelectionKey JavaDoc> iterator = readyKeys.iterator();
1248        SelectionKey JavaDoc key;
1249        while (iterator.hasNext()) {
1250            key = iterator.next();
1251            if ( !key.isValid() ) {
1252                keepAlivePipeline.untrap(key);
1253                continue;
1254            }
1255            // Keep-alive expired
1256
if ( key.attachment() != null) {
1257                long expire = (Long JavaDoc) key.attachment();
1258                if (current - expire >= kaTimeout) {
1259                    cancelKey(key);
1260                } else if (expire + kaTimeout < nextKeysExpiration){
1261                    nextKeysExpiration = expire + kaTimeout;
1262                }
1263            }
1264        }
1265    }
1266    
1267    
1268    /**
1269     * Handle a blocking operation on the socket.
1270     */

1271    private void handleConnection(Socket JavaDoc socket) throws IOException JavaDoc{
1272                
1273        if (isMonitoringEnabled()) {
1274            globalRequestProcessor.increaseCountOpenConnections();
1275            pipelineStat.incrementTotalAcceptCount();
1276        }
1277
1278        setSocketOptions(socket);
1279        getReadBlockingTask(socket).execute();
1280    }
1281   
1282    
1283    /**
1284     * Handle an incoming operation on the channel. It is always an ACCEPT or
1285     * a READ.
1286     */

1287    protected void handleConnection(SelectionKey JavaDoc key) throws IOException JavaDoc,
1288                                                           InterruptedException JavaDoc{
1289                                                    
1290        Task task = null;
1291        if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT){
1292            handleAccept(key);
1293            return;
1294        } else if ((key.readyOps() & SelectionKey.OP_READ)
1295                                                    == SelectionKey.OP_READ) {
1296            task = handleRead(key);
1297        }
1298 
1299        task.execute();
1300    }
1301      
1302    
1303    /**
1304     * Handle OP_ACCEPT
1305     */

1306    private void handleAccept(SelectionKey JavaDoc key) throws IOException JavaDoc{
1307        ServerSocketChannel JavaDoc server = (ServerSocketChannel JavaDoc) key.channel();
1308        SocketChannel JavaDoc channel = server.accept();
1309
1310        if (channel != null) {
1311            if ( selectorReadThreadsCount > 1 ) {
1312                SelectorReadThread srt = getSelectorReadThread();
1313                srt.addChannel(channel);
1314            } else {
1315                channel.configureBlocking(false);
1316                SelectionKey JavaDoc readKey =
1317                        channel.register(selector, SelectionKey.OP_READ);
1318                setSocketOptions(((SocketChannel JavaDoc)readKey.channel()).socket());
1319            }
1320
1321            if (isMonitoringEnabled()) {
1322                getRequestGroupInfo().increaseCountOpenConnections();
1323                pipelineStat.incrementTotalAcceptCount();
1324            }
1325        }
1326     }
1327    
1328    
1329    /**
1330     * Handle OP_READ
1331     */

1332    private ReadTask handleRead(SelectionKey JavaDoc key) throws IOException JavaDoc{
1333        // disable OP_READ on key before doing anything else
1334
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
1335        ReadTask task = getReadTask(key);
1336        
1337        return task;
1338    }
1339    
1340    
1341    /**
1342     * Cancel the current <code>SelectionKey</code>
1343     */

1344    protected void cancelKey(SelectionKey JavaDoc key){
1345        if (key == null || !key.isValid()){
1346            return;
1347        }
1348        
1349        keepAlivePipeline.untrap(key);
1350 
1351        try{
1352            ((SocketChannel JavaDoc)key.channel()).socket().shutdownInput();
1353        } catch (IOException JavaDoc ex){
1354            ;
1355        }
1356        
1357        try{
1358            ((SocketChannel JavaDoc)key.channel()).socket().shutdownOutput();
1359        } catch (IOException JavaDoc ex){
1360            ;
1361        }
1362
1363        try{
1364            ((SocketChannel JavaDoc)key.channel()).socket().close();
1365        } catch (IOException JavaDoc ex){
1366            ;
1367        } finally {
1368            try{
1369                key.channel().close();
1370            } catch (IOException JavaDoc ex){
1371                logger.log(Level.FINEST,"selectorThread.unableToCloseKey", key);
1372            }
1373            if (isMonitoringEnabled()) {
1374                getRequestGroupInfo().decreaseCountOpenConnections();
1375            }
1376        }
1377               
1378        // Set the attachement to null so the Selector.java handleConnection
1379
// stop processing this key.
1380
key.attach(null);
1381        key.cancel();
1382        key = null;
1383    }
1384    
1385    
1386    /**
1387     * Returns the <code>Task</code> object to the pool.
1388     */

1389    public void returnTask(Task task){
1390        // Returns the object to the pool.
1391
if (task != null) {
1392            if (task.getType() == Task.PROCESSOR_TASK){
1393                                
1394                if ( isMonitoringEnabled() ){
1395                   activeProcessorTasks.remove(((ProcessorTask)task));
1396                }
1397                
1398                processorTasks.offer((ProcessorTask)task);
1399            } else if (task.getType() == Task.READ_TASK){
1400                readTasks.offer((ReadTask)task);
1401            }
1402        }
1403    }
1404    
1405
1406    /**
1407     * Wakes up the <code>Selector</code> associated with this thread.
1408     */

1409    public void wakeup(){
1410        selector.wakeup();
1411    }
1412
1413    
1414    /**
1415     * Clear all cached <code>Tasks</code>
1416     */

1417    protected void clearTasks(){
1418        processorTasks.clear();
1419        readTasks.clear();
1420    }
1421
1422    
1423    /**
1424     * Cancel the <code>threadID</code> execution. Return <code>true</code>
1425     * if it is successful.
1426     *
1427     * @param id the thread name to cancel
1428     */

1429    public boolean cancelThreadExecution(long cancelThreadID){
1430   
1431        if ( selectorReadThreadsCount > 1 ){
1432            boolean cancelled = false;
1433            for (SelectorReadThread readSelector : readThreads) {
1434                cancelled = readSelector.cancelThreadExecution(cancelThreadID);
1435                if (cancelled) return true;
1436            }
1437            return false;
1438        }
1439                       
1440        if (activeProcessorTasks.size() == 0) return false;
1441        
1442        Iterator JavaDoc<ProcessorTask> iterator = activeProcessorTasks.iterator();
1443        ProcessorTask processorTask;
1444        long threadID;
1445        while( iterator.hasNext() ){
1446            processorTask = iterator.next();
1447            threadID = processorTask.getRequest().getRequestProcessor()
1448                .getWorkerThreadID();
1449            if (threadID == cancelThreadID){
1450                processorTask.cancelTask("Request cancelled.","500");
1451                logger.log(Level.WARNING,
1452                        "Thread Request Cancelled: " + threadID);
1453                return processorTask.getPipeline().interruptThread(threadID);
1454            }
1455        }
1456        return false;
1457    }
1458
1459    
1460    // ---------------------------------------------------Endpoint Lifecycle --/
1461

1462    
1463    public void pauseEndpoint() {
1464        if (running && !paused) {
1465            paused = true;
1466            unlockAccept();
1467        }
1468        
1469        try{
1470            selector.close();
1471        } catch (IOException JavaDoc ex){
1472            ;
1473        }
1474               
1475    }
1476
1477    public void resumeEndpoint() {
1478        if (running) {
1479            paused = false;
1480        }
1481    }
1482
1483    public void stopEndpoint() {
1484        try{
1485            if (running) running = false;
1486
1487            stopPipelines();
1488
1489            try{
1490                if ( serverSocket != null )
1491                    serverSocket.close();
1492            } catch (Throwable JavaDoc ex){
1493                logger.log(Level.SEVERE,
1494                        "selectorThread.closeSocketException",ex);
1495            }
1496
1497            try{
1498                if ( serverSocketChannel != null)
1499                    serverSocketChannel.close();
1500            } catch (Throwable JavaDoc ex){
1501                logger.log(Level.SEVERE,
1502                        "selectorThread.closeSocketException",ex);
1503            }
1504
1505            try{
1506                if ( selector != null)
1507                    selector.close();
1508            } catch (Throwable JavaDoc ex){
1509                logger.log(Level.SEVERE,
1510                        "selectorThread.closeSocketException",ex);
1511            }
1512
1513            clearTasks();
1514
1515            unregisterComponents();
1516        } catch (Throwable JavaDoc t){
1517             logger.log(Level.SEVERE,"selectorThread.stopException",t);
1518        }
1519    }
1520    
1521    protected void unlockAccept() {
1522        
1523        // Not required with non blocking.
1524
if (secure) {
1525            Socket JavaDoc s = null;
1526            try {
1527                // Need to create a connection to unlock the accept();
1528
if (inet == null) {
1529                    s=new Socket JavaDoc("127.0.0.1", port );
1530                }else{
1531                    s=new Socket JavaDoc(inet, port );
1532                        // setting soLinger to a small value will help shutdown the
1533
// connection quicker
1534
s.setSoLinger(true, 0);
1535                }
1536            } catch(Exception JavaDoc e) {
1537                logger.log(Level.FINE,"selectorThread.unlockAcceptException" + port
1538                          + " " + e.toString());
1539            } finally {
1540                if (s != null) {
1541                    try {
1542                        s.close();
1543                    } catch (Exception JavaDoc e) {
1544                        // Ignore
1545
}
1546                }
1547            }
1548        }
1549    }
1550
1551    
1552    // ------------------------------------------------------Public methods--/
1553

1554    public boolean getUseNioNonBlocking(){
1555        return useNioNonBlocking;
1556    }
1557    
1558
1559    public void setMaxThreads(int maxThreads) {
1560        if ( maxThreads == 1 ) {
1561            maxProcessorWorkerThreads = 5;
1562        } else {
1563            maxProcessorWorkerThreads = maxThreads;
1564        }
1565    }
1566
1567    public int getMaxThreads() {
1568        return maxProcessorWorkerThreads;
1569    }
1570
1571    public void setMaxSpareThreads(int maxThreads) {
1572    }
1573
1574    public int getMaxSpareThreads() {
1575        return maxProcessorWorkerThreads;
1576    }
1577
1578    public void setMinSpareThreads(int minSpareThreads) {
1579        this.minSpareThreads = minSpareThreads;
1580    }
1581
1582    public int getMinSpareThreads() {
1583        return minSpareThreads;
1584    }
1585
1586
1587    public int getPort() {
1588        return port;
1589    }
1590
1591    public void setPort(int port ) {
1592        this.port=port;
1593    }
1594
1595    public InetAddress JavaDoc getAddress() {
1596        return inet;
1597    }
1598
1599    public void setAddress(InetAddress JavaDoc inet) {
1600        this.inet=inet;
1601    }
1602
1603    public void setServerSocketFactory( ServerSocketFactory factory ) {
1604        this.factory=factory;
1605    }
1606
1607    ServerSocketFactory getServerSocketFactory() {
1608        return factory;
1609    }
1610
1611
1612    public boolean isRunning() {
1613        return running;
1614    }
1615    
1616    public boolean isPaused() {
1617        return paused;
1618    }
1619
1620
1621    /**
1622     * Provides the count of request threads that are currently
1623     * being processed by the container
1624     *
1625     * @return The count of busy threads
1626     */

1627    public int getCurrentBusyProcessorThreads() {
1628        int busy = 0;
1629        
1630        // multi selector support
1631
if (selectorReadThreadsCount > 1) {
1632            for (SelectorReadThread readSelector : readThreads) {
1633                busy += readSelector.getCurrentBusyProcessorThreads();
1634            }
1635
1636        } else {
1637            busy = processorPipeline.getCurrentThreadsBusy();
1638        }
1639
1640        return busy;
1641    }
1642
1643
1644    /**
1645     * Sets the timeout in ms of the server sockets created by this
1646     * server. This method allows the developer to make servers
1647     * more or less responsive to having their server sockets
1648     * shut down.
1649     *
1650     * <p>By default this value is 1000ms.
1651     */

1652    public void setServerTimeout(int timeout) {
1653        this.serverTimeout = timeout;
1654    }
1655
1656    public boolean getTcpNoDelay() {
1657        return tcpNoDelay;
1658    }
1659    
1660    public void setTcpNoDelay( boolean b ) {
1661        tcpNoDelay=b;
1662    }
1663
1664    public int getSoLinger() {
1665        return linger;
1666    }
1667    
1668    public void setSoLinger( int i ) {
1669        linger=i;
1670    }
1671
1672    public int getSoTimeout() {
1673        return socketTimeout;
1674    }
1675    
1676    public void setSoTimeout( int i ) {
1677        socketTimeout=i;
1678    }
1679    
1680    public int getServerSoTimeout() {
1681        return serverTimeout;
1682    }
1683    
1684    public void setServerSoTimeout( int i ) {
1685        serverTimeout=i;
1686    }
1687    
1688    public void setSecure(boolean secure){
1689        this.secure = secure;
1690    }
1691    
1692    public boolean getSecure(){
1693        return secure;
1694    }
1695        
1696    // ------------------------------------------------------ Connector Methods
1697

1698
1699    /**
1700     * Get the maximum pending connection this <code>Pipeline</code>
1701     * can handle.
1702     */

1703    public int getQueueSizeInBytes(){
1704        return maxQueueSizeInBytes;
1705    }
1706    
1707    
1708    
1709    public int getMaxKeepAliveRequests() {
1710        return maxKeepAliveRequests;
1711    }
1712    
1713    
1714    /**
1715     * Set the maximum number of Keep-Alive requests that we will honor.
1716     */

1717    public void setMaxKeepAliveRequests(int mkar) {
1718        maxKeepAliveRequests = mkar;
1719    }
1720    
1721
1722    /**
1723     * Sets the number of seconds before a keep-alive connection that has
1724     * been idle times out and is closed.
1725     *
1726     * @param timeout Keep-alive timeout in number of seconds
1727     */

1728    public void setKeepAliveTimeoutInSeconds(int timeout) {
1729        keepAliveTimeoutInSeconds = timeout;
1730        keepAliveStats.setSecondsTimeouts(timeout);
1731    }
1732
1733
1734    /**
1735     * Gets the number of seconds before a keep-alive connection that has
1736     * been idle times out and is closed.
1737     *
1738     * @return Keep-alive timeout in number of seconds
1739     */

1740    public int getKeepAliveTimeoutInSeconds() {
1741        return keepAliveTimeoutInSeconds;
1742    }
1743
1744
1745    /**
1746     * Sets the number of keep-alive threads.
1747     *
1748     * @param threadCount Number of keep-alive threads
1749     */

1750    public void setKeepAliveThreadCount(int threadCount) {
1751        keepAlivePipeline.setMaxThreads(threadCount);
1752    }
1753
1754
1755    /**
1756     * Return the current <code>SSLImplementation</code> this Thread
1757     */

1758    public SSLImplementation getSSLImplementation() {
1759        return sslImplementation;
1760    }
1761    
1762    
1763    /**
1764     * Set the <code>SSLImplementation</code> used by this thread.It usually
1765     * means HTTPS will be used.
1766     */

1767    public void setSSLImplementation( SSLImplementation sslImplementation) {
1768        this.sslImplementation = sslImplementation;
1769    }
1770
1771
1772    /**
1773     * Set the associated adapter.
1774     *
1775     * @param adapter the new adapter
1776     */

1777    public void setAdapter(Adapter adapter) {
1778        this.adapter = adapter;
1779    }
1780
1781
1782    /**
1783     * Get the associated adapter.
1784     *
1785     * @return the associated adapter
1786     */

1787    public Adapter getAdapter() {
1788        return adapter;
1789    }
1790    
1791    // ------------------------------------------------------- SSL supports --/
1792

1793    protected Socket JavaDoc acceptSocket() {
1794        if( !running || serverSocket==null ) return null;
1795
1796        Socket JavaDoc socket = null;
1797
1798        try {
1799            if(factory==null) {
1800                socket = serverSocketChannel.accept().socket();
1801            } else {
1802                socket = factory.acceptSocket(serverSocket);
1803            }
1804            if (null == socket) {
1805                logger.log(Level.WARNING,"selectorThread.acceptSocket");
1806            } else {
1807                if (!running) {
1808                    socket.close(); // rude, but unlikely!
1809
socket = null;
1810                } else if (factory != null) {
1811                    factory.initSocket( socket );
1812                }
1813            }
1814        } catch(InterruptedIOException JavaDoc iioe) {
1815            // normal part -- should happen regularly so
1816
// that the endpoint can release if the server
1817
// is shutdown.
1818
} catch (AccessControlException JavaDoc ace) {
1819            // When using the Java SecurityManager this exception
1820
// can be thrown if you are restricting access to the
1821
// socket with SocketPermission's.
1822
// Log the unauthorized access and continue
1823
logger.log(Level.WARNING,"selectorThread.wrongPermission",
1824                       new Object JavaDoc[]{serverSocket,ace});
1825        } catch (IOException JavaDoc e) {
1826
1827            String JavaDoc msg = null;
1828
1829            if (running) {
1830                logger.log(Level.SEVERE,"selectorThread.shutdownException",
1831                           new Object JavaDoc[]{serverSocket, e});
1832            }
1833
1834            if (socket != null) {
1835                try {
1836                    socket.close();
1837                } catch(Throwable JavaDoc ex) {
1838                    logger.log(Level.SEVERE,"selectorThread.shutdownException",
1839                               new Object JavaDoc[]{serverSocket, ex});
1840                }
1841                socket = null;
1842            }
1843
1844            if( !running ) return null;
1845        } catch (Throwable JavaDoc t) {
1846            try{
1847                if (socket != null)
1848                    socket.close();
1849            } catch (IOException JavaDoc ex){
1850                ;
1851            }
1852            logger.log(Level.FINE,
1853                       "selectorThread.errorOnRequest",
1854                       t);
1855        }
1856
1857        return socket;
1858    }
1859    
1860   
1861    protected void setSocketOptions(Socket JavaDoc socket) throws SocketException JavaDoc {
1862        if(linger >= 0 )
1863            socket.setSoLinger( true, linger);
1864        if( tcpNoDelay )
1865            socket.setTcpNoDelay(tcpNoDelay);
1866        
1867        // NIO Non blocking doesn't support setSoTimeout, so avoid setting it.
1868
if( keepAliveTimeoutInSeconds > 0 && !useNioNonBlocking)
1869            socket.setSoTimeout( keepAliveTimeoutInSeconds * 1000 );
1870
1871        if ( maxReadWorkerThreads != 0)
1872            socket.setReuseAddress(true);
1873    }
1874    
1875    // ------------------------------- JMX and Monnitoring support --------//
1876

1877    public ObjectName JavaDoc getObjectName() {
1878        return oname;
1879    }
1880
1881    public String JavaDoc getDomain() {
1882        return domain;
1883    }
1884
1885    public ObjectName JavaDoc preRegister(MBeanServer JavaDoc server,
1886                                  ObjectName JavaDoc name) throws Exception JavaDoc {
1887        oname=name;
1888        mserver=server;
1889        domain=name.getDomain();
1890        return name;
1891    }
1892
1893    public void postRegister(Boolean JavaDoc registrationDone) {
1894        // Do nothing
1895
}
1896
1897    public void preDeregister() throws Exception JavaDoc {
1898        // Do nothing
1899
}
1900
1901    public void postDeregister() {
1902        // Do nothing
1903
}
1904
1905
1906    /**
1907     * Register JMX components.
1908     */

1909    private void registerComponents(){
1910
1911        if( this.domain != null) {
1912
1913            Registry reg = Registry.getRegistry();
1914
1915            try {
1916                globalRequestProcessorName = new ObjectName JavaDoc(
1917                    domain + ":type=GlobalRequestProcessor,name=http" + port);
1918                reg.registerComponent(globalRequestProcessor,
1919                                      globalRequestProcessorName,
1920                                      null);
1921 
1922                keepAliveMbeanName = new ObjectName JavaDoc(
1923                    domain + ":type=PWCKeepAlive,name=http" + port);
1924                reg.registerComponent(keepAliveStats,
1925                                      keepAliveMbeanName,
1926                                      null);
1927
1928                pwcConnectionQueueMbeanName = new ObjectName JavaDoc(
1929                    domain + ":type=PWCConnectionQueue,name=http" + port);
1930                reg.registerComponent(pipelineStat,
1931                                      pwcConnectionQueueMbeanName,
1932                                      null);
1933                
1934                pwcFileCacheMbeanName = new ObjectName JavaDoc(
1935                    domain + ":type=PWCFileCache,name=http" + port);
1936                reg.registerComponent(fileCacheFactory,
1937                                      pwcFileCacheMbeanName,
1938                                      null);
1939            } catch (Exception JavaDoc ex) {
1940                logger.log(Level.WARNING,
1941                           "selectorThread.mbeanRegistrationException",
1942                           new Object JavaDoc[]{new Integer JavaDoc(port),ex});
1943            }
1944        }
1945
1946    }
1947    
1948    
1949    /**
1950     * Unregister components.
1951     **/

1952    private void unregisterComponents(){
1953
1954        if (this.domain != null) {
1955
1956            Registry reg = Registry.getRegistry();
1957
1958            try {
1959                if (globalRequestProcessorName != null) {
1960                    reg.unregisterComponent(globalRequestProcessorName);
1961                }
1962                if (keepAliveMbeanName != null) {
1963                    reg.unregisterComponent(keepAliveMbeanName);
1964                }
1965                if (pwcConnectionQueueMbeanName != null) {
1966                    reg.unregisterComponent(pwcConnectionQueueMbeanName);
1967                }
1968                if (pwcFileCacheMbeanName != null) {
1969                    reg.unregisterComponent(pwcFileCacheMbeanName);
1970                }
1971            } catch (Exception JavaDoc ex) {
1972                logger.log(Level.WARNING,
1973                           "mbeanDeregistrationException",
1974                           new Object JavaDoc[]{new Integer JavaDoc(port),ex});
1975            }
1976        }
1977    }
1978
1979    
1980    /**
1981     * Enable gathering of monitoring datas.
1982     */

1983    public void enableMonitoring(){
1984        isMonitoringEnabled = true;
1985        enablePipelineStats();
1986        
1987       fileCacheFactory.setIsMonitoringEnabled(isMonitoringEnabled);
1988    }
1989    
1990    
1991    /**
1992     * Disable gathering of monitoring datas.
1993     */

1994    public void disableMonitoring(){
1995        disablePipelineStats();
1996        
1997       fileCacheFactory.setIsMonitoringEnabled(isMonitoringEnabled);
1998    }
1999
2000    
2001    /**
2002     * Returns <code>true</code> if monitoring has been enabled,
2003     * <code>false</code> otherwise.
2004     */

2005    public boolean isMonitoringEnabled() {
2006        return isMonitoringEnabled;
2007    }
2008
2009    
2010    public RequestGroupInfo getRequestGroupInfo() {
2011        return globalRequestProcessor;
2012    }
2013
2014
2015    public KeepAliveStats getKeepAliveStats() {
2016        return keepAliveStats;
2017    }
2018
2019
2020    /*
2021     * Initializes the web container monitoring level from the domain.xml.
2022     */

2023    private void initMonitoringLevel() {
2024        pipelineStat = new PipelineStatistic(port);
2025        pipelineStat.setQueueSizeInBytes(maxQueueSizeInBytes);
2026    }
2027 
2028    
2029    public int getMaxHttpHeaderSize() {
2030        return maxHttpHeaderSize;
2031    }
2032    
2033    public void setMaxHttpHeaderSize(int maxHttpHeaderSize) {
2034        this.maxHttpHeaderSize = maxHttpHeaderSize;
2035    }
2036    
2037        
2038    /**
2039     * The minimun threads created at startup.
2040     */

2041    public void setMinThreads(int minWorkerThreads){
2042        this.minWorkerThreads = minWorkerThreads;
2043    }
2044
2045
2046    /**
2047     * Set the request input buffer size
2048     */

2049    public void setBufferSize(int requestBufferSize){
2050        this.requestBufferSize = requestBufferSize;
2051    }
2052    
2053
2054    /**
2055     * Return the request input buffer size
2056     */

2057    public int getBufferSize(){
2058        return requestBufferSize;
2059    }
2060    
2061    
2062    public Selector JavaDoc getSelector(){
2063        return selector;
2064    }
2065
2066    /************************* PWCThreadPool Stats *************************/
2067
2068    public int getCountThreadsStats() {
2069
2070        int ret = processorPipeline.getCurrentThreadCount();
2071
2072        if (readPipeline != null
2073                && readPipeline != processorPipeline) {
2074            ret += readPipeline.getCurrentThreadCount();
2075        }
2076
2077        return ret;
2078    }
2079
2080
2081    public int getCountThreadsIdleStats() {
2082
2083        int ret = processorPipeline.getWaitingThread();
2084
2085        if (readPipeline != null
2086                && readPipeline != processorPipeline) {
2087            ret += readPipeline.getWaitingThread();
2088        }
2089
2090        return ret;
2091    }
2092
2093
2094    /************************* HTTPListener Stats *************************/
2095
2096    public int getCurrentThreadCountStats() {
2097
2098        int ret = processorPipeline.getCurrentThreadCount();
2099
2100        if (readPipeline != null
2101                && readPipeline != processorPipeline) {
2102            ret += readPipeline.getCurrentThreadCount();
2103        }
2104
2105        return ret;
2106    }
2107
2108
2109    public int getCurrentThreadsBusyStats() {
2110
2111        int ret = processorPipeline.getCurrentThreadsBusy();
2112
2113        if (readPipeline != null
2114                && readPipeline != processorPipeline) {
2115            ret += readPipeline.getCurrentThreadsBusy();
2116        }
2117 
2118        return ret;
2119    }
2120
2121    public int getMaxSpareThreadsStats() {
2122
2123        int ret = processorPipeline.getMaxSpareThreads();
2124 
2125        if (readPipeline != null
2126                && readPipeline != processorPipeline) {
2127            ret += readPipeline.getMaxSpareThreads();
2128        }
2129
2130        return ret;
2131    }
2132
2133
2134    public int getMinSpareThreadsStats() {
2135
2136        int ret = processorPipeline.getMinSpareThreads();
2137
2138        if (readPipeline != null
2139                && readPipeline != processorPipeline) {
2140            ret += readPipeline.getMinSpareThreads();
2141        }
2142
2143        return ret;
2144    }
2145
2146
2147    public int getMaxThreadsStats() {
2148
2149        int ret = processorPipeline.getMaxThreads();
2150
2151        if (readPipeline != null
2152                && readPipeline != processorPipeline) {
2153            ret += readPipeline.getMaxThreads();
2154        }
2155        
2156        return ret;
2157    }
2158
2159
2160    //------------------------------------------------- FileCache config -----/
2161

2162   
2163    /**
2164     * The timeout in seconds before remove a <code>FileCacheEntry</code>
2165     * from the <code>fileCache</code>
2166     */

2167    public void setSecondsMaxAge(int sMaxAges){
2168        secondsMaxAge = sMaxAges;
2169    }
2170    
2171    
2172    /**
2173     * Set the maximum entries this cache can contains.
2174     */

2175    public void setMaxCacheEntries(int mEntries){
2176        maxCacheEntries = mEntries;
2177    }
2178
2179    
2180    /**
2181     * Return the maximum entries this cache can contains.
2182     */

2183    public int getMaxCacheEntries(){
2184        return maxCacheEntries;
2185    }
2186    
2187    
2188    /**
2189     * Set the maximum size a <code>FileCacheEntry</code> can have.
2190     */

2191    public void setMinEntrySize(long mSize){
2192        minEntrySize = mSize;
2193    }
2194    
2195    
2196    /**
2197     * Get the maximum size a <code>FileCacheEntry</code> can have.
2198     */

2199    public long getMinEntrySize(){
2200        return minEntrySize;
2201    }
2202     
2203    
2204    /**
2205     * Set the maximum size a <code>FileCacheEntry</code> can have.
2206     */

2207    public void setMaxEntrySize(long mEntrySize){
2208        maxEntrySize = mEntrySize;
2209    }
2210    
2211    
2212    /**
2213     * Get the maximum size a <code>FileCacheEntry</code> can have.
2214     */

2215    public long getMaxEntrySize(){
2216        return maxEntrySize;
2217    }
2218    
2219    
2220    /**
2221     * Set the maximum cache size
2222     */

2223    public void setMaxLargeCacheSize(long mCacheSize){
2224        maxLargeFileCacheSize = mCacheSize;
2225    }
2226
2227    
2228    /**
2229     * Get the maximum cache size
2230     */

2231    public long getMaxLargeCacheSize(){
2232        return maxLargeFileCacheSize;
2233    }
2234    
2235    
2236    /**
2237     * Set the maximum cache size
2238     */

2239    public void setMaxSmallCacheSize(long mCacheSize){
2240        maxSmallFileCacheSize = mCacheSize;
2241    }
2242    
2243    
2244    /**
2245     * Get the maximum cache size
2246     */

2247    public long getMaxSmallCacheSize(){
2248        return maxSmallFileCacheSize;
2249    }
2250
2251    
2252    /**
2253     * Is the fileCache enabled.
2254     */

2255    public boolean isFileCacheEnabled(){
2256        return isFileCacheEnabled;
2257    }
2258
2259    
2260    /**
2261     * Is the file caching mechanism enabled.
2262     */

2263    public void setFileCacheIsEnabled(boolean isFileCacheEnabled){
2264        this.isFileCacheEnabled = isFileCacheEnabled;
2265    }
2266   
2267    
2268    /**
2269     * Is the large file cache support enabled.
2270     */

2271    public void setLargeFileCacheEnabled(boolean isLargeEnabled){
2272        this.isLargeFileCacheEnabled = isLargeEnabled;
2273    }
2274   
2275    
2276    /**
2277     * Is the large file cache support enabled.
2278     */

2279    public boolean getLargeFileCacheEnabled(){
2280        return isLargeFileCacheEnabled;
2281    }
2282
2283    // --------------------------------------------------------------------//
2284

2285    /**
2286     * Enable the <code>AsyncHandler</code> used when asynchronous
2287     */

2288    public void setEnableAsyncExecution(boolean asyncExecution){
2289        this.asyncExecution = asyncExecution;
2290    }
2291    
2292       
2293    /**
2294     * Return true when asynchronous execution is
2295     * enabled.
2296     */

2297    public boolean getEnableAsyncExecution(){
2298        return asyncExecution;
2299    }
2300    
2301    
2302    /**
2303     * Set the <code>AsyncHandler</code> used when asynchronous execution is
2304     * enabled.
2305     */

2306    public void setAsyncHandler(AsyncHandler asyncHandler){
2307        this.asyncHandler = asyncHandler;
2308    }
2309    
2310       
2311    /**
2312     * Return the <code>AsyncHandler</code> used when asynchronous execution is
2313     * enabled.
2314     */

2315    public AsyncHandler getAsyncHandler(){
2316        return asyncHandler;
2317    }
2318    
2319    
2320    /**
2321     * Set the logger used by this instance.
2322     */

2323    public static void setLogger(Logger JavaDoc l){
2324        if ( l != null )
2325            logger = l;
2326    }
2327
2328    
2329    /**
2330     * Return the logger used by the Grizzly classes.
2331     */

2332    public static Logger JavaDoc logger(){
2333        return logger;
2334    }
2335    
2336    
2337    /**
2338     * Set the document root folder
2339     */

2340    public static void setWebAppRootPath(String JavaDoc rf){
2341        rootFolder = rf;
2342    }
2343    
2344    
2345    /**
2346     * Return the folder's root where application are deployed.
2347     */

2348    public static String JavaDoc getWebAppRootPath(){
2349        return rootFolder;
2350    }
2351    
2352    
2353    /**
2354     * Display the Grizzly configuration parameters.
2355     */

2356    private void displayConfiguration(){
2357       if (displayConfiguration){
2358            logger.log(Level.INFO,
2359                    "\n Grizzly configuration for http-listener "
2360                    + port
2361                    + "\n\t useNioNonBlocking:"
2362                    + useNioNonBlocking + "\n\t minReadQueueLength:"
2363                    + minReadQueueLength + "\n\t minProcessorQueueLength:"
2364                    + maxReadWorkerThreads+"\n\t maxProcessorWorkerThreads: "
2365                    + maxProcessorWorkerThreads + "\n\t minWorkerThreads:"
2366                    + minWorkerThreads + "\n\t selectorTimeout:"
2367                    + selectorTimeout + "\n\t ByteBuffer size: "
2368                    + Constants.CHANNEL_BYTE_SIZE
2369                    + "\n\t maxHttpHeaderSize:"
2370                    + maxHttpHeaderSize
2371                    + "\n\t maxKeepAliveRequests: "
2372                    + maxKeepAliveRequests
2373                    + "\n\t keepAliveTimeoutInSeconds: "
2374                    + keepAliveTimeoutInSeconds
2375                    + "\n\t useDirectByteBuffer: "
2376                    + useDirectByteBuffer
2377                    + "\n\t socketSoTimeout: "
2378                    + Constants.DEFAULT_CONNECTION_TIMEOUT
2379                    + "\n\t useByteBufferView: "
2380                    + useByteBufferView
2381                    + "\n\t selectorReadThreadsCount: "
2382                    + selectorReadThreadsCount
2383                    + "\n\t recycleTasks: "
2384                    + recycleTasks);
2385        }
2386    }
2387}
2388
2389
2390
2391
2392
Popular Tags