KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > tomcat > util > net > NioEndpoint


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 package org.apache.tomcat.util.net;
19
20 import java.io.FileInputStream JavaDoc;
21 import java.io.IOException JavaDoc;
22 import java.net.InetAddress JavaDoc;
23 import java.net.InetSocketAddress JavaDoc;
24 import java.net.Socket JavaDoc;
25 import java.nio.ByteBuffer JavaDoc;
26 import java.nio.channels.CancelledKeyException JavaDoc;
27 import java.nio.channels.SelectionKey JavaDoc;
28 import java.nio.channels.Selector JavaDoc;
29 import java.nio.channels.ServerSocketChannel JavaDoc;
30 import java.nio.channels.SocketChannel JavaDoc;
31 import java.security.KeyStore JavaDoc;
32 import java.util.HashMap JavaDoc;
33 import java.util.Iterator JavaDoc;
34 import java.util.Set JavaDoc;
35 import java.util.StringTokenizer JavaDoc;
36 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
37 import java.util.concurrent.Executor JavaDoc;
38 import java.util.concurrent.atomic.AtomicLong JavaDoc;
39 import javax.net.ssl.KeyManagerFactory;
40 import javax.net.ssl.SSLContext;
41 import javax.net.ssl.SSLEngine;
42 import javax.net.ssl.TrustManagerFactory;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.tomcat.util.IntrospectionUtils;
47 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
48 import org.apache.tomcat.util.res.StringManager;
49 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
50 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
51
52 /**
53  * NIO tailored thread pool, providing the following services:
54  * <ul>
55  * <li>Socket acceptor thread</li>
56  * <li>Socket poller thread</li>
57  * <li>Worker threads pool</li>
58  * </ul>
59  *
60  * When switching to Java 5, there's an opportunity to use the virtual
61  * machine's thread pool.
62  *
63  * @author Mladen Turk
64  * @author Remy Maucherat
65  * @author Filip Hanik
66  */

67 public class NioEndpoint {
68
69
70     // -------------------------------------------------------------- Constants
71

72
73     protected static Log log = LogFactory.getLog(NioEndpoint.class);
74
75     protected static StringManager sm =
76         StringManager.getManager("org.apache.tomcat.util.net.res");
77
78
79     /**
80      * The Request attribute key for the cipher suite.
81      */

82     public static final String JavaDoc CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
83
84     /**
85      * The Request attribute key for the key size.
86      */

87     public static final String JavaDoc KEY_SIZE_KEY = "javax.servlet.request.key_size";
88
89     /**
90      * The Request attribute key for the client certificate chain.
91      */

92     public static final String JavaDoc CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
93
94     /**
95      * The Request attribute key for the session id.
96      * This one is a Tomcat extension to the Servlet spec.
97      */

98     public static final String JavaDoc SESSION_ID_KEY = "javax.servlet.request.ssl_session";
99
100     public static final int OP_REGISTER = -1; //register interest op
101
// ----------------------------------------------------------------- Fields
102

103
104     /**
105      * Available workers.
106      */

107     protected WorkerStack workers = null;
108
109
110     /**
111      * Running state of the endpoint.
112      */

113     protected volatile boolean running = false;
114
115
116     /**
117      * Will be set to true whenever the endpoint is paused.
118      */

119     protected volatile boolean paused = false;
120
121
122     /**
123      * Track the initialization state of the endpoint.
124      */

125     protected boolean initialized = false;
126
127
128     /**
129      * Current worker threads busy count.
130      */

131     protected int curThreadsBusy = 0;
132
133
134     /**
135      * Current worker threads count.
136      */

137     protected int curThreads = 0;
138
139
140     /**
141      * Sequence number used to generate thread names.
142      */

143     protected int sequence = 0;
144     
145     protected NioSelectorPool selectorPool = new NioSelectorPool();
146     
147     /**
148      * Server socket "pointer".
149      */

150     protected ServerSocketChannel JavaDoc serverSock = null;
151
152     /**
153      * Cache for key attachment objects
154      */

155     protected ConcurrentLinkedQueue JavaDoc<KeyAttachment> keyCache = new ConcurrentLinkedQueue JavaDoc<KeyAttachment>();
156     
157     /**
158      * Cache for poller events
159      */

160     protected ConcurrentLinkedQueue JavaDoc<PollerEvent> eventCache = new ConcurrentLinkedQueue JavaDoc<PollerEvent>();
161
162     /**
163      * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
164      */

165     protected ConcurrentLinkedQueue JavaDoc<NioChannel> nioChannels = new ConcurrentLinkedQueue JavaDoc<NioChannel>() {
166         protected AtomicInteger JavaDoc size = new AtomicInteger JavaDoc(0);
167         protected AtomicInteger JavaDoc bytes = new AtomicInteger JavaDoc(0);
168         public boolean offer(NioChannel socket, KeyAttachment att) {
169             boolean offer = socketProperties.getBufferPool()==-1?true:size.get()<socketProperties.getBufferPool();
170             offer = offer && (socketProperties.getBufferPoolSize()==-1?true:(bytes.get()+socket.getBufferSize())<socketProperties.getBufferPoolSize());
171             //avoid over growing our cache or add after we have stopped
172
if ( running && (!paused) && (offer) ) {
173                 boolean result = super.offer(socket);
174                 if ( result ) {
175                     size.incrementAndGet();
176                     bytes.addAndGet(socket.getBufferSize());
177                 }
178                 return result;
179             }
180             else return false;
181         }
182         
183         public NioChannel poll() {
184             NioChannel result = super.poll();
185             if ( result != null ) {
186                 size.decrementAndGet();
187                 bytes.addAndGet(-result.getBufferSize());
188             }
189             return result;
190         }
191         
192         public void clear() {
193             super.clear();
194             size.set(0);
195         }
196     };
197
198     
199
200     // ------------------------------------------------------------- Properties
201

202
203     /**
204      * External Executor based thread pool.
205      */

206     protected Executor JavaDoc executor = null;
207     public void setExecutor(Executor JavaDoc executor) { this.executor = executor; }
208     public Executor JavaDoc getExecutor() { return executor; }
209
210
211     /**
212      * Maximum amount of worker threads.
213      */

214     protected int maxThreads = 400;
215     public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }
216     public int getMaxThreads() { return maxThreads; }
217
218
219     /**
220      * Priority of the acceptor and poller threads.
221      */

222     protected int threadPriority = Thread.NORM_PRIORITY;
223     public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }
224     public int getThreadPriority() { return threadPriority; }
225
226
227     /**
228      * Server socket port.
229      */

230     protected int port;
231     public int getPort() { return port; }
232     public void setPort(int port ) { this.port=port; }
233
234
235     /**
236      * Address for the server socket.
237      */

