1 17 18 package org.apache.tomcat.util.net; 19 20 import java.net.InetAddress ; 21 import java.util.ArrayList ; 22 import java.util.HashMap ; 23 import java.util.concurrent.Executor ; 24 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 import org.apache.tomcat.jni.Address; 28 import org.apache.tomcat.jni.Error; 29 import org.apache.tomcat.jni.File; 30 import org.apache.tomcat.jni.Library; 31 import org.apache.tomcat.jni.OS; 32 import org.apache.tomcat.jni.Poll; 33 import org.apache.tomcat.jni.Pool; 34 import org.apache.tomcat.jni.SSL; 35 import org.apache.tomcat.jni.SSLContext; 36 import org.apache.tomcat.jni.SSLSocket; 37 import org.apache.tomcat.jni.Socket; 38 import org.apache.tomcat.jni.Status; 39 import org.apache.tomcat.util.res.StringManager; 40 41 56 public class AprEndpoint { 57 58 59 61 62 protected static Log log = LogFactory.getLog(AprEndpoint.class); 63 64 protected static StringManager sm = 65 StringManager.getManager("org.apache.tomcat.util.net.res"); 66 67 68 71 public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; 72 73 76 public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; 77 78 81 public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; 82 83 87 public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; 88 89 90 92 93 96 protected WorkerStack workers = null; 97 98 99 102 protected volatile boolean running = false; 103 104 105 108 protected volatile boolean paused = false; 109 110 111 114 protected boolean initialized = false; 115 116 117 120 protected int curThreadsBusy = 0; 121 122 123 126 protected int curThreads = 0; 127 128 129 132 protected int sequence = 0; 133 134 135 138 protected long rootPool = 0; 139 140 141 144 protected long serverSock = 0; 145 146 147 150 protected long serverSockPool = 0; 151 152 153 156 protected long sslContext = 0; 157 158 159 161 162 165 protected Executor executor = null; 166 public void setExecutor(Executor executor) { this.executor = executor; } 167 public Executor getExecutor() { return executor; } 168 169 170 173 protected int maxThreads = 40; 174 public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } 175 public int getMaxThreads() { return maxThreads; } 176 177 178 181 protected int threadPriority = Thread.NORM_PRIORITY; 182 public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } 183 public int getThreadPriority() { return threadPriority; } 184 185 186 189 protected int pollerSize = 8 * 1024; 190 public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; } 191 public int getPollerSize() { return pollerSize; } 192 193 194 197 protected int sendfileSize = 1 * 1024; 198 public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; } 199 public int getSendfileSize() { return sendfileSize; } 200 201 202 205 protected int port; 206 public int getPort() { return port; } 207 public void setPort(int port ) { this.port=port; } 208 209 210 213 protected InetAddress address; 214 public InetAddress getAddress() { return address; } 215 public void setAddress(InetAddress address) { this.address = address; } 216 217 218 221 protected Handler handler = null; 222 public void setHandler(Handler handler ) { this.handler = handler; } 223 public Handler getHandler() { return handler; } 224 225 226 231 protected int backlog = 100; 232 public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } 233 public int getBacklog() { return backlog; } 234 235 236 239 protected boolean tcpNoDelay = false; 240 public boolean getTcpNoDelay() { return tcpNoDelay; } 241 public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } 242 243 244 247 protected int soLinger = 100; 248 public int getSoLinger() { return soLinger; } 249 public void setSoLinger(int soLinger) { this.soLinger = soLinger; } 250 251 252 255 protected int soTimeout = -1; 256 public int getSoTimeout() { return soTimeout; } 257 public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } 258 259 260 263 protected int firstReadTimeout = -1; 264 public int getFirstReadTimeout() { return firstReadTimeout; } 265 public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; } 266 267 268 272 protected int pollTime = 2000; 273 public int getPollTime() { return pollTime; } 274 public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } } 275 276 277 282 protected boolean daemon = true; 283 public void setDaemon(boolean b) { daemon = b; } 284 public boolean getDaemon() { return daemon; } 285 286 287 290 protected String name = "TP"; 291 public void setName(String name) { this.name = name; } 292 public String getName() { return name; } 293 294 295 298 protected boolean useSendfile = Library.APR_HAS_SENDFILE; 299 public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; } 300 public boolean getUseSendfile() { return useSendfile; } 301 302 303 306 protected boolean useComet = true; 307 public void setUseComet(boolean useComet) { this.useComet = useComet; } 308 public boolean getUseComet() { return useComet; } 309 310 311 314 protected int acceptorThreadCount = 0; 315 public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } 316 public int getAcceptorThreadCount() { return acceptorThreadCount; } 317 318 319 322 protected int sendfileThreadCount = 0; 323 public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; } 324 public int getSendfileThreadCount() { return sendfileThreadCount; } 325 326 327 330 protected int pollerThreadCount = 0; 331 public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } 332 public int getPollerThreadCount() { return pollerThreadCount; } 333 334 335 338 protected Poller[] pollers = null; 339 protected int pollerRoundRobin = 0; 340 public Poller getPoller() { 341 pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; 342 return pollers[pollerRoundRobin]; 343 } 344 345 346 349 protected Poller[] cometPollers = null; 350 protected int cometPollerRoundRobin = 0; 351 public Poller getCometPoller() { 352 cometPollerRoundRobin = (cometPollerRoundRobin + 1) % cometPollers.length; 353 return cometPollers[cometPollerRoundRobin]; 354 } 355 356 357 360 protected Sendfile[] sendfiles = null; 361 protected int sendfileRoundRobin = 0; 362 public Sendfile getSendfile() { 363 sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length; 364 return sendfiles[sendfileRoundRobin]; 365 } 366 367 368 371 public int getMaxSpareThreads() { return 0; } 372 373 374 377 public int getMinSpareThreads() { return 0; } 378 379 380 383 protected boolean SSLEnabled = false; 384 public boolean isSSLEnabled() { return SSLEnabled; } 385 public void setSSLEnabled(boolean SSLEnabled) { this.SSLEnabled = SSLEnabled; } 386 387 388 391 protected String SSLProtocol = "all"; 392 public String getSSLProtocol() { return SSLProtocol; } 393 public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; } 394 395 396 400 protected String SSLPassword = null; 401 public String getSSLPassword() { return SSLPassword; } 402 public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; } 403 404 405 408 protected String SSLCipherSuite = "ALL"; 409 public String getSSLCipherSuite() { return SSLCipherSuite; } 410 public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; } 411 412 413 416 protected String SSLCertificateFile = null; 417 public String getSSLCertificateFile() { return SSLCertificateFile; } 418 public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; } 419 420 421 424 protected String SSLCertificateKeyFile = null; 425 public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; } 426 public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; } 427 428 429 432 protected String SSLCertificateChainFile = null; 433 public String getSSLCertificateChainFile() { return SSLCertificateChainFile; } 434 public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; } 435 436 437 440 protected String SSLCACertificatePath = null; 441 public String getSSLCACertificatePath() { return SSLCACertificatePath; } 442 public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; } 443 444 445 448 protected String SSLCACertificateFile = null; 449 public String getSSLCACertificateFile() { return SSLCACertificateFile; } 450 public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; } 451 452 453 456 protected String SSLCARevocationPath = null; 457 public String getSSLCARevocationPath() { return SSLCARevocationPath; } 458 public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; } 459 460 461 464 protected String SSLCARevocationFile = null; 465 public String getSSLCARevocationFile() { return SSLCARevocationFile; } 466 public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; } 467 468 469 472 protected String SSLVerifyClient = "none"; 473 public String getSSLVerifyClient() { return SSLVerifyClient; } 474 public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; } 475 476 477 480 protected int SSLVerifyDepth = 10; 481 public int getSSLVerifyDepth() { return SSLVerifyDepth; } 482 public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; } 483 484 485 487 488 491 public int getKeepAliveCount() { 492 if (pollers == null) { 493 return 0; 494 } else { 495 int keepAliveCount = 0; 496 for (int i = 0; i < pollers.length; i++) { 497 keepAliveCount += pollers[i].getKeepAliveCount(); 498 } 499 return keepAliveCount; 500 } 501 } 502 503 504 507 public int getSendfileCount() { 508 if (sendfiles == null) { 509 return 0; 510 } else { 511 int sendfileCount = 0; 512 for (int i = 0; i < sendfiles.length; i++) { 513 sendfileCount += sendfiles[i].getSendfileCount(); 514 } 515 return sendfileCount; 516 } 517 } 518 519 520 525 public int getCurrentThreadCount() { 526 return curThreads; 527 } 528 529 530 535 public int getCurrentThreadsBusy() { 536 return curThreadsBusy; 537 } 538 539 540 545 public boolean isRunning() { 546 return running; 547 } 548 549 550 555 public boolean isPaused() { 556 return paused; 557 } 558 559 560 562 563 566 public void init() 567 throws Exception { 568 569 if (initialized) 570 return; 571 572 rootPool = Pool.create(0); 574 serverSockPool = Pool.create(rootPool); 576 String addressStr = null; 578 if (address == null) { 579 addressStr = null; 580 } else { 581 addressStr = address.getHostAddress(); 582 } 583 int family = Socket.APR_INET; 584 if (Library.APR_HAVE_IPV6 && (addressStr == null || addressStr.indexOf(':') >= 0)) { 585 family = Socket.APR_UNSPEC; 586 } 587 long inetAddress = Address.info(addressStr, family, 588 port, 0, rootPool); 589 serverSock = Socket.create(family, Socket.SOCK_STREAM, 591 Socket.APR_PROTO_TCP, rootPool); 592 if (OS.IS_UNIX) { 593 Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1); 594 } 595 Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1); 597 int ret = Socket.bind(serverSock, inetAddress); 599 if (ret != 0) { 600 throw new Exception (sm.getString("endpoint.init.bind", "" + ret, Error.strerror(ret))); 601 } 602 ret = Socket.listen(serverSock, backlog); 604 if (ret != 0) { 605 throw new Exception (sm.getString("endpoint.init.listen", "" + ret, Error.strerror(ret))); 606 } 607 if (OS.IS_WIN32 || OS.IS_WIN64) { 608 Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1); 610 } 611 612 if (useSendfile && !Library.APR_HAS_SENDFILE) { 614 log.warn(sm.getString("endpoint.sendfile.nosupport")); 615 useSendfile = false; 616 } 617 618 if (acceptorThreadCount == 0) { 620 acceptorThreadCount = 1; 622 } 623 if (pollerThreadCount == 0) { 624 if ((OS.IS_WIN32 || OS.IS_WIN64) && (pollerSize > 1024)) { 625 pollerThreadCount = pollerSize / 1024; 627 pollerSize = pollerSize - (pollerSize % 1024); 629 } else { 630 pollerThreadCount = 1; 632 } 633 } 634 if (sendfileThreadCount == 0) { 635 if ((OS.IS_WIN32 || OS.IS_WIN64) && (sendfileSize > 1024)) { 636 sendfileThreadCount = sendfileSize / 1024; 638 sendfileSize = sendfileSize - (sendfileSize % 1024); 640 } else { 641 sendfileThreadCount = 1; 644 } 645 } 646 647 Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1); 651 652 if (SSLEnabled) { 654 655 int value = SSL.SSL_PROTOCOL_ALL; 657 if ("SSLv2".equalsIgnoreCase(SSLProtocol)) { 658 value = SSL.SSL_PROTOCOL_SSLV2; 659 } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) { 660 value = SSL.SSL_PROTOCOL_SSLV3; 661 } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) { 662 value = SSL.SSL_PROTOCOL_TLSV1; 663 } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) { 664 value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3; 665 } 666 sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER); 668 SSLContext.setCipherSuite(sslContext, SSLCipherSuite); 670 SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA); 672 SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false); 674 SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath); 676 SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath); 678 value = SSL.SSL_CVERIFY_NONE; 680 if ("optional".equalsIgnoreCase(SSLVerifyClient)) { 681 value = SSL.SSL_CVERIFY_OPTIONAL; 682 } else if ("require".equalsIgnoreCase(SSLVerifyClient)) { 683 value = SSL.SSL_CVERIFY_REQUIRE; 684 } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) { 685 value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA; 686 } 687 SSLContext.setVerify(sslContext, value, SSLVerifyDepth); 688 useSendfile = false; 690 } 691 692 initialized = true; 693 694 } 695 696 697 700 public void start() 701 throws Exception { 702 if (!initialized) { 704 init(); 705 } 706 if (!running) { 707 running = true; 708 paused = false; 709 710 if (executor == null) { 712 workers = new WorkerStack(maxThreads); 713 } 714 715 for (int i = 0; i < acceptorThreadCount; i++) { 717 Thread acceptorThread = new Thread (new Acceptor(), getName() + "-Acceptor-" + i); 718 acceptorThread.setPriority(threadPriority); 719 acceptorThread.setDaemon(daemon); 720 acceptorThread.start(); 721 } 722 723 pollers = new Poller[pollerThreadCount]; 725 for (int i = 0; i < pollerThreadCount; i++) { 726 pollers[i] = new Poller(false); 727 pollers[i].init(); 728 Thread pollerThread = new Thread (pollers[i], getName() + "-Poller-" + i); 729 pollerThread.setPriority(threadPriority); 730 pollerThread.setDaemon(true); 731 pollerThread.start(); 732 } 733 734 cometPollers = new Poller[pollerThreadCount]; 736 for (int i = 0; i < pollerThreadCount; i++) { 737 cometPollers[i] = new Poller(true); 738 cometPollers[i].init(); 739 Thread pollerThread = new Thread (cometPollers[i], getName() + "-CometPoller-" + i); 740 pollerThread.setPriority(threadPriority); 741 pollerThread.setDaemon(true); 742 pollerThread.start(); 743 } 744 745 if (useSendfile) { 747 sendfiles = new Sendfile[sendfileThreadCount]; 748 for (int i = 0; i < sendfileThreadCount; i++) { 749 sendfiles[i] = new Sendfile(); 750 sendfiles[i].init(); 751 Thread sendfileThread = new Thread (sendfiles[i], getName() + "-Sendfile-" + i); 752 sendfileThread.setPriority(threadPriority); 753 sendfileThread.setDaemon(true); 754 sendfileThread.start(); 755 } 756 } 757 } 758 } 759 760 761 764 public void pause() { 765 if (running && !paused) { 766 paused = true; 767 unlockAccept(); 768 } 769 } 770 771 772 776 public void resume() { 777 if (running) { 778 paused = false; 779 } 780 } 781 782 783 786 public void stop() { 787 if (running) { 788 running = false; 789 unlockAccept(); 790 for (int i = 0; i < pollers.length; i++) { 791 pollers[i].destroy(); 792 } 793 pollers = null; 794 for (int i = 0; i < cometPollers.length; i++) { 795 cometPollers[i].destroy(); 796 } 797 cometPollers = null; 798 if (useSendfile) { 799 for (int i = 0; i < sendfiles.length; i++) { 800 sendfiles[i].destroy(); 801 } 802 sendfiles = null; 803 } 804 } 805 } 806 807 808 811 public void destroy() throws Exception { 812 if (running) { 813 stop(); 814 } 815 Pool.destroy(serverSockPool); 816 serverSockPool = 0; 817 Socket.close(serverSock); 819 serverSock = 0; 820 sslContext = 0; 821 Pool.destroy(rootPool); 823 rootPool = 0; 824 initialized = false; 825 } 826 827 828 830 831 834 protected int getSequence() { 835 return sequence++; 836 } 837 838 839 842 protected void unlockAccept() { 843 java.net.Socket s = null; 844 try { 845 if (address == null) { 847 s = new java.net.Socket ("127.0.0.1", port); 848 } else { 849 s = new java.net.Socket (address, port); 850 s.setSoLinger(true, 0); 853 } 854 } catch(Exception e) { 855 if (log.isDebugEnabled()) { 856 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); 857 } 858 } finally { 859 if (s != null) { 860 try { 861 s.close(); 862 } catch (Exception e) { 863 } 865 } 866 } 867 } 868 869 870 873 protected boolean setSocketOptions(long socket) { 874 int step = 1; 876 try { 877 878 if (soLinger >= 0) 880 Socket.optSet(socket, Socket.APR_SO_LINGER, soLinger); 881 if (tcpNoDelay) 882 Socket.optSet(socket, Socket.APR_TCP_NODELAY, (tcpNoDelay ? 1 : 0)); 883 if (soTimeout > 0) 884 Socket.timeoutSet(socket, soTimeout * 1000); 885 886 step = 2; 888 if (sslContext != 0) { 889 SSLSocket.attach(sslContext, socket); 890 if (SSLSocket.handshake(socket) != 0) { 891 if (log.isDebugEnabled()) { 892 log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError()); 893 } 894 return false; 895 } 896 } 897 898 } catch (Throwable t) { 899 if (log.isDebugEnabled()) { 900 if (step == 2) { 901 log.debug(sm.getString("endpoint.err.handshake"), t); 902 } else { 903 log.debug(sm.getString("endpoint.err.unexpected"), t); 904 } 905 } 906 return false; 908 } 909 return true; 910 } 911 912 913 919 protected Worker createWorkerThread() { 920 921 synchronized (workers) { 922 if (workers.size() > 0) { 923 curThreadsBusy++; 924 return (workers.pop()); 925 } 926 if ((maxThreads > 0) && (curThreads < maxThreads)) { 927 curThreadsBusy++; 928 return (newWorkerThread()); 929 } else { 930 if (maxThreads < 0) { 931 curThreadsBusy++; 932 return (newWorkerThread()); 933 } else { 934 return (null); 935 } 936 } 937 } 938 939 } 940 941 942 946 protected Worker newWorkerThread() { 947 948 Worker workerThread = new Worker(); 949 workerThread.start(); 950 return (workerThread); 951 952 } 953 954 955 958 protected Worker getWorkerThread() { 959 Worker workerThread = createWorkerThread(); 961 while (workerThread == null) { 962 try { 963 synchronized (workers) { 964 workers.wait(); 965 } 966 } catch (InterruptedException e) { 967 } 969 workerThread = createWorkerThread(); 970 } 971 return workerThread; 972 } 973 974 975 980 protected void recycleWorkerThread(Worker workerThread) { 981 synchronized (workers) { 982 workers.push(workerThread); 983 curThreadsBusy--; 984 workers.notify(); 985 } 986 } 987 988 989 992 protected long allocatePoller(int size, long pool, int timeout) { 993 try { 994 return Poll.create(size, pool, 0, timeout * 1000); 995 } catch (Error e) { 996 if (Status.APR_STATUS_IS_EINVAL(e.getError())) { 997 log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size)); 998 return 0; 999 } else { 1000 log.error(sm.getString("endpoint.poll.initfail"), e); 1001 return -1; 1002 } 1003 } 1004 } 1005 1006 1007 1010 protected boolean processSocketWithOptions(long socket) { 1011 try { 1012 if (executor == null) { 1013 getWorkerThread().assignWithOptions(socket); 1014 } else { 1015 executor.execute(new SocketWithOptionsProcessor(socket)); 1016 } 1017 } catch (Throwable t) { 1018 log.error(sm.getString("endpoint.process.fail"), t); 1021 return false; 1022 } 1023 return true; 1024 } 1025 1026 1027 1030 protected boolean processSocket(long socket) { 1031 try { 1032 if (executor == null) { 1033 getWorkerThread().assign(socket); 1034 } else { 1035 executor.execute(new SocketProcessor(socket)); 1036 } 1037 } catch (Throwable t) { 1038 log.error(sm.getString("endpoint.process.fail"), t); 1041 return false; 1042 } 1043 return true; 1044 } 1045 1046 1047 1050 protected boolean processSocket(long socket, SocketStatus status) { 1051 try { 1052 if (executor == null) { 1053 getWorkerThread().assign(socket, status); 1054 } else { 1055 executor.execute(new SocketEventProcessor(socket, status)); 1056 } 1057 } catch (Throwable t) { 1058 log.error(sm.getString("endpoint.process.fail"), t); 1061 return false; 1062 } 1063 return true; 1064 } 1065 1066 1067 1069 1070 1073 protected class Acceptor implements Runnable { 1074 1075 1076 1080 public void run() { 1081 1082 while (running) { 1084 1085 while (paused) { 1087 try { 1088 Thread.sleep(1000); 1089 } catch (InterruptedException e) { 1090 } 1092 } 1093 1094 try { 1095 long socket = Socket.accept(serverSock); 1097 if (!processSocketWithOptions(socket)) { 1099 Socket.destroy(socket); 1101 } 1102 } catch (Throwable t) { 1103 log.error(sm.getString("endpoint.accept.fail"), t); 1104 } 1105 1106 1108 } 1109 1110 } 1111 1112 } 1113 1114 1115 1117 1118 1121 public class Poller implements Runnable { 1122 1123 protected long serverPollset = 0; 1124 protected long pool = 0; 1125 protected long[] desc; 1126 1127 protected long[] addS; 1128 protected int addCount = 0; 1129 1130 protected boolean comet = true; 1131 1132 protected int keepAliveCount = 0; 1133 public int getKeepAliveCount() { return keepAliveCount; } 1134 1135 public Poller(boolean comet) { 1136 this.comet = comet; 1137 } 1138 1139 1143 protected void init() { 1144 pool = Pool.create(serverSockPool); 1145 int size = pollerSize / pollerThreadCount; 1146 int timeout = soTimeout; 1147 if (comet) { 1148 timeout = soTimeout * 50; 1151 } 1152 serverPollset = allocatePoller(size, pool, timeout); 1153 if (serverPollset == 0 && size > 1024) { 1154 size = 1024; 1155 serverPollset = allocatePoller(size, pool, timeout); 1156 } 1157 if (serverPollset == 0) { 1158 size = 62; 1159 serverPollset = allocatePoller(size, pool, timeout); 1160 } 1161 desc = new long[size * 2]; 1162 keepAliveCount = 0; 1163 addS = new long[size]; 1164 addCount = 0; 1165 } 1166 1167 1170 protected void destroy() { 1171 try { 1175 synchronized (this) { 1176 this.wait(pollTime / 1000); 1177 } 1178 } catch (InterruptedException e) { 1179 } 1181 for (int i = 0; i < addCount; i++) { 1183 if (comet) { 1184 processSocket(addS[i], SocketStatus.STOP); 1185 } else { 1186 Socket.destroy(addS[i]); 1187 } 1188 } 1189 int rv = Poll.pollset(serverPollset, desc); 1191 if (rv > 0) { 1192 for (int n = 0; n < rv; n++) { 1193 if (comet) { 1194 processSocket(desc[n*2+1], SocketStatus.STOP); 1195 } else { 1196 Socket.destroy(desc[n*2+1]); 1197 } 1198 } 1199 } 1200 Pool.destroy(pool); 1201 keepAliveCount = 0; 1202 addCount = 0; 1203 } 1204 1205 1213 public void add(long socket) { 1214 synchronized (this) { 1215 if (addCount >= addS.length) { 1218 if (comet) { 1220 processSocket(socket, SocketStatus.ERROR); 1221 } else { 1222 Socket.destroy(socket); 1223 } 1224 return; 1225 } 1226 addS[addCount] = socket; 1227 addCount++; 1228 this.notify(); 1229 } 1230 } 1231 1232 1236 public void run() { 1237 1238 long maintainTime = 0; 1239 while (running) { 1241 while (paused) { 1243 try { 1244 Thread.sleep(1000); 1245 } catch (InterruptedException e) { 1246 } 1248 } 1249 1250 while (keepAliveCount < 1 && addCount < 1) { 1251 maintainTime = 0; 1253 try { 1254 synchronized (this) { 1255 this.wait(); 1256 } 1257 } catch (InterruptedException e) { 1258 } 1260 } 1261 1262 try { 1263 if (addCount > 0) { 1265 synchronized (this) { 1266 for (int i = (addCount - 1); i >= 0; i--) { 1267 int rv = Poll.add 1268 (serverPollset, addS[i], Poll.APR_POLLIN); 1269 if (rv == Status.APR_SUCCESS) { 1270 keepAliveCount++; 1271 } else { 1272 if (comet) { 1274 processSocket(addS[i], SocketStatus.ERROR); 1275 } else { 1276 Socket.destroy(addS[i]); 1277 } 1278 } 1279 } 1280 addCount = 0; 1281 } 1282 } 1283 1284 maintainTime += pollTime; 1285 int rv = Poll.poll(serverPollset, pollTime, desc, true); 1287 if (rv > 0) { 1288 keepAliveCount -= rv; 1289 for (int n = 0; n < rv; n++) { 1290 if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) 1292 || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR) 1293 || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN))) 1294 || (!comet && (!processSocket(desc[n*2+1])))) { 1295 if (comet) { 1297 processSocket(desc[n*2+1], SocketStatus.DISCONNECT); 1298 } else { 1299 Socket.destroy(desc[n*2+1]); 1300 } 1301 continue; 1302 } 1303 } 1304 } else if (rv < 0) { 1305 int errn = -rv; 1306 1307 if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) { 1308 if (errn > Status.APR_OS_START_USERERR) { 1309 errn -= Status.APR_OS_START_USERERR; 1310 } 1311 log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn))); 1312 synchronized (this) { 1314 destroy(); 1315 init(); 1316 } 1317 continue; 1318 } 1319 } 1320 if (soTimeout > 0 && maintainTime > 1000000L && running) { 1321 rv = Poll.maintain(serverPollset, desc, true); 1322 maintainTime = 0; 1323 if (rv > 0) { 1324 keepAliveCount -= rv; 1325 for (int n = 0; n < rv; n++) { 1326 if (comet) { 1328 processSocket(desc[n], SocketStatus.TIMEOUT); 1329 } else { 1330 Socket.destroy(desc[n]); 1331 } 1332 } 1333 } 1334 } 1335 } catch (Throwable t) { 1336 log.error(sm.getString("endpoint.poll.error"), t); 1337 } 1338 1339 } 1340 1341 synchronized (this) { 1342 this.notifyAll(); 1343 } 1344 1345 } 1346 1347 } 1348 1349 1350 1352 1353 1356 protected class Worker implements Runnable { 1357 1358 1359 protected Thread thread = null; 1360 protected boolean available = false; 1361 protected long socket = 0; 1362 protected SocketStatus status = null; 1363 protected boolean options = false; 1364 1365 1366 1375 protected synchronized void assignWithOptions(long socket) { 1376 1377 while (available) { 1379 try { 1380 wait(); 1381 } catch (InterruptedException e) { 1382 } 1383 } 1384 1385 this.socket = socket; 1387 status = null; 1388 options = true; 1389 available = true; 1390 notifyAll(); 1391 1392 } 1393 1394 1395 1404 protected synchronized void assign(long socket) { 1405 1406 while (available) { 1408 try { 1409 wait(); 1410 } catch (InterruptedException e) { 1411 } 1412 } 1413 1414 this.socket = socket; 1416 status = null; 1417 options = false; 1418 available = true; 1419 notifyAll(); 1420 1421 } 1422 1423 1424 protected synchronized void assign(long socket, SocketStatus status) { 1425 1426 while (available) { 1428 try { 1429 wait(); 1430 } catch (InterruptedException e) { 1431 } 1432 } 1433 1434 this.socket = socket; 1436 this.status = status; 1437 options = false; 1438 available = true; 1439 notifyAll(); 1440 1441 } 1442 1443 1444 1448 protected synchronized long await() { 1449 1450 while (!available) { 1452 try { 1453 wait(); 1454 } catch (InterruptedException e) { 1455 } 1456 } 1457 1458 long socket = this.socket; 1460 available = false; 1461 notifyAll(); 1462 1463 return (socket); 1464 1465 } 1466 1467 1468 1472 public void run() { 1473 1474 while (running) { 1476 1477 long socket = await(); 1479 if (socket == 0) 1480 continue; 1481 1482 if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) { 1484 Socket.destroy(socket); 1486 socket = 0; 1487 } else if ((status == null) && ((options && !setSocketOptions(socket)) 1488 || handler.process(socket) == Handler.SocketState.CLOSED)) { 1489 Socket.destroy(socket); 1491 socket = 0; 1492 } 1493 1494 recycleWorkerThread(this); 1496 1497 } 1498 1499 } 1500 1501 1502 1505 public void start() { 1506 thread = new Thread (this); 1507 thread.setName(getName() + "-" + (++curThreads)); 1508 thread.setDaemon(true); 1509 thread.start(); 1510 } 1511 1512 1513 } 1514 1515 1516 1518 1519 1522 public static class SendfileData { 1523 public String fileName; 1525 public long fd; 1526 public long fdpool; 1527 public long start; 1529 public long end; 1530 public long socket; 1532 public long pos; 1534 public boolean keepAlive; 1536 } 1537 1538 1539 1541 1542 1545 public class Sendfile implements Runnable { 1546 1547 protected long sendfilePollset = 0; 1548 protected long pool = 0; 1549 protected long[] desc; 1550 protected HashMap <Long , SendfileData> sendfileData; 1551 1552 protected int sendfileCount; 1553 public int getSendfileCount() { return sendfileCount; } 1554 1555 protected ArrayList <SendfileData> addS; 1556 1557 1561 protected void init() { 1562 pool = Pool.create(serverSockPool); 1563 int size = sendfileSize / sendfileThreadCount; 1564 sendfilePollset = allocatePoller(size, pool, soTimeout); 1565 if (sendfilePollset == 0 && size > 1024) { 1566 size = 1024; 1567 sendfilePollset = allocatePoller(size, pool, soTimeout); 1568 } 1569 if (sendfilePollset == 0) { 1570 size = 62; 1571 sendfilePollset = allocatePoller(size, pool, soTimeout); 1572 } 1573 desc = new long[size * 2]; 1574 sendfileData = new HashMap <Long , SendfileData>(size); 1575 addS = new ArrayList <SendfileData>(); 1576 } 1577 1578 1581 protected void destroy() { 1582 try { 1586 synchronized (this) { 1587 this.wait(pollTime / 1000); 1588 } 1589 } catch (InterruptedException e) { 1590 } 1592 for (int i = (addS.size() - 1); i >= 0; i--) { 1594 SendfileData data = addS.get(i); 1595 Socket.destroy(data.socket); 1596 } 1597 int rv = Poll.pollset(sendfilePollset, desc); 1599 if (rv > 0) { 1600 for (int n = 0; n < rv; n++) { 1601 Socket.destroy(desc[n*2+1]); 1602 } 1603 } 1604 Pool.destroy(pool); 1605 sendfileData.clear(); 1606 } 1607 1608 1618 public boolean add(SendfileData data) { 1619 try { 1621 data.fdpool = Socket.pool(data.socket); 1622 data.fd = File.open 1623 (data.fileName, File.APR_FOPEN_READ 1624 | File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY, 1625 0, data.fdpool); 1626 data.pos = data.start; 1627 Socket.timeoutSet(data.socket, 0); 1629 while (true) { 1630 long nw = Socket.sendfilen(data.socket, data.fd, 1631 data.pos, data.end - data.pos, 0); 1632 if (nw < 0) { 1633 if (!(-nw == Status.EAGAIN)) { 1634 Socket.destroy(data.socket); 1635 data.socket = 0; 1636 return false; 1637 } else { 1638 break; 1640 } 1641 } else { 1642 data.pos = data.pos + nw; 1643 if (data.pos >= data.end) { 1644 Pool.destroy(data.fdpool); 1646 Socket.timeoutSet(data.socket, soTimeout * 1000); 1648 return true; 1649 } 1650 } 1651 } 1652 } catch (Exception e) { 1653 log.error(sm.getString("endpoint.sendfile.error"), e); 1654 return false; 1655 } 1656 synchronized (this) { 1659 addS.add(data); 1660 this.notify(); 1661 } 1662 return false; 1663 } 1664 1665 1670 protected void remove(SendfileData data) { 1671 int rv = Poll.remove(sendfilePollset, data.socket); 1672 if (rv == Status.APR_SUCCESS) { 1673 sendfileCount--; 1674 } 1675 sendfileData.remove(data); 1676 } 1677 1678 1682 public void run() { 1683 1684 while (running) { 1686 1687 while (paused) { 1689 try { 1690 Thread.sleep(1000); 1691 } catch (InterruptedException e) { 1692 } 1694 } 1695 1696 while (sendfileCount < 1 && addS.size() < 1) { 1697 try { 1698 synchronized (this) { 1699 this.wait(); 1700 } 1701 } catch (InterruptedException e) { 1702 } 1704 } 1705 1706 try { 1707 if (addS.size() > 0) { 1709 synchronized (this) { 1710 for (int i = (addS.size() - 1); i >= 0; i--) { 1711 SendfileData data = addS.get(i); 1712 int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT); 1713 if (rv == Status.APR_SUCCESS) { 1714 sendfileData.put(new Long (data.socket), data); 1715 sendfileCount++; 1716 } else { 1717 log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv))); 1718 Socket.destroy(data.socket); 1720 } 1721 } 1722 addS.clear(); 1723 } 1724 } 1725 int rv = Poll.poll(sendfilePollset, pollTime, desc, false); 1727 if (rv > 0) { 1728 for (int n = 0; n < rv; n++) { 1729 SendfileData state = 1731 sendfileData.get(new Long (desc[n*2+1])); 1732 if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) 1734 || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) { 1735 remove(state); 1737 Socket.destroy(state.socket); 1740 continue; 1741 } 1742 long nw = Socket.sendfilen(state.socket, state.fd, 1744 state.pos, 1745 state.end - state.pos, 0); 1746 if (nw < 0) { 1747 remove(state); 1749 Socket.destroy(state.socket); 1752 continue; 1753 } 1754 1755 state.pos = state.pos + nw; 1756 if (state.pos >= state.end) { 1757 remove(state); 1758 if (state.keepAlive) { 1759 Pool.destroy(state.fdpool); 1761 Socket.timeoutSet(state.socket, soTimeout * 1000); 1762 if (!processSocket(state.socket)) { 1765 Socket.destroy(state.socket); 1766 } 1767 } else { 1768 Socket.destroy(state.socket); 1771 } 1772 } 1773 } 1774 } else if (rv < 0) { 1775 int errn = -rv; 1776 1777 if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) { 1778 if (errn > Status.APR_OS_START_USERERR) { 1779 errn -= Status.APR_OS_START_USERERR; 1780 } 1781 log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn))); 1782 synchronized (this) { 1784 destroy(); 1785 init(); 1786 } 1787 continue; 1788 } 1789 } 1790 1791 } catch (Throwable t) { 1792 log.error(sm.getString("endpoint.poll.error"), t); 1793 } 1794 } 1795 1796 synchronized (this) { 1797 this.notifyAll(); 1798 } 1799 1800 } 1801 1802 } 1803 1804 1805 1807 1808 1813 public interface Handler { 1814 public enum SocketState { 1815 OPEN, CLOSED, LONG 1816 } 1817 public SocketState process(long socket); 1818 public SocketState event(long socket, SocketStatus status); 1819 } 1820 1821 1822 1824 1825 public class WorkerStack { 1826 1827 protected Worker[] workers = null; 1828 protected int end = 0; 1829 1830 public WorkerStack(int size) { 1831 workers = new Worker[size]; 1832 } 1833 1834 1839 public void push(Worker worker) { 1840 workers[end++] = worker; 1841 } 1842 1843 1847 public Worker pop() { 1848 if (end > 0) { 1849 return workers[--end]; 1850 } 1851 return null; 1852 } 1853 1854 1858 public Worker peek() { 1859 return workers[end]; 1860 } 1861 1862 1865 public boolean isEmpty() { 1866 return (end == 0); 1867 } 1868 1869 1872 public int size() { 1873 return (end); 1874 } 1875 } 1876 1877 1878 1880 1881 1886 protected class SocketWithOptionsProcessor implements Runnable { 1887 1888 protected long socket = 0; 1889 1890 public SocketWithOptionsProcessor(long socket) { 1891 this.socket = socket; 1892 } 1893 1894 public void run() { 1895 1896 if (!setSocketOptions(socket) 1898 || handler.process(socket) == Handler.SocketState.CLOSED) { 1899 Socket.destroy(socket); 1901 socket = 0; 1902 } 1903 1904 } 1905 1906 } 1907 1908 1909 1911 1912 1916 protected class SocketProcessor implements Runnable { 1917 1918 protected long socket = 0; 1919 1920 public SocketProcessor(long socket) { 1921 this.socket = socket; 1922 } 1923 1924 public void run() { 1925 1926 if (handler.process(socket) == Handler.SocketState.CLOSED) { 1928 Socket.destroy(socket); 1930 socket = 0; 1931 } 1932 1933 } 1934 1935 } 1936 1937 1938 1940 1941 1945 protected class SocketEventProcessor implements Runnable { 1946 1947 protected long socket = 0; 1948 protected SocketStatus status = null; 1949 1950 public SocketEventProcessor(long socket, SocketStatus status) { 1951 this.socket = socket; 1952 this.status = status; 1953 } 1954 1955 public void run() { 1956 1957 if (handler.event(socket, status) == Handler.SocketState.CLOSED) { 1959 Socket.destroy(socket); 1961 socket = 0; 1962 } 1963 1964 } 1965 1966 } 1967 1968 1969} 1970 | Popular Tags |