1 17 18 package org.apache.tomcat.util.net; 19 20 import java.io.FileInputStream ; 21 import java.io.IOException ; 22 import java.net.InetAddress ; 23 import java.net.InetSocketAddress ; 24 import java.net.Socket ; 25 import java.nio.ByteBuffer ; 26 import java.nio.channels.CancelledKeyException ; 27 import java.nio.channels.SelectionKey ; 28 import java.nio.channels.Selector ; 29 import java.nio.channels.ServerSocketChannel ; 30 import java.nio.channels.SocketChannel ; 31 import java.security.KeyStore ; 32 import java.util.HashMap ; 33 import java.util.Iterator ; 34 import java.util.Set ; 35 import java.util.StringTokenizer ; 36 import java.util.concurrent.ConcurrentLinkedQueue ; 37 import java.util.concurrent.Executor ; 38 import java.util.concurrent.atomic.AtomicLong ; 39 import javax.net.ssl.KeyManagerFactory; 40 import javax.net.ssl.SSLContext; 41 import javax.net.ssl.SSLEngine; 42 import javax.net.ssl.TrustManagerFactory; 43 44 import org.apache.commons.logging.Log; 45 import org.apache.commons.logging.LogFactory; 46 import org.apache.tomcat.util.IntrospectionUtils; 47 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; 48 import org.apache.tomcat.util.res.StringManager; 49 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; 50 import java.util.concurrent.atomic.AtomicInteger ; 51 52 67 public class NioEndpoint { 68 69 70 72 73 protected static Log log = LogFactory.getLog(NioEndpoint.class); 74 75 protected static StringManager sm = 76 StringManager.getManager("org.apache.tomcat.util.net.res"); 77 78 79 82 public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; 83 84 87 public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; 88 89 92 public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; 93 94 98 public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; 99 100 public static final int OP_REGISTER = -1; 103 104 107 protected WorkerStack workers = null; 108 109 110 113 protected volatile boolean running = false; 114 115 116 119 protected volatile boolean paused = false; 120 121 122 125 protected boolean initialized = false; 126 127 128 131 protected int curThreadsBusy = 0; 132 133 134 137 protected int curThreads = 0; 138 139 140 143 protected int sequence = 0; 144 145 protected NioSelectorPool selectorPool = new NioSelectorPool(); 146 147 150 protected ServerSocketChannel serverSock = null; 151 152 155 protected ConcurrentLinkedQueue <KeyAttachment> keyCache = new ConcurrentLinkedQueue <KeyAttachment>(); 156 157 160 protected ConcurrentLinkedQueue <PollerEvent> eventCache = new ConcurrentLinkedQueue <PollerEvent>(); 161 162 165 protected ConcurrentLinkedQueue <NioChannel> nioChannels = new ConcurrentLinkedQueue <NioChannel>() { 166 protected AtomicInteger size = new AtomicInteger (0); 167 protected AtomicInteger bytes = new AtomicInteger (0); 168 public boolean offer(NioChannel socket, KeyAttachment att) { 169 boolean offer = socketProperties.getBufferPool()==-1?true:size.get()<socketProperties.getBufferPool(); 170 offer = offer && (socketProperties.getBufferPoolSize()==-1?true:(bytes.get()+socket.getBufferSize())<socketProperties.getBufferPoolSize()); 171 if ( running && (!paused) && (offer) ) { 173 boolean result = super.offer(socket); 174 if ( result ) { 175 size.incrementAndGet(); 176 bytes.addAndGet(socket.getBufferSize()); 177 } 178 return result; 179 } 180 else return false; 181 } 182 183 public NioChannel poll() { 184 NioChannel result = super.poll(); 185 if ( result != null ) { 186 size.decrementAndGet(); 187 bytes.addAndGet(-result.getBufferSize()); 188 } 189 return result; 190 } 191 192 public void clear() { 193 super.clear(); 194 size.set(0); 195 } 196 }; 197 198 199 200 202 203 206 protected Executor executor = null; 207 public void setExecutor(Executor executor) { this.executor = executor; } 208 public Executor getExecutor() { return executor; } 209 210 211 214 protected int maxThreads = 400; 215 public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } 216 public int getMaxThreads() { return maxThreads; } 217 218 219 222 protected int threadPriority = Thread.NORM_PRIORITY; 223 public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } 224 public int getThreadPriority() { return threadPriority; } 225 226 227 230 protected int port; 231 public int getPort() { return port; } 232 public void setPort(int port ) { this.port=port; } 233 234 235 238 protected InetAddress address; 239 public InetAddress getAddress() { return address; } 240 public void setAddress(InetAddress address) { this.address = address; } 241 242 243 246 protected Handler handler = null; 247 public void setHandler(Handler handler ) { this.handler = handler; } 248 public Handler getHandler() { return handler; } 249 250 251 256 protected int backlog = 100; 257 public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } 258 public int getBacklog() { return backlog; } 259 260 protected SocketProperties socketProperties = new SocketProperties(); 261 262 265 public boolean getTcpNoDelay() { return socketProperties.getTcpNoDelay();} 266 public void setTcpNoDelay(boolean tcpNoDelay) { socketProperties.setTcpNoDelay(tcpNoDelay); } 267 268 269 272 public int getSoLinger() { return socketProperties.getSoLingerTime(); } 273 public void setSoLinger(int soLinger) { 274 socketProperties.setSoLingerTime(soLinger); 275 socketProperties.setSoLingerOn(soLinger>=0); 276 } 277 278 279 282 public int getSoTimeout() { return socketProperties.getSoTimeout(); } 283 public void setSoTimeout(int soTimeout) { socketProperties.setSoTimeout(soTimeout); } 284 285 286 289 protected int firstReadTimeout = 60000; 290 public int getFirstReadTimeout() { return firstReadTimeout; } 291 public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; } 292 293 294 299 protected boolean daemon = true; 300 public void setDaemon(boolean b) { daemon = b; } 301 public boolean getDaemon() { return daemon; } 302 303 304 307 protected String name = "TP"; 308 public void setName(String name) { this.name = name; } 309 public String getName() { return name; } 310 311 312 313 316 protected boolean useComet = true; 317 public void setUseComet(boolean useComet) { this.useComet = useComet; } 318 public boolean getUseComet() { return useComet; } 319 320 321 324 protected int acceptorThreadCount = 0; 325 public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } 326 public int getAcceptorThreadCount() { return acceptorThreadCount; } 327 328 329 330 333 protected int pollerThreadCount = 0; 334 public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } 335 public int getPollerThreadCount() { return pollerThreadCount; } 336 337 protected long selectorTimeout = 1000; 338 public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;} 339 public long getSelectorTimeout(){ return this.selectorTimeout; } 340 343 protected Poller[] pollers = null; 344 protected int pollerRoundRobin = 0; 345 public Poller getPoller0() { 346 pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; 347 Poller poller = pollers[pollerRoundRobin]; 348 return poller; 349 } 350 351 352 355 public Poller getCometPoller0() { 356 Poller poller = getPoller0(); 357 return poller; 358 } 359 360 361 364 public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); } 365 366 367 370 public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); } 371 372 375 public void setProperty(String name, String value) { 376 final String selectorPoolName = "selectorPool."; 377 final String socketName = "socket."; 378 try { 379 if (name.startsWith(selectorPoolName)) { 380 IntrospectionUtils.setProperty(selectorPool, name.substring(selectorPoolName.length()), value); 381 } else if (name.startsWith(socketName)) { 382 IntrospectionUtils.setProperty(socketProperties, name.substring(socketName.length()), value); 383 } 384 }catch ( Exception x ) { 385 log.error("Unable to set attribute \""+name+"\" to \""+value+"\"",x); 386 } 387 } 388 389 390 protected String keystoreFile = System.getProperty("user.home")+"/.keystore"; 392 public String getKeystoreFile() { return keystoreFile;} 393 public void setKeystoreFile(String s ) { this.keystoreFile = s; } 394 public void setKeystore(String s ) { setKeystoreFile(s);} 395 public String getKeystore() { return getKeystoreFile();} 396 397 protected String algorithm = "SunX509"; 398 public String getAlgorithm() { return algorithm;} 399 public void setAlgorithm(String s ) { this.algorithm = s;} 400 401 protected boolean clientAuth = false; 402 public boolean getClientAuth() { return clientAuth;} 403 public void setClientAuth(boolean b ) { this.clientAuth = b;} 404 405 protected String keystorePass = "changeit"; 406 public String getKeystorePass() { return keystorePass;} 407 public void setKeystorePass(String s ) { this.keystorePass = s;} 408 409 protected String keystoreType = "JKS"; 410 public String getKeystoreType() { return keystoreType;} 411 public void setKeystoreType(String s ) { this.keystoreType = s;} 412 413 protected String sslProtocol = "TLS"; 414 415 public String getSslProtocol() { return sslProtocol;} 416 public void setSslProtocol(String s) { sslProtocol = s;} 417 418 protected String sslEnabledProtocols=null; protected String [] sslEnabledProtocolsarr = new String [0]; 420 public void setSslEnabledProtocols(String s) { 421 this.sslEnabledProtocols = s; 422 StringTokenizer t = new StringTokenizer (s,","); 423 sslEnabledProtocolsarr = new String [t.countTokens()]; 424 for (int i=0; i<sslEnabledProtocolsarr.length; i++ ) sslEnabledProtocolsarr[i] = t.nextToken(); 425 } 426 427 428 protected String ciphers = null; 429 protected String [] ciphersarr = new String [0]; 430 public String getCiphers() { return ciphers;} 431 public void setCiphers(String s) { 432 ciphers = s; 433 if ( s == null ) ciphersarr = new String [0]; 434 else { 435 StringTokenizer t = new StringTokenizer (s,","); 436 ciphersarr = new String [t.countTokens()]; 437 for (int i=0; i<ciphersarr.length; i++ ) ciphersarr[i] = t.nextToken(); 438 } 439 } 440 441 444 protected boolean SSLEnabled = false; 445 public boolean isSSLEnabled() { return SSLEnabled;} 446 public void setSSLEnabled(boolean SSLEnabled) {this.SSLEnabled = SSLEnabled;} 447 448 protected boolean secure = false; 449 public boolean getSecure() { return secure;} 450 public void setSecure(boolean b) { secure = b;} 451 452 public void setSelectorPool(NioSelectorPool selectorPool) { 453 this.selectorPool = selectorPool; 454 } 455 456 public void setSocketProperties(SocketProperties socketProperties) { 457 this.socketProperties = socketProperties; 458 } 459 460 protected SSLContext sslContext = null; 461 public SSLContext getSSLContext() { return sslContext;} 462 public void setSSLContext(SSLContext c) { sslContext = c;} 463 464 466 467 470 public int getKeepAliveCount() { 471 if (pollers == null) { 472 return 0; 473 } else { 474 int keepAliveCount = 0; 475 for (int i = 0; i < pollers.length; i++) { 476 keepAliveCount += pollers[i].getKeepAliveCount(); 477 } 478 return keepAliveCount; 479 } 480 } 481 482 483 484 489 public int getCurrentThreadCount() { 490 return curThreads; 491 } 492 493 494 499 public int getCurrentThreadsBusy() { 500 return curThreadsBusy; 501 } 502 503 504 509 public boolean isRunning() { 510 return running; 511 } 512 513 514 519 public boolean isPaused() { 520 return paused; 521 } 522 523 524 526 527 530 public void init() 531 throws Exception { 532 533 if (initialized) 534 return; 535 536 serverSock = ServerSocketChannel.open(); 537 InetSocketAddress addr = (address!=null?new InetSocketAddress (address,port):new InetSocketAddress (port)); 538 serverSock.socket().bind(addr,100); serverSock.configureBlocking(true); 541 if (acceptorThreadCount == 0) { 543 acceptorThreadCount = 1; 545 } 546 if (pollerThreadCount <= 0) { 547 pollerThreadCount = 1; 549 } 550 551 if (isSSLEnabled()) { 553 char[] passphrase = getKeystorePass().toCharArray(); 555 556 KeyStore ks = KeyStore.getInstance(getKeystoreType()); 557 ks.load(new FileInputStream (getKeystoreFile()), passphrase); 558 KeyStore ts = KeyStore.getInstance(getKeystoreType()); 559 ts.load(new FileInputStream (getKeystoreFile()), passphrase); 560 561 KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm()); 562 kmf.init(ks, passphrase); 563 564 TrustManagerFactory tmf = TrustManagerFactory.getInstance(getAlgorithm()); 565 tmf.init(ts); 566 567 sslContext = SSLContext.getInstance(getSslProtocol()); 568 sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); 569 570 } 571 572 initialized = true; 573 574 } 575 576 577 580 public void start() 581 throws Exception { 582 if (!initialized) { 584 init(); 585 } 586 if (!running) { 587 running = true; 588 paused = false; 589 590 if (executor == null) { 592 workers = new WorkerStack(maxThreads); 593 } 595 596 for (int i = 0; i < acceptorThreadCount; i++) { 598 Thread acceptorThread = new Thread (new Acceptor(), getName() + "-Acceptor-" + i); 599 acceptorThread.setPriority(threadPriority); 600 acceptorThread.setDaemon(daemon); 601 acceptorThread.start(); 602 } 603 604 pollers = new Poller[pollerThreadCount]; 606 for (int i = 0; i < pollerThreadCount; i++) { 607 pollers[i] = new Poller(); 608 pollers[i].init(); 609 Thread pollerThread = new Thread (pollers[i], getName() + "-Poller-" + i); 610 pollerThread.setPriority(threadPriority); 611 pollerThread.setDaemon(true); 612 pollerThread.start(); 613 } 614 } 615 } 616 617 618 621 public void pause() { 622 if (running && !paused) { 623 paused = true; 624 unlockAccept(); 625 } 626 } 627 628 629 633 public void resume() { 634 if (running) { 635 paused = false; 636 } 637 } 638 639 640 643 public void stop() { 644 if (running) { 645 running = false; 646 unlockAccept(); 647 for (int i = 0; i < pollers.length; i++) { 648 pollers[i].destroy(); 649 } 650 pollers = null; 651 } 652 eventCache.clear(); 653 keyCache.clear(); 654 nioChannels.clear(); 655 } 656 657 658 661 public void destroy() throws Exception { 662 if (running) { 663 stop(); 664 } 665 serverSock.socket().close(); 667 serverSock.close(); 668 serverSock = null; 669 sslContext = null; 670 initialized = false; 671 nioChannels.clear(); 672 } 673 674 675 677 678 681 protected int getSequence() { 682 return sequence++; 683 } 684 685 public int getWriteBufSize() { 686 return socketProperties.getTxBufSize(); 687 } 688 689 public int getReadBufSize() { 690 return socketProperties.getRxBufSize(); 691 } 692 693 public NioSelectorPool getSelectorPool() { 694 return selectorPool; 695 } 696 697 public SocketProperties getSocketProperties() { 698 return socketProperties; 699 } 700 701 704 protected void unlockAccept() { 705 java.net.Socket s = null; 706 try { 707 if (address == null) { 709 s = new java.net.Socket ("127.0.0.1", port); 710 } else { 711 s = new java.net.Socket (address, port); 712 s.setSoLinger(true, 0); 715 } 716 } catch(Exception e) { 717 if (log.isDebugEnabled()) { 718 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); 719 } 720 } finally { 721 if (s != null) { 722 try { 723 s.close(); 724 } catch (Exception e) { 725 } 727 } 728 } 729 } 730 731 732 735 protected boolean setSocketOptions(SocketChannel socket) { 736 int step = 1; 738 try { 739 socket.configureBlocking(false); 741 Socket sock = socket.socket(); 742 socketProperties.setProperties(sock); 743 744 NioChannel channel = nioChannels.poll(); 745 if ( channel == null ) { 746 step = 2; 748 749 if (sslContext != null) { 750 SSLEngine engine = createSSLEngine(); 751 int appbufsize = engine.getSession().getApplicationBufferSize(); 752 NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,getReadBufSize()), 753 Math.max(appbufsize,getWriteBufSize()), 754 socketProperties.getDirectBuffer()); 755 channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool); 756 } else { 757 NioBufferHandler bufhandler = new NioBufferHandler(getReadBufSize(), 758 getWriteBufSize(), 759 socketProperties.getDirectBuffer()); 760 761 channel = new NioChannel(socket, bufhandler); 762 } 763 } else { 764 765 channel.setIOChannel(socket); 766 if ( channel instanceof SecureNioChannel ) { 767 SSLEngine engine = createSSLEngine(); 768 ((SecureNioChannel)channel).reset(engine); 769 } else { 770 channel.reset(); 771 } 772 } 773 getPoller0().register(channel); 774 775 } catch (Throwable t) { 776 try { 777 log.error("",t); 778 }catch ( Throwable tt){} 779 return false; 781 } 782 return true; 783 } 784 785 protected SSLEngine createSSLEngine() { 786 SSLEngine engine = sslContext.createSSLEngine(); 787 engine.setNeedClientAuth(getClientAuth()); 788 engine.setUseClientMode(false); 789 if ( ciphersarr.length > 0 ) engine.setEnabledCipherSuites(ciphersarr); 790 if ( sslEnabledProtocolsarr.length > 0 ) engine.setEnabledProtocols(sslEnabledProtocolsarr); 791 792 return engine; 793 } 794 795 796 802 protected Worker createWorkerThread() { 803 804 synchronized (workers) { 805 if (workers.size() > 0) { 806 curThreadsBusy++; 807 return (workers.pop()); 808 } 809 if ((maxThreads > 0) && (curThreads < maxThreads)) { 810 curThreadsBusy++; 811 return (newWorkerThread()); 812 } else { 813 if (maxThreads < 0) { 814 curThreadsBusy++; 815 return (newWorkerThread()); 816 } else { 817 return (null); 818 } 819 } 820 } 821 822 } 823 824 825 829 protected Worker newWorkerThread() { 830 831 Worker workerThread = new Worker(); 832 workerThread.start(); 833 return (workerThread); 834 835 } 836 837 838 841 protected Worker getWorkerThread() { 842 Worker workerThread = createWorkerThread(); 844 while (workerThread == null) { 845 try { 846 synchronized (workers) { 847 workerThread = createWorkerThread(); 848 if ( workerThread == null ) workers.wait(); 849 } 850 } catch (InterruptedException e) { 851 } 853 if ( workerThread == null ) workerThread = createWorkerThread(); 854 } 855 return workerThread; 856 } 857 858 859 864 protected void recycleWorkerThread(Worker workerThread) { 865 synchronized (workers) { 866 workers.push(workerThread); 867 curThreadsBusy--; 868 workers.notify(); 869 } 870 } 871 872 873 protected boolean processSocket(SocketChannel socket) { 874 try { 875 if (executor == null) { 876 getWorkerThread().assign(socket); 877 } else { 878 executor.execute(new SocketOptionsProcessor(socket)); 879 } 880 } catch (Throwable t) { 881 log.error(sm.getString("endpoint.process.fail"), t); 884 return false; 885 } 886 return true; 887 } 888 891 protected boolean processSocket(NioChannel socket) { 892 try { 893 if (executor == null) { 894 getWorkerThread().assign(socket); 895 } else { 896 executor.execute(new SocketProcessor(socket)); 897 } 898 } catch (Throwable t) { 899 log.error(sm.getString("endpoint.process.fail"), t); 902 return false; 903 } 904 return true; 905 } 906 907 908 911 protected boolean processSocket(NioChannel socket, SocketStatus status) { 912 try { 913 if (executor == null) { 914 getWorkerThread().assign(socket, status); 915 } else { 916 executor.execute(new SocketEventProcessor(socket, status)); 917 } 918 } catch (Throwable t) { 919 log.error(sm.getString("endpoint.process.fail"), t); 922 return false; 923 } 924 return true; 925 } 926 927 928 930 931 934 protected class Acceptor implements Runnable { 935 936 937 941 public void run() { 942 943 while (running) { 945 946 while (paused) { 948 try { 949 Thread.sleep(1000); 950 } catch (InterruptedException e) { 951 } 953 } 954 955 try { 956 SocketChannel socket = serverSock.accept(); 958 if ( running && (!paused) && socket != null ) processSocket(socket); 960 } catch (Throwable t) { 961 log.error(sm.getString("endpoint.accept.fail"), t); 962 } 963 964 966 } 967 968 } 969 970 } 971 972 973 975 979 public class PollerEvent implements Runnable { 980 981 protected NioChannel socket; 982 protected int interestOps; 983 protected KeyAttachment key; 984 public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) { 985 reset(ch, k, intOps); 986 } 987 988 public void reset(NioChannel ch, KeyAttachment k, int intOps) { 989 socket = ch; 990 interestOps = intOps; 991 key = k; 992 } 993 994 public void reset() { 995 reset(null, null, 0); 996 } 997 998 public void run() { 999 if ( interestOps == OP_REGISTER ) { 1000 try { 1001 socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key); 1002 } catch (Exception x) { 1003 log.error("", x); 1004 } 1005 } else { 1006 final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); 1007 final KeyAttachment att = (KeyAttachment) key.attachment(); 1008 try { 1009 if (key != null) { 1010 key.interestOps(interestOps); 1011 att.interestOps(interestOps); 1012 } 1013 } 1014 catch (CancelledKeyException ckx) { 1015 try { 1016 if (key != null && key.attachment() != null) { 1017 KeyAttachment ka = (KeyAttachment) key.attachment(); 1018 ka.setError(true); } 1020 try { 1021 socket.close(); 1022 } 1023 catch (Exception ignore) {} 1024 if (socket.isOpen()) 1025 socket.close(true); 1026 } 1027 catch (Exception ignore) {} 1028 } 1029 } } 1032 public String toString() { 1033 return super.toString()+"[intOps="+this.interestOps+"]"; 1034 } 1035 } 1036 1039 public class Poller implements Runnable { 1040 1041 protected Selector selector; 1042 protected ConcurrentLinkedQueue <Runnable > events = new ConcurrentLinkedQueue <Runnable >(); 1043 1044 protected boolean close = false; 1045 protected long nextExpiration = 0; 1047 protected int keepAliveCount = 0; 1048 public int getKeepAliveCount() { return keepAliveCount; } 1049 1050 protected AtomicLong wakeupCounter = new AtomicLong (0l); 1051 1052 1053 1054 public Poller() throws IOException { 1055 this.selector = Selector.open(); 1056 } 1057 1058 public Selector getSelector() { return selector;} 1059 1060 1064 protected void init() { 1065 keepAliveCount = 0; 1066 } 1067 1068 1071 protected void destroy() { 1072 close = true; 1076 events.clear(); 1077 selector.wakeup(); 1078 } 1079 1080 public void addEvent(Runnable event) { 1081 events.offer(event); 1082 if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup(); 1083 } 1084 1085 1093 public void add(final NioChannel socket) { 1094 add(socket,SelectionKey.OP_READ); 1095 } 1096 1097 public void add(final NioChannel socket, final int interestOps) { 1098 PollerEvent r = eventCache.poll(); 1099 if ( r==null) r = new PollerEvent(socket,null,interestOps); 1100 else r.reset(socket,null,interestOps); 1101 addEvent(r); 1102 } 1103 1104 public boolean events() { 1105 boolean result = false; 1106 Runnable r = null; 1108 result = (events.size() > 0); 1109 while ( (r = (Runnable )events.poll()) != null ) { 1110 try { 1111 r.run(); 1112 if ( r instanceof PollerEvent ) { 1113 ((PollerEvent)r).reset(); 1114 eventCache.offer((PollerEvent)r); 1115 } 1116 } catch ( Exception x ) { 1117 log.error("",x); 1118 } 1119 } 1120 return result; 1123 } 1124 1125 public void register(final NioChannel socket) 1126 { 1127 socket.setPoller(this); 1128 KeyAttachment key = keyCache.poll(); 1129 final KeyAttachment ka = key!=null?key:new KeyAttachment(); 1130 ka.reset(this,socket); 1131 PollerEvent r = eventCache.poll(); 1132 ka.interestOps(SelectionKey.OP_READ); if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); 1134 else r.reset(socket,ka,OP_REGISTER); 1135 addEvent(r); 1136 } 1137 1138 public void cancelledKey(SelectionKey key, SocketStatus status) { 1139 try { 1140 KeyAttachment ka = (KeyAttachment) key.attachment(); 1141 if (ka != null && ka.getComet()) { 1142 processSocket(ka.getChannel(), status); 1144 }else { 1145 if (key.isValid()) key.cancel(); 1146 if (key.channel().isOpen()) key.channel().close(); 1147 key.attach(null); 1148 } 1149 } catch (Throwable e) { 1150 if ( log.isDebugEnabled() ) log.error("",e); 1151 } 1153 } 1154 1158 public void run() { 1159 while (running) { 1161 while (paused) { 1163 try { 1164 Thread.sleep(1000); 1165 } catch (InterruptedException e) { 1166 } 1168 } 1169 boolean hasEvents = false; 1170 1171 hasEvents = (hasEvents | events()); 1172 if (close) return; 1174 1175 int keyCount = 0; 1176 try { 1177 keyCount = selector.select(selectorTimeout); 1178 wakeupCounter.set(0); 1179 if ( close ) { selector.close(); return; } 1180 } catch ( NullPointerException x ) { 1181 if ( wakeupCounter == null || selector == null ) throw x; 1183 continue; 1184 } catch ( CancelledKeyException x ) { 1185 if ( wakeupCounter == null || selector == null ) throw x; 1187 continue; 1188 } catch (Throwable x) { 1189 log.error("",x); 1190 continue; 1191 } 1192 if ( keyCount == 0 ) hasEvents = (hasEvents | events()); 1194 1195 Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; 1196 while (iterator != null && iterator.hasNext()) { 1199 SelectionKey sk = (SelectionKey ) iterator.next(); 1200 iterator.remove(); 1201 KeyAttachment attachment = (KeyAttachment)sk.attachment(); 1202 try { 1203 if ( sk.isValid() && attachment != null ) { 1204 attachment.access(); 1205 sk.attach(attachment); 1206 sk.interestOps(0); attachment.interestOps(0); 1208 NioChannel channel = attachment.getChannel(); 1209 if (sk.isReadable() || sk.isWritable() ) { 1210 if ( attachment.getComet() ) { 1211 if (!processSocket(channel, SocketStatus.OPEN)) 1212 processSocket(channel, SocketStatus.DISCONNECT); 1213 } else { 1214 boolean close = (!processSocket(channel)); 1215 if ( close ) { 1216 channel.close(); 1217 channel.getIOChannel().socket().close(); 1218 } 1219 } 1220 } 1221 } else { 1222 cancelledKey(sk, SocketStatus.ERROR); 1224 } 1225 } catch ( CancelledKeyException ckx ) { 1226 cancelledKey(sk, SocketStatus.ERROR); 1227 } catch (Throwable t) { 1228 log.error("",t); 1229 } 1230 } timeout(keyCount,hasEvents); 1233 } synchronized (this) { 1235 this.notifyAll(); 1236 } 1237 1238 } 1239 protected void timeout(int keyCount, boolean hasEvents) { 1240 long now = System.currentTimeMillis(); 1241 if ( (now < nextExpiration) && (keyCount>0 || hasEvents) ) return; 1244 nextExpiration = now + (long)socketProperties.getSoTimeout(); 1245 Set <SelectionKey > keys = selector.keys(); 1247 for (Iterator <SelectionKey > iter = keys.iterator(); iter.hasNext(); ) { 1248 SelectionKey key = iter.next(); 1249 try { 1250 KeyAttachment ka = (KeyAttachment) key.attachment(); 1251 if ( ka == null ) { 1252 cancelledKey(key, SocketStatus.ERROR); } else if ( ka.getError() ) { 1254 cancelledKey(key, SocketStatus.DISCONNECT); 1255 }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { 1256 long delta = now - ka.getLastAccess(); 1258 long timeout = (ka.getTimeout()==-1)?((long) socketProperties.getSoTimeout()):(ka.getTimeout()); 1259 boolean isTimedout = delta > timeout; 1260 if (isTimedout) { 1261 key.interestOps(0); 1262 ka.interestOps(0); cancelledKey(key, SocketStatus.TIMEOUT); 1264 } else { 1265 long nextTime = now+(timeout-delta); 1266 nextExpiration = (nextTime < nextExpiration)?nextTime:nextExpiration; 1267 } 1268 } }catch ( CancelledKeyException ckx ) { 1270 cancelledKey(key, SocketStatus.ERROR); 1271 } 1272 } } 1274 } 1275 1276 public static class KeyAttachment { 1277 1278 public KeyAttachment() { 1279 1280 } 1281 public void reset(Poller poller, NioChannel channel) { 1282 this.channel = channel; 1283 this.poller = poller; 1284 lastAccess = System.currentTimeMillis(); 1285 currentAccess = false; 1286 comet = false; 1287 timeout = -1; 1288 error = false; 1289 } 1290 1291 public void reset() { 1292 reset(null,null); 1293 } 1294 1295 public Poller getPoller() { return poller;} 1296 public void setPoller(Poller poller){this.poller = poller;} 1297 public long getLastAccess() { return lastAccess; } 1298 public void access() { access(System.currentTimeMillis()); } 1299 public void access(long access) { lastAccess = access; } 1300 public void setComet(boolean comet) { this.comet = comet; } 1301 public boolean getComet() { return comet; } 1302 public boolean getCurrentAccess() { return currentAccess; } 1303 public void setCurrentAccess(boolean access) { currentAccess = access; } 1304 public Object getMutex() {return mutex;} 1305 public void setTimeout(long timeout) {this.timeout = timeout;} 1306 public long getTimeout() {return this.timeout;} 1307 public boolean getError() { return error; } 1308 public void setError(boolean error) { this.error = error; } 1309 public NioChannel getChannel() { return channel;} 1310 public void setChannel(NioChannel channel) { this.channel = channel;} 1311 protected Poller poller = null; 1312 protected int interestOps = 0; 1313 public int interestOps() { return interestOps;} 1314 public int interestOps(int ops) { this.interestOps = ops; return ops; } 1315 protected Object mutex = new Object (); 1316 protected long lastAccess = -1; 1317 protected boolean currentAccess = false; 1318 protected boolean comet = false; 1319 protected long timeout = -1; 1320 protected boolean error = false; 1321 protected NioChannel channel = null; 1322 1323 } 1324 1325 1326 1327 1329 1330 1333 protected class Worker implements Runnable { 1334 1335 1336 protected Thread thread = null; 1337 protected boolean available = false; 1338 protected Object socket = null; 1339 protected SocketStatus status = null; 1340 1341 1342 1351 protected synchronized void assign(Object socket) { 1352 1353 while (available) { 1355 try { 1356 wait(); 1357 } catch (InterruptedException e) { 1358 } 1359 } 1360 this.socket = socket; 1362 status = null; 1363 available = true; 1364 notifyAll(); 1365 1366 } 1367 1368 1369 protected synchronized void assign(Object socket, SocketStatus status) { 1370 1371 while (available) { 1373 try { 1374 wait(); 1375 } catch (InterruptedException e) { 1376 } 1377 } 1378 1379 this.socket = socket; 1381 this.status = status; 1382 available = true; 1383 notifyAll(); 1384 } 1385 1386 1387 1391 protected synchronized Object await() { 1392 1393 while (!available) { 1395 try { 1396 wait(); 1397 } catch (InterruptedException e) { 1398 } 1399 } 1400 1401 Object socket = this.socket; 1403 available = false; 1404 notifyAll(); 1405 1406 return (socket); 1407 1408 } 1409 1410 1411 1415 public void run() { 1416 1417 while (running) { 1419 try { 1420 Object channel = await(); 1422 if (channel == null) 1423 continue; 1424 1425 if ( channel instanceof SocketChannel ) { 1426 SocketChannel sc = (SocketChannel )channel; 1427 if ( !setSocketOptions(sc) ) { 1428 try { 1429 sc.socket().close(); 1430 sc.close(); 1431 }catch ( IOException ix ) { 1432 if ( log.isDebugEnabled() ) log.debug("",ix); 1433 } 1434 } else { 1435 1437 } 1438 } else { 1439 1440 NioChannel socket = (NioChannel)channel; 1441 1442 SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); 1443 int handshake = -1; 1444 try { 1445 handshake = socket.handshake(key.isReadable(), key.isWritable()); 1446 }catch ( IOException x ) { 1447 handshake = -1; 1448 if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x); 1449 }catch ( CancelledKeyException ckx ) { 1450 handshake = -1; 1451 } 1452 if ( handshake == 0 ) { 1453 if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) { 1455 try { 1457 KeyAttachment att = (KeyAttachment)socket.getAttachment(true); 1458 try {socket.close();}catch (Exception ignore){} 1459 if ( socket.isOpen() ) socket.close(true); 1460 key.cancel(); 1461 key.attach(null); 1462 nioChannels.offer(socket); 1463 if ( att!=null ) keyCache.offer(att); 1464 }catch ( Exception x ) { 1465 log.error("",x); 1466 } 1467 } else if ((status == null) && (handler.process(socket) == Handler.SocketState.CLOSED)) { 1468 try { 1470 KeyAttachment att = (KeyAttachment)socket.getAttachment(true); 1471 try {socket.close();}catch (Exception ignore){} 1472 if ( socket.isOpen() ) socket.close(true); 1473 key.cancel(); 1474 key.attach(null); 1475 nioChannels.offer(socket); 1476 if ( att!=null ) keyCache.offer(att); 1477 }catch ( Exception x ) { 1478 log.error("",x); 1479 } 1480 } 1481 } else if (handshake == -1 ) { 1482 socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT); 1483 try {socket.close(true);}catch (IOException ignore){} 1484 nioChannels.offer(socket); 1485 } else { 1486 final SelectionKey fk = key; 1487 final int intops = handshake; 1488 final KeyAttachment ka = (KeyAttachment)fk.attachment(); 1489 ka.getPoller().add(socket,intops); 1490 } 1491 } 1492 } finally { 1493 socket = null; 1495 recycleWorkerThread(this); 1497 } 1498 } 1499 } 1500 1501 1502 1505 public void start() { 1506 thread = new Thread (this); 1507 thread.setName(getName() + "-" + (++curThreads)); 1508 thread.setDaemon(true); 1509 thread.start(); 1510 } 1511 1512 1513 } 1514 1515 public class NioBufferHandler implements ApplicationBufferHandler { 1517 protected ByteBuffer readbuf = null; 1518 protected ByteBuffer writebuf = null; 1519 1520 public NioBufferHandler(int readsize, int writesize, boolean direct) { 1521 if ( direct ) { 1522 readbuf = ByteBuffer.allocateDirect(readsize); 1523 writebuf = ByteBuffer.allocateDirect(writesize); 1524 }else { 1525 readbuf = ByteBuffer.allocate(readsize); 1526 writebuf = ByteBuffer.allocate(writesize); 1527 } 1528 } 1529 1530 public ByteBuffer expand(ByteBuffer buffer, int remaining) {return buffer;} 1531 public ByteBuffer getReadBuffer() {return readbuf;} 1532 public ByteBuffer getWriteBuffer() {return writebuf;} 1533 1534 } 1535 1536 1538 1539 1544 public interface Handler { 1545 public enum SocketState { 1546 OPEN, CLOSED, LONG 1547 } 1548 public SocketState process(NioChannel socket); 1549 public SocketState event(NioChannel socket, SocketStatus status); 1550 } 1551 1552 1553 1555 1556 public class WorkerStack { 1557 1558 protected Worker[] workers = null; 1559 protected int end = 0; 1560 1561 public WorkerStack(int size) { 1562 workers = new Worker[size]; 1563 } 1564 1565 1570 public void push(Worker worker) { 1571 workers[end++] = worker; 1572 } 1573 1574 1578 public Worker pop() { 1579 if (end > 0) { 1580 return workers[--end]; 1581 } 1582 return null; 1583 } 1584 1585 1589 public Worker peek() { 1590 return workers[end]; 1591 } 1592 1593 1596 public boolean isEmpty() { 1597 return (end == 0); 1598 } 1599 1600 1603 public int size() { 1604 return (end); 1605 } 1606 } 1607 1608 1609 1611 1612 1616 protected class SocketOptionsProcessor implements Runnable { 1617 1618 protected SocketChannel sc = null; 1619 1620 public SocketOptionsProcessor(SocketChannel socket) { 1621 this.sc = socket; 1622 } 1623 1624 public void run() { 1625 if ( !setSocketOptions(sc) ) { 1626 try { 1627 sc.socket().close(); 1628 sc.close(); 1629 }catch ( IOException ix ) { 1630 if ( log.isDebugEnabled() ) log.debug("",ix); 1631 } 1632 } 1633 } 1634 } 1635 1637 1638 1642 protected class SocketProcessor implements Runnable { 1643 1644 protected NioChannel socket = null; 1645 1646 public SocketProcessor(NioChannel socket) { 1647 this.socket = socket; 1648 } 1649 1650 public void run() { 1651 1652 if (handler.process(socket) == Handler.SocketState.CLOSED) { 1654 try { 1656 try {socket.close();}catch (Exception ignore){} 1657 if ( socket.isOpen() ) socket.close(true); 1658 } catch ( Exception x ) { 1659 log.error("",x); 1660 } 1661 socket = null; 1662 } 1663 1664 } 1665 1666 } 1667 1668 1669 1671 1672 1676 protected class SocketEventProcessor implements Runnable { 1677 1678 protected NioChannel socket = null; 1679 protected SocketStatus status = null; 1680 1681 public SocketEventProcessor(NioChannel socket, SocketStatus status) { 1682 this.socket = socket; 1683 this.status = status; 1684 } 1685 1686 public void run() { 1687 1688 if (handler.event(socket, status) == Handler.SocketState.CLOSED) { 1690 try { 1692 try {socket.close();}catch (Exception ignore){} 1693 if ( socket.isOpen() ) socket.close(true); 1694 } catch ( Exception x ) { 1695 log.error("",x); 1696 } 1697 socket = null; 1698 } 1699 1700 } 1701 1702 } 1703 1704 1705} 1706 | Popular Tags |