238     protected InetAddress JavaDoc address;
239     public InetAddress JavaDoc getAddress() { return address; }
240     public void setAddress(InetAddress JavaDoc address) { this.address = address; }
241
242
243     /**
244      * Handling of accepted sockets.
245      */

246     protected Handler handler = null;
247     public void setHandler(Handler handler ) { this.handler = handler; }
248     public Handler getHandler() { return handler; }
249
250
251     /**
252      * Allows the server developer to specify the backlog that
253      * should be used for server sockets. By default, this value
254      * is 100.
255      */

256     protected int backlog = 100;
257     public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
258     public int getBacklog() { return backlog; }
259
260     protected SocketProperties socketProperties = new SocketProperties();
261
262     /**
263      * Socket TCP no delay.
264      */

265     public boolean getTcpNoDelay() { return socketProperties.getTcpNoDelay();}
266     public void setTcpNoDelay(boolean tcpNoDelay) { socketProperties.setTcpNoDelay(tcpNoDelay); }
267
268
269     /**
270      * Socket linger.
271      */

272     public int getSoLinger() { return socketProperties.getSoLingerTime(); }
273     public void setSoLinger(int soLinger) {
274         socketProperties.setSoLingerTime(soLinger);
275         socketProperties.setSoLingerOn(soLinger>=0);
276     }
277
278
279     /**
280      * Socket timeout.
281      */

282     public int getSoTimeout() { return socketProperties.getSoTimeout(); }
283     public void setSoTimeout(int soTimeout) { socketProperties.setSoTimeout(soTimeout); }
284
285
286     /**
287      * Timeout on first request read before going to the poller, in ms.
288      */

289     protected int firstReadTimeout = 60000;
290     public int getFirstReadTimeout() { return firstReadTimeout; }
291     public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; }
292
293
294     /**
295      * The default is true - the created threads will be
296      * in daemon mode. If set to false, the control thread
297      * will not be daemon - and will keep the process alive.
298      */

299     protected boolean daemon = true;
300     public void setDaemon(boolean b) { daemon = b; }
301     public boolean getDaemon() { return daemon; }
302
303
304     /**
305      * Name of the thread pool, which will be used for naming child threads.
306      */

307     protected String JavaDoc name = "TP";
308     public void setName(String JavaDoc name) { this.name = name; }
309     public String JavaDoc getName() { return name; }
310
311
312
313     /**
314      * Allow comet request handling.
315      */

316     protected boolean useComet = true;
317     public void setUseComet(boolean useComet) { this.useComet = useComet; }
318     public boolean getUseComet() { return useComet; }
319
320
321     /**
322      * Acceptor thread count.
323      */

324     protected int acceptorThreadCount = 0;
325     public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; }
326     public int getAcceptorThreadCount() { return acceptorThreadCount; }
327
328
329
330     /**
331      * Poller thread count.
332      */

333     protected int pollerThreadCount = 0;
334     public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }
335     public int getPollerThreadCount() { return pollerThreadCount; }
336
337     protected long selectorTimeout = 1000;
338     public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;}
339     public long getSelectorTimeout(){ return this.selectorTimeout; }
340     /**
341      * The socket poller.
342      */

343     protected Poller[] pollers = null;
344     protected int pollerRoundRobin = 0;
345     public Poller getPoller0() {
346         pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
347         Poller poller = pollers[pollerRoundRobin];
348         return poller;
349     }
350
351
352     /**
353      * The socket poller used for Comet support.
354      */

355     public Poller getCometPoller0() {
356         Poller poller = getPoller0();
357         return poller;
358     }
359
360
361     /**
362      * Dummy maxSpareThreads property.
363      */

364     public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); }
365
366
367     /**
368      * Dummy minSpareThreads property.
369      */

370     public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); }
371     
372     /**
373      * Generic properties, introspected
374      */

375     public void setProperty(String JavaDoc name, String JavaDoc value) {
376         final String JavaDoc selectorPoolName = "selectorPool.";
377         final String JavaDoc socketName = "socket.";
378         try {
379             if (name.startsWith(selectorPoolName)) {
380                 IntrospectionUtils.setProperty(selectorPool, name.substring(selectorPoolName.length()), value);
381             } else if (name.startsWith(socketName)) {
382                 IntrospectionUtils.setProperty(socketProperties, name.substring(socketName.length()), value);
383             }
384         }catch ( Exception JavaDoc x ) {
385             log.error("Unable to set attribute \""+name+"\" to \""+value+"\"",x);
386         }
387     }
388
389
390     // -------------------- SSL related properties --------------------
391
protected String JavaDoc keystoreFile = System.getProperty("user.home")+"/.keystore";
392     public String JavaDoc getKeystoreFile() { return keystoreFile;}
393     public void setKeystoreFile(String JavaDoc s ) { this.keystoreFile = s; }
394     public void setKeystore(String JavaDoc s ) { setKeystoreFile(s);}
395     public String JavaDoc getKeystore() { return getKeystoreFile();}
396     
397     protected String JavaDoc algorithm = "SunX509";
398     public String JavaDoc getAlgorithm() { return algorithm;}
399     public void setAlgorithm(String JavaDoc s ) { this.algorithm = s;}
400
401     protected boolean clientAuth = false;
402     public boolean getClientAuth() { return clientAuth;}
403     public void setClientAuth(boolean b ) { this.clientAuth = b;}
404     
405     protected String JavaDoc keystorePass = "changeit";
406     public String JavaDoc getKeystorePass() { return keystorePass;}
407     public void setKeystorePass(String JavaDoc s ) { this.keystorePass = s;}
408     
409     protected String JavaDoc keystoreType = "JKS";
410     public String JavaDoc getKeystoreType() { return keystoreType;}
411     public void setKeystoreType(String JavaDoc s ) { this.keystoreType = s;}
412
413     protected String JavaDoc sslProtocol = "TLS";
414     
415     public String JavaDoc getSslProtocol() { return sslProtocol;}
416     public void setSslProtocol(String JavaDoc s) { sslProtocol = s;}
417     
418     protected String JavaDoc sslEnabledProtocols=null; //"TLSv1,SSLv3,SSLv2Hello"
419
protected String JavaDoc[] sslEnabledProtocolsarr = new String JavaDoc[0];
420     public void setSslEnabledProtocols(String JavaDoc s) {
421         this.sslEnabledProtocols = s;
422         StringTokenizer JavaDoc t = new StringTokenizer JavaDoc(s,",");
423         sslEnabledProtocolsarr = new String JavaDoc[t.countTokens()];
424         for (int i=0; i<sslEnabledProtocolsarr.length; i++ ) sslEnabledProtocolsarr[i] = t.nextToken();
425     }
426     
427     
428     protected String JavaDoc ciphers = null;
429     protected String JavaDoc[] ciphersarr = new String JavaDoc[0];
430     public String JavaDoc getCiphers() { return ciphers;}
431     public void setCiphers(String JavaDoc s) {
432         ciphers = s;
433         if ( s == null ) ciphersarr = new String JavaDoc[0];
434         else {
435             StringTokenizer JavaDoc t = new StringTokenizer JavaDoc(s,",");
436             ciphersarr = new String JavaDoc[t.countTokens()];
437             for (int i=0; i<ciphersarr.length; i++ ) ciphersarr[i] = t.nextToken();
438         }
439     }
440     
441     /**
442      * SSL engine.
443      */

