KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > tomcat > util > net > PoolTcpEndpoint


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 package org.apache.tomcat.util.net;
19
20 import java.io.IOException JavaDoc;
21 import java.io.InterruptedIOException JavaDoc;
22 import java.net.BindException JavaDoc;
23 import java.net.InetAddress JavaDoc;
24 import java.net.ServerSocket JavaDoc;
25 import java.net.Socket JavaDoc;
26 import java.net.SocketException JavaDoc;
27 import java.security.AccessControlException JavaDoc;
28 import java.util.Stack JavaDoc;
29 import java.util.Vector JavaDoc;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.tomcat.util.res.StringManager;
34 import org.apache.tomcat.util.threads.ThreadPool;
35 import org.apache.tomcat.util.threads.ThreadPoolRunnable;
36
37 /* Similar with MPM module in Apache2.0. Handles all the details related with
38    "tcp server" functionality - thread management, accept policy, etc.
39    It should do nothing more - as soon as it get a socket ( and all socket options
40    are set, etc), it just handle the stream to ConnectionHandler.processConnection. (costin)
41 */

42
43
44
45 /**
46  * Handle incoming TCP connections.
47  *
48  * This class implement a simple server model: one listener thread accepts on a socket and
49  * creates a new worker thread for each incoming connection.
50  *
51  * More advanced Endpoints will reuse the threads, use queues, etc.
52  *
53  * @author James Duncan Davidson [duncan@eng.sun.com]
54  * @author Jason Hunter [jch@eng.sun.com]
55  * @author James Todd [gonzo@eng.sun.com]
56  * @author Costin@eng.sun.com
57  * @author Gal Shachor [shachor@il.ibm.com]
58  * @author Yoav Shapira <yoavs@apache.org>
59  */

