KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mortbay > util > ThreadedServer


1 // ========================================================================
2
// $Id: ThreadedServer.java,v 1.41 2005/12/10 00:38:20 gregwilkins Exp $
3
// Copyright 199-2004 Mort Bay Consulting Pty. Ltd.
4
// ------------------------------------------------------------------------
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
// ========================================================================
15

16 package org.mortbay.util;
17
18 import java.io.IOException JavaDoc;
19 import java.io.InputStream JavaDoc;
20 import java.io.InterruptedIOException JavaDoc;
21 import java.io.OutputStream JavaDoc;
22 import java.net.InetAddress JavaDoc;
23 import java.net.ServerSocket JavaDoc;
24 import java.net.Socket JavaDoc;
25 import java.net.UnknownHostException JavaDoc;
26
27 import org.apache.commons.logging.Log;
28 import org.mortbay.log.LogFactory;
29
30 /* ======================================================================= */
31 /**
32  * Threaded socket server. This class listens at a socket and gives the connections received to a
33  * pool of Threads
34  * <P>
35  * The class is abstract and derived classes must provide the handling for the connections.
36  * <P>
37  * The properties THREADED_SERVER_MIN_THREADS and THREADED_SERVER_MAX_THREADS can be set to control
38  * the number of threads created.
39  * <P>
40  *
41  * @version $Id: ThreadedServer.java,v 1.41 2005/12/10 00:38:20 gregwilkins Exp $
42  * @author Greg Wilkins
43  */

