KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > quickserver > net > server > impl > NonBlockingClientHandler


1 /*
2  * This file is part of the QuickServer library
3  * Copyright (C) QuickServer.org
4  *
5  * Use, modification, copying and distribution of this software is subject to
6  * the terms and conditions of the GNU Lesser General Public License.
7  * You should have received a copy of the GNU LGP License along with this
8  * library; if not, you can download a copy from <http://www.quickserver.org/>.
9  *
10  * For questions, suggestions, bug-reports, enhancement-requests etc.
11  * visit http://www.quickserver.org
12  *
13  */

14
15 package org.quickserver.net.server.impl;
16
17 import org.quickserver.net.server.*;
18 import org.quickserver.net.*;
19 import org.quickserver.util.*;
20 import org.quickserver.util.io.*;
21
22 import java.io.*;
23 import java.net.*;
24 import java.util.*;
25 import java.util.logging.*;
26
27 import java.nio.*;
28 import java.nio.channels.*;
29
30 public class NonBlockingClientHandler extends BasicClientHandler {
31     private static final Logger logger = Logger.getLogger(NonBlockingClientHandler.class.getName());
32
33     protected ClientWriteHandler clientWriteHandler; //v1.4.5
34
private SocketChannel socketChannel;
35
36     protected ArrayList readByteBuffer = new ArrayList();
37     protected ArrayList writeByteBuffer = new ArrayList();
38
39     protected SelectionKey selectionKey;
40
41     protected volatile int threadAccessCount = 0;
42     protected volatile boolean willReturn;
43     protected volatile boolean waitingForFinalWrite;
44
45     private static int maxThreadAccessCount = 3; //one for each event ACCEPT, WRITE, READ
46
private static boolean wakeupSelectorAfterRegisterWrite = true;
47     private static boolean wakeupSelectorAfterRegisterRead = true;
48
49     /**
50      * Sets the flag to wakeup Selector After RegisterForWrite is called.
51      * @since 1.4.7
52      */

53     public static void setWakeupSelectorAfterRegisterWrite(boolean flag) {
54         wakeupSelectorAfterRegisterWrite = flag;
55     }
56     /**
57      * Returns wakeupSelectorAfterRegisterWrite the flag that controls if wakeup is called on Selector
58      * after RegisterForWrite is called.
59      * @since 1.4.7
60      */

61     public static boolean getWakeupSelectorAfterRegisterWrite() {
62         return wakeupSelectorAfterRegisterWrite;
63     }
64     
65     /**
66      * Sets the flag to wakeup Selector After RegisterForRead is called.
67      * @since 1.4.7
68      */

69     public static void setWakeupSelectorAfterRegisterRead(boolean flag) {
70         wakeupSelectorAfterRegisterRead = flag;
71     }
72     /**
73      * Returns wakeupSelectorAfterRegisterRead the flag that controls if wakeup is called on Selector
74      * after RegisterForRead is called.
75      * @since 1.4.7
76      */

77     public static boolean getWakeupSelectorAfterRegisterRead() {
78         return wakeupSelectorAfterRegisterRead;
79     }
80
81     /**
82      * Sets the maximum count of thread allowed to run objects of this class at a time.
83      * @since 1.4.7
84      */

85     public static void setMaxThreadAccessCount(int count) {
86         if(count<3 && count!=-1) throw new IllegalArgumentException JavaDoc("Value should be >=3 or -1");
87         maxThreadAccessCount = count;
88     }
89     /**
90      * Returns the maximum count of thread allowed to run objects of this class at a time.
91      * @since 1.4.7
92      */

93     public static int getMaxThreadAccessCount() {
94         return maxThreadAccessCount;
95     }
96
97     //v1.4.7
98
private ByteBufferOutputStream byteBufferOutputStream;
99
100     public NonBlockingClientHandler(int instanceCount) {
101         super(instanceCount);
102     }
103
104     public NonBlockingClientHandler() {
105         super();
106     }
107
108     public void clean() {
109         logger.finest("Starting clean - "+getName());
110         if(threadAccessCount!=0) {
111             logger.warning("Thread Access Count was not 0!: "+threadAccessCount);
112             if(Assertion.isEnabled()) {
113                 assertionSystemExit();
114             }
115             threadAccessCount = 0;
116         }
117                 
118         while(readByteBuffer.isEmpty()==false) {
119             try {
120                 getServer().getByteBufferPool().returnObject(
121                     readByteBuffer.remove(0));
122             } catch(Exception JavaDoc er) {
123                 appLogger.warning("Error in returning read ByteBuffer to pool: "+er);
124                 break;
125             }
126         }
127
128         while(writeByteBuffer.isEmpty()==false) {
129             try {
130                 getServer().getByteBufferPool().returnObject(
131                     writeByteBuffer.remove(0));
132             } catch(Exception JavaDoc er) {
133                 appLogger.warning("Error in returning write ByteBuffer to pool: "+er);
134                 break;
135             }
136         }
137
138         if(selectionKey!=null) {
139             selectionKey.cancel();
140             selectionKey.selector().wakeup();
141             selectionKey = null;
142         }
143         willReturn = false;
144         waitingForFinalWrite = false;
145         socketChannel = null;
146         if(byteBufferOutputStream!=null) {
147             byteBufferOutputStream.close();
148         }
149
150         super.clean();
151
152         clientWriteHandler = null;//1.4.5
153
byteBufferOutputStream = null;
154
155         logger.finest("Finished clean - "+getName());
156     }
157
158     protected void finalize() throws Throwable JavaDoc {
159         clean();
160         super.finalize();
161     }
162
163     public void handleClient(TheClient theClient) {
164         super.handleClient(theClient);
165         setClientWriteHandler(theClient.getClientWriteHandler()); //v1.4.5
166
setSocketChannel(theClient.getSocketChannel());//1.4.5
167
}
168
169     protected void setInputStream(InputStream in) throws IOException {
170         this.in = in;
171         if(getDataMode(DataType.IN) == DataMode.STRING) {
172             b_in = null;
173             o_in = null;
174             bufferedReader = null;
175         } else if(getDataMode(DataType.IN) == DataMode.OBJECT) {
176             b_in = null;
177             bufferedReader = null;
178             o_in = new ObjectInputStream(in);
179         } else if(getDataMode(DataType.IN) == DataMode.BYTE ||
180                 getDataMode(DataType.IN) == DataMode.BINARY) {
181             o_in = null;
182             bufferedReader = null;
183             b_in = null;
184         }
185     }
186
187     public BufferedReader getBufferedReader() {
188         throw new IllegalStateException JavaDoc("Access to BufferedReader in not allowed in Non-Blocking mode!");
189     }
190
191     public void closeConnection() {
192         synchronized(this) {
193             if(connection==false) return;
194             if(waitingForFinalWrite) return;
195             if(getSelectionKey()!=null && getSelectionKey().isValid() && lost == false) {
196                 waitingForFinalWrite = true;
197             } else {
198                 connection = false;
199             }
200         }
201
202         try {
203             if(getSocketChannel()!=null && socket!=null) {
204                 if(waitingForFinalWrite) {
205                     try {
206                         waitTillFullyWritten();
207                     } catch(Exception JavaDoc error) {
208                         logger.warning("Error in waitingForFinalWrite : "+error);
209                         if(logger.isLoggable(Level.FINE)) {
210                             logger.fine("StackTrace:\n"+MyString.getStackTrace(error));
211                         }
212                     } finally {
213                         connection = false;
214                         byteBufferOutputStream.forceNotify();
215                         getSelectionKey().cancel();
216                     }
217                 }//end of waitingForFinalWrite
218

219                 
220                 synchronized(this) {
221                     if(hasEvent(ClientEvent.MAX_CON)==false) {
222                         notifyCloseOrLost();
223                     }
224                     if(getSocketChannel().isOpen()) {
225                         logger.finest("Closing SocketChannel");
226                         getSocketChannel().close();
227                     }
228                 }
229             }
230             if(getServer()!=null) {
231                 getServer().getSelector().wakeup();
232             }
233         } catch(IOException e) {
234             logger.warning("Error in closeConnection : "+e);
235             if(logger.isLoggable(Level.FINE)) {
236                 logger.fine("StackTrace:\n"+MyString.getStackTrace(e));
237             }
238         } catch(NullPointerException JavaDoc npe) {
239             logger.fine("NullPointerException: "+npe);
240             if(logger.isLoggable(Level.FINE)) {
241                 logger.fine("StackTrace:\n"+MyString.getStackTrace(npe));
242             }
243         }
244     }
245     
246     /**
247      * waitTillFullyWritten
248      * @since 1.4.7
249      */

250     public void waitTillFullyWritten() {
251         Object JavaDoc waitLock = new Object JavaDoc();
252         if(byteBufferOutputStream.isDataAvailableForWrite(waitLock)) {
253             if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
254                 logger.finest("Waiting "+getName());
255             }
256             try {
257                 synchronized(waitLock) {
258                     waitLock.wait(1000*60*2);//2 min max
259
}
260             } catch(InterruptedException JavaDoc ie) {
261                 logger.warning("Error: "+ie);
262             }
263             if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
264                 logger.finest("Done. "+getName());
265             }
266         }
267     }
268
269     public void run() {
270         if(unprocessedClientEvents.size()==0) {
271             logger.finest("No unprocessed ClientEvents!");
272             return;
273         }
274
275         synchronized(this) {
276             if(willReturn) {
277                 return;
278             } else {
279                 threadAccessCount++;
280             }
281         }
282
283         ClientEvent currentEvent = (ClientEvent) unprocessedClientEvents.remove(0);
284
285         if(logger.isLoggable(Level.FINEST)) {
286             StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
287             sb.append("Running ").append(getName());
288             sb.append(" using ");
289             sb.append(Thread.currentThread().getName());
290             sb.append(" for ");
291
292             synchronized(clientEvents) {
293                 if(clientEvents.size()>1) {
294                     sb.append(currentEvent+", Current Events - "+clientEvents);
295                 } else {
296                     sb.append(currentEvent);
297                 }
298             }
299             logger.finest(sb.toString());
300         }
301
302         if(currentEvent==null) {
303             threadEvent.set(null);
304             return;
305         } else {
306             threadEvent.set(currentEvent);
307         }
308
309         try {
310             if(maxThreadAccessCount!=-1 && threadAccessCount>maxThreadAccessCount) {
311                 logger.warning("ThreadAccessCount can't go beyond "+maxThreadAccessCount+": "+threadAccessCount);
312                 if(Assertion.isEnabled()) {
313                     throw new AssertionError JavaDoc("ThreadAccessCount can't go beyond "+maxThreadAccessCount+": "+threadAccessCount);
314                 }
315                 return;
316             }
317
318             if(socket==null)
319                 throw new SocketException("Socket was null!");
320
321             if(getThreadEvent()==ClientEvent.ACCEPT ||
322                     getThreadEvent()==ClientEvent.MAX_CON) {
323                 prepareForRun();
324                 Assertion.affirm(willReturn==false, "WillReturn has to be false!: "+willReturn);
325             }
326
327             if(getThreadEvent()==ClientEvent.MAX_CON) {
328                 processMaxConnection(currentEvent);
329             }
330
331             try {
332                 if(getThreadEvent()==ClientEvent.ACCEPT) {
333                     registerForRead();
334                     clientEventHandler.gotConnected(this);
335
336                     if(authorised == false) {
337                         if(clientAuthenticationHandler==null && authenticator == null) {
338                             authorised = true;
339                             logger.finest("No Authenticator "+getName()+" so return thread.");
340                         } else {
341                             if(clientAuthenticationHandler!=null) {
342                                 AuthStatus authStatus = null;
343                                 do {
344                                     authStatus = processAuthorisation();
345                                 } while(authStatus==AuthStatus.FAILURE);
346
347                                 if(authStatus==AuthStatus.SUCCESS)
348                                     authorised = true;
349                             } else {
350                                 processAuthorisation();
351                             }
352                             if(authorised)
353                                 logger.finest("Authentication done "+getName()+", so return thread.");
354                             else
355                                 logger.finest("askAuthentication() done "+getName()+", so return thread.");
356                         }
357                     }//end authorised
358
returnThread(); //return thread to pool
359
return;
360                 }
361                 
362                 if(connection && getThreadEvent()==ClientEvent.READ) {
363                     if(processRead()) return;
364                 }
365
366                 if(connection && getThreadEvent()==ClientEvent.WRITE) {
367                     if(processWrite()) return;
368                 }
369
370             } catch(SocketException e) {
371                 appLogger.finest("SocketException - Client [" +
372                     getHostAddress() +"]: " + e.getMessage());
373                 //e.printStackTrace();
374
lost = true;
375             } catch(AppException e) {
376                 //errors from Application
377
appLogger.finest("AppException "+Thread.currentThread().getName()+": "
378                     + e.getMessage());
379             } catch(javax.net.ssl.SSLException e) {
380                 lost = true;
381                 if(Assertion.isEnabled()) {
382                     appLogger.info("SSLException - Client ["+getHostAddress()
383                         +"] "+Thread.currentThread().getName()+": " + e);
384                 } else {
385                     appLogger.warning("SSLException - Client ["+
386                         getHostAddress()+"]: "+e);
387                 }
388             } catch(ConnectionLostException e) {
389                 lost = true;
390                 if(e.getMessage()!=null)
391                     appLogger.finest("Connection lost " +
392                         Thread.currentThread().getName()+": " + e.getMessage());
393                 else
394                     appLogger.finest("Connection lost "+Thread.currentThread().getName());
395             } catch(ClosedChannelException e) {
396                 lost = true;
397                 appLogger.finest("Channel closed "+Thread.currentThread().getName()+": " + e);
398             } catch(IOException e) {
399                 lost = true;
400                 appLogger.fine("IOError "+Thread.currentThread().getName()+": " + e);
401             } catch(AssertionError JavaDoc er) {
402                 logger.warning("[AssertionError] "+getName()+" "+er);
403                 if(logger.isLoggable(Level.FINEST)) {
404                     logger.finest("StackTrace "+Thread.currentThread().getName()+": "+MyString.getStackTrace(er));
405                 }
406                 assertionSystemExit();
407             } catch(Error JavaDoc er) {
408                 logger.warning("[Error] "+er);
409                 if(logger.isLoggable(Level.FINEST)) {
410                     logger.finest("StackTrace "+Thread.currentThread().getName()+": "+MyString.getStackTrace(er));
411                 }
412                 if(Assertion.isEnabled()) {
413                     assertionSystemExit();
414                 }
415                 lost = true;
416             } catch(RuntimeException JavaDoc re) {
417                 logger.warning("[RuntimeException] "+MyString.getStackTrace(re));
418                 if(Assertion.isEnabled()) {
419                     assertionSystemExit();
420                 }
421                 lost = true;
422             }
423             
424             if(getThreadEvent()!=ClientEvent.MAX_CON) {
425                 notifyCloseOrLost();
426             }
427             
428             if(connection) {
429                 logger.finest(Thread.currentThread().getName()+" calling closeConnection()");
430                 closeConnection();
431             }
432             
433             if(connection==true && lost==true && waitingForFinalWrite) {
434                 byteBufferOutputStream.forceNotify();
435             }
436         } catch(javax.net.ssl.SSLException se) {
437             logger.warning("SSLException "+Thread.currentThread().getName()+" - " + se);
438         } catch(IOException ie) {
439             logger.warning("IOError "+Thread.currentThread().getName()+" - Closing Client : " + ie);
440         } catch(RuntimeException JavaDoc re) {
441             logger.warning("[RuntimeException] "+getName()+" "+Thread.currentThread().getName()+" - "+MyString.getStackTrace(re));
442             if(Assertion.isEnabled()) {
443                 assertionSystemExit();
444             }
445         } catch(Exception JavaDoc e) {
446             logger.warning("Error "+Thread.currentThread().getName()+" - Event:"+getThreadEvent()+" - Socket:"+socket+" : "+e);
447             logger.fine("StackTrace: "+getName()+"\n"+MyString.getStackTrace(e));
448             if(Assertion.isEnabled()) {
449                 assertionSystemExit();
450             }
451         } catch(Error JavaDoc e) {
452             logger.warning("Error "+Thread.currentThread().getName()+" - Event:"+getThreadEvent()+" - Socket:"+socket+" : "+e);
453             logger.fine("StackTrace: "+getName()+"\n"+MyString.getStackTrace(e));
454             if(Assertion.isEnabled()) {
455                 assertionSystemExit();
456             }
457         }
458
459         synchronized(this) {
460             try {
461                 if(getSelectionKey()!=null && getSelectionKey().isValid()) {
462                     logger.finest("Canceling SelectionKey");
463                     getSelectionKey().cancel();
464                 }
465
466                 if(socket!=null && socket.isClosed()==false) {
467                     logger.finest("Closing Socket");
468                     socket.close();
469                 }
470
471                 if(getSocketChannel()!=null && getSocketChannel().isOpen()) {
472                     logger.finest("Closing SocketChannel");
473                     socketChannel.close();
474                 }
475             } catch(Exception JavaDoc re) {
476                 logger.warning("Error closing Socket/Channel: " +re);
477             }
478         }//end synchronized
479

480         willClean = true;
481         returnClientData();
482
483         boolean returnClientHandler = false;
484         synchronized(lockObj) {
485             returnThread();
486             returnClientHandler = checkReturnClientHandler();
487         }
488
489         if(returnClientHandler) {
490             returnClientHandler(); //return to pool
491
}
492     }
493
494     protected boolean checkReturnClientHandler() {
495         if(willReturn==false) {
496             willReturn = true;
497             return true;
498         }
499         return false;
500     }
501
502     /**
503      * Process read
504      * @return value indicates if the thread should return form run()
505      */