444     protected boolean SSLEnabled = false;
445     public boolean isSSLEnabled() { return SSLEnabled;}
446     public void setSSLEnabled(boolean SSLEnabled) {this.SSLEnabled = SSLEnabled;}
447
448     protected boolean secure = false;
449     public boolean getSecure() { return secure;}
450     public void setSecure(boolean b) { secure = b;}
451
452     public void setSelectorPool(NioSelectorPool selectorPool) {
453         this.selectorPool = selectorPool;
454     }
455
456     public void setSocketProperties(SocketProperties socketProperties) {
457         this.socketProperties = socketProperties;
458     }
459
460     protected SSLContext sslContext = null;
461     public SSLContext getSSLContext() { return sslContext;}
462     public void setSSLContext(SSLContext c) { sslContext = c;}
463     
464     // --------------------------------------------------------- Public Methods
465

466
467     /**
468      * Number of keepalive sockets.
469      */

470     public int getKeepAliveCount() {
471         if (pollers == null) {
472             return 0;
473         } else {
474             int keepAliveCount = 0;
475             for (int i = 0; i < pollers.length; i++) {
476                 keepAliveCount += pollers[i].getKeepAliveCount();
477             }
478             return keepAliveCount;
479         }
480     }
481
482
483
484     /**
485      * Return the amount of threads that are managed by the pool.
486      *
487      * @return the amount of threads that are managed by the pool
488      */

489     public int getCurrentThreadCount() {
490         return curThreads;
491     }
492
493
494     /**
495      * Return the amount of threads currently busy.
496      *
497      * @return the amount of threads currently busy
498      */

499     public int getCurrentThreadsBusy() {
500         return curThreadsBusy;
501     }
502
503
504     /**
505      * Return the state of the endpoint.
506      *
507      * @return true if the endpoint is running, false otherwise
508      */

509     public boolean isRunning() {
510         return running;
511     }
512
513
514     /**
515      * Return the state of the endpoint.
516      *
517      * @return true if the endpoint is paused, false otherwise
518      */

519     public boolean isPaused() {
520         return paused;
521     }
522
523
524     // ----------------------------------------------- Public Lifecycle Methods
525

526
527     /**
528      * Initialize the endpoint.
529      */

530     public void init()
531         throws Exception JavaDoc {
532
533         if (initialized)
534             return;
535
536         serverSock = ServerSocketChannel.open();
537         InetSocketAddress JavaDoc addr = (address!=null?new InetSocketAddress JavaDoc(address,port):new InetSocketAddress JavaDoc(port));
538         serverSock.socket().bind(addr,100); //todo, set backlog value
539
serverSock.configureBlocking(true); //mimic APR behavior
540

541         // Initialize thread count defaults for acceptor, poller and sendfile
542
if (acceptorThreadCount == 0) {
543             // FIXME: Doesn't seem to work that well with multiple accept threads
544
acceptorThreadCount = 1;
545         }
546         if (pollerThreadCount <= 0) {
547             //minimum one poller thread
548
pollerThreadCount = 1;
549         }
550
551         // Initialize SSL if needed
552
if (isSSLEnabled()) {
553             // Initialize SSL
554
char[] passphrase = getKeystorePass().toCharArray();
555
556             KeyStore JavaDoc ks = KeyStore.getInstance(getKeystoreType());
557             ks.load(new FileInputStream JavaDoc(getKeystoreFile()), passphrase);
558             KeyStore JavaDoc ts = KeyStore.getInstance(getKeystoreType());
559             ts.load(new FileInputStream JavaDoc(getKeystoreFile()), passphrase);
560
561             KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
562             kmf.init(ks, passphrase);
563
564             TrustManagerFactory tmf = TrustManagerFactory.getInstance(getAlgorithm());
565             tmf.init(ts);
566
567             sslContext = SSLContext.getInstance(getSslProtocol());
568             sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
569
570         }
571
572         initialized = true;
573
574     }
575
576
577     /**
578      * Start the APR endpoint, creating acceptor, poller threads.
579      */

580     public void start()
581         throws Exception JavaDoc {
582         // Initialize socket if not done before
583
if (!initialized) {
584             init();
585         }
586         if (!running) {
587             running = true;
588             paused = false;
589             
590             // Create worker collection
591
if (executor == null) {
592                 workers = new WorkerStack(maxThreads);
593                 //executor = new ThreadPoolExecutor(getMinSpareThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
594
}
595
596             // Start acceptor threads
597
for (int i = 0; i < acceptorThreadCount; i++) {
598                 Thread JavaDoc acceptorThread = new Thread JavaDoc(new Acceptor(), getName() + "-Acceptor-" + i);
599                 acceptorThread.setPriority(threadPriority);
600                 acceptorThread.setDaemon(daemon);
601                 acceptorThread.start();
602             }
603
604             // Start poller threads
605
pollers = new Poller[pollerThreadCount];
606             for (int i = 0; i < pollerThreadCount; i++) {
607                 pollers[i] = new Poller();
608                 pollers[i].init();
609                 Thread JavaDoc pollerThread = new Thread JavaDoc(pollers[i], getName() + "-Poller-" + i);
610                 pollerThread.setPriority(threadPriority);
611                 pollerThread.setDaemon(true);
612                 pollerThread.start();
613             }
614         }
615     }
616
617
618     /**
619      * Pause the endpoint, which will make it stop accepting new sockets.
620      */

621     public void pause() {
622         if (running && !paused) {
623             paused = true;
624             unlockAccept();
625         }
626     }
627
628
629     /**
630      * Resume the endpoint, which will make it start accepting new sockets
631      * again.
632      */

633     public void resume() {
634         if (running) {
635             paused = false;
636         }
637     }
638
639
640     /**
641      * Stop the endpoint. This will cause all processing threads to stop.
642      */

