1 24 package org.objectweb.joram.client.jms; 25 26 import java.io.*; 27 28 import javax.jms.JMSException ; 29 import javax.jms.MessageFormatException ; 30 import javax.jms.MessageNotWriteableException ; 31 import javax.jms.MessageNotReadableException ; 32 import javax.jms.MessageEOFException ; 33 34 37 public final class StreamMessage extends Message implements javax.jms.StreamMessage { 38 39 private transient ByteArrayOutputStream outputBuffer = null; 40 41 private transient DataOutputStream outputStream = null; 42 43 private transient DataInputStream inputStream = null; 44 45 46 private transient boolean prepared = false; 47 48 private transient int available = 0; 49 private transient boolean firstTimeBytesRead = true; 50 51 private static final int SHORT = 1; 52 private static final int CHAR = 2; 53 private static final int INT = 3; 54 private static final int LONG = 4; 55 private static final int FLOAT = 5; 56 private static final int DOUBLE = 6; 57 private static final int BOOLEAN = 7; 58 private static final int STRING = 8; 59 private static final int BYTE = 9; 60 private static final int BYTES = 10; 61 private static final int NULL = 11; 62 63 69 StreamMessage() throws JMSException { 70 super(); 71 momMsg.type = momMsg.STREAM; 72 73 outputBuffer = new ByteArrayOutputStream(); 74 outputStream = new DataOutputStream(outputBuffer); 75 available = 0; 76 firstTimeBytesRead = true; 77 } 78 79 89 StreamMessage(Session session, 90 org.objectweb.joram.shared.messages.Message momMsg) 91 throws JMSException { 92 super(session, momMsg); 93 94 try { 95 inputStream = new DataInputStream(new ByteArrayInputStream(momMsg.body)); 96 } catch (Exception exc) { 97 JMSException jE = 98 new JMSException ("Error while creating the stream facility."); 99 jE.setLinkedException(exc); 100 throw jE; 101 } 102 available = 0; 103 firstTimeBytesRead = true; 104 } 105 106 112 public void clearBody() throws JMSException { 113 try { 114 if (! RObody) { 115 outputStream.close(); 116 outputBuffer.close(); 117 } else { 118 inputStream.close(); 119 } 120 121 outputBuffer = new ByteArrayOutputStream(); 122 outputStream = new DataOutputStream(outputBuffer); 123 124 super.clearBody(); 125 prepared = false; 126 } catch (IOException ioE) { 127 JMSException jE = new JMSException ("Error while closing the stream" 128 + " facilities."); 129 jE.setLinkedException(ioE); 130 throw jE; 131 } 132 } 133 134 141 private void prepareWrite() throws JMSException 142 { 143 if (RObody) 144 throw new MessageNotWriteableException ("Can't write a value as the" 145 + " message body is read-only."); 146 if (prepared) { 147 prepared = false; 148 outputBuffer = new ByteArrayOutputStream(); 149 outputStream = new DataOutputStream(outputBuffer); 150 } 151 } 152 153 159 public void writeBoolean(boolean value) throws JMSException { 160 prepareWrite(); 161 162 try { 163 outputStream.writeByte(BOOLEAN); 164 outputStream.writeBoolean(value); 165 } catch (IOException ioE) { 166 JMSException jE = new JMSException ("Error while writing the value."); 167 jE.setLinkedException(ioE); 168 throw jE; 169 } 170 } 171 172 178 public void writeByte(byte value) throws JMSException { 179 prepareWrite(); 180 181 try { 182 outputStream.writeByte(BYTE); 183 outputStream.writeByte((int) value); 184 } catch (IOException ioE) { 185 JMSException jE = new JMSException ("Error while writing the value."); 186 jE.setLinkedException(ioE); 187 throw jE; 188 } 189 } 190 191 197 public void writeBytes(byte[] value) throws JMSException { 198 writeBytes(value, 0, value.length); 199 } 200 201 207 public void writeBytes(byte[] value, int offset, int length) throws JMSException { 208 prepareWrite(); 209 210 try { 211 outputStream.writeByte(BYTES); 212 if (value == null) { 213 outputStream.writeInt(-1); 214 return; 215 } else 216 outputStream.writeInt(length); 217 outputStream.write(value, offset, length); 218 } catch (IOException ioE) { 219 JMSException jE = new JMSException ("Error while writing the value."); 220 jE.setLinkedException(ioE); 221 throw jE; 222 } 223 } 224 225 231 public void writeChar(char value) throws JMSException { 232 prepareWrite(); 233 234 try { 235 outputStream.writeByte(CHAR); 236 outputStream.writeChar((int) value); 237 } catch (IOException ioE) { 238 JMSException jE = new JMSException ("Error while writing the value."); 239 jE.setLinkedException(ioE); 240 throw jE; 241 } 242 } 243 244 250 public void writeDouble(double value) throws JMSException { 251 prepareWrite(); 252 253 try { 254 outputStream.writeByte(DOUBLE); 255 outputStream.writeDouble(value); 256 } catch (IOException ioE) { 257 JMSException jE = new JMSException ("Error while writing the value."); 258 jE.setLinkedException(ioE); 259 throw jE; 260 } 261 } 262 263 269 public void writeFloat(float value) throws JMSException { 270 prepareWrite(); 271 272 try { 273 outputStream.writeByte(FLOAT); 274 outputStream.writeFloat(value); 275 } catch (IOException ioE) { 276 JMSException jE = new JMSException ("Error while writing the value."); 277 jE.setLinkedException(ioE); 278 throw jE; 279 } 280 } 281 282 288 public void writeInt(int value) throws JMSException { 289 prepareWrite(); 290 291 try { 292 outputStream.writeByte(INT); 293 outputStream.writeInt(value); 294 } catch (IOException ioE) { 295 JMSException jE = new JMSException ("Error while writing the value."); 296 jE.setLinkedException(ioE); 297 throw jE; 298 } 299 } 300 301 307 public void writeLong(long value) throws JMSException { 308 prepareWrite(); 309 310 try { 311 outputStream.writeByte(LONG); 312 outputStream.writeLong(value); 313 } catch (IOException ioE) { 314 JMSException jE = new JMSException ("Error while writing the value."); 315 jE.setLinkedException(ioE); 316 throw jE; 317 } 318 } 319 320 326 public void writeShort(short value) throws JMSException { 327 prepareWrite(); 328 329 try { 330 outputStream.writeByte(SHORT); 331 outputStream.writeShort((int) value); 332 } catch (IOException ioE) { 333 JMSException jE = new JMSException ("Error while writing the value."); 334 jE.setLinkedException(ioE); 335 throw jE; 336 } 337 } 338 339 345 public void writeString(String value) throws JMSException { 346 prepareWrite(); 347 348 try { 349 if (value == null) 350 outputStream.writeByte(NULL); 351 else { 352 outputStream.writeByte(STRING); 353 outputStream.writeUTF(value); 354 } 355 } catch (IOException ioE) { 356 JMSException jE = new JMSException ("Error while writing the value."); 357 jE.setLinkedException(ioE); 358 throw jE; 359 } 360 } 361 362 369 public void writeObject(Object value) throws JMSException { 370 prepareWrite(); 371 372 if (value == null) { 373 try { 374 outputStream.writeByte(NULL); 375 } catch (IOException ioE) { 376 JMSException jE = new JMSException ("Error while writing the value."); 377 jE.setLinkedException(ioE); 378 throw jE; 379 } 380 } else if (value instanceof Boolean ) { 381 writeBoolean(((Boolean ) value).booleanValue()); 382 } else if (value instanceof Character ) { 383 writeChar(((Character ) value).charValue()); 384 } else if (value instanceof Byte ) { 385 writeByte(((Byte ) value).byteValue()); 386 } else if (value instanceof Short ) { 387 writeShort(((Short ) value).shortValue()); 388 } else if (value instanceof Integer ) { 389 writeInt(((Integer ) value).intValue()); 390 } else if (value instanceof Long ) { 391 writeLong(((Long ) value).longValue()); 392 } else if (value instanceof Float ) { 393 writeFloat(((Float ) value).floatValue()); 394 } else if (value instanceof Double ) { 395 writeDouble(((Double ) value).doubleValue()); 396 } else if (value instanceof String ) { 397 writeString((String ) value); 398 } else if (value instanceof byte[]) { 399 writeBytes((byte[]) value); 400 } else 401 throw new MessageFormatException ("Can't write non Java primitive type" 402 + " as a bytes array."); 403 } 404 405 406 415 public boolean readBoolean() throws JMSException { 416 if (! RObody) 417 throw new MessageNotReadableException ("Can't read the message body as" 418 + " it is write-only."); 419 try { 420 byte type = inputStream.readByte(); 421 if (type == BOOLEAN) 422 return inputStream.readBoolean(); 423 else if (type == STRING) 424 return Boolean.valueOf(inputStream.readUTF()).booleanValue(); 425 else 426 throw new MessageFormatException ("type read: " + type + " is not a boolean or a String."); 427 } catch (EOFException e1) { 428 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 429 exc.setLinkedException(e1); 430 throw exc; 431 } catch (IOException e2) { 432 JMSException exc = new JMSException ("IOException"); 433 exc.setLinkedException(e2); 434 throw exc; 435 } 436 } 437 438 447 public byte readByte() throws JMSException { 448 if (! RObody) 449 throw new MessageNotReadableException ("Can't read the message body as" 450 + " it is write-only."); 451 try { 452 inputStream.mark(inputStream.available()); 453 byte type = inputStream.readByte(); 454 if (type == BYTE) 455 return inputStream.readByte(); 456 else if (type == STRING) 457 return Byte.valueOf(inputStream.readUTF()).byteValue(); 458 else 459 throw new MessageFormatException ("type read: " + type + " is not a byte or a String."); 460 } catch (EOFException e1) { 461 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 462 exc.setLinkedException(e1); 463 throw exc; 464 } catch (IOException e2) { 465 JMSException exc = new JMSException ("IOException"); 466 exc.setLinkedException(e2); 467 throw exc; 468 } catch (NumberFormatException e3) { 469 try { 470 inputStream.reset(); 471 } catch (Exception e) {} 472 throw e3; 473 } 474 } 475 476 485 public short readShort() throws JMSException { 486 if (! RObody) 487 throw new MessageNotReadableException ("Can't read the message body as" 488 + " it is write-only."); 489 try { 490 byte type = inputStream.readByte(); 491 if (type == SHORT) 492 return inputStream.readShort(); 493 else if (type == BYTE) 494 return inputStream.readByte(); 495 else if (type == STRING) 496 return Short.valueOf(inputStream.readUTF()).shortValue(); 497 else 498 throw new MessageFormatException ("type read: " + type + " is not a short, a byte or a String."); 499 } catch (EOFException e1) { 500 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 501 exc.setLinkedException(e1); 502 throw exc; 503 } catch (IOException e2) { 504 JMSException exc = new JMSException ("IOException"); 505 exc.setLinkedException(e2); 506 throw exc; 507 } 508 } 509 510 519 public char readChar() throws JMSException { 520 if (! RObody) 521 throw new MessageNotReadableException ("Can't read the message body as" 522 + " it is write-only."); 523 try { 524 byte type = inputStream.readByte(); 525 if (type == CHAR) 526 return inputStream.readChar(); 527 else if (type == NULL) 530 throw new NullPointerException ("null is not a char."); 531 else 532 throw new MessageFormatException ("type read: " + type + " is not a char."); 533 } catch (EOFException e1) { 534 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 535 exc.setLinkedException(e1); 536 throw exc; 537 } catch (IOException e2) { 538 JMSException exc = new JMSException ("IOException"); 539 exc.setLinkedException(e2); 540 throw exc; 541 } 542 } 543 544 553 public int readInt() throws JMSException { 554 if (! RObody) 555 throw new MessageNotReadableException ("Can't read the message body as" 556 + " it is write-only."); 557 try { 558 byte type = inputStream.readByte(); 559 if (type == INT) 560 return inputStream.readInt(); 561 else if (type == SHORT) 562 return inputStream.readShort(); 563 else if (type == BYTE) 564 return inputStream.readByte(); 565 else if (type == STRING) 566 return Integer.valueOf(inputStream.readUTF()).intValue(); 567 else 568 throw new MessageFormatException ("type read: " + type + 569 " is not a int, a short, a byte or a String."); 570 } catch (EOFException e1) { 571 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 572 exc.setLinkedException(e1); 573 throw exc; 574 } catch (IOException e2) { 575 JMSException exc = new JMSException ("IOException"); 576 exc.setLinkedException(e2); 577 throw exc; 578 } 579 } 580 581 590 public long readLong() throws JMSException { 591 if (! RObody) 592 throw new MessageNotReadableException ("Can't read the message body as" 593 + " it is write-only."); 594 try { 595 byte type = inputStream.readByte(); 596 if (type == LONG) 597 return inputStream.readLong(); 598 else if (type == INT) 599 return inputStream.readInt(); 600 else if (type == SHORT) 601 return inputStream.readShort(); 602 else if (type == BYTE) 603 return inputStream.readByte(); 604 else if (type == STRING) 605 return Long.valueOf(inputStream.readUTF()).longValue(); 606 else 607 throw new MessageFormatException ("type read: " + type + 608 " is not a int, a short, a byte or a String."); 609 } catch (EOFException e1) { 610 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 611 exc.setLinkedException(e1); 612 throw exc; 613 } catch (IOException e2) { 614 JMSException exc = new JMSException ("IOException"); 615 exc.setLinkedException(e2); 616 throw exc; 617 } 618 } 619 620 629 public float readFloat() throws JMSException { 630 if (! RObody) 631 throw new MessageNotReadableException ("Can't read the message body as" 632 + " it is write-only."); 633 try { 634 byte type = inputStream.readByte(); 635 if (type == FLOAT) 636 return inputStream.readFloat(); 637 else if (type == STRING) 638 return Float.valueOf(inputStream.readUTF()).floatValue(); 639 else 640 throw new MessageFormatException ("type read: " + type + " is not float or String."); 641 } catch (EOFException e1) { 642 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 643 exc.setLinkedException(e1); 644 throw exc; 645 } catch (IOException e2) { 646 JMSException exc = new JMSException ("IOException"); 647 exc.setLinkedException(e2); 648 throw exc; 649 } 650 } 651 652 661 public double readDouble() throws JMSException 662 { 663 if (! RObody) 664 throw new MessageNotReadableException ("Can't read the message body as" 665 + " it is write-only."); 666 try { 667 byte type = inputStream.readByte(); 668 if (type == DOUBLE) 669 return inputStream.readDouble(); 670 else if (type == FLOAT) 671 return inputStream.readFloat(); 672 else if (type == STRING) 673 return Double.valueOf(inputStream.readUTF()).doubleValue(); 674 else 675 throw new MessageFormatException ("type read: " + type + " is not a double, a float or a String."); 676 } catch (EOFException e1) { 677 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 678 exc.setLinkedException(e1); 679 throw exc; 680 } catch (IOException e2) { 681 JMSException exc = new JMSException ("IOException"); 682 exc.setLinkedException(e2); 683 throw exc; 684 } 685 } 686 687 696 public int readBytes(byte[] bytes) throws JMSException { 697 if (! RObody) 698 throw new MessageNotReadableException ("Can't read the message body as" 699 + " it is write-only."); 700 if (bytes == null) return -1; 701 702 if (bytes.length == 0) return 0; 703 704 int ret = 0; 705 try { 706 byte type; 707 int counter = 0; 708 if (firstTimeBytesRead) { 709 type = inputStream.readByte(); 710 711 if (type == BYTES) { 712 available = inputStream.readInt(); 713 714 if (available == 0 || available == -1) 715 return available; 716 717 int toread = 0; 718 if (bytes.length >= available) 719 toread = available; 720 else 721 toread = bytes.length; 722 for (int i = 0; i < toread; i ++) { 723 bytes[i] = inputStream.readByte(); 724 counter++; 725 } 726 ret = counter; 727 available = toread - counter; 728 firstTimeBytesRead = false; 729 counter = 0; 730 } else 731 throw new MessageFormatException ("type read: " + type + " is not a byte[]."); 732 } else { 733 if (available > 0) 734 type = BYTES; 735 else { 736 inputStream.mark(inputStream.available()); 737 type = inputStream.readByte(); 738 } 739 740 if (type == BYTES) { 741 if (available >= 0) 742 available = inputStream.readInt(); 743 744 if (available == 0 || available == -1) 745 return available; 746 747 int toread = 0; 748 if (bytes.length >= available) 749 toread = available; 750 else 751 toread = bytes.length; 752 for (int i = 0; i < toread; i ++) { 753 bytes[i] = inputStream.readByte(); 754 counter++; 755 } 756 ret = counter; 757 available = toread - counter; 758 firstTimeBytesRead = false; 759 counter = 0; 760 } else { 761 inputStream.reset(); 762 firstTimeBytesRead = true; 763 return -1; 764 } 765 } 766 } catch (EOFException e1) { 767 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 768 exc.setLinkedException(e1); 769 throw exc; 770 } catch (IOException e2) { 771 JMSException exc = new JMSException ("IOException"); 772 exc.setLinkedException(e2); 773 throw exc; 774 } 775 776 if (ret == 0) 777 return -1; 778 return ret; 779 } 780 781 790 public String readString() throws JMSException { 791 if (! RObody) 792 throw new MessageNotReadableException ("Can't read the message body as" 793 + " it is write-only."); 794 try { 795 byte type = inputStream.readByte(); 796 if (type == STRING) 797 return inputStream.readUTF(); 798 else if (type == INT) 799 return String.valueOf(inputStream.readInt()); 800 else if (type == SHORT) 801 return String.valueOf(inputStream.readShort()); 802 else if (type == BYTE) 803 return String.valueOf(inputStream.readByte()); 804 else if (type == FLOAT) 805 return String.valueOf(inputStream.readFloat()); 806 else if (type == LONG) 807 return String.valueOf(inputStream.readLong()); 808 else if (type == DOUBLE) 809 return String.valueOf(inputStream.readDouble()); 810 else if (type == BOOLEAN) 811 return String.valueOf(inputStream.readBoolean()); 812 else if (type == CHAR) 813 return String.valueOf(inputStream.readChar()); 814 else if (type == NULL) 815 return null; 816 else 817 throw new MessageFormatException ("type read: " + type + 818 " is not a int, a short, a byte,... or a String."); 819 } catch (EOFException e1) { 820 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 821 exc.setLinkedException(e1); 822 throw exc; 823 } catch (IOException e2) { 824 JMSException exc = new JMSException ("IOException"); 825 exc.setLinkedException(e2); 826 throw exc; 827 } 828 } 829 830 839 public Object readObject() throws JMSException { 840 if (! RObody) 841 throw new MessageNotReadableException ("Can't read the message body as" 842 + " it is write-only."); 843 try { 844 byte type = inputStream.readByte(); 845 846 if (type == BOOLEAN) { 847 return new Boolean (inputStream.readBoolean()); 848 } else if (type == CHAR) { 849 return new Character (inputStream.readChar()); 850 } else if (type == BYTE) { 851 return new Byte (inputStream.readByte()); 852 } else if (type == SHORT) { 853 return new Short (inputStream.readShort()); 854 } else if (type == INT) { 855 return new Integer (inputStream.readInt()); 856 } else if (type == LONG) { 857 return new Long (inputStream.readLong()); 858 } else if (type == FLOAT) { 859 return new Float (inputStream.readFloat()); 860 } else if (type == DOUBLE) { 861 return new Double (inputStream.readDouble()); 862 } else if (type == STRING) { 863 return inputStream.readUTF(); 864 } else if (type == NULL) { 865 return null; 866 } else if (type == BYTES) { 867 int available = inputStream.readInt(); 868 869 if (available == -1) return null; 870 if (available == 0) return new byte[0]; 871 872 byte[] b = new byte[available]; 873 inputStream.read(b); 874 return b; 875 } else 876 throw new MessageFormatException ("not a primitive object."); 877 } catch (EOFException e1) { 878 MessageEOFException exc = new MessageEOFException ("end of message " + e1); 879 exc.setLinkedException(e1); 880 throw exc; 881 } catch (IOException e2) { 882 JMSException exc = new JMSException ("IOException"); 883 exc.setLinkedException(e2); 884 throw exc; 885 } 886 } 887 888 889 895 public void reset() throws JMSException { 896 try { 897 if (! RObody) { 898 outputStream.flush(); 899 momMsg.body = outputBuffer.toByteArray(); 900 } else { 901 inputStream.close(); 902 } 903 904 inputStream = new DataInputStream(new ByteArrayInputStream(momMsg.body)); 905 if (inputStream != null) inputStream.reset(); 906 907 RObody = true; 908 firstTimeBytesRead = true; 909 } catch (IOException iE) { 910 JMSException jE = 911 new JMSException ("Error while manipulating the stream facilities."); 912 jE.setLinkedException(iE); 913 throw jE; 914 } 915 } 916 917 923 protected void prepare() throws JMSException { 924 super.prepare(); 925 926 try { 927 if (! RObody) { 928 outputStream.flush(); 929 momMsg.body = outputBuffer.toByteArray(); 930 prepared = true; 931 } 932 } catch (IOException exc) { 933 MessageFormatException jExc = 934 new MessageFormatException ("The message body could not be serialized."); 935 jExc.setLinkedException(exc); 936 throw jExc; 937 } 938 } 939 } 940 | Popular Tags |