60 public class PoolTcpEndpoint implements Runnable JavaDoc { // implements Endpoint {
61

62     static Log log=LogFactory.getLog(PoolTcpEndpoint.class );
63
64     private StringManager sm =
65         StringManager.getManager("org.apache.tomcat.util.net.res");
66
67     private static final int BACKLOG = 100;
68     private static final int TIMEOUT = 1000;
69
70     private final Object JavaDoc threadSync = new Object JavaDoc();
71
72     private int backlog = BACKLOG;
73     private int serverTimeout = TIMEOUT;
74
75     private InetAddress JavaDoc inet;
76     private int port;
77
78     private ServerSocketFactory factory;
79     private ServerSocket JavaDoc serverSocket;
80
81     private volatile boolean running = false;
82     private volatile boolean paused = false;
83     private boolean initialized = false;
84     private boolean reinitializing = false;
85     static final int debug=0;
86
87     protected boolean tcpNoDelay=false;
88     protected int linger=100;
89     protected int socketTimeout=-1;
90     private boolean lf = true;
91
92     
93     // ------ Leader follower fields
94

95     
96     TcpConnectionHandler handler;
97     ThreadPoolRunnable listener;
98     ThreadPool tp;
99
100     
101     // ------ Master slave fields
102

103     /* The background thread. */
104     private Thread JavaDoc thread = null;
105     /* Available processors. */
106     private Stack JavaDoc workerThreads = new Stack JavaDoc();
107     private int curThreads = 0;
108     private int maxThreads = 20;
109     /* All processors which have been created. */
110     private Vector JavaDoc created = new Vector JavaDoc();
111
112     
113     public PoolTcpEndpoint() {
114     tp = new ThreadPool();
115     }
116
117     public PoolTcpEndpoint( ThreadPool tp ) {
118         this.tp=tp;
119     }
120
121     // -------------------- Configuration --------------------
122

123     public void setMaxThreads(int maxThreads) {
124     if( maxThreads > 0)
125         tp.setMaxThreads(maxThreads);
126     }
127
128     public int getMaxThreads() {
129         return tp.getMaxThreads();
130     }
131
132     public void setMaxSpareThreads(int maxThreads) {
133     if(maxThreads > 0)
134         tp.setMaxSpareThreads(maxThreads);
135     }
136
137     public int getMaxSpareThreads() {
138         return tp.getMaxSpareThreads();
139     }
140
141     public void setMinSpareThreads(int minThreads) {
142     if(minThreads > 0)
143         tp.setMinSpareThreads(minThreads);
144     }
145
146     public int getMinSpareThreads() {
147         return tp.getMinSpareThreads();
148     }
149
150     public void setThreadPriority(int threadPriority) {
151       tp.setThreadPriority(threadPriority);
152     }
153
154     public int getThreadPriority() {
155       return tp.getThreadPriority();
156     }
157
158     public int getPort() {
159         return port;
160     }
161
162     public void setPort(int port ) {
163         this.port=port;
164     }
165
166     public InetAddress JavaDoc getAddress() {
167         return inet;
168     }
169
170     public void setAddress(InetAddress JavaDoc inet) {
171         this.inet=inet;
172     }
173
174     public void setServerSocket(ServerSocket JavaDoc ss) {
175         serverSocket = ss;
176     }
177
178     public void setServerSocketFactory( ServerSocketFactory factory ) {
179         this.factory=factory;
180     }
181
182    ServerSocketFactory getServerSocketFactory() {
183         return factory;
184    }
185
186     public void setConnectionHandler( TcpConnectionHandler handler ) {
187         this.handler=handler;
188     }
189
190     public TcpConnectionHandler getConnectionHandler() {
191         return handler;
192     }
193
194     public boolean isRunning() {
195     return running;
196     }
197     
198     public boolean isPaused() {
199     return paused;
200     }
201     
202     /**
203      * Allows the server developer to specify the backlog that
204      * should be used for server sockets. By default, this value
205      * is 100.
206      */

207     public void setBacklog(int backlog) {
208     if( backlog>0)
209         this.backlog = backlog;
210     }
211
212     public int getBacklog() {
213         return backlog;
214     }
215
216     /**
217      * Sets the timeout in ms of the server sockets created by this
218      * server. This method allows the developer to make servers
219      * more or less responsive to having their server sockets
220      * shut down.
221      *
222      * <p>By default this value is 1000ms.
223      */

224     public void setServerTimeout(int timeout) {
225     this.serverTimeout = timeout;
226     }
227
228     public boolean getTcpNoDelay() {
229         return tcpNoDelay;
230     }
231     
232     public void setTcpNoDelay( boolean b ) {
233     tcpNoDelay=b;
234     }
235
236     public int getSoLinger() {
237         return linger;
238     }
239     
240     public void setSoLinger( int i ) {
241     linger=i;
242     }
243
244     public int getSoTimeout() {
245         return socketTimeout;
246     }
247     
248     public void setSoTimeout( int i ) {
249     socketTimeout=i;
250     }
251     
252     public int getServerSoTimeout() {
253         return serverTimeout;
254     }
255     
256     public void setServerSoTimeout( int i ) {
257     serverTimeout=i;
258     }
259
260     public String JavaDoc getStrategy() {
261         if (lf) {
262             return "lf";
263         } else {
264             return "ms";
265         }
266     }
267     
268     public void setStrategy(String JavaDoc strategy) {
269         if ("ms".equals(strategy)) {
270             lf = false;
271         } else {
272             lf = true;
273         }
274     }
275
276     public int getCurrentThreadCount() {
277         return curThreads;
278     }
279     
280     public int getCurrentThreadsBusy() {
281         return curThreads - workerThreads.size();
282     }
283     
284     // -------------------- Public methods --------------------
285

286     public void initEndpoint() throws IOException JavaDoc, InstantiationException JavaDoc {
287         try {
288             if(factory==null)
289                 factory=ServerSocketFactory.getDefault();
290             if(serverSocket==null) {
291                 try {
292                     if (inet == null) {
293                         serverSocket = factory.createSocket(port, backlog);
294                     } else {
295                         serverSocket = factory.createSocket(port, backlog, inet);
296                     }
297                 } catch ( BindException JavaDoc be ) {
298                     throw new BindException JavaDoc(be.getMessage() + ":" + port);
299                 }
300             }
301             if( serverTimeout >= 0 )
302                 serverSocket.setSoTimeout( serverTimeout );
303         } catch( IOException JavaDoc ex ) {
304             throw ex;
305         } catch( InstantiationException JavaDoc ex1 ) {
306             throw ex1;
307         }
308         initialized = true;
309     }
310     
311     public void startEndpoint() throws IOException JavaDoc, InstantiationException JavaDoc {
312         if (!initialized) {
313             initEndpoint();
314         }
315         if (lf) {
316             tp.start();
317         }
318         running = true;
319         paused = false;
320         if (lf) {
321             listener = new LeaderFollowerWorkerThread(this);
322             tp.runIt(listener);
323         } else {
324             maxThreads = getMaxThreads();
325             threadStart();
326         }
327     }
328
329     public void pauseEndpoint() {
330         if (running && !paused) {
331             paused = true;
332             unlockAccept();
333         }
334     }
335
336     public void resumeEndpoint() {
337         if (running) {
338             paused = false;
339         }
340     }
341
342     public void stopEndpoint() {
343         if (running) {
344             if (lf) {
345                 tp.shutdown();
346             }
347             running = false;
348             if (serverSocket != null) {
349                 closeServerSocket();
350             }
351             if (!lf) {
352                 threadStop();
353             }
354             initialized=false ;
355         }
356     }
357
358     protected void closeServerSocket() {
359         if (!paused)
360             unlockAccept();
361         try {
362             if( serverSocket!=null)
363                 serverSocket.close();
364         } catch(Exception JavaDoc e) {
365             log.error(sm.getString("endpoint.err.close"), e);
366         }
367         serverSocket = null;
368     }
369
370     protected void unlockAccept() {
371         Socket JavaDoc s = null;
372         try {
373             // Need to create a connection to unlock the accept();
374
if (inet == null) {
375                 s = new Socket JavaDoc("127.0.0.1", port);
376             } else {
377                 s = new Socket JavaDoc(inet, port);
378                     // setting soLinger to a small value will help shutdown the
379
// connection quicker
380
s.setSoLinger(true, 0);
381             }
382         } catch(Exception JavaDoc e) {
383             if (log.isDebugEnabled()) {
384                 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
385             }
386         } finally {
387             if (s != null) {
388                 try {
389                     s.close();
390                 } catch (Exception JavaDoc e) {
391                     // Ignore
392
}
393             }
394         }
395     }
396
397     // -------------------- Private methods
398

399     Socket JavaDoc acceptSocket() {
400         if( !running || serverSocket==null ) return null;
401
402         Socket JavaDoc accepted = null;
403
404         try {
405             if(factory==null) {
406                 accepted = serverSocket.accept();
407             } else {
408                 accepted = factory.acceptSocket(serverSocket);
409             }
410             if (null == accepted) {
411                 log.warn(sm.getString("endpoint.warn.nullSocket"));
412             } else {
413                 if (!running) {
414                     accepted.close(); // rude, but unlikely!
415
accepted = null;
416                 } else if (factory != null) {
417                     factory.initSocket( accepted );
418                 }
419             }
420         }
421         catch(InterruptedIOException JavaDoc iioe) {
422             // normal part -- should happen regularly so
423
// that the endpoint can release if the server
424
// is shutdown.
425
}
426         catch (AccessControlException JavaDoc ace) {
427             // When using the Java SecurityManager this exception
428
// can be thrown if you are restricting access to the
429
// socket with SocketPermission's.
430
// Log the unauthorized access and continue
431
String JavaDoc msg = sm.getString("endpoint.warn.security",
432                                       serverSocket, ace);
433             log.warn(msg);
434         }
435         catch (IOException JavaDoc e) {
436
437             String JavaDoc msg = null;
438
439             if (running) {
440                 msg = sm.getString("endpoint.err.nonfatal",
441                         serverSocket, e);
442                 log.error(msg, e);
443             }
444
445             if (accepted != null) {
446                 try {
447                     accepted.close();
448                 } catch(Throwable JavaDoc ex) {
449                     msg = sm.getString("endpoint.err.nonfatal",
450                                        accepted, ex);
451                     log.warn(msg, ex);
452                 }
453                 accepted = null;
454             }
455
456             if( ! running ) return null;
457             reinitializing = true;
458             // Restart endpoint when getting an IOException during accept
459
synchronized (threadSync) {
460                 if (reinitializing) {
461                     reinitializing = false;
462                     // 1) Attempt to close server socket
463
closeServerSocket();
464                     initialized = false;
465                     // 2) Reinit endpoint (recreate server socket)
466
try {
467                         msg = sm.getString("endpoint.warn.reinit");
468                         log.warn(msg);
469                         initEndpoint();
470                     } catch (Throwable JavaDoc t) {
471                         msg = sm.getString("endpoint.err.nonfatal",
472                                            serverSocket, t);
473                         log.error(msg, t);
474                     }
475                     // 3) If failed, attempt to restart endpoint
476
if (!initialized) {
477                         msg = sm.getString("endpoint.warn.restart");
478                         log.warn(msg);
479                         try {
480                             stopEndpoint();
481                             initEndpoint();
482                             startEndpoint();
483                         } catch (Throwable JavaDoc t) {
484                             msg = sm.getString("endpoint.err.fatal",
485                                                serverSocket, t);
486                             log.error(msg, t);
487                         }
488                         // Current thread is now invalid: kill it
489
throw new ThreadDeath JavaDoc();
490                     }
491                 }
492             }
493
494         }
495
496         return accepted;
497     }
498
499     void setSocketOptions(Socket JavaDoc socket)
500         throws SocketException JavaDoc {
501         if(linger >= 0 )
502             socket.setSoLinger( true, linger);
503         if( tcpNoDelay )
504             socket.setTcpNoDelay(tcpNoDelay);
505         if( socketTimeout > 0 )
506             socket.setSoTimeout( socketTimeout );
507     }
508
509     
510     void processSocket(Socket JavaDoc s, TcpConnection con, Object JavaDoc[] threadData) {
511         // Process the connection
512
int step = 1;
513         try {
514             
515             // 1: Set socket options: timeout, linger, etc
516
setSocketOptions(s);
517             
518             // 2: SSL handshake
519
step = 2;
520             if (getServerSocketFactory() != null) {
521                 getServerSocketFactory().handshake(s);
522             }
523             
524             // 3: Process the connection
525
step = 3;
526             con.setEndpoint(this);
527             con.setSocket(s);
528             getConnectionHandler().processConnection(con, threadData);
529             
530         } catch (SocketException JavaDoc se) {
531             log.debug(sm.getString("endpoint.err.socket", s.getInetAddress()),
532                     se);
533             // Try to close the socket
534
try {
535                 s.close();
536             } catch (IOException JavaDoc e) {
537             }
538         } catch (Throwable JavaDoc t) {
539             if (step == 2) {
540                 if (log.isDebugEnabled()) {
541                     log.debug(sm.getString("endpoint.err.handshake"), t);
542                 }
543             } else {
544                 log.error(sm.getString("endpoint.err.unexpected"), t);
545             }
546             // Try to close the socket
547
try {
548                 s.close();
549             } catch (IOException JavaDoc e) {
550             }
551         } finally {
552             if (con != null) {
553                 con.recycle();
554             }
555         }
556     }
557     
558
559     // -------------------------------------------------- Master Slave Methods
560

561
562     /**
563      * Create (or allocate) and return an available processor for use in
564      * processing a specific HTTP request, if possible. If the maximum
565      * allowed processors have already been created and are in use, return
566      * <code>null</code> instead.
567      */

568     private MasterSlaveWorkerThread createWorkerThread() {
569
570         synchronized (workerThreads) {
571             if (workerThreads.size() > 0) {
572                 return ((MasterSlaveWorkerThread) workerThreads.pop());
573             }
574             if ((maxThreads > 0) && (curThreads < maxThreads)) {
575                 return (newWorkerThread());
576             } else {
577                 if (maxThreads < 0) {
578                     return (newWorkerThread());
579                 } else {
580                     return (null);
581                 }
582             }
583         }
584
585     }
586
587     
588     /**
589      * Create and return a new processor suitable for processing HTTP
590      * requests and returning the corresponding responses.
591      */

592     private MasterSlaveWorkerThread newWorkerThread() {
593
594         MasterSlaveWorkerThread workerThread =
595             new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads));
596         workerThread.start();
597         created.addElement(workerThread);
598         return (workerThread);
599
600     }
601
602
603     /**
604      * Recycle the specified Processor so that it can be used again.
605      *
606      * @param processor The processor to be recycled
607      */

