1 16 package org.mortbay.util; 17 18 import java.io.IOException ; 19 import java.io.InputStream ; 20 import java.io.InterruptedIOException ; 21 import java.io.OutputStream ; 22 import java.net.InetAddress ; 23 import java.net.ServerSocket ; 24 import java.net.Socket ; 25 import java.net.UnknownHostException ; 26 27 import org.apache.commons.logging.Log; 28 import org.mortbay.log.LogFactory; 29 30 31 44 abstract public class ThreadedServer extends ThreadPool 45 { 46 private static Log log = LogFactory.getLog(ThreadedServer.class); 47 48 49 private InetAddrPort _address = null; 50 private int _soTimeOut = -1; 51 private int _lingerTimeSecs = 30; 52 private boolean _tcpNoDelay = true; 53 private int _acceptQueueSize = 0; 54 private int _acceptors = 1; 55 56 private transient Acceptor[] _acceptor; 57 private transient ServerSocket _listen = null; 58 private transient boolean _running = false; 59 60 61 64 public ThreadedServer() 65 { 66 } 67 68 69 72 public ServerSocket getServerSocket() 73 { 74 return _listen; 75 } 76 77 78 81 public ThreadedServer(int port) 82 { 83 setInetAddrPort(new InetAddrPort(port)); 84 } 85 86 87 90 public ThreadedServer(InetAddress address, int port) 91 { 92 setInetAddrPort(new InetAddrPort(address, port)); 93 } 94 95 96 99 public ThreadedServer(String host, int port) throws UnknownHostException 100 { 101 setInetAddrPort(new InetAddrPort(host, port)); 102 } 103 104 105 108 public ThreadedServer(InetAddrPort address) 109 { 110 setInetAddrPort(address); 111 } 112 113 114 119 public synchronized void setInetAddrPort(InetAddrPort address) 120 { 121 if (_address != null && _address.equals(address)) return; 122 123 if (isStarted()) log.warn(this + " is started"); 124 125 _address = address; 126 } 127 128 129 132 public InetAddrPort getInetAddrPort() 133 { 134 if (_address == null) return null; 135 return new InetAddrPort(_address); 136 } 137 138 139 142 public synchronized void setHost(String host) throws UnknownHostException 143 { 144 if (_address != null && _address.getHost() != null && _address.getHost().equals(host)) 145 return; 146 147 if (isStarted()) log.warn(this + " is started"); 148 149 if (_address == null) 150 _address = new InetAddrPort(host, 0); 151 else 152 _address.setHost(host); 153 } 154 155 156 159 public String getHost() 160 { 161 if (_address == null || _address.getInetAddress() == null) return null; 162 return _address.getHost(); 163 } 164 165 166 169 public synchronized void setInetAddress(InetAddress addr) 170 { 171 if (_address != null && _address.getInetAddress() != null 172 && _address.getInetAddress().equals(addr)) return; 173 174 if (isStarted()) log.warn(this + " is started"); 175 176 if (_address == null) 177 _address = new InetAddrPort(addr, 0); 178 else 179 _address.setInetAddress(addr); 180 } 181 182 183 186 public InetAddress getInetAddress() 187 { 188 if (_address == null) return null; 189 return _address.getInetAddress(); 190 } 191 192 193 196 public synchronized void setPort(int port) 197 { 198 if (_address != null && _address.getPort() == port) return; 199 200 if (isStarted()) log.warn(this + " is started"); 201 202 if (_address == null) 203 _address = new InetAddrPort(port); 204 else 205 _address.setPort(port); 206 } 207 208 209 212 public int getPort() 213 { 214 if (_address == null) return 0; 215 return _address.getPort(); 216 } 217 218 219 224 public void setMaxReadTimeMs(int ms) 225 { 226 log.warn("setMaxReadTimeMs is deprecated. Use setMaxIdleTimeMs()"); 227 } 228 229 230 233 public int getMaxReadTimeMs() 234 { 235 return getMaxIdleTimeMs(); 236 } 237 238 239 242 public void setLingerTimeSecs(int ls) 243 { 244 _lingerTimeSecs = ls; 245 } 246 247 248 251 public int getLingerTimeSecs() 252 { 253 return _lingerTimeSecs; 254 } 255 256 257 260 public void setTcpNoDelay(boolean tcpNoDelay) 261 { 262 _tcpNoDelay = tcpNoDelay; 263 } 264 265 266 269 public boolean getTcpNoDelay() 270 { 271 return _tcpNoDelay; 272 } 273 274 275 278 public int getAcceptQueueSize() 279 { 280 return _acceptQueueSize; 281 } 282 283 284 290 public void setAcceptQueueSize(int acceptQueueSize) 291 { 292 _acceptQueueSize = acceptQueueSize; 293 } 294 295 296 300 public void setAcceptorThreads(int n) 301 { 302 _acceptors = n; 303 } 304 305 306 309 public int getAcceptorThreads() 310 { 311 return _acceptors; 312 } 313 314 315 320 protected void handleConnection(InputStream in, OutputStream out) 321 { 322 throw new Error ("Either handlerConnection must be overridden"); 323 } 324 325 326 331 protected void handleConnection(Socket connection) throws IOException 332 { 333 if (log.isDebugEnabled()) log.debug("Handle " + connection); 334 InputStream in = connection.getInputStream(); 335 OutputStream out = connection.getOutputStream(); 336 337 handleConnection(in, out); 338 out.flush(); 339 340 in = null; 341 out = null; 342 connection.close(); 343 } 344 345 346 351 public void handle(Object job) 352 { 353 Socket socket = (Socket ) job; 354 try 355 { 356 if (_tcpNoDelay) socket.setTcpNoDelay(true); 357 handleConnection(socket); 358 } 359 catch (Exception e) 360 { 361 log.debug("Connection problem", e); 362 } 363 finally 364 { 365 try 366 { 367 socket.close(); 368 } 369 catch (Exception e) 370 { 371 log.debug("Connection problem", e); 372 } 373 } 374 } 375 376 377 386 protected ServerSocket newServerSocket(InetAddrPort address, int acceptQueueSize) 387 throws java.io.IOException 388 { 389 if (address == null) return new ServerSocket (0, acceptQueueSize); 390 391 return new ServerSocket (address.getPort(), acceptQueueSize, address.getInetAddress()); 392 } 393 394 395 404 protected Socket acceptSocket(ServerSocket ignored, int timeout) 405 { 406 return acceptSocket(timeout); 407 } 408 409 410 418 protected Socket acceptSocket(int timeout) 419 { 420 try 421 { 422 Socket s = null; 423 424 if (_listen != null) 425 { 426 if (_soTimeOut != timeout) 427 { 428 _soTimeOut = timeout; 429 _listen.setSoTimeout(_soTimeOut); 430 } 431 432 s = _listen.accept(); 433 434 try 435 { 436 if (getMaxIdleTimeMs() >= 0) s.setSoTimeout(getMaxIdleTimeMs()); 437 if (_lingerTimeSecs >= 0) 438 s.setSoLinger(true, _lingerTimeSecs); 439 else 440 s.setSoLinger(false, 0); 441 } 442 catch (Exception e) 443 { 444 LogSupport.ignore(log, e); 445 } 446 } 447 return s; 448 } 449 catch (java.net.SocketException e) 450 { 451 LogSupport.ignore(log, e); 454 } 455 catch (InterruptedIOException e) 456 { 457 LogSupport.ignore(log, e); 458 } 459 catch (IOException e) 460 { 461 log.warn(LogSupport.EXCEPTION, e); 462 } 463 return null; 464 } 465 466 467 473 public void open() throws IOException 474 { 475 if (_listen == null) 476 { 477 _listen = newServerSocket(_address, _acceptQueueSize); 478 479 if (_address == null) 480 _address = new InetAddrPort(_listen.getInetAddress(), _listen.getLocalPort()); 481 else 482 { 483 if (_address.getInetAddress() == null) 484 _address.setInetAddress(_listen.getInetAddress()); 485 if (_address.getPort() == 0) _address.setPort(_listen.getLocalPort()); 486 } 487 488 _soTimeOut = getMaxIdleTimeMs(); 489 if (_soTimeOut >= 0) _listen.setSoTimeout(_soTimeOut); 490 } 491 } 492 493 494 497 public synchronized void start() throws Exception 498 { 499 try 500 { 501 if (isStarted()) return; 502 503 open(); 504 505 _running = true; 506 _acceptor = new Acceptor[_acceptors]; 507 for (int a = 0; a < _acceptor.length; a++) 508 { 509 _acceptor[a] = new Acceptor(); 510 _acceptor[a].setDaemon(isDaemon()); 511 _acceptor[a].start(); 512 } 513 514 super.start(); 515 } 516 catch (Exception e) 517 { 518 log.warn("Failed to start: " + this); 519 throw e; 520 } 521 } 522 523 524 public void stop() throws InterruptedException 525 { 526 synchronized (this) 527 { 528 _running = false; 530 531 if (log.isDebugEnabled()) log.debug("closing " + _listen); 533 try 534 { 535 if (_listen != null) _listen.close(); 536 _listen=null; 537 } 538 catch (IOException e) 539 { 540 log.warn(LogSupport.EXCEPTION, e); 541 } 542 543 Thread.yield(); 545 for (int a = 0; _acceptor!=null && a<_acceptor.length; a++) 546 { 547 Acceptor acc = _acceptor[a]; 548 if (acc != null) 549 acc.interrupt(); 550 } 551 Thread.sleep(100); 552 553 for (int a = 0; _acceptor!=null && a<_acceptor.length; a++) 554 { 555 Acceptor acc = _acceptor[a]; 556 557 if (acc != null) 558 { 559 acc.forceStop(); 560 _acceptor[a] = null; 561 } 562 } 563 } 564 565 try 567 { 568 super.stop(); 569 } 570 catch (Exception e) 571 { 572 log.warn(LogSupport.EXCEPTION, e); 573 } 574 finally 575 { 576 synchronized (this) 577 { 578 _acceptor = null; 579 } 580 } 581 } 582 583 584 585 591 protected void stopJob(Thread thread, Object job) 592 { 593 if (job instanceof Socket ) 594 { 595 try 596 { 597 ((Socket ) job).close(); 598 } 599 catch (Exception e) 600 { 601 LogSupport.ignore(log, e); 602 } 603 } 604 super.stopJob(thread, job); 605 } 606 607 608 public String toString() 609 { 610 if (_address == null) return getName() + "@0.0.0.0:0"; 611 if (_listen != null) 612 return getName() + "@" + _listen.getInetAddress().getHostAddress() + ":" 613 + _listen.getLocalPort(); 614 return getName() + "@" + getInetAddrPort(); 615 } 616 617 618 619 620 private class Acceptor extends Thread 621 { 622 623 public void run() 624 { 625 ThreadedServer threadedServer = ThreadedServer.this; 626 try 627 { 628 this.setName("Acceptor " + _listen); 629 while (_running) 630 { 631 try 632 { 633 Socket socket = acceptSocket(_soTimeOut); 635 636 if (socket != null) 638 { 639 if (_running) 640 threadedServer.run(socket); 641 else 642 socket.close(); 643 } 644 } 645 catch (Throwable e) 646 { 647 if (_running) 648 log.warn(LogSupport.EXCEPTION, e); 649 else 650 log.debug(LogSupport.EXCEPTION, e); 651 } 652 } 653 } 654 finally 655 { 656 if (_running) 657 log.warn("Stopping " + this.getName()); 658 else 659 log.info("Stopping " + this.getName()); 660 synchronized (threadedServer) 661 { 662 if (_acceptor != null) 663 { 664 for (int a = 0; a < _acceptor.length; a++) 665 if (_acceptor[a] == this) 666 _acceptor[a] = null; 667 } 668 threadedServer.notifyAll(); 669 } 670 } 671 } 672 673 674 void forceStop() 675 { 676 if (_listen != null && _address != null) 677 { 678 InetAddress addr = _address.getInetAddress(); 679 try 680 { 681 if (addr == null || addr.toString().startsWith("0.0.0.0")) 682 addr = InetAddress.getByName("127.0.0.1"); 683 if (log.isDebugEnabled()) 684 log.debug("Self connect to close listener " + addr + ":" 685 + _address.getPort()); 686 Socket socket = new Socket (addr, _address.getPort()); 687 Thread.yield(); 688 socket.close(); 689 Thread.yield(); 690 } 691 catch (IOException e) 692 { 693 if (log.isDebugEnabled()) 694 log.debug("problem stopping acceptor " + addr + ": ", e); 695 } 696 } 697 } 698 } 699 700 } 701 | Popular Tags |