1 16 package org.apache.commons.net.telnet; 17 18 import java.io.BufferedInputStream ; 19 import java.io.IOException ; 20 import java.io.InputStream ; 21 import java.io.InterruptedIOException ; 22 23 32 33 34 final class TelnetInputStream extends BufferedInputStream implements Runnable 35 { 36 static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2, 37 _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5, 38 _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9; 39 40 private boolean __hasReachedEOF, __isClosed; 41 private boolean __readIsWaiting; 42 private int __receiveState, __queueHead, __queueTail, __bytesAvailable; 43 private int[] __queue; 44 private TelnetClient __client; 45 private Thread __thread; 46 private IOException __ioException; 47 48 49 private int __suboption[] = new int[256]; 50 private int __suboption_count = 0; 51 52 53 private boolean __threaded; 54 55 TelnetInputStream(InputStream input, TelnetClient client, 56 boolean readerThread) 57 { 58 super(input); 59 __client = client; 60 __receiveState = _STATE_DATA; 61 __isClosed = true; 62 __hasReachedEOF = false; 63 __queue = new int[2049]; 66 __queueHead = 0; 67 __queueTail = 0; 68 __bytesAvailable = 0; 69 __ioException = null; 70 __readIsWaiting = false; 71 __threaded = false; 72 if(readerThread) 73 __thread = new Thread (this); 74 else 75 __thread = null; 76 } 77 78 TelnetInputStream(InputStream input, TelnetClient client) { 79 this(input, client, true); 80 } 81 82 void _start() 83 { 84 if(__thread == null) 85 return; 86 87 int priority; 88 __isClosed = false; 89 priority = Thread.currentThread().getPriority() + 1; 93 if (priority > Thread.MAX_PRIORITY) 94 priority = Thread.MAX_PRIORITY; 95 __thread.setPriority(priority); 96 __thread.setDaemon(true); 97 __thread.start(); 98 __threaded = true; 99 } 100 101 102 private int __read() throws IOException 107 { 108 int ch; 109 110 _loop: 111 while (true) 112 { 113 if ((ch = super.read()) < 0) 115 return -1; 116 117 ch = (ch & 0xff); 118 119 120 synchronized (__client) 121 { 122 __client._processAYTResponse(); 123 } 124 125 126 127 __client._spyRead(ch); 128 129 130 _mainSwitch: 131 switch (__receiveState) 132 { 133 134 case _STATE_CR: 135 if (ch == '\0') 136 { 137 continue; 139 } 140 143 145 case _STATE_DATA: 146 if (ch == TelnetCommand.IAC) 147 { 148 __receiveState = _STATE_IAC; 149 continue; 150 } 151 152 153 if (ch == '\r') 154 { 155 synchronized (__client) 156 { 157 if (__client._requestedDont(TelnetOption.BINARY)) 158 __receiveState = _STATE_CR; 159 else 160 __receiveState = _STATE_DATA; 161 } 162 } 163 else 164 __receiveState = _STATE_DATA; 165 break; 166 167 case _STATE_IAC: 168 switch (ch) 169 { 170 case TelnetCommand.WILL: 171 __receiveState = _STATE_WILL; 172 continue; 173 case TelnetCommand.WONT: 174 __receiveState = _STATE_WONT; 175 continue; 176 case TelnetCommand.DO: 177 __receiveState = _STATE_DO; 178 continue; 179 case TelnetCommand.DONT: 180 __receiveState = _STATE_DONT; 181 continue; 182 183 case TelnetCommand.SB: 184 __suboption_count = 0; 185 __receiveState = _STATE_SB; 186 continue; 187 188 case TelnetCommand.IAC: 189 __receiveState = _STATE_DATA; 190 break; 191 default: 192 break; 193 } 194 __receiveState = _STATE_DATA; 195 continue; 196 case _STATE_WILL: 197 synchronized (__client) 198 { 199 __client._processWill(ch); 200 __client._flushOutputStream(); 201 } 202 __receiveState = _STATE_DATA; 203 continue; 204 case _STATE_WONT: 205 synchronized (__client) 206 { 207 __client._processWont(ch); 208 __client._flushOutputStream(); 209 } 210 __receiveState = _STATE_DATA; 211 continue; 212 case _STATE_DO: 213 synchronized (__client) 214 { 215 __client._processDo(ch); 216 __client._flushOutputStream(); 217 } 218 __receiveState = _STATE_DATA; 219 continue; 220 case _STATE_DONT: 221 synchronized (__client) 222 { 223 __client._processDont(ch); 224 __client._flushOutputStream(); 225 } 226 __receiveState = _STATE_DATA; 227 continue; 228 229 case _STATE_SB: 230 switch (ch) 231 { 232 case TelnetCommand.IAC: 233 __receiveState = _STATE_IAC_SB; 234 continue; 235 default: 236 __suboption[__suboption_count++] = ch; 238 break; 239 } 240 __receiveState = _STATE_SB; 241 continue; 242 case _STATE_IAC_SB: 243 switch (ch) 244 { 245 case TelnetCommand.SE: 246 synchronized (__client) 247 { 248 __client._processSuboption(__suboption, __suboption_count); 249 __client._flushOutputStream(); 250 } 251 __receiveState = _STATE_DATA; 252 continue; 253 default: 254 __receiveState = _STATE_SB; 255 break; 256 } 257 __receiveState = _STATE_DATA; 258 continue; 259 260 } 261 262 break; 263 } 264 265 return ch; 266 } 267 268 private void __processChar(int ch) throws InterruptedException 273 { 274 synchronized (__queue) 277 { 278 while (__bytesAvailable >= __queue.length - 1) 279 { 280 if(__threaded) 281 { 282 __queue.notify(); 283 try 284 { 285 __queue.wait(); 286 } 287 catch (InterruptedException e) 288 { 289 throw e; 290 } 291 } 292 } 293 294 if (__readIsWaiting && __threaded) 296 { 297 __queue.notify(); 298 } 299 300 __queue[__queueTail] = ch; 301 ++__bytesAvailable; 302 303 if (++__queueTail >= __queue.length) 304 __queueTail = 0; 305 } 306 } 307 308 public int read() throws IOException 309 { 310 synchronized (__queue) 314 { 315 316 while (true) 317 { 318 if (__ioException != null) 319 { 320 IOException e; 321 e = __ioException; 322 __ioException = null; 323 throw e; 324 } 325 326 if (__bytesAvailable == 0) 327 { 328 if (__hasReachedEOF) 330 return -1; 331 332 if(__threaded) 334 { 335 __queue.notify(); 336 try 337 { 338 __readIsWaiting = true; 339 __queue.wait(); 340 __readIsWaiting = false; 341 } 342 catch (InterruptedException e) 343 { 344 throw new IOException ("Fatal thread interruption during read."); 345 } 346 } 347 else 348 { 349 __readIsWaiting = true; 351 int ch; 352 353 do 354 { 355 try 356 { 357 if ((ch = __read()) < 0) 358 if(ch != -2) 359 return (ch); 360 } 361 catch (InterruptedIOException e) 362 { 363 synchronized (__queue) 364 { 365 __ioException = e; 366 __queue.notifyAll(); 367 try 368 { 369 __queue.wait(100); 370 } 371 catch (InterruptedException interrupted) 372 { 373 } 374 } 375 return (-1); 376 } 377 378 379 try 380 { 381 if(ch != -2) 382 { 383 __processChar(ch); 384 } 385 } 386 catch (InterruptedException e) 387 { 388 if (__isClosed) 389 return (-1); 390 } 391 } 392 while (super.available() > 0); 393 394 __readIsWaiting = false; 395 } 396 continue; 397 } 398 else 399 { 400 int ch; 401 402 ch = __queue[__queueHead]; 403 404 if (++__queueHead >= __queue.length) 405 __queueHead = 0; 406 407 --__bytesAvailable; 408 409 if(__bytesAvailable == 0 && __threaded) { 411 __queue.notify(); 412 } 413 414 return ch; 415 } 416 } 417 } 418 } 419 420 421 432 public int read(byte buffer[]) throws IOException 433 { 434 return read(buffer, 0, buffer.length); 435 } 436 437 438 452 public int read(byte buffer[], int offset, int length) throws IOException 453 { 454 int ch, off; 455 456 if (length < 1) 457 return 0; 458 459 synchronized (__queue) 461 { 462 if (length > __bytesAvailable) 463 length = __bytesAvailable; 464 } 465 466 if ((ch = read()) == -1) 467 return -1; 468 469 off = offset; 470 471 do 472 { 473 buffer[offset++] = (byte)ch; 474 } 475 while (--length > 0 && (ch = read()) != -1); 476 477 return (offset - off); 479 } 480 481 482 483 public boolean markSupported() 484 { 485 return false; 486 } 487 488 public int available() throws IOException 489 { 490 synchronized (__queue) 492 { 493 return __bytesAvailable; 494 } 495 } 496 497 498 public void close() throws IOException 501 { 502 super.close(); 507 508 synchronized (__queue) 509 { 510 __hasReachedEOF = true; 511 __isClosed = true; 512 513 if (__thread != null && __thread.isAlive()) 514 { 515 __thread.interrupt(); 516 } 517 518 __queue.notifyAll(); 519 } 520 521 __threaded = false; 522 } 523 524 public void run() 525 { 526 int ch; 527 528 try 529 { 530 _outerLoop: 531 while (!__isClosed) 532 { 533 try 534 { 535 if ((ch = __read()) < 0) 536 break; 537 } 538 catch (InterruptedIOException e) 539 { 540 synchronized (__queue) 541 { 542 __ioException = e; 543 __queue.notifyAll(); 544 try 545 { 546 __queue.wait(100); 547 } 548 catch (InterruptedException interrupted) 549 { 550 if (__isClosed) 551 break _outerLoop; 552 } 553 continue; 554 } 555 } catch(RuntimeException re) { 556 super.close(); 560 break _outerLoop; 563 } 564 565 try 566 { 567 __processChar(ch); 568 } 569 catch (InterruptedException e) 570 { 571 if (__isClosed) 572 break _outerLoop; 573 } 574 } 575 } 576 catch (IOException ioe) 577 { 578 synchronized (__queue) 579 { 580 __ioException = ioe; 581 } 582 } 583 584 synchronized (__queue) 585 { 586 __isClosed = true; __hasReachedEOF = true; 588 __queue.notify(); 589 } 590 591 __threaded = false; 592 } 593 } 594 595 602 | Popular Tags |