643     public void stop() {
644         if (running) {
645             running = false;
646             unlockAccept();
647             for (int i = 0; i < pollers.length; i++) {
648                 pollers[i].destroy();
649             }
650             pollers = null;
651         }
652         eventCache.clear();
653         keyCache.clear();
654         nioChannels.clear();
655     }
656
657
658     /**
659      * Deallocate APR memory pools, and close server socket.
660      */

661     public void destroy() throws Exception JavaDoc {
662         if (running) {
663             stop();
664         }
665         // Close server socket
666
serverSock.socket().close();
667         serverSock.close();
668         serverSock = null;
669         sslContext = null;
670         initialized = false;
671         nioChannels.clear();
672     }
673
674
675     // ------------------------------------------------------ Protected Methods
676

677
678     /**
679      * Get a sequence number used for thread naming.
680      */

681     protected int getSequence() {
682         return sequence++;
683     }
684
685     public int getWriteBufSize() {
686         return socketProperties.getTxBufSize();
687     }
688
689     public int getReadBufSize() {
690         return socketProperties.getRxBufSize();
691     }
692
693     public NioSelectorPool getSelectorPool() {
694         return selectorPool;
695     }
696
697     public SocketProperties getSocketProperties() {
698         return socketProperties;
699     }
700
701     /**
702      * Unlock the server socket accept using a bogus connection.
703      */

704     protected void unlockAccept() {
705         java.net.Socket JavaDoc s = null;
706         try {
707             // Need to create a connection to unlock the accept();
708
if (address == null) {
709                 s = new java.net.Socket JavaDoc("127.0.0.1", port);
710             } else {
711                 s = new java.net.Socket JavaDoc(address, port);
712                 // setting soLinger to a small value will help shutdown the
713
// connection quicker
714
s.setSoLinger(true, 0);
715             }
716         } catch(Exception JavaDoc e) {
717             if (log.isDebugEnabled()) {
718                 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
719             }
720         } finally {
721             if (s != null) {
722                 try {
723                     s.close();
724                 } catch (Exception JavaDoc e) {
725                     // Ignore
726
}
727             }
728         }
729     }
730
731
732     /**
733      * Process the specified connection.
734      */

735     protected boolean setSocketOptions(SocketChannel JavaDoc socket) {
736         // Process the connection
737
int step = 1;
738         try {
739             //disable blocking, APR style, we are gonna be polling it
740
socket.configureBlocking(false);
741             Socket JavaDoc sock = socket.socket();
742             socketProperties.setProperties(sock);
743
744             NioChannel channel = nioChannels.poll();
745             if ( channel == null ) {
746                 // 2: SSL setup
747
step = 2;
748
749                 if (sslContext != null) {
750                     SSLEngine engine = createSSLEngine();
751                     int appbufsize = engine.getSession().getApplicationBufferSize();
752                     NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,getReadBufSize()),
753                                                                        Math.max(appbufsize,getWriteBufSize()),
754                                                                        socketProperties.getDirectBuffer());
755                     channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
756                 } else {
757                     NioBufferHandler bufhandler = new NioBufferHandler(getReadBufSize(),
758                                                                        getWriteBufSize(),
759                                                                        socketProperties.getDirectBuffer());
760
761                     channel = new NioChannel(socket, bufhandler);
762                 }
763             } else {
764                 
765                 channel.setIOChannel(socket);
766                 if ( channel instanceof SecureNioChannel ) {
767                     SSLEngine engine = createSSLEngine();
768                     ((SecureNioChannel)channel).reset(engine);
769                 } else {
770                     channel.reset();
771                 }
772             }
773             getPoller0().register(channel);
774
775         } catch (Throwable JavaDoc t) {
776             try {
777                 log.error("",t);
778             }catch ( Throwable JavaDoc tt){}
779             // Tell to close the socket
780
return false;
781         }
782         return true;
783     }
784
785     protected SSLEngine createSSLEngine() {
786         SSLEngine engine = sslContext.createSSLEngine();
787         engine.setNeedClientAuth(getClientAuth());
788         engine.setUseClientMode(false);
789         if ( ciphersarr.length > 0 ) engine.setEnabledCipherSuites(ciphersarr);
790         if ( sslEnabledProtocolsarr.length > 0 ) engine.setEnabledProtocols(sslEnabledProtocolsarr);
791         
792         return engine;
793     }
794
795
796     /**
797      * Create (or allocate) and return an available processor for use in
798      * processing a specific HTTP request, if possible. If the maximum
799      * allowed processors have already been created and are in use, return
800      * <code>null</code> instead.
801      */

802     protected Worker createWorkerThread() {
803
804         synchronized (workers) {
805             if (workers.size() > 0) {
806                 curThreadsBusy++;
807                 return (workers.pop());
808             }
809             if ((maxThreads > 0) && (curThreads < maxThreads)) {
810                 curThreadsBusy++;
811                 return (newWorkerThread());
812             } else {
813                 if (maxThreads < 0) {
814                     curThreadsBusy++;
815                     return (newWorkerThread());
816                 } else {
817                     return (null);
818                 }
819             }
820         }
821
822     }
823
824
825     /**
826      * Create and return a new processor suitable for processing HTTP
827      * requests and returning the corresponding responses.
828      */

829     protected Worker newWorkerThread() {
830
831         Worker workerThread = new Worker();
832         workerThread.start();
833         return (workerThread);
834
835     }
836
837
838     /**
839      * Return a new worker thread, and block while to worker is available.
840      */

841     protected Worker getWorkerThread() {
842         // Allocate a new worker thread
843
Worker workerThread = createWorkerThread();
844         while (workerThread == null) {
845             try {
846                 synchronized (workers) {
847                     workerThread = createWorkerThread();
848                     if ( workerThread == null ) workers.wait();
849                 }
850             } catch (InterruptedException JavaDoc e) {
851                 // Ignore
852
}
853             if ( workerThread == null ) workerThread = createWorkerThread();
854         }
855         return workerThread;
856     }
857
858
859     /**
860      * Recycle the specified Processor so that it can be used again.
861      *
862      * @param workerThread The processor to be recycled
863      */

864     protected void recycleWorkerThread(Worker workerThread) {
865         synchronized (workers) {
866             workers.push(workerThread);
867             curThreadsBusy--;
868             workers.notify();
869         }
870     }
871
872
873     protected boolean processSocket(SocketChannel JavaDoc socket) {
874         try {
875             if (executor == null) {
876                 getWorkerThread().assign(socket);
877             } else {
878                 executor.execute(new SocketOptionsProcessor(socket));
879             }
880         } catch (Throwable JavaDoc t) {
881             // This means we got an OOM or similar creating a thread, or that
882
// the pool and its queue are full
883
log.error(sm.getString("endpoint.process.fail"), t);
884             return false;
885         }
886         return true;
887     }
888     /**
889      * Process given socket.
890      */

