KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mortbay > http > nio > SocketChannelListener


1 // ========================================================================
2
// $Id: SocketChannelListener.java,v 1.6 2005/11/03 18:21:59 gregwilkins Exp $
3
// Copyright 2003-2004 Mort Bay Consulting Pty. Ltd.
4
// ------------------------------------------------------------------------
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
// ========================================================================
15

16 package org.mortbay.http.nio;
17
18 import java.io.IOException JavaDoc;
19 import java.net.InetSocketAddress JavaDoc;
20 import java.net.Socket JavaDoc;
21 import java.net.UnknownHostException JavaDoc;
22 import java.nio.ByteBuffer JavaDoc;
23 import java.nio.channels.SelectionKey JavaDoc;
24 import java.nio.channels.Selector JavaDoc;
25 import java.nio.channels.ServerSocketChannel JavaDoc;
26 import java.nio.channels.SocketChannel JavaDoc;
27 import java.util.Iterator JavaDoc;
28
29 import org.apache.commons.logging.Log;
30 import org.mortbay.log.LogFactory;
31 import org.mortbay.http.HttpConnection;
32 import org.mortbay.http.HttpHandler;
33 import org.mortbay.http.HttpListener;
34 import org.mortbay.http.HttpMessage;
35 import org.mortbay.http.HttpRequest;
36 import org.mortbay.http.HttpServer;
37 import org.mortbay.util.LineInput;
38 import org.mortbay.util.LogSupport;
39 import org.mortbay.util.ThreadPool;
40
41 /* ------------------------------------------------------------------------------- */
42 /** EXPERIMENTAL NIO listener!
43  *
44  * @version $Revision: 1.6 $
45  * @author gregw
46  */

