1 3 package org.jgroups.blocks; 4 5 import org.apache.commons.logging.Log; 6 import org.jgroups.Address; 7 import org.jgroups.Global; 8 import org.jgroups.stack.IpAddress; 9 import org.jgroups.util.DirectExecutor; 10 import org.jgroups.util.Util; 11 12 import java.io.IOException ; 13 import java.net.*; 14 import java.nio.ByteBuffer ; 15 import java.nio.channels.*; 16 import java.nio.channels.spi.SelectorProvider ; 17 import java.util.*; 18 import java.util.concurrent.*; 19 20 35 public class ConnectionTableNIO extends BasicConnectionTable implements Runnable { 36 37 private ServerSocketChannel m_serverSocketChannel; 38 private Selector m_acceptSelector; 39 40 private WriteHandler[] m_writeHandlers; 41 private int m_nextWriteHandler = 0; 42 private final Object m_lockNextWriteHandler = new Object (); 43 44 private ReadHandler[] m_readHandlers; 45 private int m_nextReadHandler = 0; 46 private final Object m_lockNextReadHandler = new Object (); 47 48 private Executor m_requestProcessors; 50 private volatile boolean serverStopping=false; 51 52 private final List<Thread > m_backGroundThreads = new LinkedList<Thread >(); 54 private int m_reader_threads = 3; 55 56 private int m_writer_threads = 3; 57 58 private int m_processor_threads = 5; private int m_processor_minThreads = 5; private int m_processor_maxThreads = 5; private int m_processor_queueSize=100; private long m_processor_keepAliveTime = Long.MAX_VALUE; 66 67 68 72 public ConnectionTableNIO(int srv_port) throws Exception { 73 this.srv_port=srv_port; 74 start(); 75 } 76 77 83 public ConnectionTableNIO(int srv_port, long reaper_interval, 84 long conn_expire_time) throws Exception { 85 this.srv_port=srv_port; 86 this.reaper_interval=reaper_interval; 87 this.conn_expire_time=conn_expire_time; 88 start(); 89 } 90 91 99 public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port 100 ) 101 throws Exception 102 { 103 setReceiver(r); 104 this.external_addr=external_addr; 105 this.bind_addr=bind_addr; 106 this.srv_port=srv_port; 107 this.max_port=max_port; 108 use_reaper=true; 109 start(); 110 } 111 112 113 public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port, 114 boolean doStart 115 ) 116 throws Exception 117 { 118 setReceiver(r); 119 this.external_addr=external_addr; 120 this.bind_addr=bind_addr; 121 this.srv_port=srv_port; 122 this.max_port=max_port; 123 use_reaper=true; 124 if(doStart) 125 start(); 126 } 127 128 129 139 public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port, 140 long reaper_interval, long conn_expire_time 141 ) throws Exception 142 { 143 setReceiver(r); 144 this.bind_addr=bind_addr; 145 this.external_addr=external_addr; 146 this.srv_port=srv_port; 147 this.max_port=max_port; 148 this.reaper_interval=reaper_interval; 149 this.conn_expire_time=conn_expire_time; 150 use_reaper=true; 151 start(); 152 } 153 154 155 public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port, 156 long reaper_interval, long conn_expire_time, boolean doStart 157 ) throws Exception 158 { 159 setReceiver(r); 160 this.bind_addr=bind_addr; 161 this.external_addr=external_addr; 162 this.srv_port=srv_port; 163 this.max_port=max_port; 164 this.reaper_interval=reaper_interval; 165 this.conn_expire_time=conn_expire_time; 166 use_reaper=true; 167 if(doStart) 168 start(); 169 } 170 171 172 173 public int getReaderThreads() { return m_reader_threads; } 174 175 public void setReaderThreads(int m_reader_threads) { 176 this.m_reader_threads=m_reader_threads; 177 } 178 179 public int getWriterThreads() { return m_writer_threads; } 180 181 public void setWriterThreads(int m_writer_threads) { 182 this.m_writer_threads=m_writer_threads; 183 } 184 185 public int getProcessorThreads() { return m_processor_threads; } 186 187 public void setProcessorThreads(int m_processor_threads) { 188 this.m_processor_threads=m_processor_threads; 189 } 190 191 public int getProcessorMinThreads() { return m_processor_minThreads;} 192 193 public void setProcessorMinThreads(int m_processor_minThreads) { 194 this.m_processor_minThreads=m_processor_minThreads; 195 } 196 197 public int getProcessorMaxThreads() { return m_processor_maxThreads;} 198 199 public void setProcessorMaxThreads(int m_processor_maxThreads) { 200 this.m_processor_maxThreads=m_processor_maxThreads; 201 } 202 203 public int getProcessorQueueSize() { return m_processor_queueSize; } 204 205 public void setProcessorQueueSize(int m_processor_queueSize) { 206 this.m_processor_queueSize=m_processor_queueSize; 207 } 208 209 public long getProcessorKeepAliveTime() { return m_processor_keepAliveTime; } 210 211 public void setProcessorKeepAliveTime(long m_processor_keepAliveTime) { 212 this.m_processor_keepAliveTime=m_processor_keepAliveTime; 213 } 214 215 216 219 ConnectionTable.Connection getConnection(Address dest) throws Exception 220 { 221 Connection conn; 222 SocketChannel sock_ch; 223 224 synchronized (conns) 225 { 226 conn = (Connection) conns.get(dest); 227 if (conn == null) 228 { 229 InetSocketAddress destAddress = new InetSocketAddress(((IpAddress) dest).getIpAddress(), 235 ((IpAddress) dest).getPort()); 236 sock_ch = SocketChannel.open(destAddress); 237 sock_ch.socket().setTcpNoDelay(tcp_nodelay); 238 conn = new Connection(sock_ch, dest); 239 240 conn.sendLocalAddress(local_addr); 241 243 sock_ch.configureBlocking(false); 244 245 try 246 { 247 if (log.isTraceEnabled()) 248 log.trace("About to change new connection send buff size from " + sock_ch.socket().getSendBufferSize() + " bytes"); 249 sock_ch.socket().setSendBufferSize(send_buf_size); 250 if (log.isTraceEnabled()) 251 log.trace("Changed new connection send buff size to " + sock_ch.socket().getSendBufferSize() + " bytes"); 252 } 253 catch (IllegalArgumentException ex) 254 { 255 if (log.isErrorEnabled()) log.error("exception setting send buffer size to " + 256 send_buf_size + " bytes: " + ex); 257 } 258 try 259 { 260 if (log.isTraceEnabled()) 261 log.trace("About to change new connection receive buff size from " + sock_ch.socket().getReceiveBufferSize() + " bytes"); 262 sock_ch.socket().setReceiveBufferSize(recv_buf_size); 263 if (log.isTraceEnabled()) 264 log.trace("Changed new connection receive buff size to " + sock_ch.socket().getReceiveBufferSize() + " bytes"); 265 } 266 catch (IllegalArgumentException ex) 267 { 268 if (log.isErrorEnabled()) log.error("exception setting receive buffer size to " + 269 send_buf_size + " bytes: " + ex); 270 } 271 272 int idx; 273 synchronized (m_lockNextWriteHandler) 274 { 275 idx = m_nextWriteHandler = (m_nextWriteHandler + 1) % m_writeHandlers.length; 276 } 277 conn.setupWriteHandler(m_writeHandlers[idx]); 278 279 try 281 { 282 synchronized (m_lockNextReadHandler) 283 { 284 idx = m_nextReadHandler = (m_nextReadHandler + 1) % m_readHandlers.length; 285 } 286 m_readHandlers[idx].add(conn); 287 288 } catch (InterruptedException e) 289 { 290 if (log.isWarnEnabled()) 291 log.warn("Thread (" +Thread.currentThread().getName() + ") was interrupted, closing connection", e); 292 conn.destroy(); 294 throw e; 295 } 296 297 addConnection(dest, conn); 299 300 notifyConnectionOpened(dest); 301 if (log.isTraceEnabled()) log.trace("created socket to " + dest); 302 } 303 return conn; 304 } 305 } 306 307 public final void start() throws Exception { 308 super.start(); 309 thread_group = new ThreadGroup (Util.getGlobalThreadGroup(), "ConnectionTableThreads"); 311 init(); 312 srv_sock=createServerSocket(srv_port, max_port); 313 314 if (external_addr!=null) 315 local_addr=new IpAddress(external_addr, srv_sock.getLocalPort()); 316 else if (bind_addr != null) 317 local_addr=new IpAddress(bind_addr, srv_sock.getLocalPort()); 318 else 319 local_addr=new IpAddress(srv_sock.getLocalPort()); 320 321 if(log.isDebugEnabled()) log.debug("server socket created on " + local_addr); 322 323 324 acceptor=new Thread (thread_group, this, "ConnectionTable.AcceptorThread"); 326 acceptor.setDaemon(true); 327 acceptor.start(); 328 m_backGroundThreads.add(acceptor); 329 330 if(use_reaper && reaper == null) { 332 reaper=new Reaper(); 333 reaper.start(); 334 } 335 } 336 337 protected void init() 338 throws Exception 339 { 340 341 if(getProcessorMaxThreads() <= 0) { 343 m_requestProcessors = new DirectExecutor(); 344 } 345 else 346 { 347 ThreadPoolExecutor requestProcessors = new ThreadPoolExecutor(getProcessorMinThreads(), getProcessorMaxThreads(), 349 getProcessorKeepAliveTime(), TimeUnit.MILLISECONDS, 350 new LinkedBlockingQueue<Runnable >(getProcessorQueueSize())); 351 352 requestProcessors.setThreadFactory(new ThreadFactory() { 353 public Thread newThread(Runnable runnable) { 354 Thread new_thread=new Thread (thread_group, runnable); 355 new_thread.setDaemon(true); 356 new_thread.setName("ConnectionTableNIO.Thread"); 357 m_backGroundThreads.add(new_thread); 358 return new_thread; 359 } 360 }); 361 m_requestProcessors = requestProcessors; 362 } 363 364 m_writeHandlers = WriteHandler.create(getWriterThreads(), thread_group, m_backGroundThreads, log); 365 m_readHandlers = ReadHandler.create(getReaderThreads(), this, thread_group, m_backGroundThreads, log); 366 } 367 368 369 372 public void stop() 373 { 374 super.stop(); 375 serverStopping = true; 376 377 if(reaper != null) 378 reaper.stop(); 379 380 m_acceptSelector.wakeup(); 382 383 for (int i = 0; i < m_readHandlers.length; i++) 385 { 386 try 387 { 388 m_readHandlers[i].add(new Shutdown ()); 389 } catch (InterruptedException e) 390 { 391 log.error("Thread ("+Thread.currentThread().getName() +") was interrupted, failed to shutdown selector", e); 392 } 393 } 394 for (int i = 0; i < m_writeHandlers.length; i++) 395 { 396 try 397 { 398 m_writeHandlers[i].queue.put(new Shutdown ()); 399 m_writeHandlers[i].selector.wakeup(); 400 } catch (InterruptedException e) 401 { 402 log.error("Thread ("+Thread.currentThread().getName() +") was interrupted, failed to shutdown selector", e); 403 } 404 } 405 406 if(m_requestProcessors instanceof ThreadPoolExecutor) 408 ((ThreadPoolExecutor)m_requestProcessors).shutdownNow(); 409 410 if(m_requestProcessors instanceof ThreadPoolExecutor){ 411 try{ 412 ((ThreadPoolExecutor) m_requestProcessors).awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME, 413 TimeUnit.MILLISECONDS); 414 }catch(InterruptedException e){ 415 } 416 } 417 418 synchronized(conns) { 420 Iterator it=conns.values().iterator(); 421 while(it.hasNext()) { 422 Connection conn=(Connection)it.next(); 423 conn.destroy(); 424 } 425 conns.clear(); 426 } 427 428 while(!m_backGroundThreads.isEmpty()) { 429 Thread t =m_backGroundThreads.remove(0); 430 try { 431 t.join(); 432 } catch(InterruptedException e) { 433 log.error("Thread ("+Thread.currentThread().getName() +") was interrupted while waiting on thread " + t.getName() + " to finish."); 434 } 435 } 436 m_backGroundThreads.clear(); 437 438 } 439 440 444 public void run() { 445 Connection conn; 446 447 while(m_serverSocketChannel.isOpen() && !serverStopping) { 448 int num; 449 try { 450 num=m_acceptSelector.select(); 451 } 452 catch(IOException e) { 453 if(log.isWarnEnabled()) 454 log.warn("Select operation on listening socket failed", e); 455 continue; } 457 458 if(num > 0) { 459 Set<SelectionKey> readyKeys=m_acceptSelector.selectedKeys(); 460 for(Iterator<SelectionKey> i=readyKeys.iterator(); i.hasNext();) { 461 SelectionKey key=i.next(); 462 i.remove(); 463 465 ServerSocketChannel readyChannel=(ServerSocketChannel)key.channel(); 466 SocketChannel client_sock_ch; 467 try { 468 client_sock_ch=readyChannel.accept(); 469 } 470 catch(IOException e) { 471 if(log.isWarnEnabled()) 472 log.warn("Attempt to accept new connection from listening socket failed", e); 473 continue; 475 } 476 477 if(log.isTraceEnabled()) 478 log.trace("accepted connection, client_sock=" + client_sock_ch.socket()); 479 480 try { 481 client_sock_ch.socket().setSendBufferSize(send_buf_size); 482 } 483 catch(IllegalArgumentException ex) { 484 if(log.isErrorEnabled()) log.error("exception setting send buffer size to " + send_buf_size + " bytes: ", ex); 485 } 486 catch(SocketException e) { 487 if(log.isErrorEnabled()) log.error("exception setting send buffer size to " + send_buf_size + " bytes: ", e); 488 } 489 490 try { 491 client_sock_ch.socket().setReceiveBufferSize(recv_buf_size); 492 } 493 catch(IllegalArgumentException ex) { 494 if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " + send_buf_size + " bytes: ", ex); 495 } 496 catch(SocketException e) { 497 if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " + recv_buf_size + " bytes: ", e); 498 } 499 500 conn=new Connection(client_sock_ch, null); 501 try { 502 conn.peer_addr=conn.readPeerAddress(client_sock_ch.socket()); 503 synchronized(conns) { 504 if(conns.containsKey(conn.getPeerAddress())) { 505 if(log.isTraceEnabled()) 506 log.trace(conn.peer_addr + " is already there, will reuse connection"); 507 continue; 508 509 } 522 else { 523 addConnection(conn.getPeerAddress(), conn); 524 } 525 } 526 notifyConnectionOpened(conn.getPeerAddress()); 527 client_sock_ch.configureBlocking(false); 528 } 529 catch(IOException e) { 530 if(log.isWarnEnabled()) 531 log.warn("Attempt to configure non-blocking mode failed", e); 532 conn.destroy(); 533 continue; 534 } 535 catch(Exception e) { 536 if(log.isWarnEnabled()) 537 log.warn("Attempt to handshake with other peer failed", e); 538 conn.destroy(); 539 continue; 540 } 541 542 int idx; 543 synchronized(m_lockNextWriteHandler) { 544 idx=m_nextWriteHandler=(m_nextWriteHandler + 1) % m_writeHandlers.length; 545 } 546 conn.setupWriteHandler(m_writeHandlers[idx]); 547 548 try { 549 synchronized(m_lockNextReadHandler) { 550 idx=m_nextReadHandler=(m_nextReadHandler + 1) % m_readHandlers.length; 551 } 552 m_readHandlers[idx].add(conn); 553 554 } 555 catch(InterruptedException e) { 556 if(log.isWarnEnabled()) 557 log.warn("Attempt to configure read handler for accepted connection failed", e); 558 conn.destroy(); 560 } 561 } } } 565 if(m_serverSocketChannel.isOpen()) { 566 try { 567 m_serverSocketChannel.close(); 568 } 569 catch(Exception e) { 570 log.error("exception closing server listening socket", e); 571 } 572 } 573 if(log.isTraceEnabled()) 574 log.trace("acceptor thread terminated"); 575 576 } 577 578 579 582 protected ServerSocket createServerSocket(int start_port, int end_port) throws Exception 583 { 584 this.m_acceptSelector = Selector.open(); 585 m_serverSocketChannel = ServerSocketChannel.open(); 586 m_serverSocketChannel.configureBlocking(false); 587 while (true) 588 { 589 try 590 { 591 SocketAddress sockAddr; 592 if (bind_addr == null) 593 { 594 sockAddr=new InetSocketAddress(start_port); 595 m_serverSocketChannel.socket().bind(sockAddr); 596 } 597 else 598 { 599 sockAddr=new InetSocketAddress(bind_addr, start_port); 600 m_serverSocketChannel.socket().bind(sockAddr, backlog); 601 } 602 } 603 catch (BindException bind_ex) 604 { 605 if (start_port == end_port) 606 throw (BindException) ((new BindException("No available port to bind to")).initCause(bind_ex)); 607 start_port++; 608 continue; 609 } 610 catch (SocketException bind_ex) 611 { 612 if (start_port == end_port) 613 throw (BindException) ((new BindException("No available port to bind to")).initCause(bind_ex)); 614 start_port++; 615 continue; 616 } 617 catch (IOException io_ex) 618 { 619 if (log.isErrorEnabled()) log.error("Attempt to bind serversocket failed, port="+start_port+", bind addr=" + bind_addr ,io_ex); 620 throw io_ex; 621 } 622 srv_port = start_port; 623 break; 624 } 625 m_serverSocketChannel.register(this.m_acceptSelector, SelectionKey.OP_ACCEPT); 626 return m_serverSocketChannel.socket(); 627 } 628 629 protected void runRequest(Address addr, ByteBuffer buf) throws InterruptedException { 630 m_requestProcessors.execute(new ExecuteTask(addr, buf)); 631 } 632 633 634 private static class Shutdown { 636 } 637 638 private static class ReadHandler implements Runnable { 640 private final Selector selector= initHandler(); 641 private final LinkedBlockingQueue<Object > queue= new LinkedBlockingQueue<Object >(); 642 private final ConnectionTableNIO connectTable; 643 private final Log log; 644 645 ReadHandler(ConnectionTableNIO ct, Log log) { 646 connectTable= ct; 647 this.log=log; 648 } 649 650 public Selector initHandler() 651 { 652 try 654 { 655 return Selector.open(); 656 } catch (IOException e) 657 { 658 if (log.isErrorEnabled()) log.error(e); 659 throw new IllegalStateException (e.getMessage()); 660 } 661 662 } 663 664 669 private static ReadHandler[] create(int workerThreads, ConnectionTableNIO ct, ThreadGroup tg, List<Thread > backGroundThreads, Log log) 670 { 671 ReadHandler[] handlers = new ReadHandler[workerThreads]; 672 for (int looper = 0; looper < workerThreads; looper++) 673 { 674 handlers[looper] = new ReadHandler(ct, log); 675 676 Thread thread = new Thread (tg, handlers[looper], "nioReadHandlerThread"); 677 thread.setDaemon(true); 678 thread.start(); 679 backGroundThreads.add(thread); 680 } 681 return handlers; 682 } 683 684 685 private void add(Object conn) throws InterruptedException 686 { 687 queue.put(conn); 688 wakeup(); 689 } 690 691 private void wakeup() 692 { 693 selector.wakeup(); 694 } 695 696 public void run() 697 { 698 while (true) 699 { int events; 701 try 702 { 703 events = selector.select(); 704 } catch (IOException e) 705 { 706 if (log.isWarnEnabled()) 707 log.warn("Select operation on socket failed", e); 708 continue; } catch (ClosedSelectorException e) 710 { 711 if (log.isWarnEnabled()) 712 log.warn("Select operation on socket failed" , e); 713 return; } 715 716 if (events > 0) 717 { Set readyKeys = selector.selectedKeys(); 719 try 720 { 721 for (Iterator i = readyKeys.iterator(); i.hasNext();) 722 { 723 SelectionKey key = (SelectionKey) i.next(); 724 i.remove(); 725 Connection conn = (Connection) key.attachment(); 727 if(conn != null && conn.getSocketChannel() != null) 728 { 729 try 730 { 731 if (conn.getSocketChannel().isOpen()) 732 readOnce(conn); 733 else 734 { conn.closed(); 736 } 737 } catch (IOException e) 738 { 739 if (log.isTraceEnabled()) log.trace("Read operation on socket failed" , e); 740 key.cancel(); 743 conn.destroy(); 744 conn.closed(); 745 } 746 } 747 } 748 } 749 catch(ConcurrentModificationException e) { 750 if (log.isTraceEnabled()) log.trace("Selection set changed", e); 751 } 753 } 754 755 Object o; 757 try 758 { 759 o = queue.poll(0L, TimeUnit.MILLISECONDS); } catch (InterruptedException e) 761 { 762 if (log.isTraceEnabled()) log.trace("Thread ("+Thread.currentThread().getName() +") was interrupted while polling queue" ,e); 763 continue; 765 } 766 if (null == o) 767 continue; 768 if (o instanceof Shutdown ) { try { 770 selector.close(); 771 } catch(IOException e) { 772 if (log.isTraceEnabled()) log.trace("Read selector close operation failed" , e); 773 } 774 return; } 776 Connection conn = (Connection) o; SocketChannel sc = conn.getSocketChannel(); 778 try 779 { 780 sc.register(selector, SelectionKey.OP_READ, conn); 781 } catch (ClosedChannelException e) 782 { 783 if (log.isTraceEnabled()) log.trace("Socket channel was closed while we were trying to register it to selector" , e); 784 conn.destroy(); 787 conn.closed(); 788 } 789 } } 791 792 private void readOnce(Connection conn) 793 throws IOException 794 { 795 ConnectionReadState readState = conn.getReadState(); 796 if (!readState.isHeadFinished()) 797 { int size = readHeader(conn); 800 if (0 == size) 801 { return; 803 } 804 } 805 if (readBody(conn) > 0) 807 { return; 809 } 810 Address addr = conn.getPeerAddress(); 811 ByteBuffer buf = readState.getReadBodyBuffer(); 812 readState.bodyFinished(); 814 try 816 { 817 connectTable.runRequest(addr, buf); 818 } catch (InterruptedException e) 819 { 820 log.error("Thread ("+Thread.currentThread().getName() +") was interrupted while assigning executor to process read request" , e); 823 } 824 } 825 826 834 private int readHeader(Connection conn) 835 throws IOException 836 { 837 ConnectionReadState readState = conn.getReadState(); 838 ByteBuffer headBuf = readState.getReadHeadBuffer(); 839 840 SocketChannel sc = conn.getSocketChannel(); 841 while (headBuf.remaining() > 0) 842 { 843 int num = sc.read(headBuf); 844 if (-1 == num) 845 { throw new IOException ("Peer closed socket"); 847 } 848 if (0 == num) return 0; 850 } 851 return readState.headFinished(); 853 } 854 855 863 private int readBody(Connection conn) 864 throws IOException 865 { 866 ByteBuffer bodyBuf = conn.getReadState().getReadBodyBuffer(); 867 868 SocketChannel sc = conn.getSocketChannel(); 869 while (bodyBuf.remaining() > 0) 870 { 871 int num = sc.read(bodyBuf); 872 if (-1 == num) throw new IOException ("Couldn't read from socket as peer closed the socket"); 874 if (0 == num) return bodyBuf.remaining(); 876 } 877 bodyBuf.flip(); 879 return 0; 880 } 881 } 882 883 private class ExecuteTask implements Runnable { 884 Address m_addr = null; 885 ByteBuffer m_buf = null; 886 887 public ExecuteTask(Address addr, ByteBuffer buf) 888 { 889 m_addr = addr; 890 m_buf = buf; 891 } 892 893 public void run() 894 { 895 receive(m_addr, m_buf.array(), m_buf.arrayOffset(), m_buf.limit()); 896 } 897 } 898 899 private class ConnectionReadState { 900 private final Connection m_conn; 901 902 private boolean m_headFinished = false; 904 private ByteBuffer m_readBodyBuf = null; 905 private final ByteBuffer m_readHeadBuf = ByteBuffer.allocate(Connection.HEADER_SIZE); 906 907 public ConnectionReadState(Connection conn) 908 { 909 m_conn = conn; 910 } 911 912 ByteBuffer getReadBodyBuffer() 913 { 914 return m_readBodyBuf; 915 } 916 917 ByteBuffer getReadHeadBuffer() 918 { 919 return m_readHeadBuf; 920 } 921 922 void bodyFinished() 923 { 924 m_headFinished = false; 925 m_readHeadBuf.clear(); 926 m_readBodyBuf = null; 927 m_conn.updateLastAccessed(); 928 } 929 930 935 int headFinished() 936 { 937 m_headFinished = true; 938 m_readHeadBuf.flip(); 939 int messageSize = m_readHeadBuf.getInt(); 940 m_readBodyBuf = ByteBuffer.allocate(messageSize); 941 m_conn.updateLastAccessed(); 942 return messageSize; 943 } 944 945 boolean isHeadFinished() 946 { 947 return m_headFinished; 948 } 949 } 950 951 class Connection extends ConnectionTable.Connection { 952 private SocketChannel sock_ch = null; 953 private WriteHandler m_writeHandler; 954 private SelectorWriteHandler m_selectorWriteHandler; 955 private final ConnectionReadState m_readState; 956 957 private static final int HEADER_SIZE = 4; 958 final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE); 959 960 Connection(SocketChannel s, Address peer_addr) 961 { 962 super(s.socket(), peer_addr); 963 sock_ch = s; 964 m_readState = new ConnectionReadState(this); 965 is_running=true; 966 } 967 968 private ConnectionReadState getReadState() 969 { 970 return m_readState; 971 } 972 973 private void setupWriteHandler(WriteHandler hdlr) 974 { 975 m_writeHandler = hdlr; 976 m_selectorWriteHandler = hdlr.add(sock_ch); 977 } 978 979 980 981 void doSend(byte[] buffie, int offset, int length) throws Exception 982 { 983 MyFuture result = new MyFuture(); 984 m_writeHandler.write(sock_ch, ByteBuffer.wrap(buffie, offset, length), result, m_selectorWriteHandler); 985 Object ex = result.get(); 986 if (ex instanceof Exception ) 987 { 988 if (log.isErrorEnabled()) 989 log.error("failed sending message", (Exception )ex); 990 if (((Exception )ex).getCause() instanceof IOException ) 991 throw (IOException ) ((Exception )ex).getCause(); 992 throw (Exception )ex; 993 } 994 result.get(); 995 } 996 997 998 SocketChannel getSocketChannel() 999 { 1000 return sock_ch; 1001 } 1002 1003 void closeSocket() 1004 { 1005 1006 if (sock_ch != null) 1007 { 1008 try 1009 { 1010 if(sock_ch.isConnected() && sock_ch.isOpen()) { 1011 sock_ch.close(); 1012 } 1013 } 1014 catch (Exception e) 1015 { 1016 log.error("error closing socket connection", e); 1017 } 1018 sock_ch = null; 1019 } 1020 } 1021 1022 1023 void closed() 1024 { 1025 Address peerAddr = getPeerAddress(); 1026 synchronized (conns) 1027 { 1028 conns.remove(peerAddr); 1029 } 1030 notifyConnectionClosed(peerAddr); 1031 } 1032 } 1033 1034 1035 1038 private static class WriteHandler implements Runnable { 1039 private final LinkedBlockingQueue<Object > queue= new LinkedBlockingQueue<Object >(); 1041 1042 private final Selector selector= initSelector(); 1043 private int m_pendingChannels; 1046 private ByteBuffer m_headerBuffer = ByteBuffer.allocate(Connection.HEADER_SIZE); 1048 private final Log log; 1049 1050 1051 public WriteHandler(Log log) { 1052 this.log=log; 1053 } 1054 1055 Selector initSelector() { 1056 try 1057 { 1058 return SelectorProvider.provider().openSelector(); 1059 } 1060 catch (IOException e) 1061 { 1062 if (log.isErrorEnabled()) log.error(e); 1063 throw new IllegalStateException (e.getMessage()); 1064 } 1065 } 1066 1067 1072 private static WriteHandler[] create(int workerThreads, ThreadGroup tg, List<Thread > backGroundThreads, Log log) 1073 { 1074 WriteHandler[] handlers = new WriteHandler[workerThreads]; 1075 for (int looper = 0; looper < workerThreads; looper++) 1076 { 1077 handlers[looper] = new WriteHandler(log); 1078 1079 Thread thread = new Thread (tg, handlers[looper], "nioWriteHandlerThread"); 1080 thread.setDaemon(true); 1081 thread.start(); 1082 backGroundThreads.add(thread); 1083 } 1084 return handlers; 1085 } 1086 1087 1092 private SelectorWriteHandler add(SocketChannel channel) 1093 { 1094 return new SelectorWriteHandler(channel, selector, m_headerBuffer); 1095 } 1096 1097 1108 private void write(SocketChannel channel, ByteBuffer buffer, MyFuture notification, SelectorWriteHandler hdlr) throws InterruptedException 1109 { 1110 queue.put(new WriteRequest(channel, buffer, notification, hdlr)); 1111 } 1112 1113 private static void close(SelectorWriteHandler entry) 1114 { 1115 entry.cancel(); 1116 } 1117 1118 private static void handleChannelError( SelectorWriteHandler entry, Throwable error) 1119 { 1120 do 1122 { 1123 if (error != null) 1124 entry.notifyError(error); 1125 } 1126 while (entry.next()); 1127 close(entry); 1128 } 1129 1130 private void processWrite(Selector selector) 1132 { 1133 Set keys = selector.selectedKeys(); 1134 Object arr[] = keys.toArray(); 1135 for (Object anArr : arr) { 1136 SelectionKey key = (SelectionKey) anArr; 1137 SelectorWriteHandler entry = (SelectorWriteHandler) key.attachment(); 1138 boolean needToDecrementPendingChannels = false; 1139 try { 1140 if (0 == entry.write()) { entry.notifyObject(entry.getBytesWritten()); 1143 if (!entry.next()) { 1145 needToDecrementPendingChannels = true; 1146 } 1147 } 1148 1149 } 1150 catch (IOException e) { 1151 needToDecrementPendingChannels = true; 1152 handleChannelError(entry, e); 1154 } 1155 finally { 1156 if (needToDecrementPendingChannels) 1157 m_pendingChannels--; 1158 } 1159 } 1160 keys.clear(); 1161 } 1162 1163 public void run() 1164 { 1165 while (selector.isOpen()) 1166 { 1167 try 1168 { 1169 WriteRequest queueEntry; 1170 Object o; 1171 1172 while (null != (o = queue.poll(0L, TimeUnit.MILLISECONDS))) 1174 { 1175 if (o instanceof Shutdown ) { 1177 try { 1178 selector.close(); 1179 } catch(IOException e) { 1180 if (log.isTraceEnabled()) log.trace("Write selector close operation failed" , e); 1181 } 1182 return; 1183 } 1184 queueEntry = (WriteRequest) o; 1185 1186 if (queueEntry.getHandler().add(queueEntry)) 1187 { 1188 m_pendingChannels++; 1197 } 1198 1199 try 1200 { 1201 if (selector.selectNow() > 0) 1203 { 1204 processWrite(selector); 1205 } 1206 } 1207 catch (IOException e) 1208 { if (log.isErrorEnabled()) log.error("SelectNow operation on write selector failed, didn't expect this to occur, please report this", e); 1210 return; } 1212 } 1213 1214 if (m_pendingChannels == 0) 1216 { 1217 o = queue.take(); 1218 if (o instanceof Shutdown ){ try { 1220 selector.close(); 1221 } catch(IOException e) { 1222 if (log.isTraceEnabled()) log.trace("Write selector close operation failed" , e); 1223 } 1224 return; 1225 } 1226 queueEntry = (WriteRequest) o; 1227 if (queueEntry.getHandler().add(queueEntry)) 1228 m_pendingChannels++; 1229 } 1230 else 1232 { 1233 try 1234 { 1235 if ((selector.select()) > 0) 1236 { 1237 processWrite(selector); 1238 } 1239 } 1240 catch (IOException e) 1241 { if (log.isErrorEnabled()) log.error("Failure while writing to socket",e); 1243 } 1244 } 1245 } 1246 catch (InterruptedException e) 1247 { 1248 if (log.isErrorEnabled()) log.error("Thread ("+Thread.currentThread().getName() +") was interrupted", e); 1249 } 1250 catch (Throwable e) { if (log.isErrorEnabled()) log.error("Thread ("+Thread.currentThread().getName() +") caught Throwable" , e); 1254 } 1255 } 1256 } 1257 } 1258 1259 1260 public static class SelectorWriteHandler { 1263 1264 private final List<WriteRequest> m_writeRequests = new LinkedList<WriteRequest>(); private boolean m_headerSent = false; 1266 private SocketChannel m_channel; 1267 private SelectionKey m_key; 1268 private Selector m_selector; 1269 private int m_bytesWritten = 0; 1270 private boolean m_enabled = false; 1271 private ByteBuffer m_headerBuffer; 1272 1273 SelectorWriteHandler(SocketChannel channel, Selector selector, ByteBuffer headerBuffer) 1274 { 1275 m_channel = channel; 1276 m_selector = selector; 1277 m_headerBuffer = headerBuffer; 1278 } 1279 1280 private void register(Selector selector, SocketChannel channel) throws ClosedChannelException 1281 { 1282 m_key = channel.register(selector, 0, this); 1284 } 1285 1286 private boolean enable() 1288 { 1289 boolean rc = false; 1290 1291 try 1292 { 1293 if (m_key == null) 1294 { register(m_selector, m_channel); 1297 } 1298 } 1299 catch (ClosedChannelException e) 1300 { 1301 return rc; 1302 } 1303 1304 if (!m_enabled) 1305 { 1306 rc = true; 1307 try 1308 { 1309 m_key.interestOps(SelectionKey.OP_WRITE); 1310 } 1311 catch (CancelledKeyException e) 1312 { return false; 1314 } 1315 m_enabled = true; 1316 } 1317 return rc; 1318 } 1319 1320 private void disable() 1321 { 1322 if (m_enabled) 1323 { 1324 try 1325 { 1326 m_key.interestOps(0); } 1329 catch (CancelledKeyException eat) { } 1333 m_enabled = false; 1334 } 1335 } 1336 1337 private void cancel() 1338 { 1339 m_key.cancel(); 1340 } 1341 1342 boolean add(WriteRequest entry) 1343 { 1344 m_writeRequests.add(entry); 1345 return enable(); 1346 } 1347 1348 WriteRequest getCurrentRequest() 1349 { 1350 return m_writeRequests.get(0); 1351 } 1352 1353 SocketChannel getChannel() 1354 { 1355 return m_channel; 1356 } 1357 1358 ByteBuffer getBuffer() 1359 { 1360 return getCurrentRequest().getBuffer(); 1361 } 1362 1363 MyFuture getCallback() 1364 { 1365 return getCurrentRequest().getCallback(); 1366 } 1367 1368 int getBytesWritten() 1369 { 1370 return m_bytesWritten; 1371 } 1372 1373 void notifyError(Throwable error) 1374 { 1375 if (getCallback() != null) 1376 getCallback().setException(error); 1377 } 1378 1379 void notifyObject(Object result) 1380 { 1381 if (getCallback() != null) 1382 getCallback().set(result); 1383 } 1384 1385 1390 boolean next() 1391 { 1392 m_headerSent = false; 1393 m_bytesWritten = 0; 1394 1395 m_writeRequests.remove(0); boolean rc = !m_writeRequests.isEmpty(); 1397 if (!rc) disable(); 1399 return rc; 1400 } 1401 1402 1409 int write() throws IOException 1410 { 1411 if (!m_headerSent) 1414 { 1415 m_headerSent = true; 1416 m_headerBuffer.clear(); 1417 m_headerBuffer.putInt(getBuffer().remaining()); 1418 m_headerBuffer.flip(); 1419 do 1420 { 1421 getChannel().write(m_headerBuffer); 1422 } while (m_headerBuffer.remaining() > 0); 1424 1425 } 1426 1427 m_bytesWritten += (getChannel().write(getBuffer())); 1428 1429 return getBuffer().remaining(); 1430 } 1431 1432 } 1433 1434 public static class WriteRequest { 1435 private final SocketChannel m_channel; 1436 private final ByteBuffer m_buffer; 1437 private final MyFuture m_callback; 1438 private final SelectorWriteHandler m_hdlr; 1439 1440 WriteRequest(SocketChannel channel, ByteBuffer buffer, MyFuture callback, SelectorWriteHandler hdlr) 1441 { 1442 m_channel = channel; 1443 m_buffer = buffer; 1444 m_callback = callback; 1445 m_hdlr = hdlr; 1446 } 1447 1448 SelectorWriteHandler getHandler() 1449 { 1450 return m_hdlr; 1451 } 1452 1453 SocketChannel getChannel() 1454 { 1455 return m_channel; 1456 } 1457 1458 ByteBuffer getBuffer() 1459 { 1460 return m_buffer; 1461 } 1462 1463 MyFuture getCallback() 1464 { 1465 return m_callback; 1466 } 1467 1468 } 1469 1470 private static class NullCallable implements Callable { 1471 1472 public Object call() { 1473 System.out.println("nullCallable.call invoked"); 1474 return null; 1475 } 1476 } 1477 private static final NullCallable NULLCALL = new NullCallable(); 1478 1479 public static class MyFuture extends FutureTask { public MyFuture() { 1481 super(NULLCALL); 1482 } 1483 1484 protected void set(Object o) { 1485 super.set(o); 1486 } 1487 1488 1489 protected void setException(Throwable t) { 1490 super.setException(t); 1491 } 1492 } 1493 1494} 1495 | Popular Tags |