KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > tomcat > util > net > AprEndpoint


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 package org.apache.tomcat.util.net;
19
20 import java.net.InetAddress JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.HashMap JavaDoc;
23 import java.util.concurrent.Executor JavaDoc;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.tomcat.jni.Address;
28 import org.apache.tomcat.jni.Error;
29 import org.apache.tomcat.jni.File;
30 import org.apache.tomcat.jni.Library;
31 import org.apache.tomcat.jni.OS;
32 import org.apache.tomcat.jni.Poll;
33 import org.apache.tomcat.jni.Pool;
34 import org.apache.tomcat.jni.SSL;
35 import org.apache.tomcat.jni.SSLContext;
36 import org.apache.tomcat.jni.SSLSocket;
37 import org.apache.tomcat.jni.Socket;
38 import org.apache.tomcat.jni.Status;
39 import org.apache.tomcat.util.res.StringManager;
40
41 /**
42  * APR tailored thread pool, providing the following services:
43  * <ul>
44  * <li>Socket acceptor thread</li>
45  * <li>Socket poller thread</li>
46  * <li>Sendfile thread</li>
47  * <li>Worker threads pool</li>
48  * </ul>
49  *
50  * When switching to Java 5, there's an opportunity to use the virtual
51  * machine's thread pool.
52  *
53  * @author Mladen Turk
54  * @author Remy Maucherat
55  */

56 public class AprEndpoint {
57
58
59     // -------------------------------------------------------------- Constants
60

61
62     protected static Log log = LogFactory.getLog(AprEndpoint.class);
63
64     protected static StringManager sm =
65         StringManager.getManager("org.apache.tomcat.util.net.res");
66
67
68     /**
69      * The Request attribute key for the cipher suite.
70      */

71     public static final String JavaDoc CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
72
73     /**
74      * The Request attribute key for the key size.
75      */

76     public static final String JavaDoc KEY_SIZE_KEY = "javax.servlet.request.key_size";
77
78     /**
79      * The Request attribute key for the client certificate chain.
80      */

81     public static final String JavaDoc CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
82
83     /**
84      * The Request attribute key for the session id.
85      * This one is a Tomcat extension to the Servlet spec.
86      */

87     public static final String JavaDoc SESSION_ID_KEY = "javax.servlet.request.ssl_session";
88
89
90     // ----------------------------------------------------------------- Fields
91

92
93     /**
94      * Available workers.
95      */

96     protected WorkerStack workers = null;
97
98
99     /**
100      * Running state of the endpoint.
101      */

102     protected volatile boolean running = false;
103
104
105     /**
106      * Will be set to true whenever the endpoint is paused.
107      */

108     protected volatile boolean paused = false;
109
110
111     /**
112      * Track the initialization state of the endpoint.
113      */

114     protected boolean initialized = false;
115
116
117     /**
118      * Current worker threads busy count.
119      */

120     protected int curThreadsBusy = 0;
121
122
123     /**
124      * Current worker threads count.
125      */

126     protected int curThreads = 0;
127
128
129     /**
130      * Sequence number used to generate thread names.
131      */

132     protected int sequence = 0;
133
134
135     /**
136      * Root APR memory pool.
137      */

138     protected long rootPool = 0;
139
140
141     /**
142      * Server socket "pointer".
143      */

144     protected long serverSock = 0;
145
146
147     /**
148      * APR memory pool for the server socket.
149      */

150     protected long serverSockPool = 0;
151
152
153     /**
154      * SSL context.
155      */

156     protected long sslContext = 0;
157
158
159     // ------------------------------------------------------------- Properties
160

161
162     /**
163      * External Executor based thread pool.
164      */

165     protected Executor JavaDoc executor = null;
166     public void setExecutor(Executor JavaDoc executor) { this.executor = executor; }
167     public Executor JavaDoc getExecutor() { return executor; }
168
169
170     /**
171      * Maximum amount of worker threads.
172      */

173     protected int maxThreads = 40;
174     public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }
175     public int getMaxThreads() { return maxThreads; }
176
177
178     /**
179      * Priority of the acceptor and poller threads.
180      */

181     protected int threadPriority = Thread.NORM_PRIORITY;
182     public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }
183     public int getThreadPriority() { return threadPriority; }
184
185
186     /**
187      * Size of the socket poller.
188      */

189     protected int pollerSize = 8 * 1024;
190     public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; }
191     public int getPollerSize() { return pollerSize; }
192
193
194     /**
195      * Size of the sendfile (= concurrent files which can be served).
196      */

197     protected int sendfileSize = 1 * 1024;
198     public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; }
199     public int getSendfileSize() { return sendfileSize; }
200
201
202     /**
203      * Server socket port.
204      */

205     protected int port;
206     public int getPort() { return port; }
207     public void setPort(int port ) { this.port=port; }
208
209
210     /**
211      * Address for the server socket.
212      */

213     protected InetAddress JavaDoc address;
214     public InetAddress JavaDoc getAddress() { return address; }
215     public void setAddress(InetAddress JavaDoc address) { this.address = address; }
216
217
218     /**
219      * Handling of accepted sockets.
220      */

221     protected Handler handler = null;
222     public void setHandler(Handler handler ) { this.handler = handler; }
223     public Handler getHandler() { return handler; }
224
225
226     /**
227      * Allows the server developer to specify the backlog that
228      * should be used for server sockets. By default, this value
229      * is 100.
230      */

231     protected int backlog = 100;
232     public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
233     public int getBacklog() { return backlog; }
234
235
236     /**
237      * Socket TCP no delay.
238      */

239     protected boolean tcpNoDelay = false;
240     public boolean getTcpNoDelay() { return tcpNoDelay; }
241     public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }
242
243
244     /**
245      * Socket linger.
246      */

247     protected int soLinger = 100;
248     public int getSoLinger() { return soLinger; }
249     public void setSoLinger(int soLinger) { this.soLinger = soLinger; }
250
251
252     /**
253      * Socket timeout.
254      */

255     protected int soTimeout = -1;
256     public int getSoTimeout() { return soTimeout; }
257     public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
258
259
260     /**
261      * Timeout on first request read before going to the poller, in ms.
262      */

263     protected int firstReadTimeout = -1;
264     public int getFirstReadTimeout() { return firstReadTimeout; }
265     public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; }
266
267
268     /**
269      * Poll interval, in microseconds. The smaller the value, the more CPU the poller
270      * will use, but the more responsive to activity it will be.
271      */

272     protected int pollTime = 2000;
273     public int getPollTime() { return pollTime; }
274     public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } }
275
276
277     /**
278      * The default is true - the created threads will be
279      * in daemon mode. If set to false, the control thread
280      * will not be daemon - and will keep the process alive.
281      */

282     protected boolean daemon = true;
283     public void setDaemon(boolean b) { daemon = b; }
284     public boolean getDaemon() { return daemon; }
285
286
287     /**
288      * Name of the thread pool, which will be used for naming child threads.
289      */

290     protected String JavaDoc name = "TP";
291     public void setName(String JavaDoc name) { this.name = name; }
292     public String JavaDoc getName() { return name; }
293
294
295     /**
296      * Use endfile for sending static files.
297      */

