1 22 package org.xsocket.stream.io.impl; 23 24 import java.io.IOException ; 25 import java.net.InetAddress ; 26 import java.nio.ByteBuffer ; 27 import java.nio.channels.SelectionKey ; 28 import java.nio.channels.SocketChannel ; 29 import java.util.Collections ; 30 import java.util.HashMap ; 31 import java.util.LinkedList ; 32 import java.util.Map ; 33 import java.util.logging.Level ; 34 import java.util.logging.Logger ; 35 36 import org.xsocket.ByteBufferQueue; 37 import org.xsocket.ClosedConnectionException; 38 import org.xsocket.IDispatcher; 39 import org.xsocket.IHandle; 40 import org.xsocket.DataConverter; 41 import org.xsocket.stream.io.spi.IClientIoProvider; 42 import org.xsocket.stream.io.spi.IIoHandlerCallback; 43 import org.xsocket.stream.io.spi.IIoHandlerContext; 44 45 46 47 48 53 final class IoSocketHandler extends ChainableIoHandler implements IHandle { 54 55 private static final Logger LOG = Logger.getLogger(IoSocketHandler.class.getName()); 56 57 58 private static final int MIN_READ_BUFFER_SIZE = 64; 60 private static final Map <String ,Class > SUPPORTED_OPTIONS = new HashMap <String , Class >(); 61 62 static { 63 SUPPORTED_OPTIONS.put(IClientIoProvider.SO_RCVBUF, Integer .class); 64 SUPPORTED_OPTIONS.put(IClientIoProvider.SO_SNDBUF, Integer .class); 65 SUPPORTED_OPTIONS.put(IClientIoProvider.SO_REUSEADDR, Boolean .class); 66 SUPPORTED_OPTIONS.put(IClientIoProvider.SO_KEEPALIVE, Boolean .class); 67 SUPPORTED_OPTIONS.put(IClientIoProvider.TCP_NODELAY, Boolean .class); 68 SUPPORTED_OPTIONS.put(IClientIoProvider.SO_LINGER, Integer .class); 69 } 70 71 72 private boolean isLogicalOpen = true; 74 private boolean isDisconnect = false; 75 76 private SocketChannel channel = null; 78 79 80 private IoSocketDispatcher dispatcher = null; 82 83 84 private IMemoryManager memoryManager = null; 86 87 88 private final ByteBufferQueue sendQueue = new ByteBufferQueue(); 90 private final ByteBufferQueue receiveQueue = new ByteBufferQueue(); 91 92 93 private String id = null; 95 96 97 private long idleTimeout = Long.MAX_VALUE; 99 private long connectionTimeout = Long.MAX_VALUE; 100 101 102 private boolean suspendRead = false; 104 105 106 private long openTime = -1; 108 private long lastTimeReceived = System.currentTimeMillis(); 109 private long receivedBytes = 0; 111 private long sendBytes = 0; 112 113 114 122 @SuppressWarnings ("unchecked") 123 IoSocketHandler(SocketChannel channel, IoSocketDispatcher dispatcher, IIoHandlerContext ctx, String connectionId) throws IOException { 124 super(null); 125 126 assert (channel != null); 127 this.channel = channel; 128 129 openTime = System.currentTimeMillis(); 130 131 channel.configureBlocking(false); 132 133 this.dispatcher = dispatcher; 134 this.id = connectionId; 135 } 136 137 138 public void init(IIoHandlerCallback callbackHandler) throws IOException { 139 setPreviousCallback(callbackHandler); 140 141 blockUntilIsConnected(); 142 dispatcher.register(this, SelectionKey.OP_READ); 143 } 144 145 146 147 void setMemoryManager(IMemoryManager memoryManager) { 148 this.memoryManager = memoryManager; 149 } 150 151 @Override 152 public String getId() { 153 return id; 154 } 155 156 157 160 @Override 161 public int getPendingWriteDataSize() { 162 return sendQueue.getSize() + super.getPendingWriteDataSize(); 163 } 164 165 166 int getPendingReceiveDataSize() { 167 return receiveQueue.getSize() + super.getPendingReceiveDataSize(); 168 } 169 170 171 174 public void setOption(String name, Object value) throws IOException { 175 176 if (name.equals(IClientIoProvider.SO_SNDBUF)) { 177 channel.socket().setSendBufferSize((Integer ) value); 178 179 } else if (name.equals(IClientIoProvider.SO_REUSEADDR)) { 180 channel.socket().setReuseAddress((Boolean ) value); 181 182 } else if (name.equals(IClientIoProvider.SO_RCVBUF)) { 183 channel.socket().setReceiveBufferSize((Integer ) value); 184 185 } else if (name.equals(IClientIoProvider.SO_KEEPALIVE)) { 186 channel.socket().setKeepAlive((Boolean ) value); 187 188 } else if (name.equals(IClientIoProvider.SO_LINGER)) { 189 if (value instanceof Integer ) { 190 channel.socket().setSoLinger(true, (Integer ) value); 191 } else if (value instanceof Boolean ) { 192 if (((Boolean ) value).equals(Boolean.FALSE)) { 193 channel.socket().setSoLinger(Boolean.FALSE, 0); 194 } 195 } 196 197 } else if (name.equals(IClientIoProvider.TCP_NODELAY)) { 198 channel.socket().setTcpNoDelay((Boolean ) value); 199 200 201 } else { 202 LOG.warning("option " + name + " is not supproted for " + this.getClass().getName()); 203 } 204 } 205 206 207 208 209 212 public Object getOption(String name) throws IOException { 213 214 if (name.equals(IClientIoProvider.SO_SNDBUF)) { 215 return channel.socket().getSendBufferSize(); 216 217 } else if (name.equals(IClientIoProvider.SO_REUSEADDR)) { 218 return channel.socket().getReuseAddress(); 219 220 } else if (name.equals(IClientIoProvider.SO_RCVBUF)) { 221 return channel.socket().getReceiveBufferSize(); 222 223 } else if (name.equals(IClientIoProvider.SO_KEEPALIVE)) { 224 return channel.socket().getKeepAlive(); 225 226 } else if (name.equals(IClientIoProvider.TCP_NODELAY)) { 227 return channel.socket().getTcpNoDelay(); 228 229 } else if (name.equals(IClientIoProvider.SO_LINGER)) { 230 return channel.socket().getSoLinger(); 231 232 233 } else { 234 LOG.warning("option " + name + " is not supproted for " + this.getClass().getName()); 235 return null; 236 } 237 } 238 239 240 243 public Map <String , Class > getOptions() { 244 return Collections.unmodifiableMap(SUPPORTED_OPTIONS); 245 } 246 247 248 251 public void setIdleTimeoutSec(int timeout) { 252 long timeoutMillis = ((long) timeout) * 1000L; 253 this.idleTimeout = timeoutMillis; 254 255 dispatcher.updateTimeoutCheckPeriod((long) timeout * 250L); 256 } 257 258 259 260 261 266 public void setConnectionTimeoutSec(int timeout) { 267 long timeoutMillis = ((long) timeout) * 1000L; 268 this.connectionTimeout = timeoutMillis; 269 270 dispatcher.updateTimeoutCheckPeriod((long) timeout * 250L); 271 } 272 273 274 279 public int getConnectionTimeoutSec() { 280 return (int) (connectionTimeout / 1000); 281 } 282 283 284 287 public int getIdleTimeoutSec() { 288 return (int) (idleTimeout / 1000); 289 } 290 291 292 293 public int getReceiveQueueSize() { 294 return receiveQueue.getSize(); 295 } 296 297 298 int getSendQueueSize() { 299 return sendQueue.getSize(); 300 } 301 302 308 boolean checkIdleTimeout(Long current) { 309 long maxTime = lastTimeReceived + idleTimeout; 310 if (maxTime < 0) { 311 maxTime = Long.MAX_VALUE; 312 } 313 boolean timeoutReached = maxTime < current; 314 315 if (timeoutReached) { 316 getPreviousCallback().onIdleTimeout(); 317 } 318 return timeoutReached; 319 } 320 321 322 323 329 void checkConnection() { 330 if (!channel.isOpen()) { 331 getPreviousCallback().onConnectionAbnormalTerminated(); 332 } 333 } 334 335 336 337 343 boolean checkConnectionTimeout(Long current) { 344 long maxTime = openTime + connectionTimeout; 345 if (maxTime < 0) { 346 maxTime = Long.MAX_VALUE; 347 } 348 boolean timeoutReached = maxTime < current; 349 if (timeoutReached) { 350 getPreviousCallback().onConnectionTimeout(); 351 } 352 return timeoutReached; 353 } 354 355 356 361 int getIncomingQueueSize() { 362 return receiveQueue.getSize(); 363 } 364 365 366 void onConnectEvent() throws IOException { 367 getPreviousCallback().onConnect(); 368 } 369 370 371 void onReadableEvent() throws IOException { 372 assert (IoSocketDispatcher.isDispatcherThread()); 373 374 try { 375 readSocketIntoReceiveQueue(); 377 378 380 if (getReceiveQueueSize() > 0) { 381 getPreviousCallback().onDataRead(); 382 } 383 384 checkPreallocatedReadMemory(); 386 387 388 } catch (ClosedConnectionException ce) { 389 close(false); 390 391 } catch (Throwable t) { 392 close(false); 393 if (LOG.isLoggable(Level.FINE)) { 394 LOG.fine("error occured by handling readable event. reason: " + t.toString()); 395 } 396 } 397 } 398 399 400 401 void onWriteableEvent() throws IOException { 402 assert (IoSocketDispatcher.isDispatcherThread()); 403 404 if (suspendRead) { 405 updateInterestedSetNonen(); 406 407 } else { 408 updateInterestedSetRead(); 409 } 410 411 try { 413 writeSendQueueDataToSocket(); 414 415 getPreviousCallback().onWritten(); 417 } catch(IOException ioe) { 418 getPreviousCallback().onWriteException(ioe); 419 } 420 421 422 if (getSendQueueSize() > 0) { 424 getDispatcher().updateInterestSet(this, SelectionKey.OP_WRITE); 425 426 } else { 428 if (shouldClosedPhysically()) { 429 realClose(); 430 } 431 } 432 } 433 434 435 private void blockUntilIsConnected() throws IOException { 436 while (!getChannel().finishConnect()) { 438 try { 439 Thread.sleep(25); 440 } catch (InterruptedException ignore) { } 441 } 442 } 443 444 445 private boolean shouldClosedPhysically() { 446 if (!isLogicalOpen) { 448 449 if (sendQueue.isEmpty()) { 451 return true; 452 } 453 } 454 455 return false; 456 } 457 458 459 462 @SuppressWarnings ("unchecked") 463 public void writeOutgoing(ByteBuffer buffer) throws IOException { 464 if (buffer != null) { 465 sendQueue.append(buffer); 466 updateInterestedSetWrite(); 467 } 468 } 469 470 471 474 @SuppressWarnings ("unchecked") 475 public void writeOutgoing(LinkedList <ByteBuffer > buffers) throws IOException { 476 if (buffers != null) { 477 sendQueue.append(buffers); 478 updateInterestedSetWrite(); 479 } 480 } 481 482 483 486 public LinkedList <ByteBuffer > drainIncoming() { 487 return receiveQueue.drain(); 488 } 489 490 491 492 495 @SuppressWarnings ("unchecked") 496 public void close(boolean immediate) throws IOException { 497 if (immediate || sendQueue.isEmpty()) { 498 realClose(); 499 500 } else { 501 if (LOG.isLoggable(Level.FINE)) { 502 LOG.fine("postpone close until remaning data to write (" + sendQueue.getSize() + ") has been written"); 503 } 504 505 isLogicalOpen = false; 506 updateInterestedSetWrite(); 507 } 508 } 509 510 511 private void realClose() { 512 try { 513 getDispatcher().deregister(this); 514 } catch (Exception e) { 515 if (LOG.isLoggable(Level.FINE)) { 516 LOG.fine("error occured by deregistering connection " + id + " on dispatcher. reason: " + e.toString()); 517 } 518 } 519 520 try { 521 channel.close(); 522 if (LOG.isLoggable(Level.FINE)) { 523 LOG.fine("connection " + id + " has been closed"); 524 } 525 } catch (Exception e) { 526 if (LOG.isLoggable(Level.FINE)) { 527 LOG.fine("error occured by closing connection " + id + " reason: " + e.toString()); 528 } 529 } 530 531 532 if (!isDisconnect) { 533 isDisconnect = true; 534 getPreviousCallback().onDisconnect(); 535 } 536 } 537 538 539 540 void onDispatcherClose() { 541 getPreviousCallback().onConnectionAbnormalTerminated(); 542 } 543 544 private void updateInterestedSetWrite() throws ClosedConnectionException { 545 try { 546 dispatcher.updateInterestSet(this, SelectionKey.OP_WRITE); 547 } catch (IOException ioe) { 548 if (LOG.isLoggable(Level.FINE)) { 549 LOG.fine("couldn't update interested set to write data on socket. Reason: " + ioe.toString()); 550 } 551 552 try { 553 dispatcher.deregister(this); 554 } catch (Exception ignore) { } 555 556 throw new ClosedConnectionException("connection " + id + " is already closed", ioe); 557 } 558 } 559 560 private void updateInterestedSetRead() throws ClosedConnectionException { 561 try { 562 dispatcher.updateInterestSet(this, SelectionKey.OP_READ); 563 } catch (IOException ioe) { 564 if (LOG.isLoggable(Level.FINE)) { 565 LOG.fine("couldn't update interested set to read data. Reason: " + ioe.toString()); 566 } 567 568 try { 569 dispatcher.deregister(this); 570 } catch (Exception ignore) { } 571 572 throw new ClosedConnectionException("connection " + id + " is already closed", ioe); 573 } 574 } 575 576 577 578 579 private void updateInterestedSetNonen() throws ClosedConnectionException { 580 try { 581 dispatcher.updateInterestSet(this, 0); 582 } catch (IOException ioe) { 583 if (LOG.isLoggable(Level.FINE)) { 584 LOG.fine("couldn't update interested set to nonen. Reason: " + ioe.toString()); 585 } 586 587 try { 588 dispatcher.deregister(this); 589 } catch (Exception ignore) { } 590 591 throw new ClosedConnectionException("connection " + id + " is already closed", ioe); 592 } 593 } 594 595 596 597 598 601 public boolean isOpen() { 602 return channel.isOpen(); 603 } 604 605 606 607 612 public SocketChannel getChannel() { 613 return channel; 614 } 615 616 617 IDispatcher<IoSocketHandler> getDispatcher() { 618 return dispatcher; 619 } 620 621 622 @Override 623 public void suspendRead() throws IOException { 624 suspendRead = true; 625 updateInterestedSetNonen(); 626 } 627 628 629 @Override 630 public void resumeRead() throws IOException { 631 suspendRead = false; 632 633 updateInterestedSetWrite(); 637 } 638 639 640 647 private int readSocketIntoReceiveQueue() throws IOException { 648 assert (IoSocketDispatcher.isDispatcherThread()); 649 650 int read = 0; 651 lastTimeReceived = System.currentTimeMillis(); 652 653 654 if (isOpen()) { 655 656 assert (memoryManager instanceof UnsynchronizedMemoryManager); 657 658 ByteBuffer readBuffer = memoryManager.acquireMemory(MIN_READ_BUFFER_SIZE); 659 int pos = readBuffer.position(); 660 int limit = readBuffer.limit(); 661 662 try { 664 read = channel.read( readBuffer); 665 } catch (IOException ioe) { 667 readBuffer.position(pos); 668 readBuffer.limit(limit); 669 memoryManager.recycleMemory(readBuffer, MIN_READ_BUFFER_SIZE); 670 671 if (LOG.isLoggable(Level.FINE)) { 672 LOG.fine("[" + id + "] error occured while reading channel: " + ioe.toString()); 673 } 674 675 throw ioe; 676 } 677 678 679 switch (read) { 681 682 case -1: 684 memoryManager.recycleMemory(readBuffer, MIN_READ_BUFFER_SIZE); 685 if (LOG.isLoggable(Level.FINE)) { 686 LOG.fine("[" + id + "] channel has reached end-of-stream (maybe closed by peer)"); 687 } 688 ClosedConnectionException cce = new ClosedConnectionException("[" + id + "] End of stream reached"); 689 throw cce; 690 691 case 0: 693 memoryManager.recycleMemory(readBuffer, MIN_READ_BUFFER_SIZE); 694 break; 695 696 default: 698 int savePos = readBuffer.position(); 699 int saveLimit = readBuffer.limit(); 700 701 readBuffer.position(savePos - read); 702 readBuffer.limit(savePos); 703 704 ByteBuffer readData = readBuffer.slice(); 705 receiveQueue.append(readData); 706 707 708 if (readBuffer.hasRemaining()) { 709 readBuffer.position(savePos); 710 readBuffer.limit(saveLimit); 711 memoryManager.recycleMemory(readBuffer, MIN_READ_BUFFER_SIZE); 712 } 713 if (LOG.isLoggable(Level.FINE)) { 714 LOG.fine("[" + id + "] received (" + (readData.limit() - readData.position()) + " bytes, total " + (receivedBytes + read) + " bytes): " + DataConverter.toTextOrHexString(new ByteBuffer [] {readData.duplicate() }, "UTF-8", 500)); 715 } 716 break; 717 } 718 } 719 720 721 receivedBytes += read; 722 723 return read; 724 } 725 726 727 730 private void checkPreallocatedReadMemory() { 731 assert (IoSocketDispatcher.isDispatcherThread()); 732 733 memoryManager.preallocate(MIN_READ_BUFFER_SIZE); 734 } 735 736 742 @SuppressWarnings ("unchecked") 743 private void writeSendQueueDataToSocket() throws IOException { 744 assert (IoSocketDispatcher.isDispatcherThread()); 745 746 747 757 758 if (isOpen() && !sendQueue.isEmpty()) { 759 760 762 ByteBuffer buffer = null; 764 765 do { 766 buffer = sendQueue.removeFirst(); 767 768 if (buffer != null) { 769 int writeSize = buffer.remaining(); 770 771 if (writeSize > 0) { 772 if (LOG.isLoggable(Level.FINE)) { 773 if (LOG.isLoggable(Level.FINE)) { 774 LOG.fine("[" + id + "] sending (" + writeSize + " bytes): " + DataConverter.toTextOrHexString(buffer.duplicate(), "UTF-8", 500)); 775 } 776 } 777 778 int written = channel.write(buffer); 780 sendBytes += written; 781 782 783 if (written != writeSize) { 785 if (LOG.isLoggable(Level.FINE)) { 786 LOG.fine("[" + id + "] " + written + " of " + (writeSize - written) + " bytes has been sent. initiate sending of the remaining (total sent " + sendBytes + " bytes)"); 787 } 788 789 sendQueue.addFirst(buffer); 790 updateInterestedSetWrite(); 791 break; 792 } 793 } 794 } 795 796 } while (buffer != null); 797 } 798 } 799 800 801 804 @Override 805 public final InetAddress getLocalAddress() { 806 return channel.socket().getLocalAddress(); 807 } 808 809 810 813 @Override 814 public final int getLocalPort() { 815 return channel.socket().getLocalPort(); 816 } 817 818 819 822 @Override 823 public final InetAddress getRemoteAddress() { 824 return channel.socket().getInetAddress(); 825 } 826 827 828 831 @Override 832 public final int getRemotePort() { 833 return channel.socket().getPort(); 834 } 835 836 837 840 public void flushOutgoing() { 841 842 } 843 844 845 848 @Override 849 public String toString() { 850 try { 851 return "(" + channel.socket().getInetAddress().toString() + ":" + channel.socket().getPort() 852 + " -> " + channel.socket().getLocalAddress().toString() + ":" + channel.socket().getLocalPort() + ")" 853 + " received=" + DataConverter.toFormatedBytesSize(receivedBytes) 854 + ", sent=" + DataConverter.toFormatedBytesSize(sendBytes) 855 + ", age=" + DataConverter.toFormatedDuration(System.currentTimeMillis() - openTime) 856 + ", lastReceived=" + DataConverter.toFormatedDate(lastTimeReceived) 857 + ", sendQueueSize=" + DataConverter.toFormatedBytesSize(sendQueue.getSize()) 858 + " [" + id + "]"; 859 } catch (Exception e) { 860 return super.toString(); 861 } 862 } 863 } | Popular Tags |