891     protected boolean processSocket(NioChannel socket) {
892         try {
893             if (executor == null) {
894                 getWorkerThread().assign(socket);
895             } else {
896                 executor.execute(new SocketProcessor(socket));
897             }
898         } catch (Throwable JavaDoc t) {
899             // This means we got an OOM or similar creating a thread, or that
900
// the pool and its queue are full
901
log.error(sm.getString("endpoint.process.fail"), t);
902             return false;
903         }
904         return true;
905     }
906
907
908     /**
909      * Process given socket for an event.
910      */

911     protected boolean processSocket(NioChannel socket, SocketStatus status) {
912         try {
913             if (executor == null) {
914                 getWorkerThread().assign(socket, status);
915             } else {
916                 executor.execute(new SocketEventProcessor(socket, status));
917             }
918         } catch (Throwable JavaDoc t) {
919             // This means we got an OOM or similar creating a thread, or that
920
// the pool and its queue are full
921
log.error(sm.getString("endpoint.process.fail"), t);
922             return false;
923         }
924         return true;
925     }
926
927
928     // --------------------------------------------------- Acceptor Inner Class
929

930
931     /**
932      * Server socket acceptor thread.
933      */

934     protected class Acceptor implements Runnable JavaDoc {
935
936
937         /**
938          * The background thread that listens for incoming TCP/IP connections and
939          * hands them off to an appropriate processor.
940          */

941         public void run() {
942
943             // Loop until we receive a shutdown command
944
while (running) {
945
946                 // Loop if endpoint is paused
947
while (paused) {
948                     try {
949                         Thread.sleep(1000);
950                     } catch (InterruptedException JavaDoc e) {
951                         // Ignore
952
}
953                 }
954
955                 try {
956                     // Accept the next incoming connection from the server socket
957
SocketChannel JavaDoc socket = serverSock.accept();
958                     // Hand this socket off to an appropriate processor
959
if ( running && (!paused) && socket != null ) processSocket(socket);
960                 } catch (Throwable JavaDoc t) {
961                     log.error(sm.getString("endpoint.accept.fail"), t);
962                 }
963
964                 // The processor will recycle itself when it finishes
965

966             }
967
968         }
969
970     }
971
972
973     // ----------------------------------------------------- Poller Inner Classes
974

975     /**
976      *
977      * PollerEvent, cacheable object for poller events to avoid GC
978      */

979     public class PollerEvent implements Runnable JavaDoc {
980         
981         protected NioChannel socket;
982         protected int interestOps;
983         protected KeyAttachment key;
984         public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
985             reset(ch, k, intOps);
986         }
987     
988         public void reset(NioChannel ch, KeyAttachment k, int intOps) {
989             socket = ch;
990             interestOps = intOps;
991             key = k;
992         }
993     
994         public void reset() {
995             reset(null, null, 0);
996         }
997     
998         public void run() {
999             if ( interestOps == OP_REGISTER ) {
1000                try {
1001                    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
1002                } catch (Exception JavaDoc x) {
1003                    log.error("", x);
1004                }
1005            } else {
1006                final SelectionKey JavaDoc key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
1007                final KeyAttachment att = (KeyAttachment) key.attachment();
1008                try {
1009                    if (key != null) {
1010                        key.interestOps(interestOps);
1011                        att.interestOps(interestOps);
1012                    }
1013                }
1014                catch (CancelledKeyException JavaDoc ckx) {
1015                    try {
1016                        if (key != null && key.attachment() != null) {
1017                            KeyAttachment ka = (KeyAttachment) key.attachment();
1018                            ka.setError(true); //set to collect this socket immediately
1019
}
1020                        try {
1021                            socket.close();
1022                        }
1023                        catch (Exception JavaDoc ignore) {}
1024                        if (socket.isOpen())
1025                            socket.close(true);
1026                    }
1027                    catch (Exception JavaDoc ignore) {}
1028                }
1029            }//end if
1030
}//run
1031

1032        public String JavaDoc toString() {
1033            return super.toString()+"[intOps="+this.interestOps+"]";
1034        }
1035    }
1036    /**
1037     * Poller class.
1038     */