298     protected boolean useSendfile = Library.APR_HAS_SENDFILE;
299     public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; }
300     public boolean getUseSendfile() { return useSendfile; }
301
302
303     /**
304      * Allow comet request handling.
305      */

306     protected boolean useComet = true;
307     public void setUseComet(boolean useComet) { this.useComet = useComet; }
308     public boolean getUseComet() { return useComet; }
309
310
311     /**
312      * Acceptor thread count.
313      */

314     protected int acceptorThreadCount = 0;
315     public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; }
316     public int getAcceptorThreadCount() { return acceptorThreadCount; }
317
318
319     /**
320      * Sendfile thread count.
321      */

322     protected int sendfileThreadCount = 0;
323     public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; }
324     public int getSendfileThreadCount() { return sendfileThreadCount; }
325
326
327     /**
328      * Poller thread count.
329      */

330     protected int pollerThreadCount = 0;
331     public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }
332     public int getPollerThreadCount() { return pollerThreadCount; }
333
334
335     /**
336      * The socket poller.
337      */

338     protected Poller[] pollers = null;
339     protected int pollerRoundRobin = 0;
340     public Poller getPoller() {
341         pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
342         return pollers[pollerRoundRobin];
343     }
344
345
346     /**
347      * The socket poller used for Comet support.
348      */

349     protected Poller[] cometPollers = null;
350     protected int cometPollerRoundRobin = 0;
351     public Poller getCometPoller() {
352         cometPollerRoundRobin = (cometPollerRoundRobin + 1) % cometPollers.length;
353         return cometPollers[cometPollerRoundRobin];
354     }
355
356
357     /**
358      * The static file sender.
359      */

360     protected Sendfile[] sendfiles = null;
361     protected int sendfileRoundRobin = 0;
362     public Sendfile getSendfile() {
363         sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length;
364         return sendfiles[sendfileRoundRobin];
365     }
366
367
368     /**
369      * Dummy maxSpareThreads property.
370      */

371     public int getMaxSpareThreads() { return 0; }
372
373
374     /**
375      * Dummy minSpareThreads property.
376      */

377     public int getMinSpareThreads() { return 0; }
378
379
380     /**
381      * SSL engine.
382      */

383     protected boolean SSLEnabled = false;
384     public boolean isSSLEnabled() { return SSLEnabled; }
385     public void setSSLEnabled(boolean SSLEnabled) { this.SSLEnabled = SSLEnabled; }
386
387
388     /**
389      * SSL protocols.
390      */

391     protected String JavaDoc SSLProtocol = "all";
392     public String JavaDoc getSSLProtocol() { return SSLProtocol; }
393     public void setSSLProtocol(String JavaDoc SSLProtocol) { this.SSLProtocol = SSLProtocol; }
394
395
396     /**
397      * SSL password (if a cert is encrypted, and no password has been provided, a callback
398      * will ask for a password).
399      */

400     protected String JavaDoc SSLPassword = null;
401     public String JavaDoc getSSLPassword() { return SSLPassword; }
402     public void setSSLPassword(String JavaDoc SSLPassword) { this.SSLPassword = SSLPassword; }
403
404
405     /**
406      * SSL cipher suite.
407      */

408     protected String JavaDoc SSLCipherSuite = "ALL";
409     public String JavaDoc getSSLCipherSuite() { return SSLCipherSuite; }
410     public void setSSLCipherSuite(String JavaDoc SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; }
411
412
413     /**
414      * SSL certificate file.
415      */

416     protected String JavaDoc SSLCertificateFile = null;
417     public String JavaDoc getSSLCertificateFile() { return SSLCertificateFile; }
418     public void setSSLCertificateFile(String JavaDoc SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; }
419
420
421     /**
422      * SSL certificate key file.
423      */

424     protected String JavaDoc SSLCertificateKeyFile = null;
425     public String JavaDoc getSSLCertificateKeyFile() { return SSLCertificateKeyFile; }
426     public void setSSLCertificateKeyFile(String JavaDoc SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; }
427
428
429     /**
430      * SSL certificate chain file.
431      */

432     protected String JavaDoc SSLCertificateChainFile = null;
433     public String JavaDoc getSSLCertificateChainFile() { return SSLCertificateChainFile; }
434     public void setSSLCertificateChainFile(String JavaDoc SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; }
435
436
437     /**
438      * SSL CA certificate path.
439      */

440     protected String JavaDoc SSLCACertificatePath = null;
441     public String JavaDoc getSSLCACertificatePath() { return SSLCACertificatePath; }
442     public void setSSLCACertificatePath(String JavaDoc SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; }
443
444
445     /**
446      * SSL CA certificate file.
447      */

448     protected String JavaDoc SSLCACertificateFile = null;
449     public String JavaDoc getSSLCACertificateFile() { return SSLCACertificateFile; }
450     public void setSSLCACertificateFile(String JavaDoc SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; }
451
452
453     /**
454      * SSL CA revocation path.
455      */

456     protected String JavaDoc SSLCARevocationPath = null;
457     public String JavaDoc getSSLCARevocationPath() { return SSLCARevocationPath; }
458     public void setSSLCARevocationPath(String JavaDoc SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; }
459
460
461     /**
462      * SSL CA revocation file.
463      */

464     protected String JavaDoc SSLCARevocationFile = null;
465     public String JavaDoc getSSLCARevocationFile() { return SSLCARevocationFile; }
466     public void setSSLCARevocationFile(String JavaDoc SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; }
467
468
469     /**
470      * SSL verify client.
471      */

472     protected String JavaDoc SSLVerifyClient = "none";
473     public String JavaDoc getSSLVerifyClient() { return SSLVerifyClient; }
474     public void setSSLVerifyClient(String JavaDoc SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; }
475
476
477     /**
478      * SSL verify depth.
479      */

480     protected int SSLVerifyDepth = 10;
481     public int getSSLVerifyDepth() { return SSLVerifyDepth; }
482     public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; }
483
484
485     // --------------------------------------------------------- Public Methods
486

487
488     /**
489      * Number of keepalive sockets.
490      */

491     public int getKeepAliveCount() {
492         if (pollers == null) {
493             return 0;
494         } else {
495             int keepAliveCount = 0;
496             for (int i = 0; i < pollers.length; i++) {
497                 keepAliveCount += pollers[i].getKeepAliveCount();
498             }
499             return keepAliveCount;
500         }
501     }
502
503
504     /**
505      * Number of sendfile sockets.
506      */

507     public int getSendfileCount() {
508         if (sendfiles == null) {
509             return 0;
510         } else {
511             int sendfileCount = 0;
512             for (int i = 0; i < sendfiles.length; i++) {
513                 sendfileCount += sendfiles[i].getSendfileCount();
514             }
515             return sendfileCount;
516         }
517     }
518
519
520     /**
521      * Return the amount of threads that are managed by the pool.
522      *
523      * @return the amount of threads that are managed by the pool
524      */

525     public int getCurrentThreadCount() {
526         return curThreads;
527     }
528
529
530     /**
531      * Return the amount of threads currently busy.
532      *
533      * @return the amount of threads currently busy
534      */

