1 16 package org.mortbay.http.nio; 17 18 import java.io.IOException ; 19 import java.net.InetSocketAddress ; 20 import java.net.Socket ; 21 import java.net.UnknownHostException ; 22 import java.nio.ByteBuffer ; 23 import java.nio.channels.SelectionKey ; 24 import java.nio.channels.Selector ; 25 import java.nio.channels.ServerSocketChannel ; 26 import java.nio.channels.SocketChannel ; 27 import java.util.Iterator ; 28 29 import org.apache.commons.logging.Log; 30 import org.mortbay.log.LogFactory; 31 import org.mortbay.http.HttpConnection; 32 import org.mortbay.http.HttpHandler; 33 import org.mortbay.http.HttpListener; 34 import org.mortbay.http.HttpMessage; 35 import org.mortbay.http.HttpRequest; 36 import org.mortbay.http.HttpServer; 37 import org.mortbay.util.LineInput; 38 import org.mortbay.util.LogSupport; 39 import org.mortbay.util.ThreadPool; 40 41 42 47 public class SocketChannelListener extends ThreadPool implements HttpListener 48 { 49 private static Log log= LogFactory.getLog(SocketChannelListener.class); 50 51 private InetSocketAddress _address; 52 private int _bufferSize= 4096; 53 private int _bufferReserve= 512; 54 private int _sslPort; 55 private int _lingerTimeSecs=5; 56 private HttpHandler _handler; 57 58 private transient HttpServer _server; 59 60 private transient ServerSocketChannel _acceptChannel; 61 private transient Selector _selector; 62 private transient SelectorThread _selectorThread; 63 private transient boolean _isLow=false; 64 private transient boolean _isOut=false; 65 private transient long _warned=0; 66 67 68 69 72 public SocketChannelListener() 73 { 74 super(); 75 } 76 77 78 81 public void setHttpServer(HttpServer server) 82 { 83 _server=server; 84 } 85 86 87 90 public HttpServer getHttpServer() 91 { 92 return _server; 93 } 94 95 96 99 public void setHost(String host) throws UnknownHostException 100 { 101 _address = new InetSocketAddress (host, _address == null ? 0 : _address.getPort()); 102 } 103 104 105 108 public String getHost() 109 { 110 if (_address == null || _address.getAddress() == null) 111 return null; 112 return _address.getHostName(); 113 } 114 115 116 119 public void setPort(int port) 120 { 121 if (_address == null || _address.getHostName() == null) 122 _address= new InetSocketAddress (port); 123 else 124 _address= new InetSocketAddress (_address.getHostName(), port); 125 } 126 127 128 131 public int getPort() 132 { 133 if (_address == null) 134 return 0; 135 return _address.getPort(); 136 } 137 138 139 public void setBufferSize(int size) 140 { 141 _bufferSize= size; 142 } 143 144 145 148 public int getBufferSize() 149 { 150 return _bufferSize; 151 } 152 153 154 public void setBufferReserve(int size) 155 { 156 _bufferReserve= size; 157 } 158 159 160 163 public int getBufferReserve() 164 { 165 return _bufferReserve; 166 } 167 168 169 172 public String getDefaultScheme() 173 { 174 return HttpMessage.__SCHEME; 175 } 176 177 178 181 public void customizeRequest(HttpConnection connection, HttpRequest request) 182 { 183 } 185 186 187 190 public void persistConnection(HttpConnection connection) 191 { 192 } 194 195 196 199 public boolean isLowOnResources() 200 { 201 boolean low = (getMaxThreads()-getThreads()+getIdleThreads())<getMinThreads(); 202 203 if (low && !_isLow) 204 { 205 log.info("LOW ON THREADS (("+ 206 getMaxThreads()+"-"+ 207 getThreads()+"+"+ 208 getIdleThreads()+")<"+ 209 getMinThreads()+") on "+ this); 210 _warned=System.currentTimeMillis(); 211 _isLow=true; 212 } 213 else if (!low && _isLow) 214 { 215 if (System.currentTimeMillis()-_warned > 1000) 216 { 217 _isOut=false; 218 _isLow=false; 219 } 220 } 221 return low; 222 } 223 224 225 228 public boolean isOutOfResources() 229 { 230 boolean out = 231 getThreads()==getMaxThreads() && 232 getIdleThreads()==0; 233 234 if (out && !_isOut) 235 { 236 log.warn("OUT OF THREADS: "+this); 237 _warned=System.currentTimeMillis(); 238 _isLow=true; 239 _isOut=true; 240 } 241 242 return out; 243 } 244 245 246 249 public int getSslPort() 250 { 251 return _sslPort; 252 } 253 254 255 258 public void setSslPort(int p) 259 { 260 _sslPort= p; 261 } 262 263 264 267 public boolean isIntegral(HttpConnection connection) 268 { 269 return false; 270 } 271 272 273 276 public String getIntegralScheme() 277 { 278 return HttpMessage.__SSL_SCHEME; 279 } 280 281 282 285 public int getIntegralPort() 286 { 287 return _sslPort; 288 } 289 290 291 294 public boolean isConfidential(HttpConnection connection) 295 { 296 return false; 297 } 298 299 300 303 public String getConfidentialScheme() 304 { 305 return HttpMessage.__SSL_SCHEME; 306 } 307 308 309 312 public int getConfidentialPort() 313 { 314 return _sslPort; 315 } 316 317 318 321 public void setLingerTimeSecs(int ls) 322 { 323 _lingerTimeSecs= ls; 324 } 325 326 327 330 public int getLingerTimeSecs() 331 { 332 return _lingerTimeSecs; 333 } 334 335 336 public void setHttpHandler(HttpHandler handler) 337 { 338 _handler= handler; 339 } 340 341 342 345 public HttpHandler getHttpHandler() 346 { 347 return _handler; 348 } 349 350 351 352 public void start() throws Exception 353 { 354 if (isStarted()) 355 throw new IllegalStateException ("Started"); 356 357 _acceptChannel= ServerSocketChannel.open(); 359 _acceptChannel.configureBlocking(false); 360 361 _acceptChannel.socket().bind(_address); 363 364 _address= (InetSocketAddress )_acceptChannel.socket().getLocalSocketAddress(); 367 368 _selector= Selector.open(); 370 371 _acceptChannel.register(_selector, SelectionKey.OP_ACCEPT); 373 374 _selectorThread= new SelectorThread(); 376 _selectorThread.start(); 377 378 super.start(); 380 log.info("Started SocketChannelListener on " + getHost()+":"+getPort()); 381 } 382 383 384 385 public void stop() throws InterruptedException 386 { 387 if (_selectorThread != null) 388 _selectorThread.doStop(); 389 390 super.stop(); 391 log.info("Stopped SocketChannelListener on " + getHost()+":"+getPort()); 392 } 393 394 395 396 397 398 private class SelectorThread extends Thread 399 { 400 boolean _running= false; 401 402 403 public void run() 404 { 405 try 406 { 407 _running= true; 408 while (_running) 409 { 410 411 SelectionKey key= null; 412 try 413 { 414 _selector.select(); 415 Iterator iter= _selector.selectedKeys().iterator(); 416 417 while (iter.hasNext()) 418 { 419 key= (SelectionKey )iter.next(); 420 if (key.isAcceptable()) 421 doAccept(key); 422 if (key.isReadable()) 423 doRead(key); 424 key= null; 425 iter.remove(); 426 } 427 } 428 catch (Exception e) 429 { 430 if (_running) 431 log.warn("selector", e); 432 if (key != null) 433 key.cancel(); 434 } 435 } 436 } 437 finally 438 { 439 log.info("Stopping " + this.getName()); 440 441 try 442 { 443 if (_acceptChannel != null) 444 _acceptChannel.close(); 445 } 446 catch (IOException e) 447 { 448 LogSupport.ignore(log, e); 449 } 450 try 451 { 452 if (_selector != null) 453 _selector.close(); 454 } 455 catch (IOException e) 456 { 457 LogSupport.ignore(log, e); 458 } 459 460 _selector= null; 461 _acceptChannel= null; 462 _selectorThread= null; 463 } 464 } 465 466 467 void doAccept(SelectionKey key) 468 throws IOException , InterruptedException 469 { 470 if (isLowOnResources()) 471 return; 472 473 ServerSocketChannel server = (ServerSocketChannel ) key.channel(); 474 SocketChannel channel = server.accept(); 475 channel.configureBlocking(false); 476 SelectionKey readKey = channel.register(_selector, SelectionKey.OP_READ); 477 478 Socket socket=channel.socket(); 479 try 480 { 481 if (getMaxIdleTimeMs() >= 0) 482 socket.setSoTimeout(getMaxIdleTimeMs()); 483 if (_lingerTimeSecs >= 0) 484 socket.setSoLinger(true, _lingerTimeSecs); 485 else 486 socket.setSoLinger(false, 0); 487 } 488 catch (Exception e) 489 { 490 LogSupport.ignore(log, e); 491 } 492 493 Connection connection=new Connection(channel,readKey, SocketChannelListener.this); 494 readKey.attach(connection); 495 } 496 497 498 void doRead(SelectionKey key) 499 throws IOException 500 { 501 Connection connection = (Connection)key.attachment(); 502 if (connection._idle && isOutOfResources()) 503 return; 505 ByteBuffer buf= connection._in.getBuffer(); 506 int count = ((SocketChannel )key.channel()).read(buf); 507 if (count<0) 508 { 509 connection.close(); 510 } 511 else 512 { 513 buf.flip(); 514 connection.write(buf); 515 } 516 } 517 518 void doStop() 519 { 520 _running=false; 521 _selector.wakeup(); 522 Thread.yield(); 523 } 524 } 525 526 527 528 529 530 private static class Connection 531 extends HttpConnection 532 implements Runnable 533 { 534 boolean _idle=true; 535 SocketChannel _channel; 536 SelectionKey _key; 537 ByteBufferInputStream _in; 538 SocketChannelOutputStream _out; 539 SocketChannelListener _listener; 540 541 Connection(SocketChannel channel,SelectionKey key, SocketChannelListener listener) 542 { 543 super(listener, 544 channel.socket().getInetAddress(), 545 new ByteBufferInputStream(listener.getBufferSize()), 546 new SocketChannelOutputStream(channel,listener.getBufferSize()), 547 channel); 548 _channel=channel; 549 _key=key; 550 _listener=listener; 551 _in=(ByteBufferInputStream) ((LineInput)(getInputStream().getInputStream())).getInputStream(); 552 _out=(SocketChannelOutputStream)(getOutputStream().getOutputStream()); 553 _in.setTimeout(listener.getMaxIdleTimeMs()); 554 } 555 556 557 558 void write(ByteBuffer buf) 559 { 560 if (!_idle) 561 _in.write(buf); 562 else 563 { 564 boolean written=false; 565 566 for (int i=buf.position();i<buf.limit();i++) 568 { 569 byte b = buf.get(i); 570 571 if (b>' ') 572 { 573 buf.position(i); 574 575 try 576 { 577 written=true; 578 _in.write(buf); 579 _listener.run(this); 580 _idle=false; 581 } 582 catch(InterruptedException e) 583 { 584 LogSupport.ignore(log, e); 585 } 586 finally 587 { 588 i=buf.limit(); 589 } 590 } 591 } 592 593 if (!written) 594 { 595 _in.recycle(buf); 596 } 597 } 598 } 599 600 601 603 public void run() 604 { 605 try 606 { 607 associateThread(); 608 while (_in!=null && _in.available()>0 && _listener.isStarted()) 609 { 610 if (handleNext()) 611 recycle(); 612 else 613 destroy(); 614 } 615 } 616 catch(IOException e) 617 { 618 log.warn(e.toString()); 619 log.debug(e); 620 destroy(); 621 } 622 finally 623 { 624 _idle=true; 625 disassociateThread(); 626 } 627 } 628 629 630 public synchronized void close() 631 throws IOException 632 { 633 _out.close(); 634 _in.close(); 635 if (!_channel.isOpen()) 636 return; 637 _key.cancel(); 638 _channel.socket().shutdownOutput(); 639 _channel.close(); 640 _channel.socket().close(); 641 super.close(); 642 _channel.close(); 643 } 644 645 646 public void destroy() 647 { 648 super.destroy(); 649 if (_in!=null) 650 _in.destroy(); 651 _in=null; 652 if (_out!=null) 653 _out.destroy(); 654 _out=null; 655 _channel=null; 656 _key=null; 657 _listener=null; 658 } 659 660 } 661 662 } 663 | Popular Tags |