47 public class SocketChannelListener extends ThreadPool implements HttpListener
48 {
49     private static Log log= LogFactory.getLog(SocketChannelListener.class);
50     
51     private InetSocketAddress JavaDoc _address;
52     private int _bufferSize= 4096;
53     private int _bufferReserve= 512;
54     private int _sslPort;
55     private int _lingerTimeSecs=5;
56     private HttpHandler _handler;
57     
58     private transient HttpServer _server;
59
60     private transient ServerSocketChannel JavaDoc _acceptChannel;
61     private transient Selector JavaDoc _selector;
62     private transient SelectorThread _selectorThread;
63     private transient boolean _isLow=false;
64     private transient boolean _isOut=false;
65     private transient long _warned=0;
66     
67     
68     /* ------------------------------------------------------------------------------- */
69     /** Constructor.
70      *
71      */

72     public SocketChannelListener()
73     {
74         super();
75     }
76
77     /* ------------------------------------------------------------------------------- */
78     /*
79      * @see org.mortbay.http.HttpListener#setHttpServer(org.mortbay.http.HttpServer)
80      */

81     public void setHttpServer(HttpServer server)
82     {
83         _server=server;
84     }
85
86     /* ------------------------------------------------------------------------------- */
87     /*
88      * @see org.mortbay.http.HttpListener#getHttpServer()
89      */

90     public HttpServer getHttpServer()
91     {
92         return _server;
93     }
94
95     /* ------------------------------------------------------------------------------- */
96     /**
97      * @see org.mortbay.http.HttpListener#setHost(java.lang.String)
98      */

99     public void setHost(String JavaDoc host) throws UnknownHostException JavaDoc
100     {
101         _address = new InetSocketAddress JavaDoc(host, _address == null ? 0 : _address.getPort());
102     }
103
104     /* ------------------------------------------------------------------------------- */
105     /*
106      * @see org.mortbay.http.HttpListener#getHost()
107      */

108     public String JavaDoc getHost()
109     {
110         if (_address == null || _address.getAddress() == null)
111             return null;
112         return _address.getHostName();
113     }
114
115     /* ------------------------------------------------------------------------------- */
116     /*
117      * @see org.mortbay.http.HttpListener#setPort(int)
118      */

119     public void setPort(int port)
120     {
121         if (_address == null || _address.getHostName() == null)
122             _address= new InetSocketAddress JavaDoc(port);
123         else
124             _address= new InetSocketAddress JavaDoc(_address.getHostName(), port);
125     }
126
127     /* ------------------------------------------------------------------------------- */
128     /*
129      * @see org.mortbay.http.HttpListener#getPort()
130      */

131     public int getPort()
132     {
133         if (_address == null)
134             return 0;
135         return _address.getPort();
136     }
137
138     /* ------------------------------------------------------------ */
139     public void setBufferSize(int size)
140     {
141         _bufferSize= size;
142     }
143     
144     /* ------------------------------------------------------------------------------- */
145     /*
146      * @see org.mortbay.http.HttpListener#getBufferSize()
147      */

148     public int getBufferSize()
149     {
150         return _bufferSize;
151     }
152
153     /* ------------------------------------------------------------ */
154     public void setBufferReserve(int size)
155     {
156         _bufferReserve= size;
157     }
158     
159     /* ------------------------------------------------------------------------------- */
160     /*
161      * @see org.mortbay.http.HttpListener#getBufferReserve()
162      */

163     public int getBufferReserve()
164     {
165         return _bufferReserve;
166     }
167
168     /* ------------------------------------------------------------------------------- */
169     /*
170      * @see org.mortbay.http.HttpListener#getDefaultScheme()
171      */

172     public String JavaDoc getDefaultScheme()
173     {
174         return HttpMessage.__SCHEME;
175     }
176
177     /* ------------------------------------------------------------------------------- */
178     /*
179      * @see org.mortbay.http.HttpListener#customizeRequest(org.mortbay.http.HttpConnection, org.mortbay.http.HttpRequest)
180      */

181     public void customizeRequest(HttpConnection connection, HttpRequest request)
182     {
183         // Nothing to do
184
}
185
186     /* ------------------------------------------------------------------------------- */
187     /*
188      * @see org.mortbay.http.HttpListener#persistConnection(org.mortbay.http.HttpConnection)
189      */

190     public void persistConnection(HttpConnection connection)
191     {
192         // TODO low resources check?
193
}
194
195     /* ------------------------------------------------------------------------------- */
196     /*
197      * @see org.mortbay.http.HttpListener#isLowOnResources()
198      */

199     public boolean isLowOnResources()
200     {
201         boolean low = (getMaxThreads()-getThreads()+getIdleThreads())<getMinThreads();
202         
203         if (low && !_isLow)
204         {
205             log.info("LOW ON THREADS (("+
206                       getMaxThreads()+"-"+
207                       getThreads()+"+"+
208                       getIdleThreads()+")<"+
209                       getMinThreads()+") on "+ this);
210             _warned=System.currentTimeMillis();
211             _isLow=true;
212         }
213         else if (!low && _isLow)
214         {
215             if (System.currentTimeMillis()-_warned > 1000)
216             {
217                 _isOut=false;
218                 _isLow=false;
219             }
220         }
221         return low;
222     }
223
224     /* ------------------------------------------------------------------------------- */
225     /*
226      * @see org.mortbay.http.HttpListener#isOutOfResources()
227      */

228     public boolean isOutOfResources()
229     {
230         boolean out =
231             getThreads()==getMaxThreads() &&
232             getIdleThreads()==0;
233         
234         if (out && !_isOut)
235         {
236             log.warn("OUT OF THREADS: "+this);
237             _warned=System.currentTimeMillis();
238             _isLow=true;
239             _isOut=true;
240         }
241         
242         return out;
243     }
244
245     /* ------------------------------------------------------------------------------- */
246     /** get_sslPort.
247      * @return Port to redirect integral and confidential requests to.
248      */

249     public int getSslPort()
250     {
251         return _sslPort;
252     }
253
254     /* ------------------------------------------------------------------------------- */
255     /** set_sslPort.
256      * @param p Port to redirect integral and confidential requests to.
257      */

258     public void setSslPort(int p)
259     {
260         _sslPort= p;
261     }
262     
263     /* ------------------------------------------------------------------------------- */
264     /*
265      * @see org.mortbay.http.HttpListener#isIntegral(org.mortbay.http.HttpConnection)
266      */

267     public boolean isIntegral(HttpConnection connection)
268     {
269         return false;
270     }
271
272     /* ------------------------------------------------------------------------------- */
273     /*
274      * @see org.mortbay.http.HttpListener#getIntegralScheme()
275      */

276     public String JavaDoc getIntegralScheme()
277     {
278         return HttpMessage.__SSL_SCHEME;
279     }
280
281     /* ------------------------------------------------------------------------------- */
282     /*
283      * @see org.mortbay.http.HttpListener#getIntegralPort()
284      */

285     public int getIntegralPort()
286     {
287         return _sslPort;
288     }
289
290     /* ------------------------------------------------------------------------------- */
291     /*
292      * @see org.mortbay.http.HttpListener#isConfidential(org.mortbay.http.HttpConnection)
293      */

294     public boolean isConfidential(HttpConnection connection)
295     {
296         return false;
297     }
298
299     /* ------------------------------------------------------------------------------- */
300     /*
301      * @see org.mortbay.http.HttpListener#getConfidentialScheme()
302      */

303     public String JavaDoc getConfidentialScheme()
304     {
305         return HttpMessage.__SSL_SCHEME;
306     }
307
308     /* ------------------------------------------------------------------------------- */
309     /*
310      * @see org.mortbay.http.HttpListener#getConfidentialPort()
311      */

312     public int getConfidentialPort()
313     {
314         return _sslPort;
315     }
316
317     /* ------------------------------------------------------------ */
318     /**
319      * @param sec seconds to linger or -1 to disable linger.
320      */

321     public void setLingerTimeSecs(int ls)
322     {
323         _lingerTimeSecs= ls;
324     }
325
326     /* ------------------------------------------------------------ */
327     /**
328      * @return seconds.
329      */

330     public int getLingerTimeSecs()
331     {
332         return _lingerTimeSecs;
333     }
334     
335     /* ------------------------------------------------------------ */
336     public void setHttpHandler(HttpHandler handler)
337     {
338         _handler= handler;
339     }
340     
341     /* ------------------------------------------------------------------------------- */
342     /**
343      * @see org.mortbay.http.HttpListener#getHttpHandler()
344      */

345     public HttpHandler getHttpHandler()
346     {
347         return _handler;
348     }
349
350
351     /* ------------------------------------------------------------ */
352     public void start() throws Exception JavaDoc
353     {
354         if (isStarted())
355             throw new IllegalStateException JavaDoc("Started");
356
357         // Create a new server socket and set to non blocking mode
358
_acceptChannel= ServerSocketChannel.open();
359         _acceptChannel.configureBlocking(false);
360
361         // Bind the server socket to the local host and port
362
_acceptChannel.socket().bind(_address);
363
364         // Read the address back from the server socket to fix issues
365
// with listeners on anonymous ports
366
_address= (InetSocketAddress JavaDoc)_acceptChannel.socket().getLocalSocketAddress();
367
368         // create a selector;
369
_selector= Selector.open();
370
371         // Register accepts on the server socket with the selector.
372
_acceptChannel.register(_selector, SelectionKey.OP_ACCEPT);
373
374         // Start selector thread
375
_selectorThread= new SelectorThread();
376         _selectorThread.start();
377
378         // Start the thread Pool
379
super.start();
380         log.info("Started SocketChannelListener on " + getHost()+":"+getPort());
381     }
382     
383
384     /* ------------------------------------------------------------ */
385     public void stop() throws InterruptedException JavaDoc
386     {
387         if (_selectorThread != null)
388             _selectorThread.doStop();
389             
390         super.stop();
391         log.info("Stopped SocketChannelListener on " + getHost()+":"+getPort());
392     }
393
394
395     /* ------------------------------------------------------------ */
396     /* ------------------------------------------------------------ */
397     /* ------------------------------------------------------------ */
398     private class SelectorThread extends Thread JavaDoc
399     {
400         boolean _running= false;
401
402         /* ------------------------------------------------------------ */
403         public void run()
404         {
405             try
406             {
407                 _running= true;
408                 while (_running)
409                 {
410
411                     SelectionKey JavaDoc key= null;
412                     try
413                     {
414                         _selector.select();
415                         Iterator JavaDoc iter= _selector.selectedKeys().iterator();
416
417                         while (iter.hasNext())
418                         {
419                             key= (SelectionKey JavaDoc)iter.next();
420                             if (key.isAcceptable())
421                                 doAccept(key);
422                             if (key.isReadable())
423                                 doRead(key);
424                             key= null;
425                             iter.remove();
426                         }
427                     }
428                     catch (Exception JavaDoc e)
429                     {
430                         if (_running)
431                             log.warn("selector", e);
432                         if (key != null)
433                             key.cancel();
434                     }
435                 }
436             }
437             finally
438             {
439                 log.info("Stopping " + this.getName());
440
441                 try
442                 {
443                     if (_acceptChannel != null)
444                         _acceptChannel.close();
445                 }
446                 catch (IOException JavaDoc e)
447                 {
448                     LogSupport.ignore(log, e);
449                 }
450                 try
451                 {
452                     if (_selector != null)
453                         _selector.close();
454                 }
455                 catch (IOException JavaDoc e)
456                 {
457                     LogSupport.ignore(log, e);
458                 }
459
460                 _selector= null;
461                 _acceptChannel= null;
462                 _selectorThread= null;
463             }
464         }
465
466         /* ------------------------------------------------------------ */
467         void doAccept(SelectionKey JavaDoc key)
468             throws IOException JavaDoc, InterruptedException JavaDoc
469         {
470             if (isLowOnResources())
471                 return;
472                 
473             ServerSocketChannel JavaDoc server = (ServerSocketChannel JavaDoc) key.channel();
474             SocketChannel JavaDoc channel = server.accept();
475             channel.configureBlocking(false);
476             SelectionKey JavaDoc readKey = channel.register(_selector, SelectionKey.OP_READ);
477             
478             Socket JavaDoc socket=channel.socket();
479             try
480             {
481                 if (getMaxIdleTimeMs() >= 0)
482                     socket.setSoTimeout(getMaxIdleTimeMs());
483                 if (_lingerTimeSecs >= 0)
484                     socket.setSoLinger(true, _lingerTimeSecs);
485                 else
486                     socket.setSoLinger(false, 0);
487             }
488             catch (Exception JavaDoc e)
489             {
490                 LogSupport.ignore(log, e);
491             }
492
493             Connection connection=new Connection(channel,readKey, SocketChannelListener.this);
494             readKey.attach(connection);
495         }
496
497         /* ------------------------------------------------------------ */
498         void doRead(SelectionKey JavaDoc key)
499             throws IOException JavaDoc
500         {
501             Connection connection = (Connection)key.attachment();
502             if (connection._idle && isOutOfResources())
503                 // Don't handle idle connections if out of resources.
504
return;
505             ByteBuffer JavaDoc buf= connection._in.getBuffer();
506             int count = ((SocketChannel JavaDoc)key.channel()).read(buf);
507             if (count<0)
508             {
509                 connection.close();
510             }
511             else
512             {
513                 buf.flip();
514                 connection.write(buf);
515             }
516         }
517
518         void doStop()
519         {
520             _running=false;
521             _selector.wakeup();
522             Thread.yield();
523         }
524     }
525
526
527     /* ------------------------------------------------------------------------------- */
528     /* ------------------------------------------------------------------------------- */
529     /* ------------------------------------------------------------------------------- */
530     private static class Connection
531       extends HttpConnection
532       implements Runnable JavaDoc
533     {
534         boolean _idle=true;
535         SocketChannel JavaDoc _channel;
536         SelectionKey JavaDoc _key;
537         ByteBufferInputStream _in;
538         SocketChannelOutputStream _out;
539         SocketChannelListener _listener;
540         
541         Connection(SocketChannel JavaDoc channel,SelectionKey JavaDoc key, SocketChannelListener listener)
542         {
543             super(listener,
544                          channel.socket().getInetAddress(),
545                          new ByteBufferInputStream(listener.getBufferSize()),
546                          new SocketChannelOutputStream(channel,listener.getBufferSize()),
547                          channel);
548             _channel=channel;
549             _key=key;
550             _listener=listener;
551             _in=(ByteBufferInputStream) ((LineInput)(getInputStream().getInputStream())).getInputStream();
552             _out=(SocketChannelOutputStream)(getOutputStream().getOutputStream());
553             _in.setTimeout(listener.getMaxIdleTimeMs());
554         }
555         
556
557         /* ------------------------------------------------------------------------------- */
558         void write(ByteBuffer JavaDoc buf)
559         {
560             if (!_idle)
561                 _in.write(buf);
562             else
563             {
564                 boolean written=false;
565                 
566                 // Is there any actual content there?
567
for (int i=buf.position();i<buf.limit();i++)
568                 {
569                     byte b = buf.get(i);
570                     
571                     if (b>' ')
572                     {
573                         buf.position(i);
574                     
575                         try
576                         {
577                             written=true;
578                             _in.write(buf);
579                             _listener.run(this);
580                             _idle=false;
581                         }
582                         catch(InterruptedException JavaDoc e)
583                         {
584                             LogSupport.ignore(log, e);
585                         }
586                         finally
587                         {
588                             i=buf.limit();
589                         }
590                     }
591                 }
592                 
593                 if (!written)
594                 {
595                     _in.recycle(buf);
596                 }
597             }
598         }
599         
600         /* ------------------------------------------------------------------------------- */
601         /**
602          */

603         public void run()
604         {
605             try
606             {
607                 associateThread();
608                 while (_in!=null && _in.available()>0 && _listener.isStarted())
609                 {
610                     if (handleNext())
611                         recycle();
612                     else
613                         destroy();
614                 }
615             }
616             catch(IOException JavaDoc e)
617             {
618                 log.warn(e.toString());
619                 log.debug(e);
620                 destroy();
621             }
622             finally
623             {
624                 _idle=true;
625                 disassociateThread();
626             }
627         }
628
629         /* ------------------------------------------------------------------------------- */
630         public synchronized void close()
631             throws IOException JavaDoc
632          {
633                  _out.close();
634                  _in.close();
635                  if (!_channel.isOpen())
636                     return;
637                  _key.cancel();
638                  _channel.socket().shutdownOutput();
639                  _channel.close();
640                  _channel.socket().close();
641                  super.close();
642                  _channel.close();
643          }
644          
645         /* ------------------------------------------------------------------------------- */
646         public void destroy()
647         {
648             super.destroy();
649             if (_in!=null)
650                 _in.destroy();
651             _in=null;
652             if (_out!=null)
653                 _out.destroy();
654             _out=null;
655             _channel=null;
656             _key=null;
657             _listener=null;
658         }
659         
660     }
661     
662 }
663
Popular Tags