506     private boolean processRead() throws Exception JavaDoc, AppException {
507         int count = 0;
508         int fullCount = 0;
509         ByteBuffer buffer = (ByteBuffer)
510             getServer().getByteBufferPool().borrowObject();
511
512         while(true) {
513             try {
514                 count = getSocketChannel().read(buffer);
515                 if(count<=0) {
516                     //logger.finest("SocketChannel read was "+count+"!");
517
getServer().getByteBufferPool().returnObject(buffer);
518                     buffer = null;
519                     break;
520                 } else {
521                     fullCount += count;
522                 }
523
524                 buffer.flip(); // Make readable
525
readByteBuffer.add(buffer);
526
527                 buffer = (ByteBuffer)
528                     getServer().getByteBufferPool().borrowObject();
529             } catch(Exception JavaDoc error) {
530                 logger.finest("Error in data read: "+error);
531                 lost = true;
532                 synchronized(getInputStream()) {
533                     getInputStream().notifyAll();
534                 }
535                 throw error;
536             } finally {
537                 if(buffer!=null && count<=0) {
538                     getServer().getByteBufferPool().returnObject(buffer);
539                     buffer = null;
540                 }
541             }
542         }//end while
543

544         if(count<0) {
545             logger.finest("SocketChannel read was "+count+"!");
546             lost = true;
547             synchronized(getInputStream()) {
548                 getInputStream().notifyAll();
549             }
550         } else {
551             logger.finest(fullCount+" bytes read");
552             if(fullCount!=0) {
553                 updateLastCommunicationTime();
554                 synchronized(getInputStream()) {
555                     getInputStream().notify(); //if any are waiting
556
}
557                 if(hasEvent(ClientEvent.ACCEPT) == false) {
558                     processGotDataInBuffers();
559                 }
560             }
561
562             //check if any data was read but not yet processed
563
while(getInputStream().available()>0) {
564                 logger.finest("Sending again for processing...");
565                 if(hasEvent(ClientEvent.ACCEPT) == false) {
566                     processGotDataInBuffers();
567                     break;
568                 } else {
569                     synchronized(getInputStream()) {
570                         getInputStream().notifyAll();
571                     }
572                     Thread.sleep(100);
573                 }
574             }
575
576             if(connection) {
577                 registerForRead();
578                 //getSelectionKey().selector().wakeup();
579
returnThread(); //return to pool
580
return true;
581             }
582         }//end of else
583
logger.finest("We don't have connection, lets return all resources.");
584         return false;
585     }
586
587     /**
588      * Process write
589      * @return value indicates if the thread should return form run()
590      */