535     public int getCurrentThreadsBusy() {
536         return curThreadsBusy;
537     }
538
539
540     /**
541      * Return the state of the endpoint.
542      *
543      * @return true if the endpoint is running, false otherwise
544      */

545     public boolean isRunning() {
546         return running;
547     }
548
549
550     /**
551      * Return the state of the endpoint.
552      *
553      * @return true if the endpoint is paused, false otherwise
554      */

555     public boolean isPaused() {
556         return paused;
557     }
558
559
560     // ----------------------------------------------- Public Lifecycle Methods
561

562
563     /**
564      * Initialize the endpoint.
565      */

566     public void init()
567         throws Exception JavaDoc {
568
569         if (initialized)
570             return;
571         
572         // Create the root APR memory pool
573
rootPool = Pool.create(0);
574         // Create the pool for the server socket
575
serverSockPool = Pool.create(rootPool);
576         // Create the APR address that will be bound
577
String JavaDoc addressStr = null;
578         if (address == null) {
579             addressStr = null;
580         } else {
581             addressStr = address.getHostAddress();
582         }
583         int family = Socket.APR_INET;
584         if (Library.APR_HAVE_IPV6 && (addressStr == null || addressStr.indexOf(':') >= 0)) {
585             family = Socket.APR_UNSPEC;
586         }
587         long inetAddress = Address.info(addressStr, family,
588                 port, 0, rootPool);
589         // Create the APR server socket
590
serverSock = Socket.create(family, Socket.SOCK_STREAM,
591                 Socket.APR_PROTO_TCP, rootPool);
592         if (OS.IS_UNIX) {
593             Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
594         }
595         // Deal with the firewalls that tend to drop the inactive sockets
596
Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1);
597         // Bind the server socket
598
int ret = Socket.bind(serverSock, inetAddress);
599         if (ret != 0) {
600             throw new Exception JavaDoc(sm.getString("endpoint.init.bind", "" + ret, Error.strerror(ret)));
601         }
602         // Start listening on the server socket
603
ret = Socket.listen(serverSock, backlog);
604         if (ret != 0) {
605             throw new Exception JavaDoc(sm.getString("endpoint.init.listen", "" + ret, Error.strerror(ret)));
606         }
607         if (OS.IS_WIN32 || OS.IS_WIN64) {
608             // On Windows set the reuseaddr flag after the bind/listen
609
Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
610         }
611
612         // Sendfile usage on systems which don't support it cause major problems
613
if (useSendfile && !Library.APR_HAS_SENDFILE) {
614             log.warn(sm.getString("endpoint.sendfile.nosupport"));
615             useSendfile = false;
616         }
617
618         // Initialize thread count defaults for acceptor, poller and sendfile
619
if (acceptorThreadCount == 0) {
620             // FIXME: Doesn't seem to work that well with multiple accept threads
621
acceptorThreadCount = 1;
622         }
623         if (pollerThreadCount == 0) {
624             if ((OS.IS_WIN32 || OS.IS_WIN64) && (pollerSize > 1024)) {
625                 // The maximum per poller to get reasonable performance is 1024
626
pollerThreadCount = pollerSize / 1024;
627                 // Adjust poller size so that it won't reach the limit
628
pollerSize = pollerSize - (pollerSize % 1024);
629             } else {
630                 // No explicit poller size limitation
631
pollerThreadCount = 1;
632             }
633         }
634         if (sendfileThreadCount == 0) {
635             if ((OS.IS_WIN32 || OS.IS_WIN64) && (sendfileSize > 1024)) {
636                 // The maximum per poller to get reasonable performance is 1024
637
sendfileThreadCount = sendfileSize / 1024;
638                 // Adjust poller size so that it won't reach the limit
639
sendfileSize = sendfileSize - (sendfileSize % 1024);
640             } else {
641                 // No explicit poller size limitation
642
// FIXME: Default to one per CPU ?
643
sendfileThreadCount = 1;
644             }
645         }
646         
647         // Delay accepting of new connections until data is available
648
// Only Linux kernels 2.4 + have that implemented
649
// on other platforms this call is noop and will return APR_ENOTIMPL.
650
Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1);
651
652         // Initialize SSL if needed
653
if (SSLEnabled) {
654             
655             // SSL protocol
656
int value = SSL.SSL_PROTOCOL_ALL;
657             if ("SSLv2".equalsIgnoreCase(SSLProtocol)) {
658                 value = SSL.SSL_PROTOCOL_SSLV2;
659             } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) {
660                 value = SSL.SSL_PROTOCOL_SSLV3;
661             } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) {
662                 value = SSL.SSL_PROTOCOL_TLSV1;
663             } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) {
664                 value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3;
665             }
666             // Create SSL Context
667
sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER);
668             // List the ciphers that the client is permitted to negotiate
669
SSLContext.setCipherSuite(sslContext, SSLCipherSuite);
670             // Load Server key and certificate
671
SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA);
672             // Set certificate chain file
673
SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false);
674             // Support Client Certificates
675
SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath);
676             // Set revocation
677
SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath);
678             // Client certificate verification
679
value = SSL.SSL_CVERIFY_NONE;
680             if ("optional".equalsIgnoreCase(SSLVerifyClient)) {
681                 value = SSL.SSL_CVERIFY_OPTIONAL;
682             } else if ("require".equalsIgnoreCase(SSLVerifyClient)) {
683                 value = SSL.SSL_CVERIFY_REQUIRE;
684             } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) {
685                 value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA;
686             }
687             SSLContext.setVerify(sslContext, value, SSLVerifyDepth);
688             // For now, sendfile is not supported with SSL
689
useSendfile = false;
690         }
691
692         initialized = true;
693
694     }
695
696
697     /**
698      * Start the APR endpoint, creating acceptor, poller and sendfile threads.
699      */

700     public void start()
701         throws Exception JavaDoc {
702         // Initialize socket if not done before
703
if (!initialized) {
704             init();
705         }
706         if (!running) {
707             running = true;
708             paused = false;
709
710             // Create worker collection
711
if (executor == null) {
712                 workers = new WorkerStack(maxThreads);
713             }
714
715             // Start acceptor threads
716
for (int i = 0; i < acceptorThreadCount; i++) {
717                 Thread JavaDoc acceptorThread = new Thread JavaDoc(new Acceptor(), getName() + "-Acceptor-" + i);
718                 acceptorThread.setPriority(threadPriority);
719                 acceptorThread.setDaemon(daemon);
720                 acceptorThread.start();
721             }
722
723             // Start poller threads
724
pollers = new Poller[pollerThreadCount];
725             for (int i = 0; i < pollerThreadCount; i++) {
726                 pollers[i] = new Poller(false);
727                 pollers[i].init();
728                 Thread JavaDoc pollerThread = new Thread JavaDoc(pollers[i], getName() + "-Poller-" + i);
729                 pollerThread.setPriority(threadPriority);
730                 pollerThread.setDaemon(true);
731                 pollerThread.start();
732             }
733
734             // Start comet poller threads
735
cometPollers = new Poller[pollerThreadCount];
736             for (int i = 0; i < pollerThreadCount; i++) {
737                 cometPollers[i] = new Poller(true);
738                 cometPollers[i].init();
739                 Thread JavaDoc pollerThread = new Thread JavaDoc(cometPollers[i], getName() + "-CometPoller-" + i);
740                 pollerThread.setPriority(threadPriority);
741                 pollerThread.setDaemon(true);
742                 pollerThread.start();
743             }
744
745             // Start sendfile threads
746
if (useSendfile) {
747                 sendfiles = new Sendfile[sendfileThreadCount];
748                 for (int i = 0; i < sendfileThreadCount; i++) {
749                     sendfiles[i] = new Sendfile();
750                     sendfiles[i].init();
751                     Thread JavaDoc sendfileThread = new Thread JavaDoc(sendfiles[i], getName() + "-Sendfile-" + i);
752                     sendfileThread.setPriority(threadPriority);
753                     sendfileThread.setDaemon(true);
754                     sendfileThread.start();
755                 }
756             }
757         }
758     }
759
760
761     /**
762      * Pause the endpoint, which will make it stop accepting new sockets.
763      */