44 abstract public class ThreadedServer extends ThreadPool
45 {
46     private static Log log = LogFactory.getLog(ThreadedServer.class);
47
48     /* ------------------------------------------------------------------- */
49     private InetAddrPort _address = null;
50     private int _soTimeOut = -1;
51     private int _lingerTimeSecs = 30;
52     private boolean _tcpNoDelay = true;
53     private int _acceptQueueSize = 0;
54     private int _acceptors = 1;
55
56     private transient Acceptor[] _acceptor;
57     private transient ServerSocket JavaDoc _listen = null;
58     private transient boolean _running = false;
59
60     /* ------------------------------------------------------------------- */
61     /*
62      * Construct
63      */

64     public ThreadedServer()
65     {
66     }
67
68     /* ------------------------------------------------------------ */
69     /**
70      * @return The ServerSocket
71      */

72     public ServerSocket JavaDoc getServerSocket()
73     {
74         return _listen;
75     }
76
77     /* ------------------------------------------------------------------- */
78     /**
79      * Construct for specific port.
80      */

81     public ThreadedServer(int port)
82     {
83         setInetAddrPort(new InetAddrPort(port));
84     }
85
86     /* ------------------------------------------------------------------- */
87     /**
88      * Construct for specific address and port.
89      */

90     public ThreadedServer(InetAddress JavaDoc address, int port)
91     {
92         setInetAddrPort(new InetAddrPort(address, port));
93     }
94
95     /* ------------------------------------------------------------------- */
96     /**
97      * Construct for specific address and port.
98      */

99     public ThreadedServer(String JavaDoc host, int port) throws UnknownHostException JavaDoc
100     {
101         setInetAddrPort(new InetAddrPort(host, port));
102     }
103
104     /* ------------------------------------------------------------------- */
105     /**
106      * Construct for specific address and port.
107      */

108     public ThreadedServer(InetAddrPort address)
109     {
110         setInetAddrPort(address);
111     }
112
113     /* ------------------------------------------------------------ */
114     /**
115      * Set the server InetAddress and port.
116      *
117      * @param address The Address to listen on, or 0.0.0.0:port for all interfaces.
118      */

119     public synchronized void setInetAddrPort(InetAddrPort address)
120     {
121         if (_address != null && _address.equals(address)) return;
122
123         if (isStarted()) log.warn(this + " is started");
124
125         _address = address;
126     }
127
128     /* ------------------------------------------------------------ */
129     /**
130      * @return IP Address and port in a new Instance of InetAddrPort.
131      */

132     public InetAddrPort getInetAddrPort()
133     {
134         if (_address == null) return null;
135         return new InetAddrPort(_address);
136     }
137
138     /* ------------------------------------------------------------ */
139     /**
140      * @param host
141      */

142     public synchronized void setHost(String JavaDoc host) throws UnknownHostException JavaDoc
143     {
144         if (_address != null && _address.getHost() != null && _address.getHost().equals(host))
145                 return;
146
147         if (isStarted()) log.warn(this + " is started");
148
149         if (_address == null)
150             _address = new InetAddrPort(host, 0);
151         else
152             _address.setHost(host);
153     }
154
155     /* ------------------------------------------------------------ */
156     /**
157      * @return Host name
158      */

159     public String JavaDoc getHost()
160     {
161         if (_address == null || _address.getInetAddress() == null) return null;
162         return _address.getHost();
163     }
164
165     /* ------------------------------------------------------------ */
166     /**
167      * @param addr
168      */

169     public synchronized void setInetAddress(InetAddress JavaDoc addr)
170     {
171         if (_address != null && _address.getInetAddress() != null
172                 && _address.getInetAddress().equals(addr)) return;
173
174         if (isStarted()) log.warn(this + " is started");
175
176         if (_address == null)
177             _address = new InetAddrPort(addr, 0);
178         else
179             _address.setInetAddress(addr);
180     }
181
182     /* ------------------------------------------------------------ */
183     /**
184      * @return IP Address
185      */

186     public InetAddress JavaDoc getInetAddress()
187     {
188         if (_address == null) return null;
189         return _address.getInetAddress();
190     }
191
192     /* ------------------------------------------------------------ */
193     /**
194      * @param port
195      */

196     public synchronized void setPort(int port)
197     {
198         if (_address != null && _address.getPort() == port) return;
199
200         if (isStarted()) log.warn(this + " is started");
201
202         if (_address == null)
203             _address = new InetAddrPort(port);
204         else
205             _address.setPort(port);
206     }
207
208     /* ------------------------------------------------------------ */
209     /**
210      * @return port number
211      */

212     public int getPort()
213     {
214         if (_address == null) return 0;
215         return _address.getPort();
216     }
217
218     /* ------------------------------------------------------------ */
219     /**
220      * Set Max Read Time.
221      *
222      * @deprecated maxIdleTime is used instead.
223      */

224     public void setMaxReadTimeMs(int ms)
225     {
226         log.warn("setMaxReadTimeMs is deprecated. Use setMaxIdleTimeMs()");
227     }
228
229     /* ------------------------------------------------------------ */
230     /**
231      * @return milliseconds
232      */

233     public int getMaxReadTimeMs()
234     {
235         return getMaxIdleTimeMs();
236     }
237
238     /* ------------------------------------------------------------ */
239     /**
240      * @param ls seconds to linger or -1 to disable linger.
241      */

242     public void setLingerTimeSecs(int ls)
243     {
244         _lingerTimeSecs = ls;
245     }
246
247     /* ------------------------------------------------------------ */
248     /**
249      * @return seconds.
250      */

251     public int getLingerTimeSecs()
252     {
253         return _lingerTimeSecs;
254     }
255
256     /* ------------------------------------------------------------ */
257     /**
258      * @param tcpNoDelay if true then setTcpNoDelay(true) is called on accepted sockets.
259      */

260     public void setTcpNoDelay(boolean tcpNoDelay)
261     {
262         _tcpNoDelay = tcpNoDelay;
263     }
264
265     /* ------------------------------------------------------------ */
266     /**
267      * @return true if setTcpNoDelay(true) is called on accepted sockets.
268      */

269     public boolean getTcpNoDelay()
270     {
271         return _tcpNoDelay;
272     }
273
274     /* ------------------------------------------------------------ */
275     /**
276      * @return Returns the acceptQueueSize or -1 if not set.
277      */

278     public int getAcceptQueueSize()
279     {
280         return _acceptQueueSize;
281     }
282
283     /* ------------------------------------------------------------ */
284     /**
285      * The size of the queue for unaccepted connections. If not set, will default to greater of
286      * maxThreads or 50.
287      *
288      * @param acceptQueueSize The acceptQueueSize to set.
289      */

290     public void setAcceptQueueSize(int acceptQueueSize)
291     {
292         _acceptQueueSize = acceptQueueSize;
293     }
294
295     /* ------------------------------------------------------------ */
296     /**
297      * Set the number of threads used to accept connections. This should normally be 1, except when
298      * multiple CPUs are available and low latency is a high priority.
299      */

300     public void setAcceptorThreads(int n)
301     {
302         _acceptors = n;
303     }
304
305     /* ------------------------------------------------------------ */
306     /**
307      * Get the nmber of threads used to accept connections
308      */

309     public int getAcceptorThreads()
310     {
311         return _acceptors;
312     }
313
314     /* ------------------------------------------------------------------- */
315     /**
316      * Handle new connection. This method should be overridden by the derived class to implement the
317      * required handling. It is called by a thread created for it and does not need to return until
318      * it has finished it's task
319      */

320     protected void handleConnection(InputStream JavaDoc in, OutputStream JavaDoc out)
321     {
322         throw new Error JavaDoc("Either handlerConnection must be overridden");
323     }
324
325     /* ------------------------------------------------------------------- */
326     /**
327      * Handle new connection. If access is required to the actual socket, override this method
328      * instead of handleConnection(InputStream in,OutputStream out). The default implementation of
329      * this just calls handleConnection(InputStream in,OutputStream out).
330      */

331     protected void handleConnection(Socket JavaDoc connection) throws IOException JavaDoc
332     {
333         if (log.isDebugEnabled()) log.debug("Handle " + connection);
334         InputStream JavaDoc in = connection.getInputStream();
335         OutputStream JavaDoc out = connection.getOutputStream();
336
337         handleConnection(in, out);
338         out.flush();
339
340         in = null;
341         out = null;
342         connection.close();
343     }
344
345     /* ------------------------------------------------------------ */
346     /**
347      * Handle Job. Implementation of ThreadPool.handle(), calls handleConnection.
348      *
349      * @param job A Connection.
350      */

351     public void handle(Object JavaDoc job)
352     {
353         Socket JavaDoc socket = (Socket JavaDoc) job;
354         try
355         {
356             if (_tcpNoDelay) socket.setTcpNoDelay(true);
357             handleConnection(socket);
358         }
359         catch (Exception JavaDoc e)
360         {
361             log.debug("Connection problem", e);
362         }
363         finally
364         {
365             try
366             {
367                 socket.close();
368             }
369             catch (Exception JavaDoc e)
370             {
371                 log.debug("Connection problem", e);
372             }
373         }
374     }
375
376     /* ------------------------------------------------------------ */
377     /**
378      * New server socket. Creates a new servers socket. May be overriden by derived class to create
379      * specialist serversockets (eg SSL).
380      *
381      * @param address Address and port
382      * @param acceptQueueSize Accept queue size
383      * @return The new ServerSocket
384      * @exception java.io.IOException
385      */

386     protected ServerSocket JavaDoc newServerSocket(InetAddrPort address, int acceptQueueSize)
387             throws java.io.IOException JavaDoc
388     {
389         if (address == null) return new ServerSocket JavaDoc(0, acceptQueueSize);
390
391         return new ServerSocket JavaDoc(address.getPort(), acceptQueueSize, address.getInetAddress());
392     }
393
394     /* ------------------------------------------------------------ */
395     /**
396      * Accept socket connection. May be overriden by derived class to create specialist
397      * serversockets (eg SSL).
398      *
399      * @deprecated use acceptSocket(int timeout)
400      * @param ignored
401      * @param timeout The time to wait for a connection. Normally passed the ThreadPool maxIdleTime.
402      * @return Accepted Socket
403      */

404     protected Socket JavaDoc acceptSocket(ServerSocket JavaDoc ignored, int timeout)
405     {
406         return acceptSocket(timeout);
407     }
408     
409     /* ------------------------------------------------------------ */
410     /**
411      * Accept socket connection. May be overriden by derived class to create specialist
412      * serversockets (eg SSL).
413      *
414      * @param serverSocket
415      * @param timeout The time to wait for a connection. Normally passed the ThreadPool maxIdleTime.
416      * @return Accepted Socket
417      */

418     protected Socket JavaDoc acceptSocket(int timeout)
419     {
420         try
421         {
422             Socket JavaDoc s = null;
423
424             if (_listen != null)
425             {
426                 if (_soTimeOut != timeout)
427                 {
428                     _soTimeOut = timeout;
429                     _listen.setSoTimeout(_soTimeOut);
430                 }
431
432                 s = _listen.accept();
433
434                 try
435                 {
436                     if (getMaxIdleTimeMs() >= 0) s.setSoTimeout(getMaxIdleTimeMs());
437                     if (_lingerTimeSecs >= 0)
438                         s.setSoLinger(true, _lingerTimeSecs);
439                     else
440                         s.setSoLinger(false, 0);
441                 }
442                 catch (Exception JavaDoc e)
443                 {
444                     LogSupport.ignore(log, e);
445                 }
446             }
447             return s;
448         }
449         catch (java.net.SocketException JavaDoc e)
450         {
451             // TODO - this is caught and ignored due strange
452
// exception from linux java1.2.v1a
453
LogSupport.ignore(log, e);
454         }
455         catch (InterruptedIOException JavaDoc e)
456         {
457             LogSupport.ignore(log, e);
458         }
459         catch (IOException JavaDoc e)
460         {
461             log.warn(LogSupport.EXCEPTION, e);
462         }
463         return null;
464     }
465
466     /* ------------------------------------------------------------------- */
467     /**
468      * Open the server socket. This method can be called to open the server socket in advance of
469      * starting the listener. This can be used to test if the port is available.
470      *
471      * @exception IOException if an error occurs
472      */

473     public void open() throws IOException JavaDoc
474     {
475         if (_listen == null)
476         {
477             _listen = newServerSocket(_address, _acceptQueueSize);
478
479             if (_address == null)
480                 _address = new InetAddrPort(_listen.getInetAddress(), _listen.getLocalPort());
481             else
482             {
483                 if (_address.getInetAddress() == null)
484                         _address.setInetAddress(_listen.getInetAddress());
485                 if (_address.getPort() == 0) _address.setPort(_listen.getLocalPort());
486             }
487
488             _soTimeOut = getMaxIdleTimeMs();
489             if (_soTimeOut >= 0) _listen.setSoTimeout(_soTimeOut);
490         }
491     }
492
493     /* ------------------------------------------------------------------- */
494     /*
495      * Start the ThreadedServer listening
496      */

497     public synchronized void start() throws Exception JavaDoc
498     {
499         try
500         {
501             if (isStarted()) return;
502
503             open();
504
505             _running = true;
506             _acceptor = new Acceptor[_acceptors];
507             for (int a = 0; a < _acceptor.length; a++)
508             {
509                 _acceptor[a] = new Acceptor();
510                 _acceptor[a].setDaemon(isDaemon());
511                 _acceptor[a].start();
512             }
513
514             super.start();
515         }
516         catch (Exception JavaDoc e)
517         {
518             log.warn("Failed to start: " + this);
519             throw e;
520         }
521     }
522
523     /* --------------------------------------------------------------- */
524     public void stop() throws InterruptedException JavaDoc
525     {
526         synchronized (this)
527         {
528             // Signal that we are stopping
529
_running = false;
530
531             // Close the listener socket.
532
if (log.isDebugEnabled()) log.debug("closing " + _listen);
533             try
534             {
535                 if (_listen != null) _listen.close();
536                 _listen=null;
537             }
538             catch (IOException JavaDoc e)
539             {
540                 log.warn(LogSupport.EXCEPTION, e);
541             }
542
543             // Do we have an acceptor thread (running or not)
544
Thread.yield();
545             for (int a = 0; _acceptor!=null && a<_acceptor.length; a++)
546             {
547                 Acceptor acc = _acceptor[a];
548                 if (acc != null)
549                     acc.interrupt();
550             }
551             Thread.sleep(100);
552
553             for (int a = 0; _acceptor!=null && a<_acceptor.length; a++)
554             {
555                 Acceptor acc = _acceptor[a];
556
557                 if (acc != null)
558                 {
559                     acc.forceStop();
560                     _acceptor[a] = null;
561                 }
562             }
563         }
564
565         // Stop the thread pool
566
try
567         {
568             super.stop();
569         }
570         catch (Exception JavaDoc e)
571         {
572             log.warn(LogSupport.EXCEPTION, e);
573         }
574         finally
575         {
576             synchronized (this)
577             {
578                 _acceptor = null;
579             }
580         }
581     }
582
583     /* ------------------------------------------------------------ */
584     /* ------------------------------------------------------------ */
585     /**
586      * Kill a job. This method closes IDLE and socket associated with a job
587      *
588      * @param thread
589      * @param job
590      */

591     protected void stopJob(Thread JavaDoc thread, Object JavaDoc job)
592     {
593         if (job instanceof Socket JavaDoc)
594         {
595             try
596             {
597                 ((Socket JavaDoc) job).close();
598             }
599             catch (Exception JavaDoc e)
600             {
601                 LogSupport.ignore(log, e);
602             }
603         }
604         super.stopJob(thread, job);
605     }
606
607     /* ------------------------------------------------------------ */
608     public String JavaDoc toString()
609     {
610         if (_address == null) return getName() + "@0.0.0.0:0";
611         if (_listen != null)
612                 return getName() + "@" + _listen.getInetAddress().getHostAddress() + ":"
613                         + _listen.getLocalPort();
614         return getName() + "@" + getInetAddrPort();
615     }
616
617     /* ------------------------------------------------------------ */
618     /* ------------------------------------------------------------ */
619     /* ------------------------------------------------------------ */
620     private class Acceptor extends Thread JavaDoc
621     {
622         /* ------------------------------------------------------------ */
623         public void run()
624         {
625             ThreadedServer threadedServer = ThreadedServer.this;
626             try
627             {
628                 this.setName("Acceptor " + _listen);
629                 while (_running)
630                 {
631                     try
632                     {
633                         // Accept a socket
634
Socket JavaDoc socket = acceptSocket(_soTimeOut);
635                         
636                         // Handle the socket
637
if (socket != null)
638                         {
639                             if (_running)
640                                 threadedServer.run(socket);
641                             else
642                                 socket.close();
643                         }
644                     }
645                     catch (Throwable JavaDoc e)
646                     {
647                         if (_running)
648                             log.warn(LogSupport.EXCEPTION, e);
649                         else
650                             log.debug(LogSupport.EXCEPTION, e);
651                     }
652                 }
653             }
654             finally
655             {
656                 if (_running)
657                     log.warn("Stopping " + this.getName());
658                 else
659                     log.info("Stopping " + this.getName());
660                 synchronized (threadedServer)
661                 {
662                     if (_acceptor != null)
663                     {
664                         for (int a = 0; a < _acceptor.length; a++)
665                             if (_acceptor[a] == this)
666                                 _acceptor[a] = null;
667                     }
668                     threadedServer.notifyAll();
669                 }
670             }
671         }
672
673         /* ------------------------------------------------------------ */
674         void forceStop()
675         {
676             if (_listen != null && _address != null)
677             {
678                 InetAddress JavaDoc addr = _address.getInetAddress();
679                 try
680                 {
681                     if (addr == null || addr.toString().startsWith("0.0.0.0"))
682                             addr = InetAddress.getByName("127.0.0.1");
683                     if (log.isDebugEnabled())
684                             log.debug("Self connect to close listener " + addr + ":"
685                                     + _address.getPort());
686                     Socket JavaDoc socket = new Socket JavaDoc(addr, _address.getPort());
687                     Thread.yield();
688                     socket.close();
689                     Thread.yield();
690                 }
691                 catch (IOException JavaDoc e)
692                 {
693                     if (log.isDebugEnabled())
694                             log.debug("problem stopping acceptor " + addr + ": ", e);
695                 }
696             }
697         }
698     }
699
700 }
701
Popular Tags