KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > tomcat > util > net > JIoEndpoint


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 package org.apache.tomcat.util.net;
19
20 import java.io.IOException JavaDoc;
21 import java.net.BindException JavaDoc;
22 import java.net.InetAddress JavaDoc;
23 import java.net.ServerSocket JavaDoc;
24 import java.net.Socket JavaDoc;
25 import java.util.concurrent.Executor JavaDoc;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.tomcat.util.res.StringManager;
30
31 /**
32  * Handle incoming TCP connections.
33  *
34  * This class implement a simple server model: one listener thread accepts on a socket and
35  * creates a new worker thread for each incoming connection.
36  *
37  * More advanced Endpoints will reuse the threads, use queues, etc.
38  *
39  * @author James Duncan Davidson
40  * @author Jason Hunter
41  * @author James Todd
42  * @author Costin Manolache
43  * @author Gal Shachor
44  * @author Yoav Shapira
45  * @author Remy Maucherat
46  */

47 public class JIoEndpoint {
48
49
50     // -------------------------------------------------------------- Constants
51

52
53     protected static Log log = LogFactory.getLog(JIoEndpoint.class);
54
55     protected StringManager sm =
56         StringManager.getManager("org.apache.tomcat.util.net.res");
57
58
59     /**
60      * The Request attribute key for the cipher suite.
61      */

62     public static final String JavaDoc CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
63
64     /**
65      * The Request attribute key for the key size.
66      */

67     public static final String JavaDoc KEY_SIZE_KEY = "javax.servlet.request.key_size";
68
69     /**
70      * The Request attribute key for the client certificate chain.
71      */

72     public static final String JavaDoc CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
73
74     /**
75      * The Request attribute key for the session id.
76      * This one is a Tomcat extension to the Servlet spec.
77      */

78     public static final String JavaDoc SESSION_ID_KEY = "javax.servlet.request.ssl_session";
79
80
81     // ----------------------------------------------------------------- Fields
82

83
84     /**
85      * Available workers.
86      */

87     protected WorkerStack workers = null;
88
89
90     /**
91      * Running state of the endpoint.
92      */

93     protected volatile boolean running = false;
94
95
96     /**
97      * Will be set to true whenever the endpoint is paused.
98      */

99     protected volatile boolean paused = false;
100
101
102     /**
103      * Track the initialization state of the endpoint.
104      */

105     protected boolean initialized = false;
106
107
108     /**
109      * Current worker threads busy count.
110      */

111     protected int curThreadsBusy = 0;
112
113
114     /**
115      * Current worker threads count.
116      */

117     protected int curThreads = 0;
118
119
120     /**
121      * Sequence number used to generate thread names.
122      */

123     protected int sequence = 0;
124
125
126     /**
127      * Associated server socket.
128      */

129     protected ServerSocket JavaDoc serverSocket = null;
130
131
132     // ------------------------------------------------------------- Properties
133

134
135     /**
136      * Acceptor thread count.
137      */

138     protected int acceptorThreadCount = 0;
139     public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; }
140     public int getAcceptorThreadCount() { return acceptorThreadCount; }
141
142
143     /**
144      * External Executor based thread pool.
145      */

146     protected Executor JavaDoc executor = null;
147     public void setExecutor(Executor JavaDoc executor) { this.executor = executor; }
148     public Executor JavaDoc getExecutor() { return executor; }
149
150
151     /**
152      * Maximum amount of worker threads.
153      */

154     protected int maxThreads = 40;
155     public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }
156     public int getMaxThreads() { return maxThreads; }
157
158
159     /**
160      * Priority of the acceptor and poller threads.
161      */

162     protected int threadPriority = Thread.NORM_PRIORITY;
163     public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }
164     public int getThreadPriority() { return threadPriority; }
165
166     
167     /**
168      * Server socket port.
169      */

170     protected int port;
171     public int getPort() { return port; }
172     public void setPort(int port ) { this.port=port; }
173
174
175     /**
176      * Address for the server socket.
177      */