764     public void pause() {
765         if (running && !paused) {
766             paused = true;
767             unlockAccept();
768         }
769     }
770
771
772     /**
773      * Resume the endpoint, which will make it start accepting new sockets
774      * again.
775      */

776     public void resume() {
777         if (running) {
778             paused = false;
779         }
780     }
781
782
783     /**
784      * Stop the endpoint. This will cause all processing threads to stop.
785      */

786     public void stop() {
787         if (running) {
788             running = false;
789             unlockAccept();
790             for (int i = 0; i < pollers.length; i++) {
791                 pollers[i].destroy();
792             }
793             pollers = null;
794             for (int i = 0; i < cometPollers.length; i++) {
795                 cometPollers[i].destroy();
796             }
797             cometPollers = null;
798             if (useSendfile) {
799                 for (int i = 0; i < sendfiles.length; i++) {
800                     sendfiles[i].destroy();
801                 }
802                 sendfiles = null;
803             }
804         }
805     }
806
807
808     /**
809      * Deallocate APR memory pools, and close server socket.
810      */

811     public void destroy() throws Exception JavaDoc {
812         if (running) {
813             stop();
814         }
815         Pool.destroy(serverSockPool);
816         serverSockPool = 0;
817         // Close server socket
818
Socket.close(serverSock);
819         serverSock = 0;
820         sslContext = 0;
821         // Close all APR memory pools and resources
822
Pool.destroy(rootPool);
823         rootPool = 0;
824         initialized = false;
825     }
826
827
828     // ------------------------------------------------------ Protected Methods
829

830
831     /**
832      * Get a sequence number used for thread naming.
833      */

834     protected int getSequence() {
835         return sequence++;
836     }
837
838
839     /**
840      * Unlock the server socket accept using a bugus connection.
841      */

842     protected void unlockAccept() {
843         java.net.Socket JavaDoc s = null;
844         try {
845             // Need to create a connection to unlock the accept();
846
if (address == null) {
847                 s = new java.net.Socket JavaDoc("127.0.0.1", port);
848             } else {
849                 s = new java.net.Socket JavaDoc(address, port);
850                 // setting soLinger to a small value will help shutdown the
851
// connection quicker
852
s.setSoLinger(true, 0);
853             }
854         } catch(Exception JavaDoc e) {
855             if (log.isDebugEnabled()) {
856                 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
857             }
858         } finally {
859             if (s != null) {
860                 try {
861                     s.close();
862                 } catch (Exception JavaDoc e) {
863                     // Ignore
864
}
865             }
866         }
867     }
868
869
870     /**
871      * Process the specified connection.
872      */

873     protected boolean setSocketOptions(long socket) {
874         // Process the connection
875
int step = 1;
876         try {
877
878             // 1: Set socket options: timeout, linger, etc
879
if (soLinger >= 0)
880                 Socket.optSet(socket, Socket.APR_SO_LINGER, soLinger);
881             if (tcpNoDelay)
882                 Socket.optSet(socket, Socket.APR_TCP_NODELAY, (tcpNoDelay ? 1 : 0));
883             if (soTimeout > 0)
884                 Socket.timeoutSet(socket, soTimeout * 1000);
885
886             // 2: SSL handshake
887
step = 2;
888             if (sslContext != 0) {
889                 SSLSocket.attach(sslContext, socket);
890                 if (SSLSocket.handshake(socket) != 0) {
891                     if (log.isDebugEnabled()) {
892                         log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError());
893                     }
894                     return false;
895                 }
896             }
897
898         } catch (Throwable JavaDoc t) {
899             if (log.isDebugEnabled()) {
900                 if (step == 2) {
901                     log.debug(sm.getString("endpoint.err.handshake"), t);
902                 } else {
903                     log.debug(sm.getString("endpoint.err.unexpected"), t);
904                 }
905             }
906             // Tell to close the socket
907
return false;
908         }
909         return true;
910     }
911
912
913     /**
914      * Create (or allocate) and return an available processor for use in
915      * processing a specific HTTP request, if possible. If the maximum
916      * allowed processors have already been created and are in use, return
917      * <code>null</code> instead.
918      */

919     protected Worker createWorkerThread() {
920
921         synchronized (workers) {
922             if (workers.size() > 0) {
923                 curThreadsBusy++;
924                 return (workers.pop());
925             }
926             if ((maxThreads > 0) && (curThreads < maxThreads)) {
927                 curThreadsBusy++;
928                 return (newWorkerThread());
929             } else {
930                 if (maxThreads < 0) {
931                     curThreadsBusy++;
932                     return (newWorkerThread());
933                 } else {
934                     return (null);
935                 }
936             }
937         }
938
939     }
940
941
942     /**
943      * Create and return a new processor suitable for processing HTTP
944      * requests and returning the corresponding responses.
945      */

946     protected Worker newWorkerThread() {
947
948         Worker workerThread = new Worker();
949         workerThread.start();
950         return (workerThread);
951
952     }
953
954
955     /**
956      * Return a new worker thread, and block while to worker is available.
957      */

958     protected Worker getWorkerThread() {
959         // Allocate a new worker thread
960
Worker workerThread = createWorkerThread();
961         while (workerThread == null) {
962             try {
963                 synchronized (workers) {
964                     workers.wait();
965                 }
966             } catch (InterruptedException JavaDoc e) {
967                 // Ignore
968
}
969             workerThread = createWorkerThread();
970         }
971         return workerThread;
972     }
973
974
975     /**
976      * Recycle the specified Processor so that it can be used again.
977      *
978      * @param workerThread The processor to be recycled
979      */

980     protected void recycleWorkerThread(Worker workerThread) {
981         synchronized (workers) {
982             workers.push(workerThread);
983             curThreadsBusy--;
984             workers.notify();
985         }
986     }
987
988     
989     /**
990      * Allocate a new poller of the specified size.
991      */

992     protected long allocatePoller(int size, long pool, int timeout) {
993         try {
994             return Poll.create(size, pool, 0, timeout * 1000);
995         } catch (Error JavaDoc e) {
996             if (Status.APR_STATUS_IS_EINVAL(e.getError())) {
997                 log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size));
998                 return 0;
999             } else {
1000                log.error(sm.getString("endpoint.poll.initfail"), e);
1001                return -1;
1002            }
1003        }
1004    }
1005
1006    
1007    /**
1008     * Process given socket.
1009     */

