1 package com.quikj.server.framework; 2 3 import java.util.*; 4 import java.io.*; 5 import java.net.*; 6 7 8 public class AceInputSocketStream extends AceThread implements AceCompareMessageInterface 9 { 10 11 public static final int READ = 0; 12 public static final int READLINE = 1; 13 public static final int ONE_BYTE_LENGTH = 2; 14 public static final int TWO_BYTE_LENGTH_MSB_FIRST = 3; 15 public static final int TWO_BYTE_LENGTH_LSB_FIRST = 4; 16 public static final int CUSTOM = 5; 17 public static final int MULTILINE = 6; 18 public static final int HTTP = 7; 19 21 public AceInputSocketStream(long user_parm, 22 String name, 23 AceThread cthread, 24 Socket socket, 25 byte[] discriminator, 26 int max_msg_size, 27 int mode, 28 int tag_length, 29 AceInputFilterInterface filter) 30 throws IOException, AceException 31 { 32 super(name, true); 33 34 socket.setSoTimeout(20000); 35 36 Thread parent_thread; 37 38 if (cthread == null) 39 { 40 parent_thread = Thread.currentThread(); 41 } 42 else 43 { 44 parent_thread = cthread; 45 } 46 47 if ((parent_thread instanceof AceThread) == false) 48 { 49 throw new AceException("The thread supplied as a parameter is not an AceThread"); 50 } 51 52 if ((mode != READ) && 53 (mode != READLINE) && 54 (mode != ONE_BYTE_LENGTH) && 55 (mode != TWO_BYTE_LENGTH_MSB_FIRST) && 56 (mode != TWO_BYTE_LENGTH_LSB_FIRST) && 57 (mode != CUSTOM) && 58 (mode != MULTILINE) && 59 (mode != HTTP)) 60 { 61 throw new AceException("Invalid mode : " + mode); 62 } 63 64 parentThread = (AceThread)parent_thread; 65 this.socket = socket; 66 inputStream = socket.getInputStream(); 67 protocolDiscriminator = discriminator; 68 maxMsgSize = max_msg_size; 69 operationMode = mode; 70 tagLength = tag_length; 71 inputFilter = filter; 72 userParm = user_parm; 73 } 74 75 76 public AceInputSocketStream(long user_parm, 78 String name, 79 Socket socket, 80 int max_msg_size) 81 throws IOException, AceException 82 { 83 this(user_parm, name, null, socket, null, max_msg_size, READ, 0, null); 84 } 85 86 public AceInputSocketStream(long user_parm, 88 String name, 89 Socket socket) 90 throws IOException, AceException 91 { 92 this(user_parm, name, null, socket, null, 0, READLINE, 0, null); 93 } 94 95 96 public AceInputSocketStream(long user_parm, 98 String name, 99 Socket socket, 100 int max_msg_size, 101 AceInputFilterInterface filter) 102 throws IOException, AceException 103 { 104 this(user_parm, name, null, socket, null, max_msg_size, CUSTOM, 0, filter); 105 } 106 107 108 public AceInputSocketStream(long user_parm, 110 String name, 111 Socket socket, 112 byte[] discriminator, 113 int max_msg_size, 114 int mode, 115 int tag_length) 116 throws IOException, AceException 117 { 118 this(user_parm, name, null, socket, discriminator, max_msg_size, mode, tag_length, null); 119 } 120 121 public AceInputSocketStream(long user_parm, 123 String name, 124 Socket socket, 125 int max_msg_size, 126 int mode, 127 int tag_length) 128 throws IOException, AceException 129 { 130 this(user_parm, name, null, socket, null, max_msg_size, mode, tag_length, null); 131 } 132 133 public AceInputSocketStream(long user_parm, 135 String name, 136 Socket socket, 137 boolean multiline) 138 throws IOException, AceException 139 { 140 this(user_parm, name, null, socket, null, 0, MULTILINE, 0, null); 141 } 142 143 public AceInputSocketStream(long user_parm, 145 String name, 146 Socket socket, 147 String http) 148 throws IOException, AceException 149 { 150 this(user_parm, name, null, socket, null, 0, HTTP, 0, null); 151 } 152 153 154 public void dispose() 155 { 156 try 157 { 158 this.interrupt(); 159 160 socket.close(); 161 socket = null; 162 } 163 catch (IOException ex) 164 { 165 System.err.println(getName() + ": AceInputSocketStream.dispose() -- " + ex.getMessage()); 166 } 167 168 flushMessage(); 169 super.dispose(); 170 } 171 172 public void run() 173 { 174 try 175 { 176 switch (operationMode) 177 { 178 case READ: 179 processRead(); 180 break; 181 182 case READLINE: 183 processReadLine(); 184 break; 185 186 case ONE_BYTE_LENGTH: 187 case TWO_BYTE_LENGTH_MSB_FIRST: 188 case TWO_BYTE_LENGTH_LSB_FIRST: 189 processReadLength(); 190 break; 191 192 case CUSTOM: 193 processReadCustom(); 194 break; 195 196 case MULTILINE: 197 processReadMultiline(); 198 break; 199 200 case HTTP: 201 processReadHTTP(); 202 break; 203 } 204 } 205 catch (IOException ex) 206 { 207 System.err.println(operationMode + " IO Exception : " + ex.getMessage()); 208 if (isInterrupted() == false) 209 { 210 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 211 null, 212 0, 213 AceInputSocketStreamMessage.ERROR_OCCURED, 214 userParm)) 215 == false) 216 { 217 System.err.println(getName() 218 + ": AceInputSocketStream.run() -- Could not send error message to the requesting thread: " 219 + getErrorMessage()); 220 } 221 } 222 } 223 } 224 225 public boolean flushMessage() 226 { 227 return parentThread.removeMessage(new AceInputSocketStreamMessage(this, null, 0, 0, userParm), 229 this); 230 231 } 233 234 public AceMessageInterface waitInputStreamMessage() 235 { 236 Thread thr = Thread.currentThread(); 237 238 if ((thr instanceof AceThread) == false) 239 { 240 writeErrorMessage("This method is not being called from an object which is a sub-class of type AceThread"); 241 return null; 242 } 243 244 AceThread cthread = (AceThread)thr; 245 246 while (true) 247 { 248 AceMessageInterface msg_received = cthread.waitMessage(); 249 if ((msg_received instanceof AceInputSocketStreamMessage) == true) 250 { 251 if (((AceInputSocketStreamMessage)msg_received).getInputSocketStream() == this) 252 { 253 return msg_received; 254 } 255 } 256 else if ((msg_received instanceof AceSignalMessage) == true) 257 { 258 return msg_received; 259 } 260 } 261 } 262 263 264 public boolean same(AceMessageInterface obj1, AceMessageInterface obj2) 266 { 267 boolean ret = false; 268 269 if (((obj1 instanceof AceInputSocketStreamMessage) == true) && 270 ((obj2 instanceof AceInputSocketStreamMessage) == true)) 271 { 272 if (((AceInputSocketStreamMessage)obj1).getInputSocketStream() == 273 ((AceInputSocketStreamMessage)obj2).getInputSocketStream()) 274 { 275 ret = true; 276 } 277 } 278 279 return ret; 280 } 281 282 public static long octetsToIntMsbFirst(byte[] buffer, int offset, int length) 284 throws NumberFormatException 285 { 286 if ((length > 8) || (length < 1)) 287 { 288 throw new NumberFormatException (); 289 } 290 291 long ret = 0L; 292 int len = length; 293 for (int i = 0; i < len; i++) 294 { 295 ret |= (((buffer[offset + i]) << ((length-1) * 8)) & 296 (0xFF << ((--length) * 8))); 297 } 298 299 return ret; 300 } 301 302 public static long octetsToIntMsbLast(byte[] buffer, int offset, int length) 303 throws NumberFormatException 304 { 305 if ((length > 8) || (length < 1)) 306 { 307 throw new NumberFormatException (); 308 } 309 310 long ret = 0L; 311 312 int len = length; 313 for (int i = len - 1; i >= 0; i--) 314 { 315 ret |= (((long)buffer[offset + i]) << ((length - 1) * 8)) & 316 (0xFF << ((--length) * 8)); 317 } 318 319 return ret; 320 } 321 322 public static void longToBytesMsbFirst(long value, byte[] buffer, int offset) 323 { 324 int shift = 7; 325 for (int i = 0; i < 8; i++) 326 { 327 buffer[offset + i] = (byte)(value >>> ((shift--) * 8)); 328 } 329 } 330 331 public static void longToBytesMsbLast(long value, byte[] buffer, int offset) 332 { 333 int shift = 7; 334 for (int i = 7; i >= 0; i--) 335 { 336 buffer[offset + i] = (byte)(value >>> ((shift--) * 8)); 337 } 338 } 339 340 public static void intToBytesMsbFirst(int value, byte[] buffer, int offset) 341 { 342 int shift = 3; 343 for (int i = 0; i < 4; i++) 344 { 345 buffer[offset + i] = (byte)(value >>> ((shift--) * 8)); 346 } 347 } 348 349 public static void intToBytesMsbLast(int value, byte[] buffer, int offset) 350 { 351 int shift = 3; 352 for (int i = 3; i >= 0; i--) 353 { 354 buffer[offset + i] = (byte)(value >>> ((shift--) * 8)); 355 } 356 } 357 358 public static void shortToBytesMsbFirst(short value, byte[] buffer, int offset) 359 { 360 int shift = 1; 361 for (int i = 0; i < 2; i++) 362 { 363 buffer[offset + i] = (byte)(value >>> ((shift--) * 8)); 364 } 365 } 366 367 public static void shortToBytesMsbLast(short value, byte[] buffer, int offset) 368 { 369 int shift = 1; 370 for (int i = 1; i >= 0; i--) 371 { 372 buffer[offset + i] = (byte)(value >>> ((shift--) * 8)); 373 } 374 } 375 376 private void processRead() 377 throws IOException 378 { 379 byte[] read_buffer = new byte[maxMsgSize]; 380 int len = 0; 381 382 while (true) 383 { 384 while (len == 0) { 386 try 387 { 388 len = inputStream.read(read_buffer, 0, maxMsgSize); 389 } 390 catch (InterruptedIOException ex) 391 { 392 if (isInterrupted() == true) 394 { 395 return; 396 } 397 else 398 { 399 continue; 400 } 401 } 402 } 403 404 if (len == -1) { 406 if (isInterrupted() == true) { 408 return; 409 } 410 411 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 412 read_buffer, 413 0, 414 AceInputSocketStreamMessage.EOF_REACHED, 415 userParm)) 416 == false) 417 { 418 System.err.println(getName() 419 + ": AceInputSocketStream.processRead() -- Could not send EOF message to the requesting thread: " 420 + getErrorMessage()); 421 } 422 return; 423 } 424 425 426 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, read_buffer, len, 428 AceInputSocketStreamMessage.READ_COMPLETED, 429 userParm)) == false) 430 { 431 System.err.println(getName() 432 + ": AceInputSocketStream.processRead() -- Could not send read completed message to the requesting thread: " 433 + getErrorMessage()); 434 } 435 } 436 } 437 438 private void processReadLine() 439 throws IOException 440 { 441 BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); 442 String line = null; 443 444 while (true) 445 { 446 while (true) 447 { 448 try 449 { 450 line = reader.readLine(); 451 break; } 453 catch (InterruptedIOException ex) 454 { 455 if (isInterrupted() == true) 456 { 457 return; 458 } 459 else 460 { 461 continue; 462 } 463 } 464 } 465 466 if (line == null) { 469 if (isInterrupted() == true) 470 { 471 return; 472 } 473 474 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 475 (String )null, 476 AceInputSocketStreamMessage.EOF_REACHED, 477 userParm)) 478 == false) 479 { 480 System.err.println(getName() 481 + ": AceInputSocketStream.processReadLine() -- Could not send EOF message to the requesting thread: " 482 + getErrorMessage()); 483 } 484 485 return; 486 } 487 else 488 { 489 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, line, 490 AceInputSocketStreamMessage.READ_COMPLETED, 491 userParm)) 492 == false) 493 { 494 System.err.println(getName() 495 + ": AceInputSocketStream.processReadLine() -- Could not send read completed message to the requesting thread: " 496 + getErrorMessage()); 497 } 498 } 499 } 500 } 501 502 private void processReadCustom() 503 throws IOException 504 { 505 byte[] read_buffer = new byte[maxMsgSize]; 506 int offset = 0; 507 508 while (true) 509 { 510 int read_length = inputFilter.numberOfBytesToRead(); 511 int saved_offset = offset; 512 513 while (read_length > 0) 514 { 515 int length_read = 0; 516 try 517 { 518 length_read = inputStream.read(read_buffer, offset, read_length); 519 } 520 catch (InterruptedIOException ex) 521 { 522 if (isInterrupted() == true) 523 { 524 return; 525 } 526 else 527 { 528 continue; 529 } 530 } 531 catch (IndexOutOfBoundsException ex) 532 { 533 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 534 read_buffer, 535 offset, 536 AceInputSocketStreamMessage.MESSAGE_OVERFLOW, 537 userParm)) 538 == false) 539 { 540 System.err.println(getName() 541 + ": AceInputSocketStream.processReadCustom() -- Could not send overflow message to the requesting thread: " 542 + getErrorMessage()); 543 } 544 return; 545 } 546 547 if (length_read == -1) { 549 if (isInterrupted() == true) 550 { 551 return; 552 } 553 554 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 555 read_buffer, 556 offset, 557 AceInputSocketStreamMessage.EOF_REACHED, 558 userParm)) 559 == false) 560 { 561 System.err.println(getName() 562 + ": AceInputSocketStream.processReadCustom() -- Could not send EOF message to the requesting thread: " 563 + getErrorMessage()); 564 } 565 return; 566 } 567 568 read_length -= length_read; 569 offset += length_read; 570 } 571 572 int ret = inputFilter.processMessage(read_buffer, saved_offset, 574 offset - saved_offset); 575 switch (ret) 576 { 577 case AceInputFilterInterface.CONTINUE_RECEIVING: 578 break; 579 580 case AceInputFilterInterface.SEND_MESSAGE: 581 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 582 read_buffer, 583 offset, 584 AceInputSocketStreamMessage.READ_COMPLETED, 585 userParm)) 586 == false) 587 { 588 System.err.println(getName() 589 + ": AceInputSocketStream.processReadCustom() -- Could not send read completed message to the requesting thread: " 590 + getErrorMessage()); 591 } 592 593 offset = 0; 594 read_buffer = new byte[maxMsgSize]; 595 break; 596 597 case AceInputFilterInterface.RESET_BUFFER: 598 offset = 0; 599 read_buffer = new byte[maxMsgSize]; 600 break; 601 602 default: if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 605 read_buffer, 606 offset, 607 AceInputSocketStreamMessage.ERROR_OCCURED, 608 userParm)) 609 == false) 610 { 611 System.err.println(getName() 612 + ": AceInputSocketStream.processReadCustom() -- Could not send error message to the requesting thread: " 613 + getErrorMessage()); 614 } 615 616 offset = 0; 617 read_buffer = new byte[maxMsgSize]; 618 break; 619 } 620 } 621 } 622 623 private void processReadLength() 624 throws IOException 625 { 626 int state; 627 int length_to_read = 0; 628 if (protocolDiscriminator == null) 629 { 630 if (tagLength == 0) 631 { 632 state = STATE_COLLECTING_LENGTH; 633 634 switch (operationMode) 635 { 636 case ONE_BYTE_LENGTH: 637 length_to_read = 1; 638 break; 639 640 case TWO_BYTE_LENGTH_MSB_FIRST: 641 case TWO_BYTE_LENGTH_LSB_FIRST: 642 length_to_read = 2; 643 break; 644 } 645 } 646 else 647 { 648 state = STATE_COLLECTING_HEADER; 649 length_to_read = tagLength; 650 } 651 } 652 else 653 { 654 state = STATE_COLLECTING_DISCRIMINATOR; 655 length_to_read = protocolDiscriminator.length; 656 } 657 658 int init_state = state; 660 int init_length_to_read = length_to_read; 661 662 int offset = 0; 663 byte[] read_buffer = new byte[maxMsgSize]; 664 665 while (true) 666 { 667 int segment_offset = offset; 668 669 while (length_to_read > 0) 670 { 671 int length_read = 0; 672 try 673 { 674 length_read = inputStream.read(read_buffer, offset, length_to_read); 675 } 676 catch (InterruptedIOException ex) 677 { 678 if (isInterrupted() == true) 679 { 680 return; 681 } 682 else 683 { 684 continue; 685 } 686 } 687 688 if (length_read == -1) { 690 if (isInterrupted() == true) 691 { 692 return; 693 } 694 695 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 696 read_buffer, 697 offset, 698 AceInputSocketStreamMessage.EOF_REACHED, 699 userParm)) 700 == false) 701 { 702 System.err.println(getName() 703 + ": AceInputSocketStream.processReadLength() -- Could not send EOF message to the requesting thread: " 704 + getErrorMessage()); 705 } 706 707 return; 708 } 709 710 offset += length_to_read; 711 length_to_read -= length_read; 712 } 713 714 switch (state) 715 { 716 case STATE_COLLECTING_DISCRIMINATOR: 717 int i; 719 for (i = 0; i < protocolDiscriminator.length; i++) 720 { 721 if (read_buffer[segment_offset + i] != protocolDiscriminator[i]) 722 { 723 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 724 read_buffer, 725 offset, 726 AceInputSocketStreamMessage.DISCRIMINATOR_MISMATCH, 727 userParm)) 728 == false) 729 { 730 System.err.println(getName() 731 + ": AceInputSocketStream.processReadLength() -- Could not send discriminator mismatch message to the requesting thread: " 732 + getErrorMessage()); 733 } 734 return; 735 } 736 } 737 738 if (i == protocolDiscriminator.length) { 740 if (tagLength == 0) 741 { 742 state = STATE_COLLECTING_LENGTH; 743 744 switch (operationMode) 745 { 746 case ONE_BYTE_LENGTH: 747 length_to_read = 1; 748 break; 749 750 case TWO_BYTE_LENGTH_MSB_FIRST: 751 case TWO_BYTE_LENGTH_LSB_FIRST: 752 length_to_read = 2; 753 break; 754 } 755 } 756 else 757 { 758 state = STATE_COLLECTING_HEADER; 759 length_to_read = tagLength; 760 } 761 } 762 break; 763 764 case STATE_COLLECTING_HEADER: 765 767 state = STATE_COLLECTING_LENGTH; 768 769 switch (operationMode) 770 { 771 case ONE_BYTE_LENGTH: 772 length_to_read = 1; 773 break; 774 775 case TWO_BYTE_LENGTH_MSB_FIRST: 776 case TWO_BYTE_LENGTH_LSB_FIRST: 777 length_to_read = 2; 778 break; 779 } 780 break; 781 782 783 case STATE_COLLECTING_LENGTH: 784 switch (operationMode) 786 { 787 case ONE_BYTE_LENGTH: 788 length_to_read = read_buffer[segment_offset]; 789 break; 790 791 case TWO_BYTE_LENGTH_MSB_FIRST: 792 length_to_read = (int)octetsToIntMsbFirst(read_buffer, segment_offset, 2); 793 break; 794 795 case TWO_BYTE_LENGTH_LSB_FIRST: 796 length_to_read = (int)octetsToIntMsbLast(read_buffer, segment_offset, 2); 797 break; 798 } 799 800 if (offset + length_to_read > maxMsgSize) 801 { 802 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 803 read_buffer, 804 offset, 805 AceInputSocketStreamMessage.MESSAGE_OVERFLOW, 806 userParm)) 807 == false) 808 { 809 System.err.println(getName() 810 + ": AceInputSocketStream.processReadLength() -- Could not send overflow message to the requesting thread: " 811 + getErrorMessage()); 812 } 813 return; 814 } 815 else 816 { 817 state = STATE_COLLECTING_DATA; 818 } 819 break; 820 821 case STATE_COLLECTING_DATA: 822 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 823 read_buffer, 824 offset, 825 AceInputSocketStreamMessage.READ_COMPLETED, 826 userParm)) 827 == false) 828 { 829 System.err.println(getName() 830 + ": AceInputSocketStream.processReadLength() -- Could not send read completed message to the requesting thread: " 831 + getErrorMessage()); 832 } 833 834 state = init_state; 836 length_to_read = init_length_to_read; 837 offset = 0; 838 read_buffer = new byte[maxMsgSize]; 839 break; 840 } 841 842 } 843 } 844 845 private void processReadMultiline() 846 throws IOException 847 { 848 bReader = new BufferedReader(new InputStreamReader(inputStream)); 849 850 try 851 { 852 do 853 { 854 StringBuffer strbuf = new StringBuffer (); 855 while (true) 856 { 857 String line = null; 858 859 try 860 { 861 line = bReader.readLine(); 862 } 863 catch (InterruptedIOException ex) 864 { 865 if (isInterrupted() == true) 866 { 867 return; 868 } 869 else 870 { 871 continue; 872 } 873 } 874 875 if (line == null) { 877 if (isInterrupted() == true) 878 { 879 return; 880 } 881 882 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 883 strbuf.toString(), 884 AceInputSocketStreamMessage.EOF_REACHED, 885 userParm)) 886 == false) 887 { 888 System.err.println(getName() 889 + ": AceInputSocketStream.processReadMultiline() -- Could not send EOF message to the requesting thread: " 890 + getErrorMessage()); 891 } 892 893 return; 894 } 895 896 if (line.startsWith(".") == true) 897 { 898 break; 899 } 900 else if (line.startsWith(" .") == true) 901 { 902 strbuf.append(line.substring(1) + '\n'); 903 } 904 else 905 { 906 strbuf.append(line + '\n'); 907 } 908 } 909 910 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 911 strbuf.toString(), 912 AceInputSocketStreamMessage.READ_COMPLETED, 913 userParm)) == false) 914 { 915 System.err.println(getName() 916 + ": AceInputSocketStream.processReadMultiline() -- Could not send read completed message to the requesting thread: " 917 + getErrorMessage()); 918 } 919 920 } 921 while (true); 922 } 923 catch (IOException ex) 924 { 925 if (isInterrupted() == false) 926 { 927 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 928 false, 929 null, 930 null, 931 null, 932 null, 933 null, 934 0, 935 AceInputSocketStreamMessage.EOF_REACHED, 936 userParm)) 937 == false) 938 { 939 System.err.println(getName() 940 + ": AceInputSocketStream.processReadMultiline() -- Could not send EOF message to the requesting thread: " 941 + getErrorMessage()); 942 } 943 } 944 return; 945 } 946 } 947 948 949 private void processReadHTTP() 950 throws IOException 951 { 952 BufferedReader bReader = new BufferedReader(new InputStreamReader(inputStream)); 953 do 955 { 956 boolean to_cont = true; 957 958 boolean simple_req = false; 959 String method = null; 960 String url = null; 961 Vector header_fields = new Vector(); 962 int content_length = -1; 963 char[] body = null; 964 String line = null; 965 String http_version = null; 966 boolean req_message; 967 int version = 0; 968 String http_status = null; 969 String http_reason = ""; 970 971 975 while (true) 976 { 977 to_cont = true; 978 979 try 980 { 981 line = bReader.readLine(); 982 } 983 catch (InterruptedIOException ex) 984 { 985 if (isInterrupted() == true) 987 { 988 return; 990 } 991 else 992 { 993 continue; 994 } 995 } 996 997 if (isInterrupted() == true) 998 { 999 return; 1001 } 1002 1003 if (line == null) { 1005 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1007 false, 1008 null, 1009 null, 1010 null, 1011 null, 1012 null, 1013 0, 1014 AceInputSocketStreamMessage.EOF_REACHED, 1015 userParm)) 1016 == false) 1017 { 1018 System.err.println(getName() 1019 + ": AceInputSocketStream.processReadHTTP() -- Could not send EOF message to the requesting thread: " 1020 + getErrorMessage()); 1021 } 1022 1023 return; 1024 } 1025 1026 if (line.length() > 0) { 1029 break; 1030 } 1031 } 1032 1033 1034 StringTokenizer strtok = new StringTokenizer(line, " "); 1035 int num_fields = strtok.countTokens(); 1036 1037 if (num_fields < 2) { 1039 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1041 false, 1042 null, 1043 null, 1044 null, 1045 null, 1046 null, 1047 0, 1048 AceInputSocketStreamMessage.ERROR_OCCURED, 1049 userParm)) 1050 == false) 1051 { 1052 System.err.println(getName() 1053 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1054 + getErrorMessage()); 1055 } 1056 1057 return; 1058 } 1059 1060 1061 String first_req_field = strtok.nextToken(); 1062 1063 if (first_req_field.toUpperCase().startsWith("HTTP") == true) 1064 { 1065 req_message = false; 1066 } 1067 else 1068 { 1069 req_message = true; 1070 } 1071 1072 1073 if (req_message == true) 1074 { 1075 if (num_fields == 2) 1076 { 1077 method = first_req_field; 1078 if (method.toUpperCase().equals("GET") == true) { 1080 simple_req = true; 1081 url = strtok.nextToken(); 1082 1083 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1085 true, 1086 null, 1087 url, 1088 null, 1089 null, 1090 null, 1091 0, 1092 AceInputSocketStreamMessage.READ_COMPLETED, 1093 userParm)) 1094 == false) 1095 { 1096 System.err.println(getName() 1097 + ": AceInputSocketStream.processReadHTTP() -- Could not send read completed message to the requesting thread: " 1098 + getErrorMessage()); 1099 } 1100 1101 to_cont = false; 1102 } 1103 else { 1105 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1107 false, 1108 null, 1109 null, 1110 null, 1111 null, 1112 null, 1113 0, 1114 AceInputSocketStreamMessage.ERROR_OCCURED, 1115 userParm)) 1116 == false) 1117 { 1118 System.err.println(getName() 1119 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1120 + getErrorMessage()); 1121 } 1122 1123 return; 1124 } 1125 } 1126 else { 1128 method = first_req_field; 1129 url = strtok.nextToken(); 1130 String version_s = strtok.nextToken(); 1131 1132 StringTokenizer tok = new StringTokenizer(version_s, "/"); 1134 if (tok.countTokens() < 2) 1135 { 1136 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1138 false, 1139 null, 1140 null, 1141 null, 1142 null, 1143 null, 1144 0, 1145 AceInputSocketStreamMessage.ERROR_OCCURED, 1146 userParm)) 1147 == false) 1148 { 1149 System.err.println(getName() 1150 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1151 + getErrorMessage()); 1152 } 1153 1154 return; 1155 } 1156 1157 String protocol = tok.nextToken(); 1158 if (protocol.toUpperCase().equals("HTTP") == false) 1159 { 1160 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1162 false, 1163 null, 1164 null, 1165 null, 1166 null, 1167 null, 1168 0, 1169 AceInputSocketStreamMessage.ERROR_OCCURED, 1170 userParm)) 1171 == false) 1172 { 1173 System.err.println(getName() 1174 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1175 + getErrorMessage()); 1176 } 1177 return; 1178 } 1179 1180 http_version = tok.nextToken(); 1181 } 1182 } else { 1185 String version_s = first_req_field; 1186 1187 StringTokenizer tok = new StringTokenizer(version_s, "/"); 1189 if (tok.countTokens() < 2) 1190 { 1191 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1193 false, 1194 null, 1195 null, 1196 null, 1197 null, 1198 null, 1199 0, 1200 AceInputSocketStreamMessage.ERROR_OCCURED, 1201 userParm)) 1202 == false) 1203 { 1204 System.err.println(getName() 1205 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1206 + getErrorMessage()); 1207 } 1208 1209 return; 1210 } 1211 1212 String protocol = tok.nextToken(); 1213 if (protocol.toUpperCase().equals("HTTP") == false) 1214 { 1215 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1217 false, 1218 null, 1219 null, 1220 null, 1221 null, 1222 null, 1223 0, 1224 AceInputSocketStreamMessage.ERROR_OCCURED, 1225 userParm)) 1226 == false) 1227 { 1228 System.err.println(getName() 1229 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1230 + getErrorMessage()); 1231 } 1232 return; 1233 } 1234 1235 http_version = tok.nextToken(); 1236 1237 http_status = strtok.nextToken(); 1239 1240 try 1242 { 1243 version = Integer.parseInt(http_status); 1244 } 1245 catch (NumberFormatException ex) 1246 { 1247 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1249 false, 1250 null, 1251 null, 1252 null, 1253 null, 1254 null, 1255 0, 1256 AceInputSocketStreamMessage.ERROR_OCCURED, 1257 userParm)) 1258 == false) 1259 { 1260 System.err.println(getName() 1261 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1262 + getErrorMessage()); 1263 } 1264 return; 1265 } 1266 1267 if (num_fields > 2) 1269 { 1270 StringBuffer strbuf = new StringBuffer (); 1271 1272 for (int i = 2; i < num_fields; i++) 1273 { 1274 strbuf.append(strtok.nextToken()); 1275 1276 if (i < num_fields - 1) 1277 { 1278 strbuf.append(" "); 1279 } 1280 } 1281 http_reason = strbuf.toString(); 1282 } 1283 1284 } 1286 1287 if (to_cont == true) 1291 { 1292 StringBuffer header_line = new StringBuffer (); 1293 1294 while (true) 1295 { 1296 try 1297 { 1298 line = bReader.readLine(); 1299 } 1300 catch (InterruptedIOException ex) 1301 { 1302 if (isInterrupted() == true) 1304 { 1305 return; 1307 } 1308 else 1309 { 1310 continue; 1311 } 1312 } 1313 1314 if (isInterrupted() == true) 1315 { 1316 return; 1318 } 1319 1320 if (line == null) { 1322 AceInputSocketStreamMessage ais = null; 1324 1325 if (req_message == true) 1326 { 1327 ais = new AceInputSocketStreamMessage(this, 1328 false, 1329 method, 1330 url, 1331 http_version, 1332 header_fields, 1333 null, 1334 0, 1335 AceInputSocketStreamMessage.EOF_REACHED, 1336 userParm); 1337 } 1338 else 1339 { 1340 ais = new AceInputSocketStreamMessage(this, 1341 http_version, 1342 http_status, 1343 http_reason, 1344 header_fields, 1345 null, 1346 0, 1347 AceInputSocketStreamMessage.EOF_REACHED, 1348 userParm); 1349 } 1350 1351 1352 if (parentThread.sendMessage(ais) == false) 1353 { 1354 System.err.println(getName() 1355 + ": AceInputSocketStream.processReadHTTP() -- Could not send read completed message to the requesting thread: " 1356 + getErrorMessage()); 1357 } 1358 1359 return; 1360 } 1361 1362 if (line.length() <= 0) { 1365 if (header_line.length() > 0) 1367 { 1368 String header = header_line.toString(); 1370 1371 StringTokenizer tok = new StringTokenizer(header, ":"); 1372 1373 if (tok.countTokens() < 2) 1374 { 1375 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1377 false, 1378 null, 1379 null, 1380 null, 1381 null, 1382 null, 1383 0, 1384 AceInputSocketStreamMessage.ERROR_OCCURED, 1385 userParm)) 1386 == false) 1387 { 1388 System.err.println(getName() 1389 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1390 + getErrorMessage()); 1391 } 1392 1393 1394 return; 1395 } 1396 1397 String key = tok.nextToken().trim(); 1398 StringBuffer value_buf = new StringBuffer (); 1399 boolean ft = true; 1400 while (tok.hasMoreTokens() == true) 1401 { 1402 if (ft == false) 1403 { 1404 value_buf.append(":"); 1405 } 1406 value_buf.append(tok.nextToken()); 1407 ft = false; 1408 } 1409 String value = value_buf.toString().trim(); 1410 1411 if (key.toUpperCase().equals("CONTENT-LENGTH") == true) 1412 { 1413 try 1414 { 1415 content_length = Integer.parseInt(value); 1416 } 1417 catch (NumberFormatException ex) 1418 { 1419 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1421 false, 1422 null, 1423 null, 1424 null, 1425 null, 1426 null, 1427 0, 1428 AceInputSocketStreamMessage.ERROR_OCCURED, 1429 userParm)) 1430 == false) 1431 { 1432 System.err.println(getName() 1433 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1434 + getErrorMessage()); 1435 } 1436 1437 return; 1438 } 1439 } 1440 else 1441 { 1442 header_fields.addElement(new AceHTTPHeader(key, value)); 1444 } 1445 } 1446 1447 break; 1449 } 1450 else if ((line.startsWith(" ") == true) || 1451 (line.startsWith("\t") == true)) { 1453 header_line.append(line); 1454 } 1455 else { 1457 if (header_line.length() > 0) 1459 { 1460 String header = header_line.toString(); 1462 1463 StringTokenizer tok = new StringTokenizer(header, ":"); 1464 1465 if (tok.countTokens() < 2) 1466 { 1467 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1469 false, 1470 null, 1471 null, 1472 null, 1473 null, 1474 null, 1475 0, 1476 AceInputSocketStreamMessage.ERROR_OCCURED, 1477 userParm)) 1478 == false) 1479 { 1480 System.err.println(getName() 1481 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1482 + getErrorMessage()); 1483 } 1484 1485 return; 1486 } 1487 1488 String key = tok.nextToken().trim(); 1489 1490 StringBuffer value_buf = new StringBuffer (); 1491 boolean ft = true; 1492 while (tok.hasMoreTokens() == true) 1493 { 1494 if (ft == false) 1495 { 1496 value_buf.append(":"); 1497 } 1498 value_buf.append(tok.nextToken()); 1499 ft = false; 1500 } 1501 String value = value_buf.toString().trim(); 1502 1503 if (key.toUpperCase().equals("CONTENT-LENGTH") == true) 1504 { 1505 try 1506 { 1507 content_length = Integer.parseInt(value); 1508 } 1509 catch (NumberFormatException ex) 1510 { 1511 if (parentThread.sendMessage(new AceInputSocketStreamMessage(this, 1513 false, 1514 null, 1515 null, 1516 null, 1517 null, 1518 null, 1519 0, 1520 AceInputSocketStreamMessage.ERROR_OCCURED, 1521 userParm)) 1522 == false) 1523 { 1524 System.err.println(getName() 1525 + ": AceInputSocketStream.processReadHTTP() -- Could not send error message to the requesting thread: " 1526 + getErrorMessage()); 1527 } 1528 1529 return; 1530 } 1531 } 1532 else 1533 { 1534 header_fields.addElement(new AceHTTPHeader(key, value)); 1536 } 1537 } 1538 1539 header_line = new StringBuffer (line); 1541 } 1542 } 1543 1544 1545 if (content_length == -1) { 1547 if (req_message == true) { 1549 content_length = 0; 1550 } 1551 else { 1553 if ((version < 199) || 1555 (version == 204) || 1556 (version == 304)) 1557 { 1558 content_length = 0; 1559 } 1560 } 1561 } 1562 1563 1564 if (content_length == 0) 1565 { 1566 AceInputSocketStreamMessage ais = null; 1568 1569 if (req_message == true) 1570 { 1571 ais = new AceInputSocketStreamMessage(this, 1572 false, 1573 method, 1574 url, 1575 http_version, 1576 header_fields, 1577 null, 1578 0, 1579 AceInputSocketStreamMessage.READ_COMPLETED, 1580 userParm); 1581 } 1582 else 1583 { 1584 ais = new AceInputSocketStreamMessage(this, 1585 http_version, 1586 http_status, 1587 http_reason, 1588 header_fields, 1589 null, 1590 0, 1591 AceInputSocketStreamMessage.READ_COMPLETED, 1592 userParm); 1593 } 1594 1595 if (parentThread.sendMessage(ais) == false) 1596 { 1597 System.err.println(getName() 1598 + ": AceInputSocketStream.processReadHTTP() -- Could not send read completed message to the requesting thread: " 1599 + getErrorMessage()); 1600 } 1601 1602 to_cont = false; 1603 } 1604 } 1605 1606 1607 if (to_cont == true) 1611 { 1612 int length_read = 0; 1613 StringBuffer body_buffer = new StringBuffer (); 1614 1615 int num_to_receive = 0; 1616 if (content_length >= 0) 1617 { 1618 body = new char[content_length]; 1619 num_to_receive = content_length; 1620 } 1621 else 1622 { 1623 body = new char[1000]; 1624 num_to_receive = body.length; 1625 } 1626 1627 while (true) 1628 { 1629 int len_rcvd = 0; 1630 1631 try 1632 { 1633 len_rcvd = bReader.read(body, 0, num_to_receive); 1635 } 1639 catch (InterruptedIOException ex) 1640 { 1641 if (isInterrupted() == true) 1643 { 1644 return; 1646 } 1647 else 1648 { 1649 continue; 1650 } 1651 } 1652 1653 if (isInterrupted() == true) 1654 { 1655 return; 1657 } 1658 1659 if (len_rcvd == -1) { 1661 AceInputSocketStreamMessage ais = null; 1663 if (req_message == true) 1664 { 1665 ais = new AceInputSocketStreamMessage(this, 1666 false, 1667 method, 1668 url, 1669 http_version, 1670 header_fields, 1671 body_buffer.toString().toCharArray(), 1672 length_read, 1673 AceInputSocketStreamMessage.EOF_REACHED, 1674 userParm); 1675 } 1676 else 1677 { 1678 ais = new AceInputSocketStreamMessage(this, 1679 http_version, 1680 http_status, 1681 http_reason, 1682 header_fields, 1683 body_buffer.toString().toCharArray(), 1684 length_read, 1685 AceInputSocketStreamMessage.EOF_REACHED, 1686 userParm); 1687 } 1688 1689 if (parentThread.sendMessage(ais) == false) 1690 { 1691 System.err.println(getName() 1692 + ": AceInputSocketStream.processReadHTTP() -- Could not send EOF message to the requesting thread: " 1693 + getErrorMessage()); 1694 } 1695 1696 return; 1697 } 1698 1699 body_buffer.append(body, 0, len_rcvd); 1701 1702 length_read += len_rcvd; 1703 1704 if (content_length == -1) { 1706 continue; } 1708 1709 num_to_receive -= len_rcvd; 1710 1711 if (length_read >= content_length) 1712 { 1713 AceInputSocketStreamMessage ais = null; 1715 1716 1719 if (req_message == true) 1720 { 1721 ais = new AceInputSocketStreamMessage(this, 1722 false, 1723 method, 1724 url, 1725 http_version, 1726 header_fields, 1727 body_buffer.toString().toCharArray(), 1728 length_read, 1729 AceInputSocketStreamMessage.READ_COMPLETED, 1730 userParm); 1731 } 1732 else 1733 { 1734 ais = new AceInputSocketStreamMessage(this, 1735 http_version, 1736 http_status, 1737 http_reason, 1738 header_fields, 1739 body_buffer.toString().toCharArray(), 1740 length_read, 1741 AceInputSocketStreamMessage.READ_COMPLETED, 1742 userParm); 1743 } 1744 if (parentThread.sendMessage(ais) == false) 1745 { 1746 System.err.println(getName() 1747 + ": AceInputSocketStream.processReadHTTP() -- Could not send read completed message to the requesting thread: " 1748 + getErrorMessage()); 1749 } 1750 1751 break; 1752 } 1753 } 1754 } 1755 1756 } 1757 while (true); 1758 } 1759 1760 private static final int STATE_COLLECTING_DISCRIMINATOR = 0; 1761 private static final int STATE_COLLECTING_HEADER = 1; 1762 private static final int STATE_COLLECTING_LENGTH = 2; 1763 private static final int STATE_COLLECTING_DATA = 3; 1764 1765 private AceThread parentThread; 1766 private Socket socket; 1767 private InputStream inputStream = null; 1768 private BufferedReader bReader = null; 1769 1770 private byte[] protocolDiscriminator; 1771 private int maxMsgSize; 1772 private int operationMode; 1773 private int tagLength; 1774 private AceInputFilterInterface inputFilter; 1775 private long userParm; 1776 private int readOffset = 0; 1777 1778 public static void main(String [] args) 1780 { 1781 class MyParentClass extends AceThread 1782 { 1783 private boolean multiline; 1784 1785 public MyParentClass(boolean multiline) 1786 throws IOException 1787 { 1788 super(); 1789 this.multiline = multiline; 1790 } 1791 1792 public void run() 1793 { 1794 ServerSocket ssock = null; 1795 Socket sock = null; 1796 1797 try 1798 { 1799 ssock = new ServerSocket(5000); 1800 System.out.println("Waiting for connection..."); 1801 sock = ssock.accept(); 1802 System.out.println("Connected"); 1803 1804 AceInputSocketStream ais; 1805 if (multiline == true) 1806 { 1807 ais = new AceInputSocketStream(1L, "AceInputSocketStream", 1808 sock, 1809 multiline); 1810 } 1811 else 1812 { 1813 ais = new AceInputSocketStream(1L, "AceInputSocketStream", 1814 sock); 1815 } 1816 1817 ais.start(); 1818 1819 while (true) { 1821 AceMessageInterface msg = ais.waitInputStreamMessage(); 1822 1823 if ((msg instanceof AceInputSocketStreamMessage) == true) 1824 { 1825 System.out.print("A message is received with status = "); 1826 AceInputSocketStreamMessage is_msg = (AceInputSocketStreamMessage)msg; 1827 System.out.println(is_msg.getStatus()); 1828 1829 if (is_msg.getStatus() == AceInputSocketStreamMessage.READ_COMPLETED) 1830 { 1831 System.out.println("Received message with user parm : " 1832 + is_msg.getUserParm() 1833 + '\n' 1834 + is_msg.getLines()); 1835 } 1836 else 1837 { 1838 break; 1839 } 1840 } 1841 else if ((msg instanceof AceSignalMessage) == true) 1842 { 1843 AceSignalMessage signal = (AceSignalMessage)msg; 1844 1845 System.out.println("Received signal : " 1846 + signal.getSignalId()); 1847 break; 1848 } 1849 else { 1851 System.err.println("Unexpected message : " 1852 + msg.messageType() 1853 + " received"); 1854 break; 1855 } 1856 } 1857 1858 sock.close(); 1859 ssock.close(); 1860 1861 } 1862 catch (IOException ex1) 1863 { 1864 try 1865 { 1866 if (sock != null) sock.close(); 1867 if (ssock != null) ssock.close(); 1868 1869 System.err.println("IOException : " + ex1.getMessage()); 1870 return; 1871 } 1872 catch (IOException ex4) 1873 { 1874 System.err.println("IOException : " + ex4.getMessage()); 1875 return; 1876 } 1877 } 1878 catch (AceException ex2) 1879 { 1880 try 1881 { 1882 if (sock != null) sock.close(); 1883 if (ssock != null) ssock.close(); 1884 1885 System.err.println("AceException : " + ex2.getMessage()); 1886 return; 1887 } 1888 catch (IOException ex4) 1889 { 1890 System.err.println("IOException : " + ex4.getMessage()); 1891 return; 1892 } 1893 } 1894 } 1895 } 1896 1897 1898 try 1899 { 1900 String dflt = new String ("readline"); 1901 1902 if (args.length > 0) 1903 { 1904 dflt = args[0]; 1905 } 1906 1907 1908 AceThread pobj = null; 1909 if (dflt.equals("readline") == true) 1910 { 1911 pobj = new MyParentClass(false); 1912 } 1913 else if (dflt.equals("multiline") == true) 1914 { 1915 pobj = new MyParentClass(true); 1916 } 1917 else 1918 { 1919 System.err.println("Unknown option : " + dflt); 1920 System.exit(1); 1921 } 1922 1923 pobj.start(); 1924 pobj.join(); 1925 System.exit(0); 1926 } 1927 catch (IOException ex1) 1928 { 1929 System.err.println("IOException in main " + ex1.getMessage()); 1930 System.exit(0); 1931 } 1932 catch (InterruptedException ex2) 1933 { 1934 System.err.println("InterruptedException in main " + ex2.getMessage()); 1935 System.exit(1); 1936 } 1937 1938 } 1939 1940 1941} 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 | Popular Tags |