178     protected InetAddress JavaDoc address;
179     public InetAddress JavaDoc getAddress() { return address; }
180     public void setAddress(InetAddress JavaDoc address) { this.address = address; }
181
182
183     /**
184      * Handling of accepted sockets.
185      */

186     protected Handler handler = null;
187     public void setHandler(Handler handler ) { this.handler = handler; }
188     public Handler getHandler() { return handler; }
189
190
191     /**
192      * Allows the server developer to specify the backlog that
193      * should be used for server sockets. By default, this value
194      * is 100.
195      */

196     protected int backlog = 100;
197     public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
198     public int getBacklog() { return backlog; }
199
200
201     /**
202      * Socket TCP no delay.
203      */

204     protected boolean tcpNoDelay = false;
205     public boolean getTcpNoDelay() { return tcpNoDelay; }
206     public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }
207
208
209     /**
210      * Socket linger.
211      */

212     protected int soLinger = 100;
213     public int getSoLinger() { return soLinger; }
214     public void setSoLinger(int soLinger) { this.soLinger = soLinger; }
215
216
217     /**
218      * Socket timeout.
219      */

220     protected int soTimeout = -1;
221     public int getSoTimeout() { return soTimeout; }
222     public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
223
224
225     /**
226      * The default is true - the created threads will be
227      * in daemon mode. If set to false, the control thread
228      * will not be daemon - and will keep the process alive.
229      */

230     protected boolean daemon = true;
231     public void setDaemon(boolean b) { daemon = b; }
232     public boolean getDaemon() { return daemon; }
233
234
235     /**
236      * Name of the thread pool, which will be used for naming child threads.
237      */

238     protected String JavaDoc name = "TP";
239     public void setName(String JavaDoc name) { this.name = name; }
240     public String JavaDoc getName() { return name; }
241
242
243     /**
244      * Server socket factory.
245      */

246     protected ServerSocketFactory serverSocketFactory = null;
247     public void setServerSocketFactory(ServerSocketFactory factory) { this.serverSocketFactory = factory; }
248     public ServerSocketFactory getServerSocketFactory() { return serverSocketFactory; }
249
250
251     public boolean isRunning() {
252         return running;
253     }
254     
255     public boolean isPaused() {
256         return paused;
257     }
258     
259     public int getCurrentThreadCount() {
260         return curThreads;
261     }
262     
263     public int getCurrentThreadsBusy() {
264         return curThreads - workers.size();
265     }
266     
267
268     // ------------------------------------------------ Handler Inner Interface
269

270
271     /**
272      * Bare bones interface used for socket processing. Per thread data is to be
273      * stored in the ThreadWithAttributes extra folders, or alternately in
274      * thread local fields.
275      */

276     public interface Handler {
277         public boolean process(Socket JavaDoc socket);
278     }
279
280
281     // --------------------------------------------------- Acceptor Inner Class
282

283
284     /**
285      * Server socket acceptor thread.
286      */

287     protected class Acceptor implements Runnable JavaDoc {
288
289
290         /**
291          * The background thread that listens for incoming TCP/IP connections and
292          * hands them off to an appropriate processor.
293          */

294         public void run() {
295
296             // Loop until we receive a shutdown command
297
while (running) {
298
299                 // Loop if endpoint is paused
300
while (paused) {
301                     try {
302                         Thread.sleep(1000);
303                     } catch (InterruptedException JavaDoc e) {
304                         // Ignore
305
}
306                 }
307
308                 // Accept the next incoming connection from the server socket
309
try {
310                     Socket JavaDoc socket = serverSocketFactory.acceptSocket(serverSocket);
311                     serverSocketFactory.initSocket(socket);
312                     // Hand this socket off to an appropriate processor
313
if (!processSocket(socket)) {
314                         // Close socket right away
315
try {
316                             socket.close();
317                         } catch (IOException JavaDoc e) {
318                             // Ignore
319
}
320                     }
321                 } catch (Throwable JavaDoc t) {
322                     log.error(sm.getString("endpoint.accept.fail"), t);
323                 }
324
325                 // The processor will recycle itself when it finishes
326

327             }
328
329         }
330
331     }
332
333
334     // ------------------------------------------- SocketProcessor Inner Class
335