1010    protected boolean processSocketWithOptions(long socket) {
1011        try {
1012            if (executor == null) {
1013                getWorkerThread().assignWithOptions(socket);
1014            } else {
1015                executor.execute(new SocketWithOptionsProcessor(socket));
1016            }
1017        } catch (Throwable JavaDoc t) {
1018            // This means we got an OOM or similar creating a thread, or that
1019
// the pool and its queue are full
1020
log.error(sm.getString("endpoint.process.fail"), t);
1021            return false;
1022        }
1023        return true;
1024    }
1025    
1026
1027    /**
1028     * Process given socket.
1029     */

1030    protected boolean processSocket(long socket) {
1031        try {
1032            if (executor == null) {
1033                getWorkerThread().assign(socket);
1034            } else {
1035                executor.execute(new SocketProcessor(socket));
1036            }
1037        } catch (Throwable JavaDoc t) {
1038            // This means we got an OOM or similar creating a thread, or that
1039
// the pool and its queue are full
1040
log.error(sm.getString("endpoint.process.fail"), t);
1041            return false;
1042        }
1043        return true;
1044    }
1045    
1046
1047    /**
1048     * Process given socket for an event.
1049     */

1050    protected boolean processSocket(long socket, SocketStatus status) {
1051        try {
1052            if (executor == null) {
1053                getWorkerThread().assign(socket, status);
1054            } else {
1055                executor.execute(new SocketEventProcessor(socket, status));
1056            }
1057        } catch (Throwable JavaDoc t) {
1058            // This means we got an OOM or similar creating a thread, or that
1059
// the pool and its queue are full
1060
log.error(sm.getString("endpoint.process.fail"), t);
1061            return false;
1062        }
1063        return true;
1064    }
1065    
1066
1067    // --------------------------------------------------- Acceptor Inner Class
1068

1069
1070    /**
1071     * Server socket acceptor thread.
1072     */

1073    protected class Acceptor implements Runnable JavaDoc {
1074
1075
1076        /**
1077         * The background thread that listens for incoming TCP/IP connections and
1078         * hands them off to an appropriate processor.
1079         */

1080        public void run() {
1081
1082            // Loop until we receive a shutdown command
1083
while (running) {
1084
1085                // Loop if endpoint is paused
1086
while (paused) {
1087                    try {
1088                        Thread.sleep(1000);
1089                    } catch (InterruptedException JavaDoc e) {
1090                        // Ignore
1091
}
1092                }
1093
1094                try {
1095                    // Accept the next incoming connection from the server socket
1096
long socket = Socket.accept(serverSock);
1097                    // Hand this socket off to an appropriate processor
1098
if (!processSocketWithOptions(socket)) {
1099                        // Close socket and pool right away
1100
Socket.destroy(socket);
1101                    }
1102                } catch (Throwable JavaDoc t) {
1103                    log.error(sm.getString("endpoint.accept.fail"), t);
1104                }
1105
1106                // The processor will recycle itself when it finishes
1107

1108            }
1109
1110        }
1111
1112    }
1113
1114
1115    // ----------------------------------------------------- Poller Inner Class
1116

1117
1118    /**
1119     * Poller class.
1120     */

1121    public class Poller implements Runnable JavaDoc {
1122
1123        protected long serverPollset = 0;
1124        protected long pool = 0;
1125        protected long[] desc;
1126
1127        protected long[] addS;
1128        protected int addCount = 0;
1129        
1130        protected boolean comet = true;
1131
1132        protected int keepAliveCount = 0;
1133        public int getKeepAliveCount() { return keepAliveCount; }
1134
1135        public Poller(boolean comet) {
1136            this.comet = comet;
1137        }
1138        
1139        /**
1140         * Create the poller. With some versions of APR, the maximum poller size will
1141         * be 62 (reocmpiling APR is necessary to remove this limitation).
1142         */

1143        protected void init() {
1144            pool = Pool.create(serverSockPool);
1145            int size = pollerSize / pollerThreadCount;
1146            int timeout = soTimeout;
1147            if (comet) {
1148                // FIXME: Find an appropriate timeout value, for now, "longer than usual"
1149
// semms appropriate
1150
timeout = soTimeout * 50;
1151            }
1152            serverPollset = allocatePoller(size, pool, timeout);
1153            if (serverPollset == 0 && size > 1024) {
1154                size = 1024;
1155                serverPollset = allocatePoller(size, pool, timeout);
1156            }
1157            if (serverPollset == 0) {
1158                size = 62;
1159                serverPollset = allocatePoller(size, pool, timeout);
1160            }
1161            desc = new long[size * 2];
1162            keepAliveCount = 0;
1163            addS = new long[size];
1164            addCount = 0;
1165        }
1166
1167        /**
1168         * Destroy the poller.
1169         */

1170        protected void destroy() {
1171            // Wait for polltime before doing anything, so that the poller threads
1172
// exit, otherwise parallel descturction of sockets which are still
1173
// in the poller can cause problems
1174
try {
1175                synchronized (this) {
1176                    this.wait(pollTime / 1000);
1177                }
1178            } catch (InterruptedException JavaDoc e) {
1179                // Ignore
1180
}
1181            // Close all sockets in the add queue
1182
for (int i = 0; i < addCount; i++) {
1183                if (comet) {
1184                    processSocket(addS[i], SocketStatus.STOP);
1185                } else {
1186                    Socket.destroy(addS[i]);
1187                }
1188            }
1189            // Close all sockets still in the poller
1190
int rv = Poll.pollset(serverPollset, desc);
1191            if (rv > 0) {
1192                for (int n = 0; n < rv; n++) {
1193                    if (comet) {
1194                        processSocket(desc[n*2+1], SocketStatus.STOP);
1195                    } else {
1196                        Socket.destroy(desc[n*2+1]);
1197                    }
1198                }
1199            }
1200            Pool.destroy(pool);
1201            keepAliveCount = 0;
1202            addCount = 0;
1203        }
1204
1205        /**
1206         * Add specified socket and associated pool to the poller. The socket will
1207         * be added to a temporary array, and polled first after a maximum amount
1208         * of time equal to pollTime (in most cases, latency will be much lower,
1209         * however).
1210         *
1211         * @param socket to add to the poller
1212         */

1213        public void add(long socket) {
1214            synchronized (this) {
1215                // Add socket to the list. Newly added sockets will wait
1216
// at most for pollTime before being polled
1217
if (addCount >= addS.length) {
1218                    // Can't do anything: close the socket right away
1219
if (comet) {
1220                        processSocket(socket, SocketStatus.ERROR);
1221                    } else {
1222                        Socket.destroy(socket);
1223                    }
1224                    return;
1225                }
1226                addS[addCount] = socket;
1227                addCount++;
1228                this.notify();
1229            }
1230        }
1231
1232        /**
1233         * The background thread that listens for incoming TCP/IP connections and
1234         * hands them off to an appropriate processor.
1235         */

1236        public void run() {
1237
1238            long maintainTime = 0;
1239            // Loop until we receive a shutdown command
1240
while (running) {
1241                // Loop if endpoint is paused
1242
while (paused) {
1243                    try {
1244                        Thread.sleep(1000);
1245                    } catch (InterruptedException JavaDoc e) {
1246                        // Ignore
1247
}
1248                }
1249
1250                while (keepAliveCount < 1 && addCount < 1) {
1251                    // Reset maintain time.
1252
maintainTime = 0;
1253                    try {
1254                        synchronized (this) {
1255                            this.wait();
1256                        }
1257                    } catch (InterruptedException JavaDoc e) {
1258                        // Ignore
1259
}
1260                }
1261
1262                try {
1263                    // Add sockets which are waiting to the poller
1264
if (addCount > 0) {
1265                        synchronized (this) {
1266                            for (int i = (addCount - 1); i >= 0; i--) {
1267                                int rv = Poll.add
1268                                    (serverPollset, addS[i], Poll.APR_POLLIN);
1269                                if (rv == Status.APR_SUCCESS) {
1270                                    keepAliveCount++;
1271                                } else {
1272                                    // Can't do anything: close the socket right away
1273
if (comet) {
1274                                        processSocket(addS[i], SocketStatus.ERROR);
1275                                    } else {
1276                                        Socket.destroy(addS[i]);
1277                                    }
1278                                }
1279                            }
1280                            addCount = 0;
1281                        }
1282                    }
1283
1284                    maintainTime += pollTime;
1285                    // Pool for the specified interval
1286
int rv = Poll.poll(serverPollset, pollTime, desc, true);
1287                    if (rv > 0) {
1288                        keepAliveCount -= rv;
1289                        for (int n = 0; n < rv; n++) {
1290                            // Check for failed sockets and hand this socket off to a worker
1291
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
1292                                    || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
1293                                    || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN)))
1294                                    || (!comet && (!processSocket(desc[n*2+1])))) {
1295                                // Close socket and clear pool
1296
if (comet) {
1297                                    processSocket(desc[n*2+1], SocketStatus.DISCONNECT);
1298                                } else {
1299                                    Socket.destroy(desc[n*2+1]);
1300                                }
1301                                continue;
1302                            }
1303                        }
1304                    } else if (rv < 0) {
1305                        int errn = -rv;
1306                        /* Any non timeup or interrupted error is critical */
1307                        if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
1308                            if (errn > Status.APR_OS_START_USERERR) {
1309                                errn -= Status.APR_OS_START_USERERR;
1310                            }
1311                            log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
1312                            // Handle poll critical failure
1313
synchronized (this) {
1314                                destroy();
1315                                init();
1316                            }
1317                            continue;
1318                        }
1319                    }
1320                    if (soTimeout > 0 && maintainTime > 1000000L && running) {
1321                        rv = Poll.maintain(serverPollset, desc, true);
1322                        maintainTime = 0;
1323                        if (rv > 0) {
1324                            keepAliveCount -= rv;
1325                            for (int n = 0; n < rv; n++) {
1326                                // Close socket and clear pool
1327
if (comet) {
1328                                    processSocket(desc[n], SocketStatus.TIMEOUT);
1329                                } else {
1330                                    Socket.destroy(desc[n]);
1331                                }
1332                            }
1333                        }
1334                    }
1335                } catch (Throwable JavaDoc t) {
1336                    log.error(sm.getString("endpoint.poll.error"), t);
1337                }
1338
1339            }
1340
1341            synchronized (this) {
1342                this.notifyAll();
1343            }
1344
1345        }
1346        
1347    }
1348
1349
1350    // ----------------------------------------------------- Worker Inner Class
1351

