1 28 29 package HTTPClient; 30 31 32 import java.io.*; 33 import java.net.Socket ; 34 import java.util.Vector ; 35 import java.util.Enumeration ; 36 37 44 45 class StreamDemultiplexor implements GlobalConstants 46 { 47 48 private int Protocol; 49 50 51 private HTTPConnection Connection; 52 53 54 private ExtBufferedInputStream Stream; 55 56 57 private Socket Sock = null; 58 59 60 private ResponseHandler MarkedForClose; 61 62 63 private SocketTimeout.TimeoutEntry Timer = null; 64 65 66 private static SocketTimeout TimerThread = null; 67 68 69 private LinkedList RespHandlerList; 70 71 72 private int chunk_len; 73 74 75 private int cur_timeout = 0; 76 77 78 static 79 { 80 TimerThread = new SocketTimeout(60); 81 TimerThread.start(); 82 } 83 84 85 87 94 StreamDemultiplexor(int protocol, Socket sock, HTTPConnection connection) 95 throws IOException 96 { 97 this.Protocol = protocol; 98 this.Connection = connection; 99 RespHandlerList = new LinkedList(); 100 init(sock); 101 } 102 103 104 109 private void init(Socket sock) throws IOException 110 { 111 if (DebugDemux) 112 System.err.println("Demux: Initializing Stream Demultiplexor (" + 113 this.hashCode() + ")"); 114 115 this.Sock = sock; 116 this.Stream = new ExtBufferedInputStream(sock.getInputStream()); 117 MarkedForClose = null; 118 chunk_len = -1; 119 120 Timer = TimerThread.setTimeout(this); 122 } 123 124 125 127 130 void register(Response resp_handler, Request req) throws RetryException 131 { 132 synchronized(RespHandlerList) 133 { 134 if (Sock == null) 135 throw new RetryException(); 136 137 RespHandlerList.addToEnd( 138 new ResponseHandler(resp_handler, req, this)); 139 } 140 } 141 142 148 RespInputStream getStream(Response resp) 149 { 150 ResponseHandler resph; 151 for (resph = (ResponseHandler) RespHandlerList.enumerate(); 152 resph != null; resph = (ResponseHandler) RespHandlerList.next()) 153 { 154 if (resph.resp == resp) break; 155 } 156 157 if (resph != null) 158 return resph.stream; 159 else 160 return null; 161 } 162 163 164 168 void restartTimer() 169 { 170 if (Timer != null) Timer.reset(); 171 } 172 173 174 177 int read(byte[] b, int off, int len, ResponseHandler resph, int timeout) 178 throws IOException 179 { 180 if (resph.exception != null) 181 throw (IOException) resph.exception.fillInStackTrace(); 182 183 if (resph.eof) 184 return -1; 185 186 187 189 ResponseHandler head; 190 while ((head = (ResponseHandler) RespHandlerList.getFirst()) != null && 191 head != resph) 192 { 193 try 194 { head.stream.readAll(timeout); } 195 catch (IOException ioe) 196 { 197 if (ioe instanceof InterruptedIOException) 198 throw ioe; 199 else 200 throw (IOException) resph.exception.fillInStackTrace(); 201 } 202 } 203 204 205 207 synchronized(this) 208 { 209 if (resph.exception != null) 210 throw (IOException) resph.exception.fillInStackTrace(); 211 212 if (DebugDemux) 213 { 214 if (resph.resp.cd_type != CD_HDRS) 215 System.err.println("Demux: Reading for stream " + 216 resph.stream.hashCode() + 217 " (" + Thread.currentThread() + ")"); 218 } 219 220 if (Timer != null) Timer.hyber(); 221 222 try 223 { 224 int rcvd = -1; 225 226 if (timeout != cur_timeout) 227 { 228 if (DebugDemux) 229 { 230 System.err.println("Demux: Setting timeout to " + 231 timeout + " ms"); 232 } 233 234 try 235 { Sock.setSoTimeout(timeout); } 236 catch (Throwable t) 237 { } 238 cur_timeout = timeout; 239 } 240 241 switch (resph.resp.cd_type) 242 { 243 case CD_HDRS: 244 rcvd = Stream.read(b, off, len); 245 if (rcvd == -1) 246 throw new EOFException("Premature EOF encountered"); 247 break; 248 249 case CD_0: 250 rcvd = -1; 251 close(resph); 252 break; 253 254 case CD_CLOSE: 255 rcvd = Stream.read(b, off, len); 256 if (rcvd == -1) 257 close(resph); 258 break; 259 260 case CD_CONTLEN: 261 int cl = resph.resp.ContentLength; 262 if (len > cl - resph.stream.count) 263 len = cl - resph.stream.count; 264 265 rcvd = Stream.read(b, off, len); 266 if (rcvd == -1) 267 throw new EOFException("Premature EOF encountered"); 268 269 if (resph.stream.count+rcvd == cl) 270 close(resph); 271 272 break; 273 274 case CD_CHUNKED: 275 if (chunk_len == -1) chunk_len = Codecs.getChunkLength(Stream); 277 278 if (chunk_len > 0) { 280 if (len > chunk_len) len = chunk_len; 281 rcvd = Stream.read(b, off, len); 282 if (rcvd == -1) 283 throw new EOFException("Premature EOF encountered"); 284 chunk_len -= rcvd; 285 if (chunk_len == 0) { 287 Stream.read(); Stream.read(); chunk_len = -1; 290 } 291 } 292 else { 294 resph.resp.readTrailers(Stream); 295 rcvd = -1; 296 close(resph); 297 chunk_len = -1; 298 } 299 break; 300 301 case CD_MP_BR: 302 byte[] endbndry = resph.getEndBoundary(Stream); 303 int[] end_cmp = resph.getEndCompiled(Stream); 304 305 rcvd = Stream.read(b, off, len); 306 if (rcvd == -1) 307 throw new EOFException("Premature EOF encountered"); 308 309 int ovf = Stream.pastEnd(endbndry, end_cmp); 310 if (ovf != -1) 311 { 312 rcvd -= ovf; 313 Stream.reset(); 314 close(resph); 315 } 316 317 break; 318 319 default: 320 throw new Error ("Internal Error in StreamDemultiplexor: " + 321 "Invalid cd_type " + resph.resp.cd_type); 322 } 323 324 restartTimer(); 325 return rcvd; 326 327 } 328 catch (InterruptedIOException ie) { 330 restartTimer(); 331 throw ie; 332 } 333 catch (IOException ioe) 334 { 335 if (DebugDemux) 336 { 337 System.err.print("Demux: (" + Thread.currentThread() + ") "); 338 ioe.printStackTrace(); 339 } 340 341 close(ioe, true); 342 throw resph.exception; } 344 catch (ParseException pe) 345 { 346 if (DebugDemux) 347 { 348 System.err.print("Demux: (" + Thread.currentThread() + ") "); 349 pe.printStackTrace(); 350 } 351 352 close(new IOException(pe.toString()), true); 353 throw resph.exception; } 355 } 356 } 357 358 362 synchronized long skip(long num, ResponseHandler resph) throws IOException 363 { 364 if (resph.exception != null) 365 throw (IOException) resph.exception.fillInStackTrace(); 366 367 if (resph.eof) 368 return 0; 369 370 byte[] dummy = new byte[(int) num]; 371 int rcvd = read(dummy, 0, (int) num, resph, 0); 372 if (rcvd == -1) 373 return 0; 374 else 375 return rcvd; 376 } 377 378 381 synchronized int available(ResponseHandler resph) throws IOException 382 { 383 int avail = Stream.available(); 384 if (resph == null) return avail; 385 386 if (resph.exception != null) 387 throw (IOException) resph.exception.fillInStackTrace(); 388 389 if (resph.eof) 390 return 0; 391 392 switch (resph.resp.cd_type) 393 { 394 case CD_0: 395 return 0; 396 case CD_HDRS: 397 return (avail > 0 ? 1 : 0); 403 case CD_CLOSE: 404 return avail; 405 case CD_CONTLEN: 406 int cl = resph.resp.ContentLength; 407 cl -= resph.stream.count; 408 return (avail < cl ? avail : cl); 409 case CD_CHUNKED: 410 return avail; case CD_MP_BR: 412 return avail; default: 414 throw new Error ("Internal Error in StreamDemultiplexor: " + 415 "Invalid cd_type " + resph.resp.cd_type); 416 } 417 418 } 419 420 421 437 synchronized void close(IOException exception, boolean was_reset) 438 { 439 if (Sock == null) return; 441 442 if (DebugDemux) 443 System.err.println("Demux: Closing all streams and socket (" + 444 this.hashCode() + ")"); 445 446 try 447 { Stream.close(); } 448 catch (IOException ioe) { } 449 try 450 { Sock.close(); } 451 catch (IOException ioe) { } 452 Sock = null; 453 454 if (Timer != null) 455 { 456 Timer.kill(); 457 Timer = null; 458 } 459 460 Connection.DemuxList.remove(this); 461 462 463 465 if (exception != null) 466 synchronized(RespHandlerList) 467 { retry_requests(exception, was_reset); } 468 } 469 470 471 481 private void retry_requests(IOException exception, boolean was_reset) 482 { 483 RetryException first = null, 484 prev = null; 485 ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate(); 486 487 while (resph != null) 488 { 489 493 if (resph.resp.got_headers) 494 { 495 resph.exception = exception; 496 } 497 else 498 { 499 RetryException tmp = new RetryException(exception.getMessage()); 500 if (first == null) first = tmp; 501 502 tmp.request = resph.request; 503 tmp.response = resph.resp; 504 tmp.exception = exception; 505 tmp.conn_reset = was_reset; 506 tmp.first = first; 507 tmp.addToListAfter(prev); 508 509 prev = tmp; 510 resph.exception = tmp; 511 } 512 513 RespHandlerList.remove(resph); 514 resph = (ResponseHandler) RespHandlerList.next(); 515 } 516 } 517 518 519 523 synchronized void close(ResponseHandler resph) 524 { 525 if (resph != (ResponseHandler) RespHandlerList.getFirst()) 526 return; 527 528 if (DebugDemux) 529 System.err.println("Demux: Closing stream " + 530 resph.stream.hashCode() + 531 " (" + Thread.currentThread() + ")"); 532 533 resph.eof = true; 534 RespHandlerList.remove(resph); 535 536 if (resph == MarkedForClose) 537 close(new IOException("Premature end of Keep-Alive"), false); 538 else 539 closeSocketIfAllStreamsClosed(); 540 } 541 542 543 565 synchronized void closeSocketIfAllStreamsClosed() 566 { 567 synchronized(RespHandlerList) 568 { 569 ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate(); 570 571 while (resph != null && resph.stream.closed) 572 { 573 if (resph == MarkedForClose) 574 { 575 ResponseHandler tmp; 577 do 578 { 579 tmp = (ResponseHandler) RespHandlerList.getFirst(); 580 RespHandlerList.remove(tmp); 581 } 582 while (tmp != resph); 583 584 close(new IOException("Premature end of Keep-Alive"), false); 586 return; 587 } 588 589 resph = (ResponseHandler) RespHandlerList.next(); 590 } 591 } 592 } 593 594 595 598 synchronized Socket getSocket() 599 { 600 if (MarkedForClose != null) 601 return null; 602 603 if (Timer != null) Timer.hyber(); 604 return Sock; 605 } 606 607 608 616 synchronized void markForClose(Response resp) 617 { 618 synchronized(RespHandlerList) 619 { 620 if (RespHandlerList.getFirst() == null) { close(new IOException("Premature end of Keep-Alive"), false); 623 return; 624 } 625 } 626 627 if (Timer != null) 628 { 629 Timer.kill(); 630 Timer = null; 631 } 632 633 ResponseHandler resph, lasth = null; 634 for (resph = (ResponseHandler) RespHandlerList.enumerate(); 635 resph != null; resph = (ResponseHandler) RespHandlerList.next()) 636 { 637 if (resph.resp == resp) { 639 MarkedForClose = resph; 640 641 if (DebugDemux) 642 System.err.println("Demux: stream " + 643 resp.inp_stream.hashCode() + 644 " marked for close (" + 645 Thread.currentThread() + ")"); 646 647 closeSocketIfAllStreamsClosed(); 648 return; 649 } 650 651 if (MarkedForClose == resph) 652 return; 654 lasth = resph; 655 } 656 657 if (lasth == null) 658 return; 659 660 MarkedForClose = lasth; closeSocketIfAllStreamsClosed(); 662 663 if (DebugDemux) 664 System.err.println("Demux: stream " + lasth.stream.hashCode() + 665 " marked for close (" + 666 Thread.currentThread() + ")"); 667 } 668 669 670 676 void abort() 677 { 678 if (DebugDemux) 679 System.err.println("Demux: Aborting socket (" + 680 this.hashCode() + ")"); 681 682 683 685 synchronized(RespHandlerList) 686 { 687 for (ResponseHandler resph = 688 (ResponseHandler) RespHandlerList.enumerate(); 689 resph != null; 690 resph = (ResponseHandler) RespHandlerList.next()) 691 { 692 if (resph.resp.http_resp != null) 693 resph.resp.http_resp.markAborted(); 694 if (resph.exception == null) 695 resph.exception = new IOException("Request aborted by user"); 696 } 697 698 699 704 if (Sock != null) 705 { 706 try 707 { 708 try 709 { Sock.setSoLinger(false, 0); } 710 catch (Throwable t) 711 { } 712 713 try 714 { Stream.close(); } 715 catch (IOException ioe) { } 716 try 717 { Sock.close(); } 718 catch (IOException ioe) { } 719 Sock = null; 720 721 if (Timer != null) 722 { 723 Timer.kill(); 724 Timer = null; 725 } 726 } 727 catch (NullPointerException npe) 728 { } 729 730 Connection.DemuxList.remove(this); 731 } 732 } 733 } 734 735 736 739 protected void finalize() throws Throwable 740 { 741 close((IOException) null, false); 742 super.finalize(); 743 } 744 745 746 750 public String toString() 751 { 752 String prot; 753 754 switch (Protocol) 755 { 756 case HTTP: 757 prot = "HTTP"; break; 758 case HTTPS: 759 prot = "HTTPS"; break; 760 case SHTTP: 761 prot = "SHTTP"; break; 762 case HTTP_NG: 763 prot = "HTTP_NG"; break; 764 default: 765 throw new Error ("HTTPClient Internal Error: invalid protocol " + 766 Protocol); 767 } 768 769 return getClass().getName() + "[Protocol=" + prot + "]"; 770 } 771 } 772 773 774 778 class SocketTimeout extends Thread implements GlobalConstants 779 { 780 787 class TimeoutEntry 788 { 789 boolean restart = false, 790 hyber = false, 791 alive = true; 792 StreamDemultiplexor demux; 793 TimeoutEntry next = null, 794 prev = null; 795 796 TimeoutEntry(StreamDemultiplexor demux) 797 { 798 this.demux = demux; 799 } 800 801 void reset() 802 { 803 hyber = false; 804 if (restart) return; 805 restart = true; 806 807 synchronized(time_list) 808 { 809 if (!alive) return; 810 811 next.prev = prev; 813 prev.next = next; 814 815 next = time_list[current]; 817 prev = time_list[current].prev; 818 prev.next = this; 819 next.prev = this; 820 } 821 } 822 823 void hyber() 824 { 825 if (alive) hyber = true; 826 } 827 828 void kill() 829 { 830 alive = false; 831 restart = false; 832 hyber = false; 833 834 synchronized(time_list) 835 { 836 if (prev == null) return; 837 next.prev = prev; 838 prev.next = next; 839 prev = null; 840 } 841 } 842 } 843 844 private TimeoutEntry[] time_list; 845 private int current; 846 847 848 SocketTimeout(int secs) 849 { 850 super("SocketTimeout"); 851 852 try { setDaemon(true); } 853 catch (SecurityException se) { } setPriority(MAX_PRIORITY); 855 856 time_list = new TimeoutEntry[secs]; 857 for (int idx=0; idx<secs; idx++) 858 { 859 time_list[idx] = new TimeoutEntry(null); 860 time_list[idx].next = time_list[idx].prev = time_list[idx]; 861 } 862 current = 0; 863 } 864 865 866 public TimeoutEntry setTimeout(StreamDemultiplexor demux) 867 { 868 TimeoutEntry entry = new TimeoutEntry(demux); 869 synchronized(time_list) 870 { 871 entry.next = time_list[current]; 872 entry.prev = time_list[current].prev; 873 entry.prev.next = entry; 874 entry.next.prev = entry; 875 } 876 877 return entry; 878 } 879 880 881 885 public void run() 886 { 887 TimeoutEntry marked = null; 888 889 while (true) 890 { 891 try { sleep(1000L); } catch (InterruptedException ie) { } 892 893 synchronized(time_list) 894 { 895 for (TimeoutEntry entry = time_list[current].next; 897 entry != time_list[current]; 898 entry = entry.next) 899 { 900 entry.restart = false; 901 } 902 903 current++; 904 if (current >= time_list.length) 905 current = 0; 906 907 for (TimeoutEntry entry = time_list[current].next; 909 entry != time_list[current]; 910 entry = entry.next) 911 { 912 if (entry.alive && !entry.hyber) 913 { 914 TimeoutEntry prev = entry.prev; 915 entry.kill(); 916 921 entry.next = marked; 922 marked = entry; 923 entry = prev; 924 } 925 } 926 } 927 928 while (marked != null) 929 { 930 marked.demux.markForClose(null); 931 marked = marked.next; 932 } 933 } 934 } 935 } 936 937 | Popular Tags |