1 17 18 package org.apache.tomcat.util.net; 19 20 import java.io.IOException ; 21 import java.net.BindException ; 22 import java.net.InetAddress ; 23 import java.net.ServerSocket ; 24 import java.net.Socket ; 25 import java.util.concurrent.Executor ; 26 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 import org.apache.tomcat.util.res.StringManager; 30 31 47 public class JIoEndpoint { 48 49 50 52 53 protected static Log log = LogFactory.getLog(JIoEndpoint.class); 54 55 protected StringManager sm = 56 StringManager.getManager("org.apache.tomcat.util.net.res"); 57 58 59 62 public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; 63 64 67 public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; 68 69 72 public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; 73 74 78 public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; 79 80 81 83 84 87 protected WorkerStack workers = null; 88 89 90 93 protected volatile boolean running = false; 94 95 96 99 protected volatile boolean paused = false; 100 101 102 105 protected boolean initialized = false; 106 107 108 111 protected int curThreadsBusy = 0; 112 113 114 117 protected int curThreads = 0; 118 119 120 123 protected int sequence = 0; 124 125 126 129 protected ServerSocket serverSocket = null; 130 131 132 134 135 138 protected int acceptorThreadCount = 0; 139 public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } 140 public int getAcceptorThreadCount() { return acceptorThreadCount; } 141 142 143 146 protected Executor executor = null; 147 public void setExecutor(Executor executor) { this.executor = executor; } 148 public Executor getExecutor() { return executor; } 149 150 151 154 protected int maxThreads = 40; 155 public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } 156 public int getMaxThreads() { return maxThreads; } 157 158 159 162 protected int threadPriority = Thread.NORM_PRIORITY; 163 public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } 164 public int getThreadPriority() { return threadPriority; } 165 166 167 170 protected int port; 171 public int getPort() { return port; } 172 public void setPort(int port ) { this.port=port; } 173 174 175 178 protected InetAddress address; 179 public InetAddress getAddress() { return address; } 180 public void setAddress(InetAddress address) { this.address = address; } 181 182 183 186 protected Handler handler = null; 187 public void setHandler(Handler handler ) { this.handler = handler; } 188 public Handler getHandler() { return handler; } 189 190 191 196 protected int backlog = 100; 197 public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } 198 public int getBacklog() { return backlog; } 199 200 201 204 protected boolean tcpNoDelay = false; 205 public boolean getTcpNoDelay() { return tcpNoDelay; } 206 public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } 207 208 209 212 protected int soLinger = 100; 213 public int getSoLinger() { return soLinger; } 214 public void setSoLinger(int soLinger) { this.soLinger = soLinger; } 215 216 217 220 protected int soTimeout = -1; 221 public int getSoTimeout() { return soTimeout; } 222 public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } 223 224 225 230 protected boolean daemon = true; 231 public void setDaemon(boolean b) { daemon = b; } 232 public boolean getDaemon() { return daemon; } 233 234 235 238 protected String name = "TP"; 239 public void setName(String name) { this.name = name; } 240 public String getName() { return name; } 241 242 243 246 protected ServerSocketFactory serverSocketFactory = null; 247 public void setServerSocketFactory(ServerSocketFactory factory) { this.serverSocketFactory = factory; } 248 public ServerSocketFactory getServerSocketFactory() { return serverSocketFactory; } 249 250 251 public boolean isRunning() { 252 return running; 253 } 254 255 public boolean isPaused() { 256 return paused; 257 } 258 259 public int getCurrentThreadCount() { 260 return curThreads; 261 } 262 263 public int getCurrentThreadsBusy() { 264 return curThreads - workers.size(); 265 } 266 267 268 270 271 276 public interface Handler { 277 public boolean process(Socket socket); 278 } 279 280 281 283 284 287 protected class Acceptor implements Runnable { 288 289 290 294 public void run() { 295 296 while (running) { 298 299 while (paused) { 301 try { 302 Thread.sleep(1000); 303 } catch (InterruptedException e) { 304 } 306 } 307 308 try { 310 Socket socket = serverSocketFactory.acceptSocket(serverSocket); 311 serverSocketFactory.initSocket(socket); 312 if (!processSocket(socket)) { 314 try { 316 socket.close(); 317 } catch (IOException e) { 318 } 320 } 321 } catch (Throwable t) { 322 log.error(sm.getString("endpoint.accept.fail"), t); 323 } 324 325 327 } 328 329 } 330 331 } 332 333 334 336 337 341 protected class SocketProcessor implements Runnable { 342 343 protected Socket socket = null; 344 345 public SocketProcessor(Socket socket) { 346 this.socket = socket; 347 } 348 349 public void run() { 350 351 if (!setSocketOptions(socket) || !handler.process(socket)) { 353 try { 355 socket.close(); 356 } catch (IOException e) { 357 } 358 } 359 360 socket = null; 362 363 } 364 365 } 366 367 368 370 371 protected class Worker implements Runnable { 372 373 protected Thread thread = null; 374 protected boolean available = false; 375 protected Socket socket = null; 376 377 378 387 synchronized void assign(Socket socket) { 388 389 while (available) { 391 try { 392 wait(); 393 } catch (InterruptedException e) { 394 } 395 } 396 397 this.socket = socket; 399 available = true; 400 notifyAll(); 401 402 } 403 404 405 409 private synchronized Socket await() { 410 411 while (!available) { 413 try { 414 wait(); 415 } catch (InterruptedException e) { 416 } 417 } 418 419 Socket socket = this.socket; 421 available = false; 422 notifyAll(); 423 424 return (socket); 425 426 } 427 428 429 430 434 public void run() { 435 436 while (running) { 438 439 Socket socket = await(); 441 if (socket == null) 442 continue; 443 444 if (!setSocketOptions(socket) || !handler.process(socket)) { 446 try { 448 socket.close(); 449 } catch (IOException e) { 450 } 451 } 452 453 socket = null; 455 recycleWorkerThread(this); 456 457 } 458 459 } 460 461 462 465 public void start() { 466 thread = new Thread (this); 467 thread.setName(getName() + "-" + (++curThreads)); 468 thread.setDaemon(true); 469 thread.start(); 470 } 471 472 473 } 474 475 476 478 public void init() 479 throws Exception { 480 481 if (initialized) 482 return; 483 484 if (acceptorThreadCount == 0) { 486 acceptorThreadCount = 1; 487 } 488 if (serverSocketFactory == null) { 489 serverSocketFactory = ServerSocketFactory.getDefault(); 490 } 491 if (serverSocket == null) { 492 try { 493 if (address == null) { 494 serverSocket = serverSocketFactory.createSocket(port, backlog); 495 } else { 496 serverSocket = serverSocketFactory.createSocket(port, backlog, address); 497 } 498 } catch (BindException be) { 499 throw new BindException (be.getMessage() + ":" + port); 500 } 501 } 502 505 initialized = true; 506 507 } 508 509 public void start() 510 throws Exception { 511 if (!initialized) { 513 init(); 514 } 515 if (!running) { 516 running = true; 517 paused = false; 518 519 if (executor == null) { 521 workers = new WorkerStack(maxThreads); 522 } 523 524 for (int i = 0; i < acceptorThreadCount; i++) { 526 Thread acceptorThread = new Thread (new Acceptor(), getName() + "-Acceptor-" + i); 527 acceptorThread.setPriority(threadPriority); 528 acceptorThread.setDaemon(daemon); 529 acceptorThread.start(); 530 } 531 } 532 } 533 534 public void pause() { 535 if (running && !paused) { 536 paused = true; 537 unlockAccept(); 538 } 539 } 540 541 public void resume() { 542 if (running) { 543 paused = false; 544 } 545 } 546 547 public void stop() { 548 if (running) { 549 running = false; 550 unlockAccept(); 551 } 552 } 553 554 557 public void destroy() throws Exception { 558 if (running) { 559 stop(); 560 } 561 if (serverSocket != null) { 562 try { 563 if (serverSocket != null) 564 serverSocket.close(); 565 } catch (Exception e) { 566 log.error(sm.getString("endpoint.err.close"), e); 567 } 568 serverSocket = null; 569 } 570 initialized = false ; 571 } 572 573 574 577 protected void unlockAccept() { 578 Socket s = null; 579 try { 580 if (address == null) { 582 s = new Socket ("127.0.0.1", port); 583 } else { 584 s = new Socket (address, port); 585 s.setSoLinger(true, 0); 588 } 589 } catch (Exception e) { 590 if (log.isDebugEnabled()) { 591 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); 592 } 593 } finally { 594 if (s != null) { 595 try { 596 s.close(); 597 } catch (Exception e) { 598 } 600 } 601 } 602 } 603 604 605 608 protected boolean setSocketOptions(Socket socket) { 609 int step = 1; 611 try { 612 613 if (soLinger >= 0) { 615 socket.setSoLinger(true, soLinger); 616 } 617 if (tcpNoDelay) { 618 socket.setTcpNoDelay(tcpNoDelay); 619 } 620 if (soTimeout > 0) { 621 socket.setSoTimeout(soTimeout); 622 } 623 624 step = 2; 626 serverSocketFactory.handshake(socket); 627 628 } catch (Throwable t) { 629 if (log.isDebugEnabled()) { 630 if (step == 2) { 631 log.debug(sm.getString("endpoint.err.handshake"), t); 632 } else { 633 log.debug(sm.getString("endpoint.err.unexpected"), t); 634 } 635 } 636 return false; 638 } 639 return true; 640 } 641 642 643 649 protected Worker createWorkerThread() { 650 651 synchronized (workers) { 652 if (workers.size() > 0) { 653 curThreadsBusy++; 654 return workers.pop(); 655 } 656 if ((maxThreads > 0) && (curThreads < maxThreads)) { 657 curThreadsBusy++; 658 return (newWorkerThread()); 659 } else { 660 if (maxThreads < 0) { 661 curThreadsBusy++; 662 return (newWorkerThread()); 663 } else { 664 return (null); 665 } 666 } 667 } 668 669 } 670 671 672 676 protected Worker newWorkerThread() { 677 678 Worker workerThread = new Worker(); 679 workerThread.start(); 680 return (workerThread); 681 682 } 683 684 685 688 protected Worker getWorkerThread() { 689 Worker workerThread = createWorkerThread(); 691 while (workerThread == null) { 692 try { 693 synchronized (workers) { 694 workers.wait(); 695 } 696 } catch (InterruptedException e) { 697 } 699 workerThread = createWorkerThread(); 700 } 701 return workerThread; 702 } 703 704 705 710 protected void recycleWorkerThread(Worker workerThread) { 711 synchronized (workers) { 712 workers.push(workerThread); 713 curThreadsBusy--; 714 workers.notify(); 715 } 716 } 717 718 719 722 protected boolean processSocket(Socket socket) { 723 try { 724 if (executor == null) { 725 getWorkerThread().assign(socket); 726 } else { 727 executor.execute(new SocketProcessor(socket)); 728 } 729 } catch (Throwable t) { 730 log.error(sm.getString("endpoint.process.fail"), t); 733 return false; 734 } 735 return true; 736 } 737 738 739 741 742 public class WorkerStack { 743 744 protected Worker[] workers = null; 745 protected int end = 0; 746 747 public WorkerStack(int size) { 748 workers = new Worker[size]; 749 } 750 751 756 public void push(Worker worker) { 757 workers[end++] = worker; 758 } 759 760 764 public Worker pop() { 765 if (end > 0) { 766 return workers[--end]; 767 } 768 return null; 769 } 770 771 775 public Worker peek() { 776 return workers[end]; 777 } 778 779 782 public boolean isEmpty() { 783 return (end == 0); 784 } 785 786 789 public int size() { 790 return (end); 791 } 792 } 793 794 } 795 | Popular Tags |