1352
1353    /**
1354     * Server processor class.
1355     */

1356    protected class Worker implements Runnable JavaDoc {
1357
1358
1359        protected Thread JavaDoc thread = null;
1360        protected boolean available = false;
1361        protected long socket = 0;
1362        protected SocketStatus status = null;
1363        protected boolean options = false;
1364
1365
1366        /**
1367         * Process an incoming TCP/IP connection on the specified socket. Any
1368         * exception that occurs during processing must be logged and swallowed.
1369         * <b>NOTE</b>: This method is called from our Connector's thread. We
1370         * must assign it to our own thread so that multiple simultaneous
1371         * requests can be handled.
1372         *
1373         * @param socket TCP socket to process
1374         */

1375        protected synchronized void assignWithOptions(long socket) {
1376
1377            // Wait for the Processor to get the previous Socket
1378
while (available) {
1379                try {
1380                    wait();
1381                } catch (InterruptedException JavaDoc e) {
1382                }
1383            }
1384
1385            // Store the newly available Socket and notify our thread
1386
this.socket = socket;
1387            status = null;
1388            options = true;
1389            available = true;
1390            notifyAll();
1391
1392        }
1393
1394
1395        /**
1396         * Process an incoming TCP/IP connection on the specified socket. Any
1397         * exception that occurs during processing must be logged and swallowed.
1398         * <b>NOTE</b>: This method is called from our Connector's thread. We
1399         * must assign it to our own thread so that multiple simultaneous
1400         * requests can be handled.
1401         *
1402         * @param socket TCP socket to process
1403         */

1404        protected synchronized void assign(long socket) {
1405
1406            // Wait for the Processor to get the previous Socket
1407
while (available) {
1408                try {
1409                    wait();
1410                } catch (InterruptedException JavaDoc e) {
1411                }
1412            }
1413
1414            // Store the newly available Socket and notify our thread
1415
this.socket = socket;
1416            status = null;
1417            options = false;
1418            available = true;
1419            notifyAll();
1420
1421        }
1422
1423
1424        protected synchronized void assign(long socket, SocketStatus status) {
1425
1426            // Wait for the Processor to get the previous Socket
1427
while (available) {
1428                try {
1429                    wait();
1430                } catch (InterruptedException JavaDoc e) {
1431                }
1432            }
1433
1434            // Store the newly available Socket and notify our thread
1435
this.socket = socket;
1436            this.status = status;
1437            options = false;
1438            available = true;
1439            notifyAll();
1440
1441        }
1442
1443
1444        /**
1445         * Await a newly assigned Socket from our Connector, or <code>null</code>
1446         * if we are supposed to shut down.
1447         */

1448        protected synchronized long await() {
1449
1450            // Wait for the Connector to provide a new Socket
1451
while (!available) {
1452                try {
1453                    wait();
1454                } catch (InterruptedException JavaDoc e) {
1455                }
1456            }
1457
1458            // Notify the Connector that we have received this Socket
1459
long socket = this.socket;
1460            available = false;
1461            notifyAll();
1462
1463            return (socket);
1464
1465        }
1466
1467
1468        /**
1469         * The background thread that listens for incoming TCP/IP connections and
1470         * hands them off to an appropriate processor.
1471         */

1472        public void run() {
1473
1474            // Process requests until we receive a shutdown signal
1475
while (running) {
1476
1477                // Wait for the next socket to be assigned
1478
long socket = await();
1479                if (socket == 0)
1480                    continue;
1481
1482                // Process the request from this socket
1483
if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
1484                    // Close socket and pool
1485
Socket.destroy(socket);
1486                    socket = 0;
1487                } else if ((status == null) && ((options && !setSocketOptions(socket))
1488                        || handler.process(socket) == Handler.SocketState.CLOSED)) {
1489                    // Close socket and pool
1490
Socket.destroy(socket);
1491                    socket = 0;
1492                }
1493
1494                // Finish up this request
1495
recycleWorkerThread(this);
1496
1497            }
1498
1499        }
1500
1501
1502        /**
1503         * Start the background processing thread.
1504         */

