1 22 package org.xsocket.stream; 23 24 import java.io.IOException ; 25 import java.io.UnsupportedEncodingException ; 26 import java.net.InetAddress ; 27 import java.net.InetSocketAddress ; 28 import java.nio.BufferUnderflowException ; 29 import java.nio.ByteBuffer ; 30 import java.nio.channels.WritableByteChannel ; 31 import java.util.HashMap ; 32 import java.util.LinkedList ; 33 import java.util.Map ; 34 import java.util.concurrent.Executor ; 35 import java.util.concurrent.Executors ; 36 import java.util.logging.Level ; 37 import java.util.logging.Logger ; 38 39 import javax.net.ssl.SSLContext; 40 41 import org.xsocket.ClosedConnectionException; 42 import org.xsocket.DataConverter; 43 import org.xsocket.MaxReadSizeExceededException; 44 import org.xsocket.stream.io.spi.IHandlerIoProvider; 45 import org.xsocket.stream.io.spi.IIoHandler; 46 import org.xsocket.stream.io.spi.IIoHandlerContext; 47 48 49 50 51 60 public final class NonBlockingConnection extends Connection implements INonBlockingConnection { 61 62 private static final Logger LOG = Logger.getLogger(BlockingConnection.class.getName()); 63 64 private static final Executor DEFAULT_WORKER_POOL = Executors.newCachedThreadPool(); 65 66 67 68 private IHandler appHandler = null; 69 private boolean disconnectOccured = false; 70 71 72 73 74 84 public NonBlockingConnection(String hostname, int port) throws IOException { 85 this(InetAddress.getByName(hostname), port); 86 } 87 88 89 90 93 public NonBlockingConnection(String hostname, int port, StreamSocketConfiguration socketConfiguration) throws IOException { 94 this(InetAddress.getByName(hostname), port, socketConfiguration); 95 } 96 97 98 99 109 public NonBlockingConnection(InetAddress address, int port) throws IOException { 110 this(new InetSocketAddress (address, port), new HashMap <String , Object >(), null, false, null, null); 111 } 112 113 114 115 118 public NonBlockingConnection(InetAddress address, int port, StreamSocketConfiguration socketConfiguration) throws IOException { 119 this(new InetSocketAddress (address, port), socketConfiguration.toOptions(), null, false, null, null); 120 } 121 122 123 134 public NonBlockingConnection(InetAddress address, int port, Map <String , Object > options) throws IOException { 135 this(new InetSocketAddress (address, port), options, null, false, null, null); 136 } 137 138 139 140 141 153 public NonBlockingConnection(InetAddress address, int port, IHandler appHandler) throws IOException { 154 this(new InetSocketAddress (address, port), new HashMap <String , Object >(), null, false, appHandler, DEFAULT_WORKER_POOL); 155 } 156 157 158 159 162 public NonBlockingConnection(InetAddress address, int port, StreamSocketConfiguration socketConfiguration, IHandler appHandler) throws IOException { 163 this(new InetSocketAddress (address, port), socketConfiguration.toOptions(), null, false, appHandler, DEFAULT_WORKER_POOL); 164 } 165 166 167 170 public NonBlockingConnection(InetAddress address, int port, StreamSocketConfiguration socketConfiguration, IHandler appHandler, int preallocationMemorySize) throws IOException { 171 this(new InetSocketAddress (address, port), socketConfiguration.toOptions(), null, false, appHandler, DEFAULT_WORKER_POOL); 172 LOG.warning("parameter preallocation memory size is not more supported. use System.property instead (see JavaDoc org.xsocket.stream.io.impl.IoProvider)"); 173 } 174 175 176 177 189 public NonBlockingConnection(InetAddress address, int port, Map <String , Object > options, IHandler appHandler) throws IOException { 190 this(new InetSocketAddress (address, port), options, null, false, appHandler, DEFAULT_WORKER_POOL); 191 } 192 193 194 195 208 public NonBlockingConnection(String hostname, int port, IHandler appHandler) throws IOException { 209 this(new InetSocketAddress (hostname, port), new HashMap <String , Object >(), null, false, appHandler, DEFAULT_WORKER_POOL); 210 } 211 212 213 214 217 public NonBlockingConnection(String hostname, int port, StreamSocketConfiguration socketConfiguration, IHandler appHandler) throws IOException { 218 this(new InetSocketAddress (hostname, port), socketConfiguration.toOptions(), null, false, appHandler, DEFAULT_WORKER_POOL); 219 } 220 221 222 223 226 public NonBlockingConnection(String hostname, int port, StreamSocketConfiguration socketConfiguration, IHandler appHandler, int preallocationMemorySize) throws IOException { 227 this(new InetSocketAddress (hostname, port), socketConfiguration.toOptions(), null, false, appHandler, DEFAULT_WORKER_POOL); 228 LOG.warning("parameter preallocation memory size is not more supported. use System.property instead (see JavaDoc org.xsocket.stream.io.impl.IoProvider)"); 229 } 230 231 232 233 247 public NonBlockingConnection(String hostname, int port, Map <String , Object > options, IHandler appHandler) throws IOException { 248 this(new InetSocketAddress (hostname, port), options, null, false, appHandler, DEFAULT_WORKER_POOL); 249 } 250 251 264 public NonBlockingConnection(InetAddress address, int port, SSLContext sslContext, boolean sslOn) throws IOException { 265 this(new InetSocketAddress (address, port), new HashMap <String , Object >(), sslContext, sslOn, null, null); 266 } 267 268 269 272 public NonBlockingConnection(InetAddress address, int port, StreamSocketConfiguration socketConfiguration, SSLContext sslContext, boolean sslOn) throws IOException { 273 this(new InetSocketAddress (address, port), socketConfiguration.toOptions(), sslContext, sslOn, null, null); 274 } 275 276 277 291 public NonBlockingConnection(InetAddress address, int port, Map <String , Object > options, SSLContext sslContext, boolean sslOn) throws IOException { 292 this(new InetSocketAddress (address, port), options, sslContext, sslOn, null, null); 293 } 294 295 296 297 310 public NonBlockingConnection(String hostname, int port, SSLContext sslContext, boolean sslOn) throws IOException { 311 this(new InetSocketAddress (hostname, port), new HashMap <String , Object >(), sslContext, sslOn, null, null); 312 } 313 314 315 318 public NonBlockingConnection(String hostname, int port, StreamSocketConfiguration socketConfiguration, SSLContext sslContext, boolean sslOn) throws IOException { 319 this(new InetSocketAddress (hostname, port), socketConfiguration.toOptions(), sslContext, sslOn, null, null); 320 } 321 322 323 337 public NonBlockingConnection(String hostname, int port, Map <String , Object > options, SSLContext sslContext, boolean sslOn) throws IOException { 338 this(new InetSocketAddress (hostname, port), options, sslContext, sslOn, null, null); 339 } 340 341 342 343 374 public NonBlockingConnection(InetAddress address, int port, IHandler appHandler, Executor workerPool) throws IOException { 375 this(new InetSocketAddress (address, port), new HashMap <String , Object >(), null, false, appHandler, workerPool); 376 } 377 378 379 382 public NonBlockingConnection(InetAddress address, int port, IHandler appHandler, Executor workerPool, int preallocationMemorySize) throws IOException { 383 this(new InetSocketAddress (address, port), new HashMap <String , Object >(), null, false, appHandler, workerPool); 384 LOG.warning("parameter preallocation memory size is not more supported. use System.property instead (see JavaDoc org.xsocket.stream.io.impl.IoProvider)"); 385 } 386 387 388 389 390 393 private NonBlockingConnection(InetSocketAddress remoteAddress, Map <String , Object > options, SSLContext sslContext, boolean sslOn, IHandler appHandler, Executor workerPool) throws IOException { 394 super(new IoHandlerContext(appHandler, workerPool), remoteAddress, options, sslContext, sslOn); 395 this.appHandler = appHandler; 396 397 if (LOG.isLoggable(Level.FINE)) { 398 if ((appHandler instanceof IConnectionScoped)) { 399 LOG.fine("handler type IConnectionScoped is not supported in the client context"); 400 } 401 402 if ((appHandler instanceof org.xsocket.ILifeCycle)) { 403 LOG.fine("ILifeCycle is not supported in the client context"); 404 } 405 } 406 407 init(); 408 } 409 410 411 414 NonBlockingConnection(IIoHandlerContext ctx, IIoHandler ioHandler, IHandler appHandler, IHandlerIoProvider ioProvider) throws IOException { 415 super(ctx, ioHandler, ioProvider); 416 this.appHandler = appHandler; 417 418 init(); 419 } 420 421 422 IHandler getAppHandler() { 423 return appHandler; 424 } 425 426 427 428 @Override 429 void reset() throws IOException { 430 try { 431 setWriteTransferRate(UNLIMITED); 432 } catch (Exception e) { 433 if (LOG.isLoggable(Level.FINE)) { 434 LOG.fine("error occured by reseting (setWriteTransferRate). Reason: " + e.toString()); 435 } 436 } 437 super.reset(); 438 439 setFlushmode(INonBlockingConnection.INITIAL_FLUSH_MODE); 440 } 441 442 443 444 445 448 public void setWriteTransferRate(int bytesPerSecond) throws ClosedConnectionException, IOException { 449 450 if (bytesPerSecond != UNLIMITED) { 451 if (getFlushmode() != FlushMode.ASYNC) { 452 LOG.warning("setWriteTransferRate is only supported for FlushMode ASYNC. Ignore update of the transfer rate"); 453 return; 454 } 455 } 456 457 setIoHandler(getIoProvider().setWriteTransferRate(getIoHandler(), bytesPerSecond)); 458 } 459 460 461 462 463 464 467 public int getNumberOfAvailableBytes() { 468 return getReadQueue().getSize(); 469 } 470 471 472 475 public ByteBuffer [] readAvailable() throws IOException , ClosedConnectionException { 476 LinkedList <ByteBuffer > buffers = extractAvailableFromReadQueue(); 477 if (buffers != null) { 478 return buffers.toArray(new ByteBuffer [buffers.size()]); 479 } else { 480 return new ByteBuffer [0]; 481 } 482 } 483 484 485 488 public boolean readAvailableByDelimiter(String delimiter, WritableByteChannel outputChannel) throws IOException , ClosedConnectionException { 489 return readAvailableByDelimiter(delimiter, getDefaultEncoding(), outputChannel); 490 } 491 492 495 public boolean readAvailableByDelimiter(String delimiter, String encoding, WritableByteChannel outputChannel) throws IOException , ClosedConnectionException { 496 return extractAvailableFromReadQueue(delimiter.getBytes(encoding), outputChannel); 497 } 498 499 500 503 public int read(ByteBuffer buffer) throws IOException { 504 int size = buffer.remaining(); 505 506 int available = getNumberOfAvailableBytes(); 507 if (available < size) { 508 size = available; 509 } 510 511 ByteBuffer [] bufs = readByteBufferByLength(size); 512 for (ByteBuffer buf : bufs) { 513 while (buf.hasRemaining()) { 514 buffer.put(buf); 515 } 516 } 517 518 return size; 519 } 520 521 522 525 public byte readByte() throws IOException , ClosedConnectionException, BufferUnderflowException { 526 return extractByteFromReadQueue(); 527 } 528 529 530 533 public ByteBuffer [] readByteBufferByDelimiter(String delimiter) throws IOException , ClosedConnectionException, BufferUnderflowException { 534 return readByteBufferByDelimiter(delimiter, Integer.MAX_VALUE); 535 } 536 537 538 541 public ByteBuffer [] readByteBufferByDelimiter(String delimiter, int maxLength) throws IOException , ClosedConnectionException, MaxReadSizeExceededException, BufferUnderflowException { 542 return readByteBufferByDelimiter(delimiter, getDefaultEncoding(), maxLength); 543 } 544 545 546 549 public ByteBuffer [] readByteBufferByDelimiter(String delimiter, String encoding, int maxLength) throws IOException , ClosedConnectionException, MaxReadSizeExceededException { 550 LinkedList <ByteBuffer > result = extractBytesByDelimiterFromReadQueue(delimiter.getBytes(encoding), maxLength); 551 return result.toArray(new ByteBuffer [result.size()]); 552 } 553 554 557 public ByteBuffer [] readByteBufferByLength(int length) throws IOException , ClosedConnectionException, BufferUnderflowException { 558 LinkedList <ByteBuffer > extracted = extractBytesByLength(length); 559 560 return extracted.toArray(new ByteBuffer [extracted.size()]); 561 } 562 563 564 565 568 public byte[] readBytesByDelimiter(String delimiter) throws IOException , ClosedConnectionException, BufferUnderflowException { 569 return readBytesByDelimiter(delimiter, Integer.MAX_VALUE); 570 } 571 572 573 576 public byte[] readBytesByDelimiter(String delimiter, int maxLength) throws IOException , ClosedConnectionException, MaxReadSizeExceededException, BufferUnderflowException { 577 return readBytesByDelimiter(delimiter, getDefaultEncoding(), maxLength); 578 } 579 580 581 584 public byte[] readBytesByDelimiter(String delimiter, String encoding, int maxLength) throws IOException , ClosedConnectionException, MaxReadSizeExceededException { 585 return DataConverter.toBytes(readByteBufferByDelimiter(delimiter, encoding, maxLength)); 586 } 587 588 589 592 public byte[] readBytesByLength(int length) throws IOException , ClosedConnectionException, BufferUnderflowException { 593 return DataConverter.toBytes(readByteBufferByLength(length)); 594 } 595 596 597 600 public double readDouble() throws IOException , ClosedConnectionException, BufferUnderflowException { 601 return extractDoubleFromReadQueue(); 602 } 603 604 605 608 public int readInt() throws IOException , ClosedConnectionException, BufferUnderflowException { 609 return extractIntFromReadQueue(); 610 } 611 612 615 public short readShort() throws IOException , ClosedConnectionException, BufferUnderflowException { 616 return extractShortFromReadQueue(); 617 } 618 619 620 621 624 public long readLong() throws IOException , ClosedConnectionException, BufferUnderflowException { 625 return extractLongFromReadQueue(); 626 } 627 628 629 632 public String readStringByDelimiter(String delimiter) throws IOException ,ClosedConnectionException ,BufferUnderflowException ,UnsupportedEncodingException { 633 return readStringByDelimiter(delimiter, Integer.MAX_VALUE); 634 }; 635 636 637 640 public String readStringByDelimiter(String delimiter, int maxLength) throws IOException ,ClosedConnectionException ,BufferUnderflowException ,UnsupportedEncodingException ,MaxReadSizeExceededException { 641 return readStringByDelimiter(delimiter, getDefaultEncoding(), maxLength); 642 }; 643 644 645 648 public String readStringByDelimiter(String delimiter, String encoding) throws IOException , ClosedConnectionException, BufferUnderflowException , UnsupportedEncodingException { 649 return readStringByDelimiter(delimiter, encoding, Integer.MAX_VALUE); 650 } 651 652 653 656 public String readStringByDelimiter(String delimiter, String encoding, int maxLength) throws IOException , ClosedConnectionException, BufferUnderflowException , UnsupportedEncodingException , MaxReadSizeExceededException { 657 LinkedList <ByteBuffer > extracted = extractBytesByDelimiterFromReadQueue(delimiter.getBytes(encoding), maxLength); 658 659 return DataConverter.toString(extracted, encoding); 660 } 661 662 663 666 public String readStringByLength(int length) throws IOException , ClosedConnectionException, BufferUnderflowException , UnsupportedEncodingException { 667 return readStringByLength(length, getDefaultEncoding()); 668 } 669 670 671 674 public String readStringByLength(int length, String encoding) throws IOException , ClosedConnectionException, BufferUnderflowException , UnsupportedEncodingException { 675 LinkedList <ByteBuffer > extracted = extractBytesByLength(length); 676 return DataConverter.toString(extracted, encoding); 677 } 678 679 680 681 682 685 public int getIndexOf(String str) throws IOException , ClosedConnectionException, BufferUnderflowException { 686 return getIndexOf(str, Integer.MAX_VALUE); 687 } 688 689 690 693 public int getIndexOf(String str, int maxLength) throws IOException , ClosedConnectionException, BufferUnderflowException , MaxReadSizeExceededException { 694 return getIndexOf(str, getDefaultEncoding(), maxLength); 695 } 696 697 698 701 public int getIndexOf(String str, String encoding, int maxLength) throws IOException , ClosedConnectionException, MaxReadSizeExceededException { 702 return readIndexOf(str.getBytes(encoding), maxLength); 703 } 704 705 706 709 @Override 710 public INonBlockingConnection setOption(String name, Object value) throws IOException { 711 return (INonBlockingConnection) super.setOption(name, value); 712 } 713 714 715 716 @Override 717 protected int onDataEvent() { 718 719 int addSize = super.onDataEvent(); 720 721 if (addSize > 0) { 722 if (appHandler != null) { 723 boolean remaingDataToHandle = false; 724 try { 725 do { 726 remaingDataToHandle = false; 727 int insertVersion = getReadQueue().getInsertVersionVersion(); 728 int sizeBeforeHandle = getReadQueue().getSize(); 729 730 try { 732 733 ((IDataHandler) appHandler).onData(NonBlockingConnection.this); 734 735 } catch (MaxReadSizeExceededException mee) { 736 try { 737 close(); 738 } catch (Exception fe) { 739 } 741 742 return addSize; 743 744 } catch (BufferUnderflowException bue) { 745 return addSize; 747 748 749 } catch (Exception e) { 750 if (LOG.isLoggable(Level.FINE)) { 751 LOG.fine("[" + getId() + "] closing connection because an error has been occured by handling data by appHandler. " + appHandler + " Reason: " + e.toString()); 752 } 753 try { 754 close(); 755 } catch (IOException ignore) { } 756 return addSize; 757 } 758 759 760 if (!getReadQueue().isEmpty()) { 762 763 if (insertVersion != getReadQueue().getInsertVersionVersion()) { 765 remaingDataToHandle = true; 767 768 } else { 770 if (sizeBeforeHandle != getReadQueue().getSize()) { 772 remaingDataToHandle = true; 774 } 775 } 776 } 777 778 } while (remaingDataToHandle); 779 780 781 } catch (Exception e) { 782 if (LOG.isLoggable(Level.FINE)) { 783 LOG.fine("[" + getId() + "] closing connection because an error has been occured by handling data. Reason: " + e.toString()); 784 } 785 try { 786 close(); 787 } catch (IOException ignore) { } 788 } 789 } 790 } 791 792 return addSize; 793 } 794 795 796 797 798 799 800 801 @Override 802 protected void onConnectEvent() { 803 804 try { 805 if (appHandler != null) { 806 if (getIoHandlerContext().isAppHandlerListenForConnectEvent()) { 807 ((IConnectHandler) appHandler).onConnect(NonBlockingConnection.this); 808 } 809 } 810 811 } catch (MaxReadSizeExceededException mee) { 812 try { 813 close(); 814 } catch (Exception fe) { 815 } 817 818 } catch (BufferUnderflowException bue) { 819 821 } catch (Exception e) { 822 if (LOG.isLoggable(Level.FINE)) { 823 LOG.fine("[" + getId() + "] closing connection because an error has been occured by on connect data. Reason: " + e.toString()); 824 } 825 try { 826 close(); 827 } catch (IOException ignore) { } 828 } 829 } 830 831 832 833 @Override 834 protected void onDisconnectEvent() { 835 if (!disconnectOccured) { 836 disconnectOccured = true; 837 try { 838 if (appHandler != null) { 839 if (getIoHandlerContext().isAppHandlerListenforDisconnectEvent()) { 840 ((IDisconnectHandler) appHandler).onDisconnect(NonBlockingConnection.this); 841 } 842 } 843 } catch (Exception e) { 844 if (LOG.isLoggable(Level.FINE)) { 845 LOG.fine("[" + getId() + "] error occured by handling connect. Reason: " + e.toString()); 846 } 847 } 848 } 849 } 850 851 852 @Override 853 protected boolean onConnectionTimeoutEvent() { 854 if (getIoHandlerContext().isAppHandlerListenForTimeoutEvent()) { 855 try { 856 if (appHandler != null) { 857 boolean isHandled = ((ITimeoutHandler) appHandler).onConnectionTimeout(NonBlockingConnection.this); 858 return isHandled; 859 } 860 } catch (MaxReadSizeExceededException mee) { 861 try { 862 close(); 863 } catch (Exception fe) { 864 } 866 867 } catch (BufferUnderflowException bue) { 868 870 } catch (Exception e) { 871 if (LOG.isLoggable(Level.FINE)) { 872 LOG.fine("[" + getId() + "] closing connection because an error has been occured by on connect timeout. Reason: " + e.toString()); 873 } 874 try { 875 close(); 876 } catch (IOException ignore) { } 877 } 878 } 879 880 return false; 881 } 882 883 884 @Override 885 protected boolean onIdleTimeoutEvent() { 886 if (getIoHandlerContext().isAppHandlerListenForTimeoutEvent()) { 887 try { 888 if (appHandler != null) { 889 boolean isHandled = ((ITimeoutHandler) appHandler).onIdleTimeout(NonBlockingConnection.this); 890 return isHandled; 891 } 892 } catch (MaxReadSizeExceededException mee) { 893 try { 894 close(); 895 } catch (Exception fe) { 896 } 898 899 } catch (BufferUnderflowException bue) { 900 902 } catch (Exception e) { 903 if (LOG.isLoggable(Level.FINE)) { 904 LOG.fine("[" + getId() + "] closing connection because an error has been occured by on idle timeout. Reason: " + e.toString()); 905 } 906 try { 907 close(); 908 } catch (IOException ignore) { } 909 } 910 }; 911 912 return false; 913 } 914 } 915 | Popular Tags |