336
337     /**
338      * This class is the equivalent of the Worker, but will simply use in an
339      * external Executor thread pool.
340      */

341     protected class SocketProcessor implements Runnable JavaDoc {
342         
343         protected Socket JavaDoc socket = null;
344         
345         public SocketProcessor(Socket JavaDoc socket) {
346             this.socket = socket;
347         }
348
349         public void run() {
350
351             // Process the request from this socket
352
if (!setSocketOptions(socket) || !handler.process(socket)) {
353                 // Close socket
354
try {
355                     socket.close();
356                 } catch (IOException JavaDoc e) {
357                 }
358             }
359
360             // Finish up this request
361
socket = null;
362
363         }
364         
365     }
366     
367     
368     // ----------------------------------------------------- Worker Inner Class
369

370
371     protected class Worker implements Runnable JavaDoc {
372
373         protected Thread JavaDoc thread = null;
374         protected boolean available = false;
375         protected Socket JavaDoc socket = null;
376
377         
378         /**
379          * Process an incoming TCP/IP connection on the specified socket. Any
380          * exception that occurs during processing must be logged and swallowed.
381          * <b>NOTE</b>: This method is called from our Connector's thread. We
382          * must assign it to our own thread so that multiple simultaneous
383          * requests can be handled.
384          *
385          * @param socket TCP socket to process
386          */

387         synchronized void assign(Socket JavaDoc socket) {
388
389             // Wait for the Processor to get the previous Socket
390
while (available) {
391                 try {
392                     wait();
393                 } catch (InterruptedException JavaDoc e) {
394                 }
395             }
396
397             // Store the newly available Socket and notify our thread
398
this.socket = socket;
399             available = true;
400             notifyAll();
401
402         }
403
404         
405         /**
406          * Await a newly assigned Socket from our Connector, or <code>null</code>
407          * if we are supposed to shut down.
408          */

409         private synchronized Socket JavaDoc await() {
410
411             // Wait for the Connector to provide a new Socket
412
while (!available) {
413                 try {
414                     wait();
415                 } catch (InterruptedException JavaDoc e) {
416                 }
417             }
418
419             // Notify the Connector that we have received this Socket
420
Socket JavaDoc socket = this.socket;
421             available = false;
422             notifyAll();
423
424             return (socket);
425
426         }
427
428
429
430         /**
431          * The background thread that listens for incoming TCP/IP connections and
432          * hands them off to an appropriate processor.
433          */

434         public void run() {
435
436             // Process requests until we receive a shutdown signal
437
while (running) {
438
439                 // Wait for the next socket to be assigned
440
Socket JavaDoc socket = await();
441                 if (socket == null)
442                     continue;
443
444                 // Process the request from this socket
445
if (!setSocketOptions(socket) || !handler.process(socket)) {
446                     // Close socket
447
try {
448                         socket.close();
449                     } catch (IOException JavaDoc e) {
450                     }
451                 }
452
453                 // Finish up this request
454
socket = null;
455                 recycleWorkerThread(this);
456
457             }
458
459         }
460
461
462         /**
463          * Start the background processing thread.
464          */

465         public void start() {
466             thread = new Thread JavaDoc(this);
467             thread.setName(getName() + "-" + (++curThreads));
468             thread.setDaemon(true);
469             thread.start();
470         }
471
472
473     }
474
475
476     // -------------------- Public methods --------------------
477