1039    public class Poller implements Runnable JavaDoc {
1040
1041        protected Selector JavaDoc selector;
1042        protected ConcurrentLinkedQueue JavaDoc<Runnable JavaDoc> events = new ConcurrentLinkedQueue JavaDoc<Runnable JavaDoc>();
1043        
1044        protected boolean close = false;
1045        protected long nextExpiration = 0;//optimize expiration handling
1046

1047        protected int keepAliveCount = 0;
1048        public int getKeepAliveCount() { return keepAliveCount; }
1049        
1050        protected AtomicLong JavaDoc wakeupCounter = new AtomicLong JavaDoc(0l);
1051
1052
1053
1054        public Poller() throws IOException JavaDoc {
1055            this.selector = Selector.open();
1056        }
1057        
1058        public Selector JavaDoc getSelector() { return selector;}
1059
1060        /**
1061         * Create the poller. With some versions of APR, the maximum poller size will
1062         * be 62 (reocmpiling APR is necessary to remove this limitation).
1063         */

1064        protected void init() {
1065            keepAliveCount = 0;
1066        }
1067
1068        /**
1069         * Destroy the poller.
1070         */

1071        protected void destroy() {
1072            // Wait for polltime before doing anything, so that the poller threads
1073
// exit, otherwise parallel descturction of sockets which are still
1074
// in the poller can cause problems
1075
close = true;
1076            events.clear();
1077            selector.wakeup();
1078        }
1079        
1080        public void addEvent(Runnable JavaDoc event) {
1081            events.offer(event);
1082            if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup();
1083        }
1084
1085        /**
1086         * Add specified socket and associated pool to the poller. The socket will
1087         * be added to a temporary array, and polled first after a maximum amount
1088         * of time equal to pollTime (in most cases, latency will be much lower,
1089         * however).
1090         *
1091         * @param socket to add to the poller
1092         */

1093        public void add(final NioChannel socket) {
1094            add(socket,SelectionKey.OP_READ);
1095        }
1096        
1097        public void add(final NioChannel socket, final int interestOps) {
1098            PollerEvent r = eventCache.poll();
1099            if ( r==null) r = new PollerEvent(socket,null,interestOps);
1100            else r.reset(socket,null,interestOps);
1101            addEvent(r);
1102        }
1103        
1104        public boolean events() {
1105            boolean result = false;
1106            //synchronized (events) {
1107
Runnable JavaDoc r = null;
1108                result = (events.size() > 0);
1109                while ( (r = (Runnable JavaDoc)events.poll()) != null ) {
1110                    try {
1111                        r.run();
1112                        if ( r instanceof PollerEvent ) {
1113                            ((PollerEvent)r).reset();
1114                            eventCache.offer((PollerEvent)r);
1115                        }
1116                    } catch ( Exception JavaDoc x ) {
1117                        log.error("",x);
1118                    }
1119                }
1120                //events.clear();
1121
//}
1122
return result;
1123        }
1124        
1125        public void register(final NioChannel socket)
1126        {
1127            socket.setPoller(this);
1128            KeyAttachment key = keyCache.poll();
1129            final KeyAttachment ka = key!=null?key:new KeyAttachment();
1130            ka.reset(this,socket);
1131            PollerEvent r = eventCache.poll();
1132            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
1133
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
1134            else r.reset(socket,ka,OP_REGISTER);
1135            addEvent(r);
1136        }
1137        
1138        public void cancelledKey(SelectionKey JavaDoc key, SocketStatus status) {
1139            try {
1140                KeyAttachment ka = (KeyAttachment) key.attachment();
1141                if (ka != null && ka.getComet()) {
1142                    //the comet event takes care of clean up
1143
processSocket(ka.getChannel(), status);
1144                }else {
1145                    if (key.isValid()) key.cancel();
1146                    if (key.channel().isOpen()) key.channel().close();
1147                    key.attach(null);
1148                }
1149            } catch (Throwable JavaDoc e) {
1150                if ( log.isDebugEnabled() ) log.error("",e);
1151                // Ignore
1152
}
1153        }
1154        /**
1155         * The background thread that listens for incoming TCP/IP connections and
1156         * hands them off to an appropriate processor.
1157         */

1158        public void run() {
1159            // Loop until we receive a shutdown command
1160
while (running) {
1161                // Loop if endpoint is paused
1162
while (paused) {
1163                    try {
1164                        Thread.sleep(1000);
1165                    } catch (InterruptedException JavaDoc e) {
1166                        // Ignore
1167
}
1168                }
1169                boolean hasEvents = false;
1170
1171                hasEvents = (hasEvents | events());
1172                // Time to terminate?
1173
if (close) return;
1174
1175                int keyCount = 0;
1176                try {
1177                    keyCount = selector.select(selectorTimeout);
1178                    wakeupCounter.set(0);
1179                    if ( close ) { selector.close(); return; }
1180                } catch ( NullPointerException JavaDoc x ) {
1181                    //sun bug 5076772 on windows JDK 1.5
1182
if ( wakeupCounter == null || selector == null ) throw x;
1183                    continue;
1184                } catch ( CancelledKeyException JavaDoc x ) {
1185                    //sun bug 5076772 on windows JDK 1.5
1186
if ( wakeupCounter == null || selector == null ) throw x;
1187                    continue;
1188                } catch (Throwable JavaDoc x) {
1189                    log.error("",x);
1190                    continue;
1191                }
1192                //either we timed out or we woke up, process events first
1193
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
1194
1195                Iterator JavaDoc iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
1196                // Walk through the collection of ready keys and dispatch
1197
// any active event.
1198
while (iterator != null && iterator.hasNext()) {
1199                    SelectionKey JavaDoc sk = (SelectionKey JavaDoc) iterator.next();
1200                    iterator.remove();
1201                    KeyAttachment attachment = (KeyAttachment)sk.attachment();
1202                    try {
1203                        if ( sk.isValid() && attachment != null ) {
1204                            attachment.access();
1205                            sk.attach(attachment);
1206                            sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket
1207
attachment.interestOps(0);
1208                            NioChannel channel = attachment.getChannel();
1209                            if (sk.isReadable() || sk.isWritable() ) {
1210                                if ( attachment.getComet() ) {
1211                                    if (!processSocket(channel, SocketStatus.OPEN))
1212                                        processSocket(channel, SocketStatus.DISCONNECT);
1213                                } else {
1214                                    boolean close = (!processSocket(channel));
1215                                    if ( close ) {
1216                                        channel.close();
1217                                        channel.getIOChannel().socket().close();
1218                                    }
1219                                }
1220                            }
1221                        } else {
1222                            //invalid key
1223
cancelledKey(sk, SocketStatus.ERROR);
1224                        }
1225                    } catch ( CancelledKeyException JavaDoc ckx ) {
1226                        cancelledKey(sk, SocketStatus.ERROR);
1227                    } catch (Throwable JavaDoc t) {
1228                        log.error("",t);
1229                    }
1230                }//while
1231
//process timeouts
1232
timeout(keyCount,hasEvents);
1233            }//while
1234
synchronized (this) {
1235                this.notifyAll();
1236            }
1237
1238        }
1239        protected void timeout(int keyCount, boolean hasEvents) {
1240            long now = System.currentTimeMillis();
1241            //don't process timeouts too frequently, but if the selector simply timed out
1242
//then we can check timeouts to avoid gaps
1243
if ( (now < nextExpiration) && (keyCount>0 || hasEvents) ) return;
1244            nextExpiration = now + (long)socketProperties.getSoTimeout();
1245            //timeout
1246
Set JavaDoc<SelectionKey JavaDoc> keys = selector.keys();
1247            for (Iterator JavaDoc<SelectionKey JavaDoc> iter = keys.iterator(); iter.hasNext(); ) {
1248                SelectionKey JavaDoc key = iter.next();
1249                try {
1250                    KeyAttachment ka = (KeyAttachment) key.attachment();
1251                    if ( ka == null ) {
1252                        cancelledKey(key, SocketStatus.ERROR); //we don't support any keys without attachments
1253
} else if ( ka.getError() ) {
1254                        cancelledKey(key, SocketStatus.DISCONNECT);
1255                    }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
1256                        //only timeout sockets that we are waiting for a read from
1257
long delta = now - ka.getLastAccess();
1258                        long timeout = (ka.getTimeout()==-1)?((long) socketProperties.getSoTimeout()):(ka.getTimeout());
1259                        boolean isTimedout = delta > timeout;
1260                        if (isTimedout) {
1261                            key.interestOps(0);
1262                            ka.interestOps(0); //avoid duplicate timeout calls
1263
cancelledKey(key, SocketStatus.TIMEOUT);
1264                        } else {
1265                            long nextTime = now+(timeout-delta);
1266                            nextExpiration = (nextTime < nextExpiration)?nextTime:nextExpiration;
1267                        }
1268                    }//end if
1269
}catch ( CancelledKeyException JavaDoc ckx ) {
1270                    cancelledKey(key, SocketStatus.ERROR);
1271                }
1272            }//for
1273
}
1274    }
1275    
1276    public static class KeyAttachment {
1277        
1278        public KeyAttachment() {
1279            
1280        }
1281        public void reset(Poller poller, NioChannel channel) {
1282            this.channel = channel;
1283            this.poller = poller;
1284            lastAccess = System.currentTimeMillis();
1285            currentAccess = false;
1286            comet = false;
1287            timeout = -1;
1288            error = false;
1289        }
1290        
1291        public void reset() {
1292            reset(null,null);
1293        }
1294        
1295        public Poller getPoller() { return poller;}
1296        public void setPoller(Poller poller){this.poller = poller;}
1297        public long getLastAccess() { return lastAccess; }
1298        public void access() { access(System.currentTimeMillis()); }
1299        public void access(long access) { lastAccess = access; }
1300        public void setComet(boolean comet) { this.comet = comet; }
1301        public boolean getComet() { return comet; }
1302        public boolean getCurrentAccess() { return currentAccess; }
1303        public void setCurrentAccess(boolean access) { currentAccess = access; }
1304        public Object JavaDoc getMutex() {return mutex;}
1305        public void setTimeout(long timeout) {this.timeout = timeout;}
1306        public long getTimeout() {return this.timeout;}
1307        public boolean getError() { return error; }
1308        public void setError(boolean error) { this.error = error; }
1309        public NioChannel getChannel() { return channel;}
1310        public void setChannel(NioChannel channel) { this.channel = channel;}
1311        protected Poller poller = null;
1312        protected int interestOps = 0;
1313        public int interestOps() { return interestOps;}
1314        public int interestOps(int ops) { this.interestOps = ops; return ops; }
1315        protected Object JavaDoc mutex = new Object JavaDoc();
1316        protected long lastAccess = -1;
1317        protected boolean currentAccess = false;
1318        protected boolean comet = false;
1319        protected long timeout = -1;
1320        protected boolean error = false;
1321        protected NioChannel channel = null;
1322
1323    }
1324
1325
1326
1327    // ----------------------------------------------------- Worker Inner Class
1328