591     private boolean processWrite() throws IOException {
592         updateLastCommunicationTime();
593         
594         boolean flag = byteBufferOutputStream.writeAllByteBuffer();
595         
596         if(flag==false) {
597             registerWrite();
598         } else if(/*flag==true && */clientWriteHandler!=null) {
599             clientWriteHandler.handleWrite(this);
600         }
601         
602         if(connection) {
603             returnThread(); //return to pool
604
return true;
605         } else {
606             logger.finest("We don't have connection, lets return all resources.");
607         }
608         return false;
609     }
610
611     protected void returnThread() {
612         threadAccessCount--;
613         Assertion.affirm(threadAccessCount>=0, "ThreadAccessCount went less the 0! Value: "+threadAccessCount);
614         //return is done at ClientThread end
615
removeEvent((ClientEvent)threadEvent.get());
616     }
617
618     protected void returnClientHandler() {
619         logger.finest(getName());
620         try {
621             for(int i=0;threadAccessCount!=0;i++) {
622                 if(i==100) {
623                     logger.warning("ClientHandler must have got into a loop waiting for thread to free up! ThreadAccessCount="+threadAccessCount);
624                     threadAccessCount = 0;
625                     if(Assertion.isEnabled()) {
626                         assertionSystemExit();
627                     } else {
628                         break;
629                     }
630                 }
631                 if(threadAccessCount<=0) break;
632
633                 logger.finest("Waiting for other thread of "+getName()+" to finish");
634                 Thread.sleep(60);
635             }
636         } catch(InterruptedException JavaDoc ie) {
637             appLogger.warning("InterruptedException: "+ie);
638         }
639         super.returnClientHandler();
640     }
641
642     public void setDataMode(DataMode dataMode, DataType dataType)
643             throws IOException {
644         if(getDataMode(dataType)==dataMode) return;
645
646         appLogger.fine("Setting Type:"+dataType+", Mode:"+dataMode);
647         super.checkDataModeSet(dataMode, dataType);
648
649         setDataModeNonBlocking(dataMode, dataType);
650     }
651
652     private void setDataModeNonBlocking(DataMode dataMode, DataType dataType)
653             throws IOException {
654         logger.finest("ENTER");
655         if(dataMode == DataMode.STRING) {
656             if(dataType == DataType.OUT) {
657                 if(dataModeOUT == DataMode.BYTE || dataModeOUT == DataMode.BINARY) {
658                     dataModeOUT = dataMode;
659                 } else if(dataModeOUT == DataMode.OBJECT) {
660                     dataModeOUT = dataMode;
661                     o_out.flush(); o_out = null;
662                     b_out = new BufferedOutputStream(out);
663                 } else {
664                     Assertion.affirm(false, "Unknown DataType.OUT DataMode - "+dataModeOUT);
665                 }
666                 Assertion.affirm(b_out!=null, "BufferedOutputStream is still null!");
667                 Assertion.affirm(o_out==null, "ObjectOutputStream is still not null!");
668             } else if(dataType == DataType.IN) {
669                 dataModeIN = dataMode;
670
671                 if(o_in!=null) {
672                     if(o_in.available()!=0)
673                         logger.warning("Data looks to be present in ObjectInputStream");
674                     o_in = null;
675                 }
676                 b_in = null;
677                 bufferedReader = null;
678                 //input stream will work
679
Assertion.affirm(in!=null, "InputStream is still null!");
680                 Assertion.affirm(b_in==null, "BufferedInputStream is still not null!");
681                 Assertion.affirm(bufferedReader==null, "BufferedReader is still not null!");
682             }
683         } else if(dataMode == DataMode.OBJECT) {
684             if(dataType == DataType.IN) {
685                 //we will disable this for now
686
throw new IllegalArgumentException JavaDoc("Can't set DataType.IN mode to OBJECT when blocking mode is set as false!");
687             }
688
689             if(dataType == DataType.OUT) {
690                 dataModeOUT = dataMode;
691                 b_out = null;
692                 o_out = new ObjectOutputStream(out);
693                 Assertion.affirm(o_out!=null, "ObjectOutputStream is still null!");
694             } /*else if(dataType == DataType.IN) {
695                 dataModeIN = dataMode;
696                 b_in = null;
697                 bufferedReader = null;
698                 //registerForRead();
699                 o_in = new ObjectInputStream(in); //will block
700                 Assertion.affirm(o_in!=null, "ObjectInputStream is still null!");
701             }*/

702         } else if(dataMode == DataMode.BYTE || dataMode == DataMode.BINARY) {
703             if(dataType == DataType.OUT) {
704                 if(dataModeOUT == DataMode.STRING ||
705                         dataModeOUT == DataMode.BYTE ||
706                         dataModeOUT == DataMode.BINARY) {
707                     dataModeOUT = dataMode;
708                 } else if(dataModeOUT == DataMode.OBJECT) {
709                     dataModeOUT = dataMode;
710                     
711                     o_out = null;
712                     b_out = new BufferedOutputStream(out);
713                 } else {
714                     Assertion.affirm(false, "Unknown DataType.OUT - DataMode: "+dataModeOUT);
715                 }
716                 Assertion.affirm(b_out!=null, "BufferedOutputStream is still null!");
717             } else if(dataType == DataType.IN) {
718                 dataModeIN = dataMode;
719                 o_in = null;
720                 bufferedReader = null;
721                 b_in = null;
722                 //input stream will work
723
Assertion.affirm(in!=null, "InputStream is still null!");
724             } else {
725                 throw new IllegalArgumentException JavaDoc("Unknown DataType : "+dataType);
726             }
727         } else {
728             throw new IllegalArgumentException JavaDoc("Unknown DataMode : "+dataMode);
729         }
730     }
731
732     protected byte[] readInputStream() throws IOException {
733         return readInputStream(getInputStream());
734     }
735
736     public void updateInputOutputStreams() throws IOException {
737         byteBufferOutputStream = new ByteBufferOutputStream(writeByteBuffer, this);
738         setInputStream( new ByteBufferInputStream(readByteBuffer, this, getCharset()) );
739         setOutputStream(byteBufferOutputStream);
740     }
741
742     public void setSocketChannel(SocketChannel socketChannel) {
743         this.socketChannel = socketChannel;
744     }
745     public SocketChannel getSocketChannel() {
746         return socketChannel;
747     }
748
749     public void setSelectionKey(SelectionKey selectionKey) {
750         this.selectionKey = selectionKey;
751     }
752     public SelectionKey getSelectionKey() {
753         if(selectionKey==null)
754             selectionKey = getSocketChannel().keyFor(getServer().getSelector());
755         return selectionKey;
756     }
757
758     private void processGotDataInBuffers() throws AppException,
759             ConnectionLostException, ClassNotFoundException JavaDoc, IOException {
760         if(getInputStream().available()==0) return;
761         
762         logger.finest("Trying to process got data.. DataMode.IN="+dataModeIN);
763         AuthStatus authStatus = null;
764         
765         //--For debug
766
//((ByteBufferInputStream) getInputStream()).dumpContent();
767

768         String JavaDoc temp = null;
769         String JavaDoc rec = null;
770         Object JavaDoc recObject = null;
771         byte[] recByte = null;
772
773         boolean timeToCheckForNewLineMiss = false;
774         
775         do {
776             //updateLastCommunicationTime();
777

778             if(dataModeIN == DataMode.STRING) {
779                 ByteBufferInputStream bbin = (ByteBufferInputStream)
780                     getInputStream();
781                 timeToCheckForNewLineMiss = true;
782
783                 while(bbin.isLineReady()) {
784
785                     rec = bbin.readLine();
786                     if(rec==null) {
787                         lost = true;
788                         return;
789                     }
790                     if(getCommunicationLogging() && authorised == true) {
791                         appLogger.fine("Got STRING ["+getHostAddress()+"] : "+
792                             rec);
793                     }
794                     
795                     if(authorised == false)
796                         authStatus = clientAuthenticationHandler.handleAuthentication(this, rec);
797                     else
798                         clientCommandHandler.handleCommand(this, rec);
799
800                     if(isClosed()==true) return;
801
802                     while(authStatus==AuthStatus.FAILURE)
803                         authStatus = processAuthorisation();
804
805                     if(authStatus==AuthStatus.SUCCESS)
806                         authorised = true;
807
808                     if(dataModeIN != DataMode.STRING) {
809                         break;
810                     }
811
812                     timeToCheckForNewLineMiss = false;
813                 }//end of while
814

815                 if(timeToCheckForNewLineMiss && bbin.availableOnlyInByteBuffer()==0) {
816                     return;
817                 } else {
818                     timeToCheckForNewLineMiss = false;
819                 }
820             }
821
822             //} else if(dataModeIN == DataMode.OBJECT) {
823
/*
824             while(dataModeIN == DataMode.OBJECT && o_in!=null) {
825                 recObject = o_in.readObject();
826                 if(recObject==null) {
827                     lost = true;
828                     return;
829                 }
830                 if(getCommunicationLogging() && authorised == true) {
831                     appLogger.fine("Got OBJECT ["+getHostAddress()+"] : "+
832                         recObject.toString());
833                 }
834
835
836                 if(authorised == false)
837                     authStatus = clientAuthenticationHandler.handleAuthentication(this, recObject);
838                 else
839                     clientObjectHandler.handleObject(this, recObject);
840                 
841                 if(isClosed()==true) return;
842
843                 while(authStatus==AuthStatus.FAILURE)
844                     authStatus = processAuthorisation();
845                 
846                 if(authStatus==AuthStatus.SUCCESS)
847                     authorised = true;
848             }
849             */

850             
851             //} else if(dataModeIN == DataMode.BYTE) {
852
while(dataModeIN == DataMode.BYTE && getInputStream().available()!=0) {
853                 rec = readBytes();
854                 if(rec==null) {
855                     lost = true;
856                     return;
857                 }
858                 if(getCommunicationLogging() && authorised == true) {
859                     appLogger.fine("Got BYTE ["+getHostAddress()+"] : "+rec);
860                 }
861
862                 if(authorised == false)
863                     authStatus = clientAuthenticationHandler.handleAuthentication(this, rec);
864                 else
865                     clientCommandHandler.handleCommand(this, rec);
866
867                 if(isClosed()==true) return;
868
869                 while(authStatus==AuthStatus.FAILURE)
870                     authStatus = processAuthorisation();
871
872                 if(authStatus==AuthStatus.SUCCESS)
873                     authorised = true;
874             }
875
876             //} else if(dataModeIN == DataMode.BINARY) {
877
while(dataModeIN == DataMode.BINARY && getInputStream().available()!=0) {
878                 recByte = readBinary();
879                 if(recByte==null) {
880                     lost = true;
881                     return;
882                 }
883                 if(getCommunicationLogging() && authorised == true) {
884                     appLogger.fine("Got BINARY ["+getHostAddress()+"] : "+
885                         MyString.getMemInfo(recByte.length));
886                 }
887
888                 if(authorised == false)
889                     authStatus = clientAuthenticationHandler.handleAuthentication(this, recByte);
890                 else
891                     clientBinaryHandler.handleBinary(this, recByte);
892
893                 if(isClosed()==true) return;
894
895                 while(authStatus==AuthStatus.FAILURE)
896                     authStatus = processAuthorisation();
897
898                 if(authStatus==AuthStatus.SUCCESS)
899                     authorised = true;
900             }
901
902             //} else {
903
if(dataModeIN != DataMode.STRING && dataModeIN != DataMode.OBJECT
904                 && dataModeIN != DataMode.BYTE && dataModeIN != DataMode.BINARY) {
905                 throw new IllegalStateException JavaDoc("Incoming DataMode is not supported : "+dataModeIN);
906             }
907         } while(getInputStream().available()!=0);
908     }
909
910     public void registerForRead()
911             throws IOException, ClosedChannelException {
912         try {
913             if(getSelectionKey()==null) {
914                 boolean flag = getServer().registerChannel(getSocketChannel(),
915                     SelectionKey.OP_READ, this);
916                 if(flag) {
917                     logger.finest("Adding OP_READ as interest Ops for "+getName());
918                 } else if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
919                     logger.finest("OP_READ is already present in interest Ops for "+getName());
920                 }
921             } else if(getSelectionKey().isValid()) {
922                 if((getSelectionKey().interestOps() & SelectionKey.OP_READ) == 0 ) {
923                     logger.finest("Adding OP_READ to interest Ops for "+getName());
924                     removeEvent(ClientEvent.READ);
925                     getSelectionKey().interestOps(getSelectionKey().interestOps()
926                         | SelectionKey.OP_READ);
927                     if(wakeupSelectorAfterRegisterRead) {
928                         getServer().getSelector().wakeup();
929                     }
930                 } else {
931                     if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
932                         logger.finest("OP_READ is already present in interest Ops for "+getName());
933                     }
934                 }
935             } else {
936                 throw new IOException("SelectionKey is invalid!");
937             }
938         } catch(CancelledKeyException e) {
939             throw new IOException("SelectionKey is cancelled!");
940         }
941     }
942
943     public void registerForWrite()
944             throws IOException, ClosedChannelException {
945         if(hasEvent(ClientEvent.RUN_BLOCKING) || hasEvent(ClientEvent.MAX_CON_BLOCKING)) {
946             throw new IllegalStateException JavaDoc("This method is only allowed under Non-Blocking mode.");
947         }
948
949         if(clientWriteHandler==null) {
950             throw new IllegalStateException JavaDoc("ClientWriteHandler has not been set!");
951         }
952         registerWrite();
953     }
954     
955     public void registerWrite() throws IOException {
956         try {
957             if(getSelectionKey()==null) {
958                 boolean flag = getServer().registerChannel(getSocketChannel(),
959                         SelectionKey.OP_WRITE, this);
960                 if(flag) {
961                     logger.finest("Adding OP_WRITE as interest Ops for "+getName());
962                 } else if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
963                     logger.finest("OP_WRITE is already present in interest Ops for "+getName());
964                 }
965             } else if(getSelectionKey().isValid()) {
966                 if((getSelectionKey().interestOps() & SelectionKey.OP_WRITE) == 0 ) {
967                     logger.finest("Adding OP_WRITE to interest Ops for "+getName());
968                     removeEvent(ClientEvent.WRITE);
969                     getSelectionKey().interestOps(getSelectionKey().interestOps()
970                         | SelectionKey.OP_WRITE);
971                     if(wakeupSelectorAfterRegisterWrite) {
972                         getServer().getSelector().wakeup();
973                     }
974                 } else {
975                     if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
976                         logger.finest("OP_WRITE is already present in interest Ops for "+getName());
977                     }
978                 }
979             } else {
980                 throw new IOException("SelectionKey is invalid!");
981             }
982         } catch(CancelledKeyException e) {
983             throw new IOException("SelectionKey is cancelled!");
984         }
985     }
986
987     protected void setClientWriteHandler(ClientWriteHandler handler) {
988         clientWriteHandler=handler;
989     }
990
991     /**
992      * Returns number of thread currently in this object.
993      * @since 1.4.6
994      */

995     public int getThreadAccessCount() {
996         return threadAccessCount;
997     }
998 }
999
Popular Tags