1 22 package org.xsocket.stream; 23 24 import java.io.IOException ; 25 import java.io.UnsupportedEncodingException ; 26 import java.net.InetAddress ; 27 import java.net.InetSocketAddress ; 28 import java.net.SocketOptions ; 29 import java.nio.BufferUnderflowException ; 30 import java.nio.ByteBuffer ; 31 import java.nio.channels.WritableByteChannel ; 32 import java.util.HashMap ; 33 import java.util.LinkedList ; 34 import java.util.Map ; 35 import java.util.logging.Level ; 36 import java.util.logging.Logger ; 37 38 import javax.net.ssl.SSLContext; 39 40 41 import org.xsocket.ByteBufferQueue; 42 import org.xsocket.ClosedConnectionException; 43 import org.xsocket.DataConverter; 44 import org.xsocket.MaxReadSizeExceededException; 45 import org.xsocket.stream.ByteBufferParser.Index; 46 import org.xsocket.stream.io.impl.IoProvider; 47 import org.xsocket.stream.io.spi.IClientIoProvider; 48 import org.xsocket.stream.io.spi.IHandlerIoProvider; 49 import org.xsocket.stream.io.spi.IIoHandler; 50 import org.xsocket.stream.io.spi.IIoHandlerCallback; 51 import org.xsocket.stream.io.spi.IIoHandlerContext; 52 53 54 55 61 abstract class Connection implements IConnection { 62 63 private static final Logger LOG = Logger.getLogger(Connection.class.getName()); 64 65 static final FlushMode INITIAL_FLUSH_MODE = FlushMode.SYNC; 66 67 private static final long SEND_TIMEOUT = 60 * 1000; 68 69 70 private static final ByteBufferParser PARSER = new ByteBufferParser(); 72 73 74 static final IoProvider DEFAULT_CLIENT_IO_PROVIDER = new IoProvider(); private static IClientIoProvider clientIoProvider = null; 77 private IHandlerIoProvider ioProvider = null; 78 79 80 private boolean isClosed = false; 82 83 private final ByteBufferQueue writeQueue = new ByteBufferQueue(); 85 private final ByteBufferQueue readQueue = new ByteBufferQueue(); 86 87 88 private IIoHandler ioHandler = null; 90 private IIoHandlerContext ioHandlerCtx = null; 91 92 93 private String defaultEncoding = INITIAL_DEFAULT_ENCODING; 95 96 private boolean autoflush = INITIAL_AUTOFLUSH; 98 99 private FlushMode flushmode = INITIAL_FLUSH_MODE; 101 102 private Index cachedIndex = null; 104 105 private Object attachment = null; 107 108 109 private Object writeGuard = new Object (); 111 private IOException writeException = null; 112 113 114 private LinkedList <ByteBuffer > readMarkBuffer = null; 116 private boolean isReadMarked = false; 117 118 private WriteMarkBuffer writeMarkBuffer = null; 119 private boolean isWriteMarked = false; 120 121 122 private boolean idleTimeoutOccured = false; 124 private boolean connectionTimeoutOccured = false; 125 126 127 128 static { 129 String clientIoManagerClassname = System.getProperty(IClientIoProvider.PROVIDER_CLASSNAME_KEY, IoProvider.class.getName()); 131 try { 132 Class clientIoManagerClass = Class.forName(clientIoManagerClassname); 133 clientIoProvider = (IClientIoProvider) clientIoManagerClass.newInstance(); 134 } catch (Exception e) { 135 LOG.warning("error occured by creating ClientIoManager " + clientIoManagerClassname + ": " + e.toString()); 136 LOG.info("using default ClientIoManager " + DEFAULT_CLIENT_IO_PROVIDER.getClass().getName()); 137 clientIoProvider = DEFAULT_CLIENT_IO_PROVIDER; 138 } 139 } 140 141 142 145 public final int getPendingWriteDataSize() { 146 return writeQueue.getSize() + ioHandler.getPendingWriteDataSize(); 147 } 148 149 150 154 Connection(IIoHandlerContext ioHandlerCtx, InetSocketAddress remoteAddress, Map <String ,Object > options, SSLContext sslContext, boolean sslOn) throws IOException { 155 this.ioHandlerCtx = ioHandlerCtx; 156 157 if (sslContext != null) { 158 ioHandler = ((IoProvider) clientIoProvider).createSSLClientIoHandler(ioHandlerCtx, remoteAddress, options, sslContext, sslOn); 159 } else { 160 ioHandler = clientIoProvider.createClientIoHandler(ioHandlerCtx, remoteAddress, options); 161 } 162 163 ioProvider = clientIoProvider; 164 } 165 166 167 171 Connection(IIoHandlerContext ioHandlerCtx, IIoHandler ioHandler, IHandlerIoProvider ioProvider) throws IOException { 172 this.ioHandlerCtx = ioHandlerCtx; 173 this.ioHandler = ioHandler; 174 this.ioProvider = ioProvider; 175 } 176 177 178 protected final IHandlerIoProvider getIoProvider() { 179 return ioProvider; 180 } 181 182 183 protected final IIoHandlerContext getIoHandlerContext() { 184 return ioHandlerCtx; 185 } 186 187 protected final void init() throws IOException { 188 ioHandler.init(new HandlerCallback()); 189 190 if (LOG.isLoggable(Level.FINE)) { 191 LOG.fine("connection " + getId() + " created. IoHandler: " + ioHandler.toString()); 192 } 193 } 194 195 protected final IIoHandlerContext getioHandlerCtx() { 196 return ioHandlerCtx; 197 } 198 199 protected final void setIoHandler(IIoHandler ioHandler) { 200 this.ioHandler = ioHandler; 201 } 202 203 207 void reset() throws IOException { 208 writeQueue.drain(); 209 210 writeGuard = new Object (); 211 writeException = null; 212 213 resumeRead(); 214 setIdleTimeoutSec(Integer.MAX_VALUE); 215 setConnectionTimeoutSec(Integer.MAX_VALUE); 216 setAutoflush(IBlockingConnection.INITIAL_AUTOFLUSH); 217 setFlushmode(INITIAL_FLUSH_MODE); 218 setDefaultEncoding(IBlockingConnection.INITIAL_DEFAULT_ENCODING); 219 removeReadMark(); 220 removeWriteMark(); 221 resetCachedIndex(); 222 attachment = null; 223 224 idleTimeoutOccured = false; 225 connectionTimeoutOccured = false; 226 227 ioHandler.drainIncoming(); 228 readQueue.drain(); 229 } 230 231 232 236 protected final IIoHandler getIoHandler() { 237 return ioHandler; 238 } 239 240 241 242 245 final SocketOptions getSocketOptions() { 246 Map <String , Object > opt = new HashMap <String , Object >(); 247 248 Map <String , Class > setOpts = getOptions(); 249 for (String optionName : setOpts.keySet()) { 250 try { 251 opt.put(optionName, getOption(optionName)); 252 } catch (IOException ignore) { }; 253 } 254 255 return StreamSocketConfiguration.fromOptions(opt); 256 } 257 258 259 264 final ByteBufferQueue getReadQueue() { 265 return readQueue; 266 } 267 268 269 270 273 public final void close() throws IOException { 274 if (isOpen() && !isClosed) { 275 isClosed = true; 276 277 if (LOG.isLoggable(Level.FINE)) { 278 LOG.fine("closing connection -> flush all remaining data"); 279 } 280 281 flushWriteQueue(); 282 ioHandler.close(false); 283 } 284 } 285 286 287 290 public final boolean isOpen() { 291 if (isClosed) { 292 return false; 293 } else { 294 return ioHandler.isOpen(); 295 } 296 } 297 298 303 final void writeIncoming(ByteBuffer data) { 304 readQueue.append(data); 305 } 306 307 308 313 final void writeOutgoing(ByteBuffer data) { 314 writeQueue.append(data); 315 } 316 317 322 void writeOutgoing(LinkedList <ByteBuffer > datas) { 323 writeQueue.append(datas); 324 } 325 326 327 330 public void suspendRead() throws IOException { 331 ioHandler.suspendRead(); 332 } 333 334 335 338 public void resumeRead() throws IOException { 339 ioHandler.resumeRead(); 340 } 341 342 343 346 public void flush() throws ClosedConnectionException, IOException { 347 if (autoflush) { 348 LOG.warning("flush has been called for a connection which is in autoflush mode (since xSocket V1.1 autoflush is activated by default)"); 349 } 350 internalFlush(); 351 } 352 353 359 void internalFlush() throws ClosedConnectionException, IOException { 360 361 removeWriteMark(); 362 if (flushmode == FlushMode.SYNC) { 363 syncFlush(); 365 366 } else { 367 flushWriteQueue(); 369 } 370 } 371 372 373 377 public final void setFlushmode(FlushMode flushMode) { 378 this.flushmode = flushMode; 379 } 380 381 386 public final FlushMode getFlushmode() { 387 return flushmode; 388 } 389 390 391 395 public void setIdleTimeoutSec(int timeoutInSec) { 396 getIoHandler().setIdleTimeoutSec(timeoutInSec); 397 } 398 399 400 403 public void setConnectionTimeoutSec(int timeoutSec) { 404 getIoHandler().setConnectionTimeoutSec(timeoutSec); 405 } 406 407 408 411 public int getConnectionTimeoutSec() { 412 return getIoHandler().getConnectionTimeoutSec(); 413 } 414 415 416 417 421 public int getIdleTimeoutSec() { 422 return getIoHandler().getIdleTimeoutSec(); 423 } 424 425 426 427 430 private void syncFlush() throws ClosedConnectionException, IOException { 431 432 long start = System.currentTimeMillis(); 433 long remainingTime = SEND_TIMEOUT; 434 435 synchronized (writeGuard) { 436 flushWriteQueue(); 437 438 do { 439 if (ioHandler.getPendingWriteDataSize() == 0) { 441 return; 442 443 } else if(writeException != null) { 445 IOException ioe = writeException; 446 writeException = null; 447 throw ioe; 448 449 } else { 451 try { 452 writeGuard.wait(remainingTime); 453 } catch (InterruptedException ignore) { } 454 } 455 456 remainingTime = (start + SEND_TIMEOUT) - System.currentTimeMillis(); 457 } while (remainingTime > 0); 458 } 459 } 460 461 private void flushWriteQueue() throws ClosedConnectionException, IOException { 462 if (!writeQueue.isEmpty()) { 463 LinkedList <ByteBuffer > buffers = writeQueue.drain(); 464 ioHandler.writeOutgoing(buffers); 465 } 466 } 467 468 469 470 471 472 475 public final String getDefaultEncoding() { 476 return defaultEncoding; 477 } 478 479 480 483 public final void setDefaultEncoding(String defaultEncoding) { 484 this.defaultEncoding = defaultEncoding; 485 } 486 487 488 491 public final void setAutoflush(boolean autoflush) { 492 this.autoflush = autoflush; 493 } 494 495 496 499 public final boolean getAutoflush() { 500 return autoflush; 501 } 502 503 504 507 public final String getId() { 508 return ioHandler.getId(); 509 } 510 511 512 515 public final InetAddress getLocalAddress() { 516 return ioHandler.getLocalAddress(); 517 } 518 519 520 521 524 public final int getLocalPort() { 525 return ioHandler.getLocalPort(); 526 } 527 528 529 530 533 public final InetAddress getRemoteAddress() { 534 return ioHandler.getRemoteAddress(); 535 } 536 537 538 539 542 public final int getRemotePort() { 543 return ioHandler.getRemotePort(); 544 } 545 546 547 548 549 550 553 public void activateSecuredMode() throws IOException { 554 555 boolean isPrestarted = DEFAULT_CLIENT_IO_PROVIDER.preStartSecuredMode(ioHandler); 556 557 if (isPrestarted) { 558 internalFlush(); 559 DEFAULT_CLIENT_IO_PROVIDER.startSecuredMode(ioHandler, readQueue.drain()); 560 } 561 } 562 563 564 567 public final int write(String s) throws ClosedConnectionException, IOException { 568 return write(s, defaultEncoding); 569 } 570 571 572 575 public final int write(String s, String encoding) throws ClosedConnectionException, IOException { 576 ByteBuffer buffer = DataConverter.toByteBuffer(s, encoding); 577 return write(buffer); 578 } 579 580 583 public final int write(byte b) throws ClosedConnectionException, IOException { 584 ByteBuffer buffer = ByteBuffer.allocate(1).put(b); 585 buffer.flip(); 586 return write(buffer); 587 } 588 589 590 593 public final int write(byte... bytes) throws ClosedConnectionException, IOException { 594 return write(ByteBuffer.wrap(bytes)); 595 } 596 597 598 601 public final int write(byte[] bytes, int offset, int length) throws ClosedConnectionException, IOException { 602 return write(ByteBuffer.wrap(bytes, offset, length)); 603 } 604 605 606 609 public final long write(ByteBuffer [] buffers) throws ClosedConnectionException, IOException { 610 if (isOpen()) { 611 long written = 0; 612 for (ByteBuffer buffer : buffers) { 613 written += buffer.limit() - buffer.position(); 614 } 615 616 617 if (isWriteMarked) { 618 for (ByteBuffer buffer : buffers) { 619 writeMarkBuffer.add(buffer); 620 } 621 622 } else { 623 for (ByteBuffer buffer : buffers) { 624 writeQueue.append(buffer); 625 } 626 } 627 628 629 if (autoflush) { 630 internalFlush(); 631 } 632 633 return written; 634 635 } else { 636 throw new ClosedConnectionException("connection " + getId() + " is already closed"); 637 } 638 } 639 640 641 644 public final int write(ByteBuffer buffer) throws ClosedConnectionException, IOException { 645 if (isOpen()) { 646 int written = buffer.limit() - buffer.position(); 647 648 if (isWriteMarked) { 649 writeMarkBuffer.add(buffer); 650 651 } else { 652 writeQueue.append(buffer); 653 } 654 655 656 if (autoflush) { 657 internalFlush(); 658 } 659 660 return written; 661 } else { 662 throw new ClosedConnectionException("connection " + getId() + " is already closed"); 663 } 664 665 } 666 667 668 669 672 public final int write(int i) throws ClosedConnectionException, IOException { 673 ByteBuffer buffer = ByteBuffer.allocate(4).putInt(i); 674 buffer.flip(); 675 return (int) write(buffer); 676 } 677 678 679 682 public final int write(short s) throws ClosedConnectionException, IOException { 683 ByteBuffer buffer = ByteBuffer.allocate(2).putShort(s); 684 buffer.flip(); 685 return (int) write(buffer); 686 } 687 688 689 690 693 public final int write(long l) throws ClosedConnectionException, IOException { 694 ByteBuffer buffer = ByteBuffer.allocate(8).putLong(l); 695 buffer.flip(); 696 return (int) write(buffer); 697 } 698 699 700 703 public final int write(double d) throws ClosedConnectionException, IOException { 704 ByteBuffer buffer = ByteBuffer.allocate(8).putDouble(d); 705 buffer.flip(); 706 return (int) write(buffer); 707 } 708 709 710 713 public final long write(ByteBuffer [] srcs, int offset, int length) throws IOException { 714 ByteBuffer [] bufs = new ByteBuffer [length]; 715 System.arraycopy(srcs, offset, bufs, 0, length); 716 717 return write(bufs); 718 } 719 720 721 726 protected final LinkedList <ByteBuffer > extractAvailableFromReadQueue() { 727 resetCachedIndex(); 728 729 LinkedList <ByteBuffer > buffers = readQueue.drain(); 730 onExtracted(buffers); 731 732 return buffers; 733 } 734 735 744 protected final LinkedList <ByteBuffer > extractBytesByDelimiterFromReadQueue(byte[] delimiter, int maxLength) throws IOException , BufferUnderflowException , MaxReadSizeExceededException { 745 746 if (!readQueue.isEmpty()) { 747 LinkedList <ByteBuffer > buffers = readQueue.drain(); 748 assert (buffers != null); 749 750 ByteBufferParser.Index index = scanByDelimiter(buffers, delimiter); 751 752 753 if (index.getReadBytes() > maxLength) { 755 throw new MaxReadSizeExceededException(); 756 } 757 758 if (index.hasDelimiterFound()) { 760 LinkedList <ByteBuffer > extracted = PARSER.extract(buffers, index); 762 onExtracted(extracted, delimiter); 763 764 readQueue.addFirstSilence(buffers); 765 resetCachedIndex(); 766 767 return extracted; 768 769 } else { 771 readQueue.addFirstSilence(buffers); 772 cachedIndex = index; 773 } 774 } 775 776 777 if (isOpen()) { 778 throw new BufferUnderflowException (); 779 780 } else { 781 int read = retrieveIoHandlerData(); 783 784 if (read > 0) { 786 return extractBytesByDelimiterFromReadQueue(delimiter, maxLength); 787 788 } else { 790 throw new ClosedConnectionException("connection " + getId() + " is already closed"); 791 } 792 } 793 } 794 795 796 806 public int indexOf(String str) { 807 808 int length = 0; 809 810 if (!readQueue.isEmpty()) { 811 try { 812 LinkedList <ByteBuffer > buffers = readQueue.drain(); 813 ByteBufferParser.Index index = scanByDelimiter(buffers, str.getBytes(getDefaultEncoding())); 814 815 if (index.hasDelimiterFound()) { 817 length = index.getReadBytes() - str.length(); 818 819 } else { 821 length = -1; 822 } 823 824 readQueue.addFirstSilence(buffers); 825 cachedIndex = index; 826 } catch (UnsupportedEncodingException uce) { 827 throw new RuntimeException (uce); 828 } 829 } 830 831 return length; 832 } 833 834 835 836 protected final int readIndexOf(byte[] bytes, int maxReadSize) throws IOException , BufferUnderflowException , MaxReadSizeExceededException { 837 838 int length = 0; 839 840 if (!readQueue.isEmpty()) { 841 LinkedList <ByteBuffer > buffers = readQueue.drain(); 842 ByteBufferParser.Index index = scanByDelimiter(buffers, bytes); 843 844 if (index.hasDelimiterFound()) { 846 length = index.getReadBytes() - bytes.length; 847 848 } else { 850 length = -1; 851 } 852 853 readQueue.addFirstSilence(buffers); 854 cachedIndex = index; 855 } else { 856 length = -1; 857 } 858 859 if (length < 0) { 860 if (readQueue.getSize() >= maxReadSize) { 861 throw new MaxReadSizeExceededException(); 862 } 863 864 if (isOpen()) { 865 throw new BufferUnderflowException (); 866 } else { 867 throw new ClosedConnectionException("connection " + getId() + " is already closed"); 868 } 869 } 870 871 return length; 872 } 873 874 875 883 protected final LinkedList <ByteBuffer > extractBytesByLength(int length) throws IOException , BufferUnderflowException { 884 885 if (length == 0) { 886 return new LinkedList <ByteBuffer >(); 887 } 888 889 if (readQueue.getSize() >= length) { 891 LinkedList <ByteBuffer > buffers = readQueue.drain(); 892 assert (buffers != null); 893 894 LinkedList <ByteBuffer > extracted = PARSER.extract(buffers, length); 895 onExtracted(extracted); 896 897 readQueue.addFirstSilence(buffers); 898 resetCachedIndex(); 899 900 return extracted; 901 902 } else { 904 if (isOpen()) { 905 throw new BufferUnderflowException (); 906 907 } else { 908 int read = retrieveIoHandlerData(); 910 911 if (read > 0) { 913 return extractBytesByLength(length); 914 915 } else { 917 throw new ClosedConnectionException("connection " + getId() + " is already closed"); 918 } 919 } 920 } 921 } 922 923 924 925 933 @SuppressWarnings ("unchecked") 934 protected final boolean extractAvailableFromReadQueue(byte[] delimiter, WritableByteChannel outChannel) throws IOException { 935 936 if (!readQueue.isEmpty()) { 937 LinkedList <ByteBuffer > buffers = readQueue.drain(); 938 assert (buffers != null); 939 940 ByteBufferParser.Index index = scanByDelimiter(buffers, delimiter); 941 942 943 if (index.hasDelimiterFound()) { 945 LinkedList <ByteBuffer > extracted = PARSER.extract(buffers, index); 946 onExtracted(extracted, delimiter); 947 for (ByteBuffer buffer : extracted) { 948 outChannel.write(buffer); 949 } 950 951 readQueue.addFirstSilence(buffers); 952 resetCachedIndex(); 953 return true; 954 955 } else { 957 if (index.getDelimiterPos() == 0) { 959 int readBytes = index.getReadBytes(); 960 if (readBytes > 0) { 961 int availableBytes = readBytes - index.getDelimiterPos(); 962 if (availableBytes > 0) { 963 LinkedList <ByteBuffer > extracted = PARSER.extract(buffers, availableBytes); 964 onExtracted(extracted); 965 for (ByteBuffer buffer : extracted) { 966 outChannel.write(buffer); 967 } 968 969 resetCachedIndex(); 970 } 971 } 972 } 973 974 readQueue.addFirstSilence(buffers); 975 return false; 976 } 977 978 } else { 979 return false; 980 } 981 } 982 983 984 private ByteBufferParser.Index scanByDelimiter(LinkedList <ByteBuffer > buffers, byte[] delimiter) { 985 986 if (cachedIndex != null) { 988 if (cachedIndex.isDelimiterEquals(delimiter)) { 989 return PARSER.find(buffers, cachedIndex); 990 } else { 991 cachedIndex = null; 992 } 993 } 994 995 return PARSER.find(buffers, delimiter); 996 } 997 998 999 private void onExtracted(LinkedList <ByteBuffer > buffers) { 1000 if (isReadMarked) { 1001 for (ByteBuffer buffer : buffers) { 1002 onExtracted(buffer); 1003 } 1004 } 1005 } 1006 1007 1008 private void onExtracted(LinkedList <ByteBuffer > buffers, byte[] delimiter) { 1009 if (isReadMarked) { 1010 for (ByteBuffer buffer : buffers) { 1011 onExtracted(buffer); 1012 } 1013 onExtracted(ByteBuffer.wrap(delimiter)); 1014 } 1015 } 1016 1017 private void onExtracted(ByteBuffer buffer) { 1018 if (isReadMarked) { 1019 readMarkBuffer.addLast(buffer.duplicate()); 1020 } 1021 } 1022 1023 1024 1025 1026 1033 protected final byte[] extractBytesFromReadQueue(int length) throws BufferUnderflowException { 1034 resetCachedIndex(); 1035 1036 ByteBuffer buffer = readQueue.read(length); 1037 onExtracted(buffer); 1038 return DataConverter.toBytes(buffer); 1039 } 1040 1041 1042 1048 protected final int extractIntFromReadQueue() throws BufferUnderflowException { 1049 resetCachedIndex(); 1050 1051 ByteBuffer buffer = readQueue.read(4); 1052 onExtracted(buffer); 1053 return buffer.getInt(); 1054 } 1055 1056 1057 1063 protected final short extractShortFromReadQueue() throws BufferUnderflowException { 1064 resetCachedIndex(); 1065 1066 ByteBuffer buffer = readQueue.read(2); 1067 onExtracted(buffer); 1068 return buffer.getShort(); 1069 } 1070 1071 1077 protected final byte extractByteFromReadQueue() throws BufferUnderflowException { 1078 resetCachedIndex(); 1079 1080 ByteBuffer buffer = readQueue.read(1); 1081 onExtracted(buffer); 1082 1083 return buffer.get(); 1084 } 1085 1086 1087 1093 protected final double extractDoubleFromReadQueue() throws BufferUnderflowException { 1094 resetCachedIndex(); 1095 1096 ByteBuffer buffer = readQueue.read(8); 1097 onExtracted(buffer); 1098 1099 return buffer.getDouble(); 1100 } 1101 1102 1103 1104 1110 protected final long extractLongFromReadQueue() throws BufferUnderflowException { 1111 resetCachedIndex(); 1112 1113 ByteBuffer buffer = readQueue.read(8); 1114 onExtracted(buffer); 1115 1116 return buffer.getLong(); 1117 } 1118 1119 1120 private void resetCachedIndex() { 1121 cachedIndex = null; 1122 } 1123 1124 1125 1126 1129 public final Object attach(Object obj) { 1130 Object old = attachment; 1131 attachment = obj; 1132 return old; 1133 } 1134 1135 1136 1139 public final Object attachment() { 1140 return attachment; 1141 } 1142 1143 1144 1147 public void markReadPosition() { 1148 removeReadMark(); 1149 1150 isReadMarked = true; 1151 readMarkBuffer = new LinkedList <ByteBuffer >(); 1152 } 1153 1154 1155 1158 public void markWritePosition() { 1159 if (getAutoflush()) { 1160 throw new UnsupportedOperationException ("write mark is only supported for mode autoflush off"); 1161 } 1162 removeWriteMark(); 1163 1164 isWriteMarked = true; 1165 writeMarkBuffer = new WriteMarkBuffer(); 1166 } 1167 1168 1169 1172 public boolean resetToWriteMark() { 1173 if (isWriteMarked) { 1174 writeMarkBuffer.resetWritePosition(); 1175 return true; 1176 1177 } else { 1178 return false; 1179 } 1180 } 1181 1182 1183 1184 1187 public boolean resetToReadMark() { 1188 if (isReadMarked) { 1189 getReadQueue().addFirstSilence(readMarkBuffer); 1190 removeReadMark(); 1191 return true; 1192 1193 } else { 1194 return false; 1195 } 1196 } 1197 1198 1199 1202 public void removeReadMark() { 1203 isReadMarked = false; 1204 readMarkBuffer = null; 1205 } 1206 1207 1208 1211 public void removeWriteMark() { 1212 if (isWriteMarked) { 1213 isWriteMarked = false; 1214 writeQueue.append(writeMarkBuffer.drain()); 1215 writeMarkBuffer = null; 1216 } 1217 } 1218 1219 1220 protected int onDataEvent() { 1221 return retrieveIoHandlerData(); 1222 } 1223 1224 1225 private int retrieveIoHandlerData() { 1226 1227 try { 1228 1229 LinkedList <ByteBuffer > buffers = getIoHandler().drainIncoming(); 1230 1231 int addSize = 0; 1232 for (ByteBuffer buffer : buffers) { 1233 addSize += buffer.remaining(); 1234 } 1235 1236 1237 if (addSize > 0) { 1238 getReadQueue().append(buffers); 1239 } 1240 1241 return addSize; 1242 1243 } catch (RuntimeException e) { 1244 if (LOG.isLoggable(Level.FINE)) { 1245 LOG.fine("error occured by transfering data to connection's read queue " + e.toString()); 1246 } 1247 throw e; 1248 } 1249 } 1250 1251 1252 1253 protected void onConnectEvent() { 1254 1255 } 1256 1257 1258 protected void onDisconnectEvent() { 1259 1260 } 1261 1262 protected boolean onConnectionTimeoutEvent() { 1263 return false; 1264 } 1265 1266 protected boolean onIdleTimeoutEvent() { 1267 return false; 1268 } 1269 1270 1271 1272 public void onWritten() { 1273 if (flushmode == FlushMode.SYNC) { 1274 synchronized (writeGuard) { 1275 writeGuard.notifyAll(); 1276 } 1277 } 1278 } 1279 1280 1281 public void onWriteException(IOException ioException) { 1282 if (flushmode == FlushMode.SYNC) { 1283 synchronized (writeGuard) { 1284 writeException = ioException; 1285 writeGuard.notify(); 1286 } 1287 } 1288 } 1289 1290 1291 1294 public final Object getOption(String name) throws IOException { 1295 return ioHandler.getOption(name); 1296 } 1297 1298 1299 1302 public final Map <String , Class > getOptions() { 1303 return ioHandler.getOptions(); 1304 } 1305 1306 1307 1310 public IConnection setOption(String name, Object value) throws IOException { 1311 ioHandler.setOption(name, value); 1312 return this; 1313 } 1314 1315 1316 1317 1318 1319 private void initiateClose() { 1320 try { 1321 close(); 1322 } catch (IOException ioe) { 1323 if (LOG.isLoggable(Level.FINE)) { 1324 LOG.fine("[" + getId() + "] error occured closing connection. Reason: " + ioe.toString()); 1325 } 1326 } 1327 } 1328 1329 1330 1331 1334 @Override 1335 public String toString() { 1336 try { 1337 if (isOpen()) { 1338 return "id=" + getId() + ", remote=" + getRemoteAddress().getCanonicalHostName() + "(" + getRemoteAddress() + ":" + getRemotePort() + ")"; 1339 } else { 1340 return "id=" + getId() + " (closed)"; 1341 } 1342 } catch (Exception e) { 1343 return super.toString(); 1344 } 1345 } 1346 1347 1348 1349 1350 private static final class WriteMarkBuffer { 1351 private final Entry head = new Entry(null, null); 1352 private Entry tail = head; 1353 1354 1355 public LinkedList <ByteBuffer > drain() { 1356 LinkedList <ByteBuffer > result = new LinkedList <ByteBuffer >(); 1357 1358 Entry entry = head; 1359 do { 1360 entry = entry.next; 1361 if (entry!= null) { 1362 result.add(entry.element); 1363 } 1364 1365 } while (entry != null); 1366 1367 head.next = null; 1368 tail = head; 1369 1370 return result; 1371 } 1372 1373 public void add(ByteBuffer data) { 1374 int size = data.remaining(); 1375 1376 if (size == 0) { 1377 return; 1378 } 1379 1380 1381 Entry entry = new Entry(data, tail.next); 1383 tail.next = entry; 1384 tail = entry; 1385 1386 1387 while (size > 0) { 1389 if (tail.next != null) { 1390 int nextSize = tail.next.element.remaining(); 1391 1392 if (nextSize <= size) { 1394 size = size - nextSize; 1395 1396 tail.next = tail.next.next; 1397 if (tail.next == null) { 1398 break; 1399 } 1400 1401 } else { 1403 ByteBuffer buffer = tail.next.element; 1404 buffer.position(buffer.position() + size); 1405 ByteBuffer sliced = buffer.slice(); 1406 1407 Entry slicedEntry = new Entry(sliced, tail.next.next); 1408 tail.next = slicedEntry; 1409 break; 1410 } 1411 } else { 1412 break; 1413 } 1414 } 1415 } 1416 1417 1418 1419 public void resetWritePosition() { 1420 tail = head; 1421 } 1422 } 1423 1424 private static class Entry { 1425 private ByteBuffer element = null; 1426 private Entry next = null; 1427 1428 Entry(ByteBuffer element, Entry next) { 1429 this.element = element; 1430 this.next = next; 1431 } 1432 1433 @Override 1434 public String toString() { 1435 StringBuilder sb = new StringBuilder (); 1436 1437 if (element != null) { 1438 sb.append(DataConverter.toHexString(new ByteBuffer [] {element}, 100000)); 1439 } 1440 1441 if (next != null) { 1442 sb.append(next.toString()); 1443 } 1444 1445 return sb.toString(); 1446 } 1447 } 1448 1449 1450 1451 private final class HandlerCallback implements IIoHandlerCallback { 1452 1453 public void onWritten() { 1454 Connection.this.onWritten(); 1455 } 1456 1457 1458 public void onWriteException(IOException ioException) { 1459 Connection.this.onWriteException(ioException); 1460 } 1461 1462 public void onDataRead() { 1463 Connection.this.onDataEvent(); 1464 } 1465 1466 1467 public void onConnectionAbnormalTerminated() { 1468 Connection.this.initiateClose(); 1469 } 1470 1471 1472 public void onConnect() { 1473 Connection.this.onConnectEvent(); 1474 } 1475 1476 1477 public void onDisconnect() { 1478 Connection.this.onDisconnectEvent(); 1479 } 1480 1481 public void onConnectionTimeout() { 1482 if (!connectionTimeoutOccured) { 1483 connectionTimeoutOccured = true; 1484 boolean isHandled = Connection.this.onConnectionTimeoutEvent(); 1485 if (!isHandled) { 1486 try { 1487 close(); 1488 } catch (IOException ioe) { 1489 if (LOG.isLoggable(Level.FINE)) { 1490 LOG.fine("[" + getId() + "] error occured closing connection caused by connection timeout. Reason: " + ioe.toString()); 1491 } 1492 } 1493 } 1494 } 1495 } 1496 1497 1498 public void onIdleTimeout() { 1499 if (!idleTimeoutOccured) { 1500 idleTimeoutOccured = true; 1501 1502 boolean isHandled = Connection.this.onIdleTimeoutEvent(); 1503 if (!isHandled) { 1504 try { 1505 close(); 1506 } catch (IOException ioe) { 1507 if (LOG.isLoggable(Level.FINE)) { 1508 LOG.fine("[" + getId() + "] error occured closing connection caused by idle timeout. Reason: " + ioe.toString()); 1509 } 1510 } 1511 } 1512 } 1513 } 1514 1515 } 1516} 1517 | Popular Tags |