1329
1330    /**
1331     * Server processor class.
1332     */

1333    protected class Worker implements Runnable JavaDoc {
1334
1335
1336        protected Thread JavaDoc thread = null;
1337        protected boolean available = false;
1338        protected Object JavaDoc socket = null;
1339        protected SocketStatus status = null;
1340
1341
1342        /**
1343         * Process an incoming TCP/IP connection on the specified socket. Any
1344         * exception that occurs during processing must be logged and swallowed.
1345         * <b>NOTE</b>: This method is called from our Connector's thread. We
1346         * must assign it to our own thread so that multiple simultaneous
1347         * requests can be handled.
1348         *
1349         * @param socket TCP socket to process
1350         */

1351        protected synchronized void assign(Object JavaDoc socket) {
1352
1353            // Wait for the Processor to get the previous Socket
1354
while (available) {
1355                try {
1356                    wait();
1357                } catch (InterruptedException JavaDoc e) {
1358                }
1359            }
1360            // Store the newly available Socket and notify our thread
1361
this.socket = socket;
1362            status = null;
1363            available = true;
1364            notifyAll();
1365
1366        }
1367
1368
1369        protected synchronized void assign(Object JavaDoc socket, SocketStatus status) {
1370
1371            // Wait for the Processor to get the previous Socket
1372
while (available) {
1373                try {
1374                    wait();
1375                } catch (InterruptedException JavaDoc e) {
1376                }
1377            }
1378
1379            // Store the newly available Socket and notify our thread
1380
this.socket = socket;
1381            this.status = status;
1382            available = true;
1383            notifyAll();
1384        }
1385
1386
1387        /**
1388         * Await a newly assigned Socket from our Connector, or <code>null</code>
1389         * if we are supposed to shut down.
1390         */

1391        protected synchronized Object JavaDoc await() {
1392
1393            // Wait for the Connector to provide a new Socket
1394
while (!available) {
1395                try {
1396                    wait();
1397                } catch (InterruptedException JavaDoc e) {
1398                }
1399            }
1400
1401            // Notify the Connector that we have received this Socket
1402
Object JavaDoc socket = this.socket;
1403            available = false;
1404            notifyAll();
1405
1406            return (socket);
1407
1408        }
1409
1410
1411        /**
1412         * The background thread that listens for incoming TCP/IP connections and
1413         * hands them off to an appropriate processor.
1414         */

1415        public void run() {
1416
1417            // Process requests until we receive a shutdown signal
1418
while (running) {
1419                try {
1420                    // Wait for the next socket to be assigned
1421
Object JavaDoc channel = await();
1422                    if (channel == null)
1423                        continue;
1424
1425                    if ( channel instanceof SocketChannel JavaDoc) {
1426                        SocketChannel JavaDoc sc = (SocketChannel JavaDoc)channel;
1427                        if ( !setSocketOptions(sc) ) {
1428                            try {
1429                                sc.socket().close();
1430                                sc.close();
1431                            }catch ( IOException JavaDoc ix ) {
1432                                if ( log.isDebugEnabled() ) log.debug("",ix);
1433                            }
1434                        } else {
1435                            //now we have it registered, remove it from the cache
1436

1437                        }
1438                    } else {
1439                        
1440                        NioChannel socket = (NioChannel)channel;
1441
1442                        SelectionKey JavaDoc key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
1443                        int handshake = -1;
1444                        try {
1445                            handshake = socket.handshake(key.isReadable(), key.isWritable());
1446                        }catch ( IOException JavaDoc x ) {
1447                            handshake = -1;
1448                            if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
1449                        }catch ( CancelledKeyException JavaDoc ckx ) {
1450                            handshake = -1;
1451                        }
1452                        if ( handshake == 0 ) {
1453                            // Process the request from this socket
1454
if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
1455                                // Close socket and pool
1456
try {
1457                                    KeyAttachment att = (KeyAttachment)socket.getAttachment(true);
1458                                    try {socket.close();}catch (Exception JavaDoc ignore){}
1459                                    if ( socket.isOpen() ) socket.close(true);
1460                                    key.cancel();
1461                                    key.attach(null);
1462                                    nioChannels.offer(socket);
1463                                    if ( att!=null ) keyCache.offer(att);
1464                                }catch ( Exception JavaDoc x ) {
1465                                    log.error("",x);
1466                                }
1467                            } else if ((status == null) && (handler.process(socket) == Handler.SocketState.CLOSED)) {
1468                                // Close socket and pool
1469
try {
1470                                    KeyAttachment att = (KeyAttachment)socket.getAttachment(true);
1471                                    try {socket.close();}catch (Exception JavaDoc ignore){}
1472                                    if ( socket.isOpen() ) socket.close(true);
1473                                    key.cancel();
1474                                    key.attach(null);
1475                                    nioChannels.offer(socket);
1476                                    if ( att!=null ) keyCache.offer(att);
1477                                }catch ( Exception JavaDoc x ) {
1478                                    log.error("",x);
1479                                }
1480                            }
1481                        } else if (handshake == -1 ) {
1482                            socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT);
1483                            try {socket.close(true);}catch (IOException JavaDoc ignore){}
1484                            nioChannels.offer(socket);
1485                        } else {
1486                            final SelectionKey JavaDoc fk = key;
1487                            final int intops = handshake;
1488                            final KeyAttachment ka = (KeyAttachment)fk.attachment();
1489                            ka.getPoller().add(socket,intops);
1490                        }
1491                    }
1492                } finally {
1493                    //dereference socket to let GC do its job
1494
socket = null;
1495                    // Finish up this request
1496
recycleWorkerThread(this);
1497                }
1498            }
1499        }
1500
1501
1502        /**
1503         * Start the background processing thread.
1504         */