478     public void init()
479         throws Exception JavaDoc {
480
481         if (initialized)
482             return;
483         
484         // Initialize thread count defaults for acceptor
485
if (acceptorThreadCount == 0) {
486             acceptorThreadCount = 1;
487         }
488         if (serverSocketFactory == null) {
489             serverSocketFactory = ServerSocketFactory.getDefault();
490         }
491         if (serverSocket == null) {
492             try {
493                 if (address == null) {
494                     serverSocket = serverSocketFactory.createSocket(port, backlog);
495                 } else {
496                     serverSocket = serverSocketFactory.createSocket(port, backlog, address);
497                 }
498             } catch (BindException JavaDoc be) {
499                 throw new BindException JavaDoc(be.getMessage() + ":" + port);
500             }
501         }
502         //if( serverTimeout >= 0 )
503
// serverSocket.setSoTimeout( serverTimeout );
504

505         initialized = true;
506         
507     }
508     
509     public void start()
510         throws Exception JavaDoc {
511         // Initialize socket if not done before
512
if (!initialized) {
513             init();
514         }
515         if (!running) {
516             running = true;
517             paused = false;
518
519             // Create worker collection
520
if (executor == null) {
521                 workers = new WorkerStack(maxThreads);
522             }
523
524             // Start acceptor threads
525
for (int i = 0; i < acceptorThreadCount; i++) {
526                 Thread JavaDoc acceptorThread = new Thread JavaDoc(new Acceptor(), getName() + "-Acceptor-" + i);
527                 acceptorThread.setPriority(threadPriority);
528                 acceptorThread.setDaemon(daemon);
529                 acceptorThread.start();
530             }
531         }
532     }
533
534     public void pause() {
535         if (running && !paused) {
536             paused = true;
537             unlockAccept();
538         }
539     }
540
541     public void resume() {
542         if (running) {
543             paused = false;
544         }
545     }
546
547     public void stop() {
548         if (running) {
549             running = false;
550             unlockAccept();
551         }
552     }
553
554     /**
555      * Deallocate APR memory pools, and close server socket.
556      */

557     public void destroy() throws Exception JavaDoc {
558         if (running) {
559             stop();
560         }
561         if (serverSocket != null) {
562             try {
563                 if (serverSocket != null)
564                     serverSocket.close();
565             } catch (Exception JavaDoc e) {
566                 log.error(sm.getString("endpoint.err.close"), e);
567             }
568             serverSocket = null;
569         }
570         initialized = false ;
571     }
572
573     
574     /**
575      * Unlock the accept by using a local connection.
576      */

577     protected void unlockAccept() {
578         Socket JavaDoc s = null;
579         try {
580             // Need to create a connection to unlock the accept();
581
if (address == null) {
582                 s = new Socket JavaDoc("127.0.0.1", port);
583             } else {
584                 s = new Socket JavaDoc(address, port);
585                     // setting soLinger to a small value will help shutdown the
586
// connection quicker
587
s.setSoLinger(true, 0);
588             }
589         } catch (Exception JavaDoc e) {
590             if (log.isDebugEnabled()) {
591                 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
592             }
593         } finally {
594             if (s != null) {
595                 try {
596                     s.close();
597                 } catch (Exception JavaDoc e) {
598                     // Ignore
599
}
600             }
601         }
602     }
603
604
605     /**
606      * Set the options for the current socket.
607      */

608     protected boolean setSocketOptions(Socket JavaDoc socket) {
609         // Process the connection
610
int step = 1;
611         try {
612
613             // 1: Set socket options: timeout, linger, etc
614
if (soLinger >= 0) {
615                 socket.setSoLinger(true, soLinger);
616             }
617             if (tcpNoDelay) {
618                 socket.setTcpNoDelay(tcpNoDelay);
619             }
620             if (soTimeout > 0) {
621                 socket.setSoTimeout(soTimeout);
622             }
623
624             // 2: SSL handshake
625
step = 2;
626             serverSocketFactory.handshake(socket);
627
628         } catch (Throwable JavaDoc t) {
629             if (log.isDebugEnabled()) {
630                 if (step == 2) {
631                     log.debug(sm.getString("endpoint.err.handshake"), t);
632                 } else {
633                     log.debug(sm.getString("endpoint.err.unexpected"), t);
634                 }
635             }
636             // Tell to close the socket
637
return false;
638         }
639         return true;
640     }
641
642     
643     /**
644      * Create (or allocate) and return an available processor for use in
645      * processing a specific HTTP request, if possible. If the maximum
646      * allowed processors have already been created and are in use, return
647      * <code>null</code> instead.
648      */

