1 14 15 package org.quickserver.net.server.impl; 16 17 import org.quickserver.net.server.*; 18 import org.quickserver.net.*; 19 import org.quickserver.util.*; 20 import org.quickserver.util.io.*; 21 22 import java.io.*; 23 import java.net.*; 24 import java.util.*; 25 import java.util.logging.*; 26 27 import java.nio.*; 28 import java.nio.channels.*; 29 30 public class NonBlockingClientHandler extends BasicClientHandler { 31 private static final Logger logger = Logger.getLogger(NonBlockingClientHandler.class.getName()); 32 33 protected ClientWriteHandler clientWriteHandler; private SocketChannel socketChannel; 35 36 protected ArrayList readByteBuffer = new ArrayList(); 37 protected ArrayList writeByteBuffer = new ArrayList(); 38 39 protected SelectionKey selectionKey; 40 41 protected volatile int threadAccessCount = 0; 42 protected volatile boolean willReturn; 43 protected volatile boolean waitingForFinalWrite; 44 45 private static int maxThreadAccessCount = 3; private static boolean wakeupSelectorAfterRegisterWrite = true; 47 private static boolean wakeupSelectorAfterRegisterRead = true; 48 49 53 public static void setWakeupSelectorAfterRegisterWrite(boolean flag) { 54 wakeupSelectorAfterRegisterWrite = flag; 55 } 56 61 public static boolean getWakeupSelectorAfterRegisterWrite() { 62 return wakeupSelectorAfterRegisterWrite; 63 } 64 65 69 public static void setWakeupSelectorAfterRegisterRead(boolean flag) { 70 wakeupSelectorAfterRegisterRead = flag; 71 } 72 77 public static boolean getWakeupSelectorAfterRegisterRead() { 78 return wakeupSelectorAfterRegisterRead; 79 } 80 81 85 public static void setMaxThreadAccessCount(int count) { 86 if(count<3 && count!=-1) throw new IllegalArgumentException ("Value should be >=3 or -1"); 87 maxThreadAccessCount = count; 88 } 89 93 public static int getMaxThreadAccessCount() { 94 return maxThreadAccessCount; 95 } 96 97 private ByteBufferOutputStream byteBufferOutputStream; 99 100 public NonBlockingClientHandler(int instanceCount) { 101 super(instanceCount); 102 } 103 104 public NonBlockingClientHandler() { 105 super(); 106 } 107 108 public void clean() { 109 logger.finest("Starting clean - "+getName()); 110 if(threadAccessCount!=0) { 111 logger.warning("Thread Access Count was not 0!: "+threadAccessCount); 112 if(Assertion.isEnabled()) { 113 assertionSystemExit(); 114 } 115 threadAccessCount = 0; 116 } 117 118 while(readByteBuffer.isEmpty()==false) { 119 try { 120 getServer().getByteBufferPool().returnObject( 121 readByteBuffer.remove(0)); 122 } catch(Exception er) { 123 appLogger.warning("Error in returning read ByteBuffer to pool: "+er); 124 break; 125 } 126 } 127 128 while(writeByteBuffer.isEmpty()==false) { 129 try { 130 getServer().getByteBufferPool().returnObject( 131 writeByteBuffer.remove(0)); 132 } catch(Exception er) { 133 appLogger.warning("Error in returning write ByteBuffer to pool: "+er); 134 break; 135 } 136 } 137 138 if(selectionKey!=null) { 139 selectionKey.cancel(); 140 selectionKey.selector().wakeup(); 141 selectionKey = null; 142 } 143 willReturn = false; 144 waitingForFinalWrite = false; 145 socketChannel = null; 146 if(byteBufferOutputStream!=null) { 147 byteBufferOutputStream.close(); 148 } 149 150 super.clean(); 151 152 clientWriteHandler = null; byteBufferOutputStream = null; 154 155 logger.finest("Finished clean - "+getName()); 156 } 157 158 protected void finalize() throws Throwable { 159 clean(); 160 super.finalize(); 161 } 162 163 public void handleClient(TheClient theClient) { 164 super.handleClient(theClient); 165 setClientWriteHandler(theClient.getClientWriteHandler()); setSocketChannel(theClient.getSocketChannel()); } 168 169 protected void setInputStream(InputStream in) throws IOException { 170 this.in = in; 171 if(getDataMode(DataType.IN) == DataMode.STRING) { 172 b_in = null; 173 o_in = null; 174 bufferedReader = null; 175 } else if(getDataMode(DataType.IN) == DataMode.OBJECT) { 176 b_in = null; 177 bufferedReader = null; 178 o_in = new ObjectInputStream(in); 179 } else if(getDataMode(DataType.IN) == DataMode.BYTE || 180 getDataMode(DataType.IN) == DataMode.BINARY) { 181 o_in = null; 182 bufferedReader = null; 183 b_in = null; 184 } 185 } 186 187 public BufferedReader getBufferedReader() { 188 throw new IllegalStateException ("Access to BufferedReader in not allowed in Non-Blocking mode!"); 189 } 190 191 public void closeConnection() { 192 synchronized(this) { 193 if(connection==false) return; 194 if(waitingForFinalWrite) return; 195 if(getSelectionKey()!=null && getSelectionKey().isValid() && lost == false) { 196 waitingForFinalWrite = true; 197 } else { 198 connection = false; 199 } 200 } 201 202 try { 203 if(getSocketChannel()!=null && socket!=null) { 204 if(waitingForFinalWrite) { 205 try { 206 waitTillFullyWritten(); 207 } catch(Exception error) { 208 logger.warning("Error in waitingForFinalWrite : "+error); 209 if(logger.isLoggable(Level.FINE)) { 210 logger.fine("StackTrace:\n"+MyString.getStackTrace(error)); 211 } 212 } finally { 213 connection = false; 214 byteBufferOutputStream.forceNotify(); 215 getSelectionKey().cancel(); 216 } 217 } 219 220 synchronized(this) { 221 if(hasEvent(ClientEvent.MAX_CON)==false) { 222 notifyCloseOrLost(); 223 } 224 if(getSocketChannel().isOpen()) { 225 logger.finest("Closing SocketChannel"); 226 getSocketChannel().close(); 227 } 228 } 229 } 230 if(getServer()!=null) { 231 getServer().getSelector().wakeup(); 232 } 233 } catch(IOException e) { 234 logger.warning("Error in closeConnection : "+e); 235 if(logger.isLoggable(Level.FINE)) { 236 logger.fine("StackTrace:\n"+MyString.getStackTrace(e)); 237 } 238 } catch(NullPointerException npe) { 239 logger.fine("NullPointerException: "+npe); 240 if(logger.isLoggable(Level.FINE)) { 241 logger.fine("StackTrace:\n"+MyString.getStackTrace(npe)); 242 } 243 } 244 } 245 246 250 public void waitTillFullyWritten() { 251 Object waitLock = new Object (); 252 if(byteBufferOutputStream.isDataAvailableForWrite(waitLock)) { 253 if(ByteBufferOutputStream.isLoggable(Level.FINEST)) { 254 logger.finest("Waiting "+getName()); 255 } 256 try { 257 synchronized(waitLock) { 258 waitLock.wait(1000*60*2); } 260 } catch(InterruptedException ie) { 261 logger.warning("Error: "+ie); 262 } 263 if(ByteBufferOutputStream.isLoggable(Level.FINEST)) { 264 logger.finest("Done. "+getName()); 265 } 266 } 267 } 268 269 public void run() { 270 if(unprocessedClientEvents.size()==0) { 271 logger.finest("No unprocessed ClientEvents!"); 272 return; 273 } 274 275 synchronized(this) { 276 if(willReturn) { 277 return; 278 } else { 279 threadAccessCount++; 280 } 281 } 282 283 ClientEvent currentEvent = (ClientEvent) unprocessedClientEvents.remove(0); 284 285 if(logger.isLoggable(Level.FINEST)) { 286 StringBuffer sb = new StringBuffer (); 287 sb.append("Running ").append(getName()); 288 sb.append(" using "); 289 sb.append(Thread.currentThread().getName()); 290 sb.append(" for "); 291 292 synchronized(clientEvents) { 293 if(clientEvents.size()>1) { 294 sb.append(currentEvent+", Current Events - "+clientEvents); 295 } else { 296 sb.append(currentEvent); 297 } 298 } 299 logger.finest(sb.toString()); 300 } 301 302 if(currentEvent==null) { 303 threadEvent.set(null); 304 return; 305 } else { 306 threadEvent.set(currentEvent); 307 } 308 309 try { 310 if(maxThreadAccessCount!=-1 && threadAccessCount>maxThreadAccessCount) { 311 logger.warning("ThreadAccessCount can't go beyond "+maxThreadAccessCount+": "+threadAccessCount); 312 if(Assertion.isEnabled()) { 313 throw new AssertionError ("ThreadAccessCount can't go beyond "+maxThreadAccessCount+": "+threadAccessCount); 314 } 315 return; 316 } 317 318 if(socket==null) 319 throw new SocketException("Socket was null!"); 320 321 if(getThreadEvent()==ClientEvent.ACCEPT || 322 getThreadEvent()==ClientEvent.MAX_CON) { 323 prepareForRun(); 324 Assertion.affirm(willReturn==false, "WillReturn has to be false!: "+willReturn); 325 } 326 327 if(getThreadEvent()==ClientEvent.MAX_CON) { 328 processMaxConnection(currentEvent); 329 } 330 331 try { 332 if(getThreadEvent()==ClientEvent.ACCEPT) { 333 registerForRead(); 334 clientEventHandler.gotConnected(this); 335 336 if(authorised == false) { 337 if(clientAuthenticationHandler==null && authenticator == null) { 338 authorised = true; 339 logger.finest("No Authenticator "+getName()+" so return thread."); 340 } else { 341 if(clientAuthenticationHandler!=null) { 342 AuthStatus authStatus = null; 343 do { 344 authStatus = processAuthorisation(); 345 } while(authStatus==AuthStatus.FAILURE); 346 347 if(authStatus==AuthStatus.SUCCESS) 348 authorised = true; 349 } else { 350 processAuthorisation(); 351 } 352 if(authorised) 353 logger.finest("Authentication done "+getName()+", so return thread."); 354 else 355 logger.finest("askAuthentication() done "+getName()+", so return thread."); 356 } 357 } returnThread(); return; 360 } 361 362 if(connection && getThreadEvent()==ClientEvent.READ) { 363 if(processRead()) return; 364 } 365 366 if(connection && getThreadEvent()==ClientEvent.WRITE) { 367 if(processWrite()) return; 368 } 369 370 } catch(SocketException e) { 371 appLogger.finest("SocketException - Client [" + 372 getHostAddress() +"]: " + e.getMessage()); 373 lost = true; 375 } catch(AppException e) { 376 appLogger.finest("AppException "+Thread.currentThread().getName()+": " 378 + e.getMessage()); 379 } catch(javax.net.ssl.SSLException e) { 380 lost = true; 381 if(Assertion.isEnabled()) { 382 appLogger.info("SSLException - Client ["+getHostAddress() 383 +"] "+Thread.currentThread().getName()+": " + e); 384 } else { 385 appLogger.warning("SSLException - Client ["+ 386 getHostAddress()+"]: "+e); 387 } 388 } catch(ConnectionLostException e) { 389 lost = true; 390 if(e.getMessage()!=null) 391 appLogger.finest("Connection lost " + 392 Thread.currentThread().getName()+": " + e.getMessage()); 393 else 394 appLogger.finest("Connection lost "+Thread.currentThread().getName()); 395 } catch(ClosedChannelException e) { 396 lost = true; 397 appLogger.finest("Channel closed "+Thread.currentThread().getName()+": " + e); 398 } catch(IOException e) { 399 lost = true; 400 appLogger.fine("IOError "+Thread.currentThread().getName()+": " + e); 401 } catch(AssertionError er) { 402 logger.warning("[AssertionError] "+getName()+" "+er); 403 if(logger.isLoggable(Level.FINEST)) { 404 logger.finest("StackTrace "+Thread.currentThread().getName()+": "+MyString.getStackTrace(er)); 405 } 406 assertionSystemExit(); 407 } catch(Error er) { 408 logger.warning("[Error] "+er); 409 if(logger.isLoggable(Level.FINEST)) { 410 logger.finest("StackTrace "+Thread.currentThread().getName()+": "+MyString.getStackTrace(er)); 411 } 412 if(Assertion.isEnabled()) { 413 assertionSystemExit(); 414 } 415 lost = true; 416 } catch(RuntimeException re) { 417 logger.warning("[RuntimeException] "+MyString.getStackTrace(re)); 418 if(Assertion.isEnabled()) { 419 assertionSystemExit(); 420 } 421 lost = true; 422 } 423 424 if(getThreadEvent()!=ClientEvent.MAX_CON) { 425 notifyCloseOrLost(); 426 } 427 428 if(connection) { 429 logger.finest(Thread.currentThread().getName()+" calling closeConnection()"); 430 closeConnection(); 431 } 432 433 if(connection==true && lost==true && waitingForFinalWrite) { 434 byteBufferOutputStream.forceNotify(); 435 } 436 } catch(javax.net.ssl.SSLException se) { 437 logger.warning("SSLException "+Thread.currentThread().getName()+" - " + se); 438 } catch(IOException ie) { 439 logger.warning("IOError "+Thread.currentThread().getName()+" - Closing Client : " + ie); 440 } catch(RuntimeException re) { 441 logger.warning("[RuntimeException] "+getName()+" "+Thread.currentThread().getName()+" - "+MyString.getStackTrace(re)); 442 if(Assertion.isEnabled()) { 443 assertionSystemExit(); 444 } 445 } catch(Exception e) { 446 logger.warning("Error "+Thread.currentThread().getName()+" - Event:"+getThreadEvent()+" - Socket:"+socket+" : "+e); 447 logger.fine("StackTrace: "+getName()+"\n"+MyString.getStackTrace(e)); 448 if(Assertion.isEnabled()) { 449 assertionSystemExit(); 450 } 451 } catch(Error e) { 452 logger.warning("Error "+Thread.currentThread().getName()+" - Event:"+getThreadEvent()+" - Socket:"+socket+" : "+e); 453 logger.fine("StackTrace: "+getName()+"\n"+MyString.getStackTrace(e)); 454 if(Assertion.isEnabled()) { 455 assertionSystemExit(); 456 } 457 } 458 459 synchronized(this) { 460 try { 461 if(getSelectionKey()!=null && getSelectionKey().isValid()) { 462 logger.finest("Canceling SelectionKey"); 463 getSelectionKey().cancel(); 464 } 465 466 if(socket!=null && socket.isClosed()==false) { 467 logger.finest("Closing Socket"); 468 socket.close(); 469 } 470 471 if(getSocketChannel()!=null && getSocketChannel().isOpen()) { 472 logger.finest("Closing SocketChannel"); 473 socketChannel.close(); 474 } 475 } catch(Exception re) { 476 logger.warning("Error closing Socket/Channel: " +re); 477 } 478 } 480 willClean = true; 481 returnClientData(); 482 483 boolean returnClientHandler = false; 484 synchronized(lockObj) { 485 returnThread(); 486 returnClientHandler = checkReturnClientHandler(); 487 } 488 489 if(returnClientHandler) { 490 returnClientHandler(); } 492 } 493 494 protected boolean checkReturnClientHandler() { 495 if(willReturn==false) { 496 willReturn = true; 497 return true; 498 } 499 return false; 500 } 501 502 506 private boolean processRead() throws Exception , AppException { 507 int count = 0; 508 int fullCount = 0; 509 ByteBuffer buffer = (ByteBuffer) 510 getServer().getByteBufferPool().borrowObject(); 511 512 while(true) { 513 try { 514 count = getSocketChannel().read(buffer); 515 if(count<=0) { 516 getServer().getByteBufferPool().returnObject(buffer); 518 buffer = null; 519 break; 520 } else { 521 fullCount += count; 522 } 523 524 buffer.flip(); readByteBuffer.add(buffer); 526 527 buffer = (ByteBuffer) 528 getServer().getByteBufferPool().borrowObject(); 529 } catch(Exception error) { 530 logger.finest("Error in data read: "+error); 531 lost = true; 532 synchronized(getInputStream()) { 533 getInputStream().notifyAll(); 534 } 535 throw error; 536 } finally { 537 if(buffer!=null && count<=0) { 538 getServer().getByteBufferPool().returnObject(buffer); 539 buffer = null; 540 } 541 } 542 } 544 if(count<0) { 545 logger.finest("SocketChannel read was "+count+"!"); 546 lost = true; 547 synchronized(getInputStream()) { 548 getInputStream().notifyAll(); 549 } 550 } else { 551 logger.finest(fullCount+" bytes read"); 552 if(fullCount!=0) { 553 updateLastCommunicationTime(); 554 synchronized(getInputStream()) { 555 getInputStream().notify(); } 557 if(hasEvent(ClientEvent.ACCEPT) == false) { 558 processGotDataInBuffers(); 559 } 560 } 561 562 while(getInputStream().available()>0) { 564 logger.finest("Sending again for processing..."); 565 if(hasEvent(ClientEvent.ACCEPT) == false) { 566 processGotDataInBuffers(); 567 break; 568 } else { 569 synchronized(getInputStream()) { 570 getInputStream().notifyAll(); 571 } 572 Thread.sleep(100); 573 } 574 } 575 576 if(connection) { 577 registerForRead(); 578 returnThread(); return true; 581 } 582 } logger.finest("We don't have connection, lets return all resources."); 584 return false; 585 } 586 587 591 private boolean processWrite() throws IOException { 592 updateLastCommunicationTime(); 593 594 boolean flag = byteBufferOutputStream.writeAllByteBuffer(); 595 596 if(flag==false) { 597 registerWrite(); 598 } else if(clientWriteHandler!=null) { 599 clientWriteHandler.handleWrite(this); 600 } 601 602 if(connection) { 603 returnThread(); return true; 605 } else { 606 logger.finest("We don't have connection, lets return all resources."); 607 } 608 return false; 609 } 610 611 protected void returnThread() { 612 threadAccessCount--; 613 Assertion.affirm(threadAccessCount>=0, "ThreadAccessCount went less the 0! Value: "+threadAccessCount); 614 removeEvent((ClientEvent)threadEvent.get()); 616 } 617 618 protected void returnClientHandler() { 619 logger.finest(getName()); 620 try { 621 for(int i=0;threadAccessCount!=0;i++) { 622 if(i==100) { 623 logger.warning("ClientHandler must have got into a loop waiting for thread to free up! ThreadAccessCount="+threadAccessCount); 624 threadAccessCount = 0; 625 if(Assertion.isEnabled()) { 626 assertionSystemExit(); 627 } else { 628 break; 629 } 630 } 631 if(threadAccessCount<=0) break; 632 633 logger.finest("Waiting for other thread of "+getName()+" to finish"); 634 Thread.sleep(60); 635 } 636 } catch(InterruptedException ie) { 637 appLogger.warning("InterruptedException: "+ie); 638 } 639 super.returnClientHandler(); 640 } 641 642 public void setDataMode(DataMode dataMode, DataType dataType) 643 throws IOException { 644 if(getDataMode(dataType)==dataMode) return; 645 646 appLogger.fine("Setting Type:"+dataType+", Mode:"+dataMode); 647 super.checkDataModeSet(dataMode, dataType); 648 649 setDataModeNonBlocking(dataMode, dataType); 650 } 651 652 private void setDataModeNonBlocking(DataMode dataMode, DataType dataType) 653 throws IOException { 654 logger.finest("ENTER"); 655 if(dataMode == DataMode.STRING) { 656 if(dataType == DataType.OUT) { 657 if(dataModeOUT == DataMode.BYTE || dataModeOUT == DataMode.BINARY) { 658 dataModeOUT = dataMode; 659 } else if(dataModeOUT == DataMode.OBJECT) { 660 dataModeOUT = dataMode; 661 o_out.flush(); o_out = null; 662 b_out = new BufferedOutputStream(out); 663 } else { 664 Assertion.affirm(false, "Unknown DataType.OUT DataMode - "+dataModeOUT); 665 } 666 Assertion.affirm(b_out!=null, "BufferedOutputStream is still null!"); 667 Assertion.affirm(o_out==null, "ObjectOutputStream is still not null!"); 668 } else if(dataType == DataType.IN) { 669 dataModeIN = dataMode; 670 671 if(o_in!=null) { 672 if(o_in.available()!=0) 673 logger.warning("Data looks to be present in ObjectInputStream"); 674 o_in = null; 675 } 676 b_in = null; 677 bufferedReader = null; 678 Assertion.affirm(in!=null, "InputStream is still null!"); 680 Assertion.affirm(b_in==null, "BufferedInputStream is still not null!"); 681 Assertion.affirm(bufferedReader==null, "BufferedReader is still not null!"); 682 } 683 } else if(dataMode == DataMode.OBJECT) { 684 if(dataType == DataType.IN) { 685 throw new IllegalArgumentException ("Can't set DataType.IN mode to OBJECT when blocking mode is set as false!"); 687 } 688 689 if(dataType == DataType.OUT) { 690 dataModeOUT = dataMode; 691 b_out = null; 692 o_out = new ObjectOutputStream(out); 693 Assertion.affirm(o_out!=null, "ObjectOutputStream is still null!"); 694 } 702 } else if(dataMode == DataMode.BYTE || dataMode == DataMode.BINARY) { 703 if(dataType == DataType.OUT) { 704 if(dataModeOUT == DataMode.STRING || 705 dataModeOUT == DataMode.BYTE || 706 dataModeOUT == DataMode.BINARY) { 707 dataModeOUT = dataMode; 708 } else if(dataModeOUT == DataMode.OBJECT) { 709 dataModeOUT = dataMode; 710 711 o_out = null; 712 b_out = new BufferedOutputStream(out); 713 } else { 714 Assertion.affirm(false, "Unknown DataType.OUT - DataMode: "+dataModeOUT); 715 } 716 Assertion.affirm(b_out!=null, "BufferedOutputStream is still null!"); 717 } else if(dataType == DataType.IN) { 718 dataModeIN = dataMode; 719 o_in = null; 720 bufferedReader = null; 721 b_in = null; 722 Assertion.affirm(in!=null, "InputStream is still null!"); 724 } else { 725 throw new IllegalArgumentException ("Unknown DataType : "+dataType); 726 } 727 } else { 728 throw new IllegalArgumentException ("Unknown DataMode : "+dataMode); 729 } 730 } 731 732 protected byte[] readInputStream() throws IOException { 733 return readInputStream(getInputStream()); 734 } 735 736 public void updateInputOutputStreams() throws IOException { 737 byteBufferOutputStream = new ByteBufferOutputStream(writeByteBuffer, this); 738 setInputStream( new ByteBufferInputStream(readByteBuffer, this, getCharset()) ); 739 setOutputStream(byteBufferOutputStream); 740 } 741 742 public void setSocketChannel(SocketChannel socketChannel) { 743 this.socketChannel = socketChannel; 744 } 745 public SocketChannel getSocketChannel() { 746 return socketChannel; 747 } 748 749 public void setSelectionKey(SelectionKey selectionKey) { 750 this.selectionKey = selectionKey; 751 } 752 public SelectionKey getSelectionKey() { 753 if(selectionKey==null) 754 selectionKey = getSocketChannel().keyFor(getServer().getSelector()); 755 return selectionKey; 756 } 757 758 private void processGotDataInBuffers() throws AppException, 759 ConnectionLostException, ClassNotFoundException , IOException { 760 if(getInputStream().available()==0) return; 761 762 logger.finest("Trying to process got data.. DataMode.IN="+dataModeIN); 763 AuthStatus authStatus = null; 764 765 768 String temp = null; 769 String rec = null; 770 Object recObject = null; 771 byte[] recByte = null; 772 773 boolean timeToCheckForNewLineMiss = false; 774 775 do { 776 778 if(dataModeIN == DataMode.STRING) { 779 ByteBufferInputStream bbin = (ByteBufferInputStream) 780 getInputStream(); 781 timeToCheckForNewLineMiss = true; 782 783 while(bbin.isLineReady()) { 784 785 rec = bbin.readLine(); 786 if(rec==null) { 787 lost = true; 788 return; 789 } 790 if(getCommunicationLogging() && authorised == true) { 791 appLogger.fine("Got STRING ["+getHostAddress()+"] : "+ 792 rec); 793 } 794 795 if(authorised == false) 796 authStatus = clientAuthenticationHandler.handleAuthentication(this, rec); 797 else 798 clientCommandHandler.handleCommand(this, rec); 799 800 if(isClosed()==true) return; 801 802 while(authStatus==AuthStatus.FAILURE) 803 authStatus = processAuthorisation(); 804 805 if(authStatus==AuthStatus.SUCCESS) 806 authorised = true; 807 808 if(dataModeIN != DataMode.STRING) { 809 break; 810 } 811 812 timeToCheckForNewLineMiss = false; 813 } 815 if(timeToCheckForNewLineMiss && bbin.availableOnlyInByteBuffer()==0) { 816 return; 817 } else { 818 timeToCheckForNewLineMiss = false; 819 } 820 } 821 822 850 851 while(dataModeIN == DataMode.BYTE && getInputStream().available()!=0) { 853 rec = readBytes(); 854 if(rec==null) { 855 lost = true; 856 return; 857 } 858 if(getCommunicationLogging() && authorised == true) { 859 appLogger.fine("Got BYTE ["+getHostAddress()+"] : "+rec); 860 } 861 862 if(authorised == false) 863 authStatus = clientAuthenticationHandler.handleAuthentication(this, rec); 864 else 865 clientCommandHandler.handleCommand(this, rec); 866 867 if(isClosed()==true) return; 868 869 while(authStatus==AuthStatus.FAILURE) 870 authStatus = processAuthorisation(); 871 872 if(authStatus==AuthStatus.SUCCESS) 873 authorised = true; 874 } 875 876 while(dataModeIN == DataMode.BINARY && getInputStream().available()!=0) { 878 recByte = readBinary(); 879 if(recByte==null) { 880 lost = true; 881 return; 882 } 883 if(getCommunicationLogging() && authorised == true) { 884 appLogger.fine("Got BINARY ["+getHostAddress()+"] : "+ 885 MyString.getMemInfo(recByte.length)); 886 } 887 888 if(authorised == false) 889 authStatus = clientAuthenticationHandler.handleAuthentication(this, recByte); 890 else 891 clientBinaryHandler.handleBinary(this, recByte); 892 893 if(isClosed()==true) return; 894 895 while(authStatus==AuthStatus.FAILURE) 896 authStatus = processAuthorisation(); 897 898 if(authStatus==AuthStatus.SUCCESS) 899 authorised = true; 900 } 901 902 if(dataModeIN != DataMode.STRING && dataModeIN != DataMode.OBJECT 904 && dataModeIN != DataMode.BYTE && dataModeIN != DataMode.BINARY) { 905 throw new IllegalStateException ("Incoming DataMode is not supported : "+dataModeIN); 906 } 907 } while(getInputStream().available()!=0); 908 } 909 910 public void registerForRead() 911 throws IOException, ClosedChannelException { 912 try { 913 if(getSelectionKey()==null) { 914 boolean flag = getServer().registerChannel(getSocketChannel(), 915 SelectionKey.OP_READ, this); 916 if(flag) { 917 logger.finest("Adding OP_READ as interest Ops for "+getName()); 918 } else if(ByteBufferOutputStream.isLoggable(Level.FINEST)) { 919 logger.finest("OP_READ is already present in interest Ops for "+getName()); 920 } 921 } else if(getSelectionKey().isValid()) { 922 if((getSelectionKey().interestOps() & SelectionKey.OP_READ) == 0 ) { 923 logger.finest("Adding OP_READ to interest Ops for "+getName()); 924 removeEvent(ClientEvent.READ); 925 getSelectionKey().interestOps(getSelectionKey().interestOps() 926 | SelectionKey.OP_READ); 927 if(wakeupSelectorAfterRegisterRead) { 928 getServer().getSelector().wakeup(); 929 } 930 } else { 931 if(ByteBufferOutputStream.isLoggable(Level.FINEST)) { 932 logger.finest("OP_READ is already present in interest Ops for "+getName()); 933 } 934 } 935 } else { 936 throw new IOException("SelectionKey is invalid!"); 937 } 938 } catch(CancelledKeyException e) { 939 throw new IOException("SelectionKey is cancelled!"); 940 } 941 } 942 943 public void registerForWrite() 944 throws IOException, ClosedChannelException { 945 if(hasEvent(ClientEvent.RUN_BLOCKING) || hasEvent(ClientEvent.MAX_CON_BLOCKING)) { 946 throw new IllegalStateException ("This method is only allowed under Non-Blocking mode."); 947 } 948 949 if(clientWriteHandler==null) { 950 throw new IllegalStateException ("ClientWriteHandler has not been set!"); 951 } 952 registerWrite(); 953 } 954 955 public void registerWrite() throws IOException { 956 try { 957 if(getSelectionKey()==null) { 958 boolean flag = getServer().registerChannel(getSocketChannel(), 959 SelectionKey.OP_WRITE, this); 960 if(flag) { 961 logger.finest("Adding OP_WRITE as interest Ops for "+getName()); 962 } else if(ByteBufferOutputStream.isLoggable(Level.FINEST)) { 963 logger.finest("OP_WRITE is already present in interest Ops for "+getName()); 964 } 965 } else if(getSelectionKey().isValid()) { 966 if((getSelectionKey().interestOps() & SelectionKey.OP_WRITE) == 0 ) { 967 logger.finest("Adding OP_WRITE to interest Ops for "+getName()); 968 removeEvent(ClientEvent.WRITE); 969 getSelectionKey().interestOps(getSelectionKey().interestOps() 970 | SelectionKey.OP_WRITE); 971 if(wakeupSelectorAfterRegisterWrite) { 972 getServer().getSelector().wakeup(); 973 } 974 } else { 975 if(ByteBufferOutputStream.isLoggable(Level.FINEST)) { 976 logger.finest("OP_WRITE is already present in interest Ops for "+getName()); 977 } 978 } 979 } else { 980 throw new IOException("SelectionKey is invalid!"); 981 } 982 } catch(CancelledKeyException e) { 983 throw new IOException("SelectionKey is cancelled!"); 984 } 985 } 986 987 protected void setClientWriteHandler(ClientWriteHandler handler) { 988 clientWriteHandler=handler; 989 } 990 991 995 public int getThreadAccessCount() { 996 return threadAccessCount; 997 } 998 } 999 | Popular Tags |