1505        public void start() {
1506            thread = new Thread JavaDoc(this);
1507            thread.setName(getName() + "-" + (++curThreads));
1508            thread.setDaemon(true);
1509            thread.start();
1510        }
1511
1512
1513    }
1514
1515    // ------------------------------------------------ Application Buffer Handler
1516
public class NioBufferHandler implements ApplicationBufferHandler {
1517        protected ByteBuffer JavaDoc readbuf = null;
1518        protected ByteBuffer JavaDoc writebuf = null;
1519        
1520        public NioBufferHandler(int readsize, int writesize, boolean direct) {
1521            if ( direct ) {
1522                readbuf = ByteBuffer.allocateDirect(readsize);
1523                writebuf = ByteBuffer.allocateDirect(writesize);
1524            }else {
1525                readbuf = ByteBuffer.allocate(readsize);
1526                writebuf = ByteBuffer.allocate(writesize);
1527            }
1528        }
1529        
1530        public ByteBuffer JavaDoc expand(ByteBuffer JavaDoc buffer, int remaining) {return buffer;}
1531        public ByteBuffer JavaDoc getReadBuffer() {return readbuf;}
1532        public ByteBuffer JavaDoc getWriteBuffer() {return writebuf;}
1533
1534    }
1535
1536    // ------------------------------------------------ Handler Inner Interface
1537

1538
1539    /**
1540     * Bare bones interface used for socket processing. Per thread data is to be
1541     * stored in the ThreadWithAttributes extra folders, or alternately in
1542     * thread local fields.
1543     */

1544    public interface Handler {
1545        public enum SocketState {
1546            OPEN, CLOSED, LONG
1547        }
1548        public SocketState process(NioChannel socket);
1549        public SocketState event(NioChannel socket, SocketStatus status);
1550    }
1551
1552
1553    // ------------------------------------------------- WorkerStack Inner Class
1554

1555
1556    public class WorkerStack {
1557
1558        protected Worker[] workers = null;
1559        protected int end = 0;
1560
1561        public WorkerStack(int size) {
1562            workers = new Worker[size];
1563        }
1564
1565        /**
1566         * Put the object into the queue.
1567         *
1568         * @param object the object to be appended to the queue (first element).
1569         */

1570        public void push(Worker worker) {
1571            workers[end++] = worker;
1572        }
1573
1574        /**
1575         * Get the first object out of the queue. Return null if the queue
1576         * is empty.
1577         */

1578        public Worker pop() {
1579            if (end > 0) {
1580                return workers[--end];
1581            }
1582            return null;
1583        }
1584
1585        /**
1586         * Get the first object out of the queue, Return null if the queue
1587         * is empty.
1588         */

1589        public Worker peek() {
1590            return workers[end];
1591        }
1592
1593        /**
1594         * Is the queue empty?
1595         */

1596        public boolean isEmpty() {
1597            return (end == 0);
1598        }
1599
1600        /**
1601         * How many elements are there in this queue?
1602         */

1603        public int size() {
1604            return (end);
1605        }
1606    }
1607
1608
1609    // ---------------------------------------------- SocketOptionsProcessor Inner Class
1610

1611
1612    /**
1613     * This class is the equivalent of the Worker, but will simply use in an
1614     * external Executor thread pool.
1615     */

1616    protected class SocketOptionsProcessor implements Runnable JavaDoc {
1617
1618        protected SocketChannel JavaDoc sc = null;
1619
1620        public SocketOptionsProcessor(SocketChannel JavaDoc socket) {
1621            this.sc = socket;
1622        }
1623
1624        public void run() {
1625            if ( !setSocketOptions(sc) ) {
1626                try {
1627                    sc.socket().close();
1628                    sc.close();
1629                }catch ( IOException JavaDoc ix ) {
1630                    if ( log.isDebugEnabled() ) log.debug("",ix);
1631                }
1632            }
1633        }
1634    }
1635    // ---------------------------------------------- SocketProcessor Inner Class
1636

1637
1638    /**
1639     * This class is the equivalent of the Worker, but will simply use in an
1640     * external Executor thread pool.
1641     */

1642    protected class SocketProcessor implements Runnable JavaDoc {
1643
1644        protected NioChannel socket = null;
1645
1646        public SocketProcessor(NioChannel socket) {
1647            this.socket = socket;
1648        }
1649
1650        public void run() {
1651
1652            // Process the request from this socket
1653
if (handler.process(socket) == Handler.SocketState.CLOSED) {
1654                // Close socket and pool
1655
try {
1656                    try {socket.close();}catch (Exception JavaDoc ignore){}
1657                    if ( socket.isOpen() ) socket.close(true);
1658                } catch ( Exception JavaDoc x ) {
1659                    log.error("",x);
1660                }
1661                socket = null;
1662            }
1663
1664        }
1665
1666    }
1667
1668
1669    // --------------------------------------- SocketEventProcessor Inner Class
1670

1671
1672    /**
1673     * This class is the equivalent of the Worker, but will simply use in an
1674     * external Executor thread pool.
1675     */

1676    protected class SocketEventProcessor implements Runnable JavaDoc {
1677
1678        protected NioChannel socket = null;
1679        protected SocketStatus status = null;
1680
1681        public SocketEventProcessor(NioChannel socket, SocketStatus status) {
1682            this.socket = socket;
1683            this.status = status;
1684        }
1685
1686        public void run() {
1687
1688            // Process the request from this socket
1689
if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
1690                // Close socket and pool
1691
try {
1692                    try {socket.close();}catch (Exception JavaDoc ignore){}
1693                    if ( socket.isOpen() ) socket.close(true);
1694                } catch ( Exception JavaDoc x ) {
1695                    log.error("",x);
1696                }
1697                socket = null;
1698            }
1699
1700        }
1701
1702    }
1703
1704
1705}
1706
Popular Tags