KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > commons > net > telnet > TelnetInputStream


1 /*
2  * Copyright 2003-2004 The Apache Software Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.apache.commons.net.telnet;
17
18 import java.io.BufferedInputStream JavaDoc;
19 import java.io.IOException JavaDoc;
20 import java.io.InputStream JavaDoc;
21 import java.io.InterruptedIOException JavaDoc;
22
23 /***
24  *
25  * <p>
26  *
27  * <p>
28  * <p>
29  * @author Daniel F. Savarese
30  * @author Bruno D'Avanzo
31  ***/

32
33
34 final class TelnetInputStream extends BufferedInputStream JavaDoc implements Runnable JavaDoc
35 {
36     static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
37                      _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
38                      _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9;
39
40     private boolean __hasReachedEOF, __isClosed;
41     private boolean __readIsWaiting;
42     private int __receiveState, __queueHead, __queueTail, __bytesAvailable;
43     private int[] __queue;
44     private TelnetClient __client;
45     private Thread JavaDoc __thread;
46     private IOException JavaDoc __ioException;
47
48     /* TERMINAL-TYPE option (start)*/
49     private int __suboption[] = new int[256];
50     private int __suboption_count = 0;
51     /* TERMINAL-TYPE option (end)*/
52
53     private boolean __threaded;
54
55     TelnetInputStream(InputStream JavaDoc input, TelnetClient client,
56                       boolean readerThread)
57     {
58         super(input);
59         __client = client;
60         __receiveState = _STATE_DATA;
61         __isClosed = true;
62         __hasReachedEOF = false;
63         // Make it 2049, because when full, one slot will go unused, and we
64
// want a 2048 byte buffer just to have a round number (base 2 that is)
65
__queue = new int[2049];
66         __queueHead = 0;
67         __queueTail = 0;
68         __bytesAvailable = 0;
69         __ioException = null;
70         __readIsWaiting = false;
71         __threaded = false;
72         if(readerThread)
73             __thread = new Thread JavaDoc(this);
74         else
75             __thread = null;
76     }
77
78     TelnetInputStream(InputStream JavaDoc input, TelnetClient client) {
79         this(input, client, true);
80     }
81
82     void _start()
83     {
84         if(__thread == null)
85             return;
86
87         int priority;
88         __isClosed = false;
89         // Need to set a higher priority in case JVM does not use pre-emptive
90
// threads. This should prevent scheduler induced deadlock (rather than
91
// deadlock caused by a bug in this code).
92
priority = Thread.currentThread().getPriority() + 1;
93         if (priority > Thread.MAX_PRIORITY)
94             priority = Thread.MAX_PRIORITY;
95         __thread.setPriority(priority);
96         __thread.setDaemon(true);
97         __thread.start();
98         __threaded = true;
99     }
100
101
102     // synchronized(__client) critical sections are to protect against
103
// TelnetOutputStream writing through the telnet client at same time
104
// as a processDo/Will/etc. command invoked from TelnetInputStream
105
// tries to write.
106
private int __read() throws IOException JavaDoc
107     {
108         int ch;
109
110 _loop:
111         while (true)
112         {
113             // Exit only when we reach end of stream.
114
if ((ch = super.read()) < 0)
115                 return -1;
116
117             ch = (ch & 0xff);
118
119             /* Code Section added for supporting AYT (start)*/
120             synchronized (__client)
121             {
122                 __client._processAYTResponse();
123             }
124             /* Code Section added for supporting AYT (end)*/
125
126             /* Code Section added for supporting spystreams (start)*/
127             __client._spyRead(ch);
128             /* Code Section added for supporting spystreams (end)*/
129
130 _mainSwitch:
131             switch (__receiveState)
132             {
133
134             case _STATE_CR:
135                 if (ch == '\0')
136                 {
137                     // Strip null
138
continue;
139                 }
140                 // How do we handle newline after cr?
141
// else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
142

143                 // Handle as normal data by falling through to _STATE_DATA case
144

145             case _STATE_DATA:
146                 if (ch == TelnetCommand.IAC)
147                 {
148                     __receiveState = _STATE_IAC;
149                     continue;
150                 }
151
152
153                 if (ch == '\r')
154                 {
155                     synchronized (__client)
156                     {
157                         if (__client._requestedDont(TelnetOption.BINARY))
158                             __receiveState = _STATE_CR;
159                         else
160                             __receiveState = _STATE_DATA;
161                     }
162                 }
163                 else
164                     __receiveState = _STATE_DATA;
165                 break;
166
167             case _STATE_IAC:
168                 switch (ch)
169                 {
170                 case TelnetCommand.WILL:
171                     __receiveState = _STATE_WILL;
172                     continue;
173                 case TelnetCommand.WONT:
174                     __receiveState = _STATE_WONT;
175                     continue;
176                 case TelnetCommand.DO:
177                     __receiveState = _STATE_DO;
178                     continue;
179                 case TelnetCommand.DONT:
180                     __receiveState = _STATE_DONT;
181                     continue;
182                 /* TERMINAL-TYPE option (start)*/
183                 case TelnetCommand.SB:
184                     __suboption_count = 0;
185                     __receiveState = _STATE_SB;
186                     continue;
187                 /* TERMINAL-TYPE option (end)*/
188                 case TelnetCommand.IAC:
189                     __receiveState = _STATE_DATA;
190                     break;
191                 default:
192                     break;
193                 }
194                 __receiveState = _STATE_DATA;
195                 continue;
196             case _STATE_WILL:
197                 synchronized (__client)
198                 {
199                     __client._processWill(ch);
200                     __client._flushOutputStream();
201                 }
202                 __receiveState = _STATE_DATA;
203                 continue;
204             case _STATE_WONT:
205                 synchronized (__client)
206                 {
207                     __client._processWont(ch);
208                     __client._flushOutputStream();
209                 }
210                 __receiveState = _STATE_DATA;
211                 continue;
212             case _STATE_DO:
213                 synchronized (__client)
214                 {
215                     __client._processDo(ch);
216                     __client._flushOutputStream();
217                 }
218                 __receiveState = _STATE_DATA;
219                 continue;
220             case _STATE_DONT:
221                 synchronized (__client)
222                 {
223                     __client._processDont(ch);
224                     __client._flushOutputStream();
225                 }
226                 __receiveState = _STATE_DATA;
227                 continue;
228             /* TERMINAL-TYPE option (start)*/
229             case _STATE_SB:
230                 switch (ch)
231                 {
232                 case TelnetCommand.IAC:
233                     __receiveState = _STATE_IAC_SB;
234                     continue;
235                 default:
236                     // store suboption char
237
__suboption[__suboption_count++] = ch;
238                     break;
239                 }
240                 __receiveState = _STATE_SB;
241                 continue;
242             case _STATE_IAC_SB:
243                 switch (ch)
244                 {
245                 case TelnetCommand.SE:
246                     synchronized (__client)
247                     {
248                         __client._processSuboption(__suboption, __suboption_count);
249                         __client._flushOutputStream();
250                     }
251                     __receiveState = _STATE_DATA;
252                     continue;
253                 default:
254                     __receiveState = _STATE_SB;
255                     break;
256                 }
257                 __receiveState = _STATE_DATA;
258                 continue;
259             /* TERMINAL-TYPE option (end)*/
260             }
261
262             break;
263         }
264
265         return ch;
266     }
267
268     // synchronized(__client) critical sections are to protect against
269
// TelnetOutputStream writing through the telnet client at same time
270
// as a processDo/Will/etc. command invoked from TelnetInputStream
271
// tries to write.
272
private void __processChar(int ch) throws InterruptedException JavaDoc
273     {
274         // Critical section because we're altering __bytesAvailable,
275
// __queueTail, and the contents of _queue.
276
synchronized (__queue)
277         {
278             while (__bytesAvailable >= __queue.length - 1)
279             {
280                 if(__threaded)
281                 {
282                     __queue.notify();
283                     try
284                     {
285                         __queue.wait();
286                     }
287                     catch (InterruptedException JavaDoc e)
288                     {
289                         throw e;
290                     }
291                 }
292             }
293
294             // Need to do this in case we're not full, but block on a read
295
if (__readIsWaiting && __threaded)
296             {
297                 __queue.notify();
298             }
299
300             __queue[__queueTail] = ch;
301             ++__bytesAvailable;
302
303             if (++__queueTail >= __queue.length)
304                 __queueTail = 0;
305         }
306     }
307
308     public int read() throws IOException JavaDoc
309     {
310         // Critical section because we're altering __bytesAvailable,
311
// __queueHead, and the contents of _queue in addition to
312
// testing value of __hasReachedEOF.
313
synchronized (__queue)
314         {
315
316             while (true)
317             {
318                 if (__ioException != null)
319                 {
320                     IOException JavaDoc e;
321                     e = __ioException;
322                     __ioException = null;
323                     throw e;
324                 }
325
326                 if (__bytesAvailable == 0)
327                 {
328                     // Return -1 if at end of file
329
if (__hasReachedEOF)
330                         return -1;
331
332                     // Otherwise, we have to wait for queue to get something
333
if(__threaded)
334                     {
335                         __queue.notify();
336                         try
337                         {
338                             __readIsWaiting = true;
339                             __queue.wait();
340                             __readIsWaiting = false;
341                         }
342                         catch (InterruptedException JavaDoc e)
343                         {
344                             throw new IOException JavaDoc("Fatal thread interruption during read.");
345                         }
346                     }
347                     else
348                     {
349                         //__alreadyread = false;
350
__readIsWaiting = true;
351                         int ch;
352
353                         do
354                         {
355                             try
356                             {
357                                 if ((ch = __read()) < 0)
358                                     if(ch != -2)
359                                         return (ch);
360                             }
361                             catch (InterruptedIOException JavaDoc e)
362                             {
363                                 synchronized (__queue)
364                                 {
365                                     __ioException = e;
366                                     __queue.notifyAll();
367                                     try
368                                     {
369                                         __queue.wait(100);
370                                     }
371                                     catch (InterruptedException JavaDoc interrupted)
372                                     {
373                                     }
374                                 }
375                                 return (-1);
376                             }
377
378
379                             try
380                             {
381                                 if(ch != -2)
382                                 {
383                                     __processChar(ch);
384                                 }
385                             }
386                             catch (InterruptedException JavaDoc e)
387                             {
388                                 if (__isClosed)
389                                     return (-1);
390                             }
391                         }
392                         while (super.available() > 0);
393
394                         __readIsWaiting = false;
395                     }
396                     continue;
397                 }
398                 else
399                 {
400                     int ch;
401
402                     ch = __queue[__queueHead];
403
404                     if (++__queueHead >= __queue.length)
405                         __queueHead = 0;
406
407                     --__bytesAvailable;
408
409             // Need to explicitly notify() so available() works properly
410
if(__bytesAvailable == 0 && __threaded) {
411                 __queue.notify();
412             }
413             
414                     return ch;
415                 }
416             }
417         }
418     }
419
420
421     /***
422      * Reads the next number of bytes from the stream into an array and
423      * returns the number of bytes read. Returns -1 if the end of the
424      * stream has been reached.
425      * <p>
426      * @param buffer The byte array in which to store the data.
427      * @return The number of bytes read. Returns -1 if the
428      * end of the message has been reached.
429      * @exception IOException If an error occurs in reading the underlying
430      * stream.
431      ***/

432     public int read(byte buffer[]) throws IOException JavaDoc
433     {
434         return read(buffer, 0, buffer.length);
435     }
436
437
438     /***
439      * Reads the next number of bytes from the stream into an array and returns
440      * the number of bytes read. Returns -1 if the end of the
441      * message has been reached. The characters are stored in the array
442      * starting from the given offset and up to the length specified.
443      * <p>
444      * @param buffer The byte array in which to store the data.
445      * @param offset The offset into the array at which to start storing data.
446      * @param length The number of bytes to read.
447      * @return The number of bytes read. Returns -1 if the
448      * end of the stream has been reached.
449      * @exception IOException If an error occurs while reading the underlying
450      * stream.
451      ***/

452     public int read(byte buffer[], int offset, int length) throws IOException JavaDoc
453     {
454         int ch, off;
455
456         if (length < 1)
457             return 0;
458
459         // Critical section because run() may change __bytesAvailable
460
synchronized (__queue)
461         {
462             if (length > __bytesAvailable)
463                 length = __bytesAvailable;
464         }
465
466         if ((ch = read()) == -1)
467             return -1;
468
469         off = offset;
470
471         do
472         {
473             buffer[offset++] = (byte)ch;
474         }
475         while (--length > 0 && (ch = read()) != -1);
476
477         //__client._spyRead(buffer, off, offset - off);
478
return (offset - off);
479     }
480
481
482     /*** Returns false. Mark is not supported. ***/
483     public boolean markSupported()
484     {
485         return false;
486     }
487
488     public int available() throws IOException JavaDoc
489     {
490         // Critical section because run() may change __bytesAvailable
491
synchronized (__queue)
492         {
493             return __bytesAvailable;
494         }
495     }
496
497
498     // Cannot be synchronized. Will cause deadlock if run() is blocked
499
// in read because BufferedInputStream read() is synchronized.
500
public void close() throws IOException JavaDoc
501     {
502         // Completely disregard the fact thread may still be running.
503
// We can't afford to block on this close by waiting for
504
// thread to terminate because few if any JVM's will actually
505
// interrupt a system read() from the interrupt() method.
506
super.close();
507
508         synchronized (__queue)
509         {
510             __hasReachedEOF = true;
511             __isClosed = true;
512
513             if (__thread != null && __thread.isAlive())
514             {
515                 __thread.interrupt();
516             }
517
518             __queue.notifyAll();
519         }
520
521         __threaded = false;
522     }
523
524     public void run()
525     {
526         int ch;
527
528         try
529         {
530 _outerLoop:
531             while (!__isClosed)
532             {
533                 try
534                 {
535                     if ((ch = __read()) < 0)
536                         break;
537                 }
538                 catch (InterruptedIOException JavaDoc e)
539                 {
540                     synchronized (__queue)
541                     {
542                         __ioException = e;
543                         __queue.notifyAll();
544                         try
545                         {
546                             __queue.wait(100);
547                         }
548                         catch (InterruptedException JavaDoc interrupted)
549                         {
550                             if (__isClosed)
551                                 break _outerLoop;
552                         }
553                         continue;
554                     }
555                 } catch(RuntimeException JavaDoc re) {
556                     // We treat any runtime exceptions as though the
557
// stream has been closed. We close the
558
// underlying stream just to be sure.
559
super.close();
560                     // Breaking the loop has the effect of setting
561
// the state to closed at the end of the method.
562
break _outerLoop;
563                 }
564
565                 try
566                 {
567                     __processChar(ch);
568                 }
569                 catch (InterruptedException JavaDoc e)
570                 {
571                     if (__isClosed)
572                         break _outerLoop;
573                 }
574             }
575         }
576         catch (IOException JavaDoc ioe)
577         {
578             synchronized (__queue)
579             {
580                 __ioException = ioe;
581             }
582         }
583
584         synchronized (__queue)
585         {
586             __isClosed = true; // Possibly redundant
587
__hasReachedEOF = true;
588             __queue.notify();
589         }
590
591         __threaded = false;
592     }
593 }
594
595 /* Emacs configuration
596  * Local variables: **
597  * mode: java **
598  * c-basic-offset: 4 **
599  * indent-tabs-mode: nil **
600  * End: **
601  */

602
Popular Tags