1505        public void start() {
1506            thread = new Thread JavaDoc(this);
1507            thread.setName(getName() + "-" + (++curThreads));
1508            thread.setDaemon(true);
1509            thread.start();
1510        }
1511
1512
1513    }
1514
1515
1516    // ----------------------------------------------- SendfileData Inner Class
1517

1518
1519    /**
1520     * SendfileData class.
1521     */

1522    public static class SendfileData {
1523        // File
1524
public String JavaDoc fileName;
1525        public long fd;
1526        public long fdpool;
1527        // Range information
1528
public long start;
1529        public long end;
1530        // Socket and socket pool
1531
public long socket;
1532        // Position
1533
public long pos;
1534        // KeepAlive flag
1535
public boolean keepAlive;
1536    }
1537
1538
1539    // --------------------------------------------------- Sendfile Inner Class
1540

1541
1542    /**
1543     * Sendfile class.
1544     */

1545    public class Sendfile implements Runnable JavaDoc {
1546
1547        protected long sendfilePollset = 0;
1548        protected long pool = 0;
1549        protected long[] desc;
1550        protected HashMap JavaDoc<Long JavaDoc, SendfileData> sendfileData;
1551        
1552        protected int sendfileCount;
1553        public int getSendfileCount() { return sendfileCount; }
1554
1555        protected ArrayList JavaDoc<SendfileData> addS;
1556
1557        /**
1558         * Create the sendfile poller. With some versions of APR, the maximum poller size will
1559         * be 62 (reocmpiling APR is necessary to remove this limitation).
1560         */

1561        protected void init() {
1562            pool = Pool.create(serverSockPool);
1563            int size = sendfileSize / sendfileThreadCount;
1564            sendfilePollset = allocatePoller(size, pool, soTimeout);
1565            if (sendfilePollset == 0 && size > 1024) {
1566                size = 1024;
1567                sendfilePollset = allocatePoller(size, pool, soTimeout);
1568            }
1569            if (sendfilePollset == 0) {
1570                size = 62;
1571                sendfilePollset = allocatePoller(size, pool, soTimeout);
1572            }
1573            desc = new long[size * 2];
1574            sendfileData = new HashMap JavaDoc<Long JavaDoc, SendfileData>(size);
1575            addS = new ArrayList JavaDoc<SendfileData>();
1576        }
1577
1578        /**
1579         * Destroy the poller.
1580         */

1581        protected void destroy() {
1582            // Wait for polltime before doing anything, so that the poller threads
1583
// exit, otherwise parallel descturction of sockets which are still
1584
// in the poller can cause problems
1585
try {
1586                synchronized (this) {
1587                    this.wait(pollTime / 1000);
1588                }
1589            } catch (InterruptedException JavaDoc e) {
1590                // Ignore
1591
}
1592            // Close any socket remaining in the add queue
1593
for (int i = (addS.size() - 1); i >= 0; i--) {
1594                SendfileData data = addS.get(i);
1595                Socket.destroy(data.socket);
1596            }
1597            // Close all sockets still in the poller
1598
int rv = Poll.pollset(sendfilePollset, desc);
1599            if (rv > 0) {
1600                for (int n = 0; n < rv; n++) {
1601                    Socket.destroy(desc[n*2+1]);
1602                }
1603            }
1604            Pool.destroy(pool);
1605            sendfileData.clear();
1606        }
1607
1608        /**
1609         * Add the sendfile data to the sendfile poller. Note that in most cases,
1610         * the initial non blocking calls to sendfile will return right away, and
1611         * will be handled asynchronously inside the kernel. As a result,
1612         * the poller will never be used.
1613         *
1614         * @param data containing the reference to the data which should be snet
1615         * @return true if all the data has been sent right away, and false
1616         * otherwise
1617         */

1618        public boolean add(SendfileData data) {
1619            // Initialize fd from data given
1620
try {
1621                data.fdpool = Socket.pool(data.socket);
1622                data.fd = File.open
1623                    (data.fileName, File.APR_FOPEN_READ
1624                     | File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY,
1625                     0, data.fdpool);
1626                data.pos = data.start;
1627                // Set the socket to nonblocking mode
1628
Socket.timeoutSet(data.socket, 0);
1629                while (true) {
1630                    long nw = Socket.sendfilen(data.socket, data.fd,
1631                                               data.pos, data.end - data.pos, 0);
1632                    if (nw < 0) {
1633                        if (!(-nw == Status.EAGAIN)) {
1634                            Socket.destroy(data.socket);
1635                            data.socket = 0;
1636                            return false;
1637                        } else {
1638                            // Break the loop and add the socket to poller.
1639
break;
1640                        }
1641                    } else {
1642                        data.pos = data.pos + nw;
1643                        if (data.pos >= data.end) {
1644                            // Entire file has been sent
1645
Pool.destroy(data.fdpool);
1646                            // Set back socket to blocking mode
1647
Socket.timeoutSet(data.socket, soTimeout * 1000);
1648                            return true;
1649                        }
1650                    }
1651                }
1652            } catch (Exception JavaDoc e) {
1653                log.error(sm.getString("endpoint.sendfile.error"), e);
1654                return false;
1655            }
1656            // Add socket to the list. Newly added sockets will wait
1657
// at most for pollTime before being polled
1658
synchronized (this) {
1659                addS.add(data);
1660                this.notify();
1661            }
1662            return false;
1663        }
1664
1665        /**
1666         * Remove socket from the poller.
1667         *
1668         * @param data the sendfile data which should be removed
1669         */

1670        protected void remove(SendfileData data) {
1671            int rv = Poll.remove(sendfilePollset, data.socket);
1672            if (rv == Status.APR_SUCCESS) {
1673                sendfileCount--;
1674            }
1675            sendfileData.remove(data);
1676        }
1677
1678        /**
1679         * The background thread that listens for incoming TCP/IP connections and
1680         * hands them off to an appropriate processor.
1681         */

1682        public void run() {
1683
1684            // Loop until we receive a shutdown command
1685
while (running) {
1686
1687                // Loop if endpoint is paused
1688
while (paused) {
1689                    try {
1690                        Thread.sleep(1000);
1691                    } catch (InterruptedException JavaDoc e) {
1692                        // Ignore
1693
}
1694                }
1695
1696                while (sendfileCount < 1 && addS.size() < 1) {
1697                    try {
1698                        synchronized (this) {
1699                            this.wait();
1700                        }
1701                    } catch (InterruptedException JavaDoc e) {
1702                        // Ignore
1703
}
1704                }
1705
1706                try {
1707                    // Add socket to the poller
1708
if (addS.size() > 0) {
1709                        synchronized (this) {
1710                            for (int i = (addS.size() - 1); i >= 0; i--) {
1711                                SendfileData data = addS.get(i);
1712                                int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT);
1713                                if (rv == Status.APR_SUCCESS) {
1714                                    sendfileData.put(new Long JavaDoc(data.socket), data);
1715                                    sendfileCount++;
1716                                } else {
1717                                    log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv)));
1718                                    // Can't do anything: close the socket right away
1719
Socket.destroy(data.socket);
1720                                }
1721                            }
1722                            addS.clear();
1723                        }
1724                    }
1725                    // Pool for the specified interval
1726
int rv = Poll.poll(sendfilePollset, pollTime, desc, false);
1727                    if (rv > 0) {
1728                        for (int n = 0; n < rv; n++) {
1729                            // Get the sendfile state
1730
SendfileData state =
1731                                sendfileData.get(new Long JavaDoc(desc[n*2+1]));
1732                            // Problem events
1733
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
1734                                    || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {
1735                                // Close socket and clear pool
1736
remove(state);
1737                                // Destroy file descriptor pool, which should close the file
1738
// Close the socket, as the reponse would be incomplete
1739
Socket.destroy(state.socket);
1740                                continue;
1741                            }
1742                            // Write some data using sendfile
1743
long nw = Socket.sendfilen(state.socket, state.fd,
1744                                                       state.pos,
1745                                                       state.end - state.pos, 0);
1746                            if (nw < 0) {
1747                                // Close socket and clear pool
1748
remove(state);
1749                                // Close the socket, as the reponse would be incomplete
1750
// This will close the file too.
1751
Socket.destroy(state.socket);
1752                                continue;
1753                            }
1754
1755                            state.pos = state.pos + nw;
1756                            if (state.pos >= state.end) {
1757                                remove(state);
1758                                if (state.keepAlive) {
1759                                    // Destroy file descriptor pool, which should close the file
1760
Pool.destroy(state.fdpool);
1761                                    Socket.timeoutSet(state.socket, soTimeout * 1000);
1762                                    // If all done hand this socket off to a worker for
1763
// processing of further requests
1764
if (!processSocket(state.socket)) {
1765                                        Socket.destroy(state.socket);
1766                                    }
1767                                } else {
1768                                    // Close the socket since this is
1769
// the end of not keep-alive request.
1770
Socket.destroy(state.socket);
1771                                }
1772                            }
1773                        }
1774                    } else if (rv < 0) {
1775                        int errn = -rv;
1776                        /* Any non timeup or interrupted error is critical */
1777                        if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
1778                            if (errn > Status.APR_OS_START_USERERR) {
1779                                errn -= Status.APR_OS_START_USERERR;
1780                            }
1781                            log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
1782                            // Handle poll critical failure
1783
synchronized (this) {
1784                                destroy();
1785                                init();
1786                            }
1787                            continue;
1788                        }
1789                    }
1790                    /* TODO: See if we need to call the maintain for sendfile poller */
1791                } catch (Throwable JavaDoc t) {
1792                    log.error(sm.getString("endpoint.poll.error"), t);
1793                }
1794            }
1795
1796            synchronized (this) {
1797                this.notifyAll();
1798            }
1799
1800        }
1801
1802    }
1803
1804
1805    // ------------------------------------------------ Handler Inner Interface
1806