649     protected Worker createWorkerThread() {
650
651         synchronized (workers) {
652             if (workers.size() > 0) {
653                 curThreadsBusy++;
654                 return workers.pop();
655             }
656             if ((maxThreads > 0) && (curThreads < maxThreads)) {
657                 curThreadsBusy++;
658                 return (newWorkerThread());
659             } else {
660                 if (maxThreads < 0) {
661                     curThreadsBusy++;
662                     return (newWorkerThread());
663                 } else {
664                     return (null);
665                 }
666             }
667         }
668
669     }
670
671
672     /**
673      * Create and return a new processor suitable for processing HTTP
674      * requests and returning the corresponding responses.
675      */

676     protected Worker newWorkerThread() {
677
678         Worker workerThread = new Worker();
679         workerThread.start();
680         return (workerThread);
681
682     }
683
684
685     /**
686      * Return a new worker thread, and block while to worker is available.
687      */

688     protected Worker getWorkerThread() {
689         // Allocate a new worker thread
690
Worker workerThread = createWorkerThread();
691         while (workerThread == null) {
692             try {
693                 synchronized (workers) {
694                     workers.wait();
695                 }
696             } catch (InterruptedException JavaDoc e) {
697                 // Ignore
698
}
699             workerThread = createWorkerThread();
700         }
701         return workerThread;
702     }
703
704
705     /**
706      * Recycle the specified Processor so that it can be used again.
707      *
708      * @param workerThread The processor to be recycled
709      */

710     protected void recycleWorkerThread(Worker workerThread) {
711         synchronized (workers) {
712             workers.push(workerThread);
713             curThreadsBusy--;
714             workers.notify();
715         }
716     }
717
718
719     /**
720      * Process given socket.
721      */

722     protected boolean processSocket(Socket JavaDoc socket) {
723         try {
724             if (executor == null) {
725                 getWorkerThread().assign(socket);
726             } else {
727                 executor.execute(new SocketProcessor(socket));
728             }
729         } catch (Throwable JavaDoc t) {
730             // This means we got an OOM or similar creating a thread, or that
731
// the pool and its queue are full
732
log.error(sm.getString("endpoint.process.fail"), t);
733             return false;
734         }
735         return true;
736     }
737     
738
739     // ------------------------------------------------- WorkerStack Inner Class
740

741
742     public class WorkerStack {
743         
744         protected Worker[] workers = null;
745         protected int end = 0;
746         
747         public WorkerStack(int size) {
748             workers = new Worker[size];
749         }
750         
751         /**
752          * Put the object into the queue.
753          *
754          * @param object the object to be appended to the queue (first element).
755          */

756         public void push(Worker worker) {
757             workers[end++] = worker;
758         }
759         
760         /**
761          * Get the first object out of the queue. Return null if the queue
762          * is empty.
763          */

764         public Worker pop() {
765             if (end > 0) {
766                 return workers[--end];
767             }
768             return null;
769         }
770         
771         /**
772          * Get the first object out of the queue, Return null if the queue
773          * is empty.
774          */

775         public Worker peek() {
776             return workers[end];
777         }
778         
779         /**
780          * Is the queue empty?
781          */

782         public boolean isEmpty() {
783             return (end == 0);
784         }
785         
786         /**
787          * How many elements are there in this queue?
788          */

789         public int size() {
790             return (end);
791         }
792     }
793
794 }
795
Popular Tags