608     void recycleWorkerThread(MasterSlaveWorkerThread workerThread) {
609         workerThreads.push(workerThread);
610     }
611
612     
613     /**
614      * The background thread that listens for incoming TCP/IP connections and
615      * hands them off to an appropriate processor.
616      */

617     public void run() {
618
619         // Loop until we receive a shutdown command
620
while (running) {
621
622             // Loop if endpoint is paused
623
while (paused) {
624                 try {
625                     Thread.sleep(1000);
626                 } catch (InterruptedException JavaDoc e) {
627                     // Ignore
628
}
629             }
630
631             // Allocate a new worker thread
632
MasterSlaveWorkerThread workerThread = createWorkerThread();
633             if (workerThread == null) {
634                 try {
635                     // Wait a little for load to go down: as a result,
636
// no accept will be made until the concurrency is
637
// lower than the specified maxThreads, and current
638
// connections will wait for a little bit instead of
639
// failing right away.
640
Thread.sleep(100);
641                 } catch (InterruptedException JavaDoc e) {
642                     // Ignore
643
}
644                 continue;
645             }
646             
647             // Accept the next incoming connection from the server socket
648
Socket JavaDoc socket = acceptSocket();
649
650             // Hand this socket off to an appropriate processor
651
workerThread.assign(socket);
652
653             // The processor will recycle itself when it finishes
654

655         }
656
657         // Notify the threadStop() method that we have shut ourselves down
658
synchronized (threadSync) {
659             threadSync.notifyAll();
660         }
661
662     }
663
664
665     /**
666      * Start the background processing thread.
667      */

668     private void threadStart() {
669         thread = new Thread JavaDoc(this, tp.getName());
670         thread.setPriority(getThreadPriority());
671         thread.setDaemon(true);
672         thread.start();
673     }
674
675
676     /**
677      * Stop the background processing thread.
678      */

679     private void threadStop() {
680         thread = null;
681     }
682
683
684 }
685
Popular Tags