1807
1808    /**
1809     * Bare bones interface used for socket processing. Per thread data is to be
1810     * stored in the ThreadWithAttributes extra folders, or alternately in
1811     * thread local fields.
1812     */

1813    public interface Handler {
1814        public enum SocketState {
1815            OPEN, CLOSED, LONG
1816        }
1817        public SocketState process(long socket);
1818        public SocketState event(long socket, SocketStatus status);
1819    }
1820
1821
1822    // ------------------------------------------------- WorkerStack Inner Class
1823

1824
1825    public class WorkerStack {
1826        
1827        protected Worker[] workers = null;
1828        protected int end = 0;
1829        
1830        public WorkerStack(int size) {
1831            workers = new Worker[size];
1832        }
1833        
1834        /**
1835         * Put the object into the queue.
1836         *
1837         * @param object the object to be appended to the queue (first element).
1838         */

1839        public void push(Worker worker) {
1840            workers[end++] = worker;
1841        }
1842        
1843        /**
1844         * Get the first object out of the queue. Return null if the queue
1845         * is empty.
1846         */

1847        public Worker pop() {
1848            if (end > 0) {
1849                return workers[--end];
1850            }
1851            return null;
1852        }
1853        
1854        /**
1855         * Get the first object out of the queue, Return null if the queue
1856         * is empty.
1857         */

1858        public Worker peek() {
1859            return workers[end];
1860        }
1861        
1862        /**
1863         * Is the queue empty?
1864         */

1865        public boolean isEmpty() {
1866            return (end == 0);
1867        }
1868        
1869        /**
1870         * How many elements are there in this queue?
1871         */

1872        public int size() {
1873            return (end);
1874        }
1875    }
1876
1877
1878    // ---------------------------------------------- SocketProcessor Inner Class
1879

1880
1881    /**
1882     * This class is the equivalent of the Worker, but will simply use in an
1883     * external Executor thread pool. This will also set the socket options
1884     * and do the handshake.
1885     */

1886    protected class SocketWithOptionsProcessor implements Runnable JavaDoc {
1887        
1888        protected long socket = 0;
1889        
1890        public SocketWithOptionsProcessor(long socket) {
1891            this.socket = socket;
1892        }
1893
1894        public void run() {
1895
1896            // Process the request from this socket
1897
if (!setSocketOptions(socket)
1898                    || handler.process(socket) == Handler.SocketState.CLOSED) {
1899                // Close socket and pool
1900
Socket.destroy(socket);
1901                socket = 0;
1902            }
1903
1904        }
1905        
1906    }
1907    
1908    
1909    // ---------------------------------------------- SocketProcessor Inner Class
1910

1911
1912    /**
1913     * This class is the equivalent of the Worker, but will simply use in an
1914     * external Executor thread pool.
1915     */

1916    protected class SocketProcessor implements Runnable JavaDoc {
1917        
1918        protected long socket = 0;
1919        
1920        public SocketProcessor(long socket) {
1921            this.socket = socket;
1922        }
1923
1924        public void run() {
1925
1926            // Process the request from this socket
1927
if (handler.process(socket) == Handler.SocketState.CLOSED) {
1928                // Close socket and pool
1929
Socket.destroy(socket);
1930                socket = 0;
1931            }
1932
1933        }
1934        
1935    }
1936    
1937    
1938    // --------------------------------------- SocketEventProcessor Inner Class
1939

1940
1941    /**
1942     * This class is the equivalent of the Worker, but will simply use in an
1943     * external Executor thread pool.
1944     */

1945    protected class SocketEventProcessor implements Runnable JavaDoc {
1946        
1947        protected long socket = 0;
1948        protected SocketStatus status = null;
1949        
1950        public SocketEventProcessor(long socket, SocketStatus status) {
1951            this.socket = socket;
1952            this.status = status;
1953        }
1954
1955        public void run() {
1956
1957            // Process the request from this socket
1958
if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
1959                // Close socket and pool
1960
Socket.destroy(socket);
1961                socket = 0;
1962            }
1963
1964        }
1965        
1966    }
1967    
1968    
1969}
1970
Popular Tags