1 18 package org.apache.activemq.command; 19 20 import org.apache.activemq.ActiveMQConnection; 21 import org.apache.activemq.util.ByteArrayInputStream; 22 import org.apache.activemq.util.ByteArrayOutputStream; 23 import org.apache.activemq.util.*; 24 25 import javax.jms.*; 26 import java.io.*; 27 import java.util.zip.Deflater ; 28 import java.util.zip.DeflaterOutputStream ; 29 import java.util.zip.InflaterInputStream ; 30 31 60 public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage { 61 62 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE; 63 64 transient protected DataOutputStream dataOut; 65 transient protected ByteArrayOutputStream bytesOut; 66 transient protected DataInputStream dataIn; 67 transient protected int length; 68 69 public Message copy() { 70 ActiveMQBytesMessage copy = new ActiveMQBytesMessage(); 71 copy(copy); 72 return copy; 73 } 74 75 private void copy(ActiveMQBytesMessage copy) { 76 storeContent(); 77 super.copy(copy); 78 copy.dataOut = null; 79 copy.bytesOut = null; 80 copy.dataIn = null; 81 } 82 83 public void onSend() throws JMSException { 84 super.onSend(); 85 storeContent(); 86 } 87 88 private void storeContent() { 89 try { 90 if (dataOut != null) { 91 dataOut.close(); 92 ByteSequence bs = bytesOut.toByteSequence(); 93 if( compressed ) { 94 int pos = bs.offset; 95 ByteSequenceData.writeIntBig(bs, length); 96 bs.offset = pos; 97 } 98 setContent(bs); 99 bytesOut = null; 100 dataOut = null; 101 } 102 } catch (IOException ioe) { 103 throw new RuntimeException (ioe.getMessage(), ioe); } 105 } 106 107 public byte getDataStructureType() { 108 return DATA_STRUCTURE_TYPE; 109 } 110 111 public String getJMSXMimeType() { 112 return "jms/bytes-message"; 113 } 114 115 116 123 public void clearBody() throws JMSException { 124 super.clearBody(); 125 this.dataOut = null; 126 this.dataIn = null; 127 this.bytesOut = null; 128 } 129 130 140 141 public long getBodyLength() throws JMSException { 142 initializeReading(); 143 return length; 144 } 145 146 154 public boolean readBoolean() throws JMSException { 155 initializeReading(); 156 try { 157 return this.dataIn.readBoolean(); 158 } catch (EOFException e) { 159 throw JMSExceptionSupport.createMessageEOFException(e); 160 } catch (IOException e) { 161 throw JMSExceptionSupport.createMessageFormatException(e); 162 } 163 } 164 165 173 public byte readByte() throws JMSException { 174 initializeReading(); 175 try { 176 return this.dataIn.readByte(); 177 } catch (EOFException e) { 178 throw JMSExceptionSupport.createMessageEOFException(e); 179 } catch (IOException e) { 180 throw JMSExceptionSupport.createMessageFormatException(e); 181 } 182 } 183 184 192 public int readUnsignedByte() throws JMSException { 193 initializeReading(); 194 try { 195 return this.dataIn.readUnsignedByte(); 196 } catch (EOFException e) { 197 throw JMSExceptionSupport.createMessageEOFException(e); 198 } catch (IOException e) { 199 throw JMSExceptionSupport.createMessageFormatException(e); 200 } 201 } 202 203 211 public short readShort() throws JMSException { 212 initializeReading(); 213 try { 214 return this.dataIn.readShort(); 215 } catch (EOFException e) { 216 throw JMSExceptionSupport.createMessageEOFException(e); 217 } catch (IOException e) { 218 throw JMSExceptionSupport.createMessageFormatException(e); 219 } 220 } 221 222 230 public int readUnsignedShort() throws JMSException { 231 initializeReading(); 232 try { 233 return this.dataIn.readUnsignedShort(); 234 } catch (EOFException e) { 235 throw JMSExceptionSupport.createMessageEOFException(e); 236 } catch (IOException e) { 237 throw JMSExceptionSupport.createMessageFormatException(e); 238 } 239 } 240 241 249 public char readChar() throws JMSException { 250 initializeReading(); 251 try { 252 return this.dataIn.readChar(); 253 } catch (EOFException e) { 254 throw JMSExceptionSupport.createMessageEOFException(e); 255 } catch (IOException e) { 256 throw JMSExceptionSupport.createMessageFormatException(e); 257 } 258 } 259 260 268 public int readInt() throws JMSException { 269 initializeReading(); 270 try { 271 return this.dataIn.readInt(); 272 } catch (EOFException e) { 273 throw JMSExceptionSupport.createMessageEOFException(e); 274 } catch (IOException e) { 275 throw JMSExceptionSupport.createMessageFormatException(e); 276 } 277 } 278 279 287 public long readLong() throws JMSException { 288 initializeReading(); 289 try { 290 return this.dataIn.readLong(); 291 } catch (EOFException e) { 292 throw JMSExceptionSupport.createMessageEOFException(e); 293 } catch (IOException e) { 294 throw JMSExceptionSupport.createMessageFormatException(e); 295 } 296 } 297 298 306 public float readFloat() throws JMSException { 307 initializeReading(); 308 try { 309 return this.dataIn.readFloat(); 310 } catch (EOFException e) { 311 throw JMSExceptionSupport.createMessageEOFException(e); 312 } catch (IOException e) { 313 throw JMSExceptionSupport.createMessageFormatException(e); 314 } 315 } 316 317 325 public double readDouble() throws JMSException { 326 initializeReading(); 327 try { 328 return this.dataIn.readDouble(); 329 } catch (EOFException e) { 330 throw JMSExceptionSupport.createMessageEOFException(e); 331 } catch (IOException e) { 332 throw JMSExceptionSupport.createMessageFormatException(e); 333 } 334 } 335 336 347 public String readUTF() throws JMSException { 348 initializeReading(); 349 try { 350 return this.dataIn.readUTF(); 351 } catch (EOFException e) { 352 throw JMSExceptionSupport.createMessageEOFException(e); 353 } catch (IOException e) { 354 throw JMSExceptionSupport.createMessageFormatException(e); 355 } 356 } 357 358 372 public int readBytes(byte[] value) throws JMSException { 373 return readBytes(value, value.length); 374 } 375 376 395 public int readBytes(byte[] value, int length) throws JMSException { 396 initializeReading(); 397 try { 398 int n = 0; 399 while (n < length) { 400 int count = this.dataIn.read(value, n, length - n); 401 if (count < 0) { 402 break; 403 } 404 n += count; 405 } 406 if (n == 0 && length > 0) { 407 n = -1; 408 } 409 return n; 410 } catch (EOFException e) { 411 throw JMSExceptionSupport.createMessageEOFException(e); 412 } catch (IOException e) { 413 throw JMSExceptionSupport.createMessageFormatException(e); 414 } 415 } 416 417 426 public void writeBoolean(boolean value) throws JMSException { 427 initializeWriting(); 428 try { 429 this.dataOut.writeBoolean(value); 430 } catch (IOException ioe) { 431 throw JMSExceptionSupport.create(ioe); 432 } 433 } 434 435 442 public void writeByte(byte value) throws JMSException { 443 initializeWriting(); 444 try { 445 this.dataOut.writeByte(value); 446 } catch (IOException ioe) { 447 throw JMSExceptionSupport.create(ioe); 448 } 449 } 450 451 458 public void writeShort(short value) throws JMSException { 459 initializeWriting(); 460 try { 461 this.dataOut.writeShort(value); 462 } catch (IOException ioe) { 463 throw JMSExceptionSupport.create(ioe); 464 } 465 } 466 467 474 public void writeChar(char value) throws JMSException { 475 initializeWriting(); 476 try { 477 this.dataOut.writeChar(value); 478 } catch (IOException ioe) { 479 throw JMSExceptionSupport.create(ioe); 480 } 481 } 482 483 490 public void writeInt(int value) throws JMSException { 491 initializeWriting(); 492 try { 493 this.dataOut.writeInt(value); 494 } catch (IOException ioe) { 495 throw JMSExceptionSupport.create(ioe); 496 } 497 } 498 499 506 public void writeLong(long value) throws JMSException { 507 initializeWriting(); 508 try { 509 this.dataOut.writeLong(value); 510 } catch (IOException ioe) { 511 throw JMSExceptionSupport.create(ioe); 512 } 513 } 514 515 524 public void writeFloat(float value) throws JMSException { 525 initializeWriting(); 526 try { 527 this.dataOut.writeFloat(value); 528 } catch (IOException ioe) { 529 throw JMSExceptionSupport.create(ioe); 530 } 531 } 532 533 542 public void writeDouble(double value) throws JMSException { 543 initializeWriting(); 544 try { 545 this.dataOut.writeDouble(value); 546 } catch (IOException ioe) { 547 throw JMSExceptionSupport.create(ioe); 548 } 549 } 550 551 561 public void writeUTF(String value) throws JMSException { 562 initializeWriting(); 563 try { 564 this.dataOut.writeUTF(value); 565 } catch (IOException ioe) { 566 throw JMSExceptionSupport.create(ioe); 567 } 568 } 569 570 577 public void writeBytes(byte[] value) throws JMSException { 578 initializeWriting(); 579 try { 580 this.dataOut.write(value); 581 } catch (IOException ioe) { 582 throw JMSExceptionSupport.create(ioe); 583 } 584 } 585 586 595 public void writeBytes(byte[] value, int offset, int length) throws JMSException { 596 initializeWriting(); 597 try { 598 this.dataOut.write(value, offset, length); 599 } catch (IOException ioe) { 600 throw JMSExceptionSupport.create(ioe); 601 } 602 } 603 604 616 public void writeObject(Object value) throws JMSException { 617 if (value == null) { 618 throw new NullPointerException (); 619 } 620 initializeWriting(); 621 if (value instanceof Boolean ) { 622 writeBoolean(((Boolean ) value).booleanValue()); 623 } else if (value instanceof Character ) { 624 writeChar(((Character ) value).charValue()); 625 } else if (value instanceof Byte ) { 626 writeByte(((Byte ) value).byteValue()); 627 } else if (value instanceof Short ) { 628 writeShort(((Short ) value).shortValue()); 629 } else if (value instanceof Integer ) { 630 writeInt(((Integer ) value).intValue()); 631 } else if (value instanceof Long ) { 632 writeLong(((Long ) value).longValue()); 633 } else if (value instanceof Float ) { 634 writeFloat(((Float ) value).floatValue()); 635 } else if (value instanceof Double ) { 636 writeDouble(((Double ) value).doubleValue()); 637 } else if (value instanceof String ) { 638 writeUTF(value.toString()); 639 } else if (value instanceof byte[]) { 640 writeBytes((byte[]) value); 641 } else { 642 throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass()); 643 } 644 } 645 646 651 public void reset() throws JMSException { 652 storeContent(); 653 this.bytesOut = null; 654 this.dataIn = null; 655 this.dataOut = null; 656 setReadOnlyBody(true); 657 } 658 659 private void initializeWriting() throws JMSException { 660 checkReadOnlyBody(); 661 if (this.dataOut == null) { 662 this.bytesOut = new ByteArrayOutputStream(); 663 OutputStream os = bytesOut; 664 ActiveMQConnection connection = getConnection(); 665 if( connection!=null && connection.isUseCompression() ) { 666 try { 669 os.write(new byte[4]); 670 } catch (IOException e) { 671 throw JMSExceptionSupport.create(e); 672 } 673 length=0; 674 compressed = true; 675 Deflater deflater = new Deflater (Deflater.BEST_SPEED); 676 os = new FilterOutputStream(new DeflaterOutputStream (os,deflater)) { 677 public void write(byte[] arg0) throws IOException { 678 length+=arg0.length; 679 out.write(arg0); 680 } 681 public void write(byte[] arg0, int arg1, int arg2) throws IOException { 682 length+=arg2; 683 out.write(arg0, arg1, arg2); 684 } 685 public void write(int arg0) throws IOException { 686 length++; 687 out.write(arg0); 688 } 689 }; 690 } 691 this.dataOut = new DataOutputStream(os); 692 } 693 } 694 695 protected void checkWriteOnlyBody() throws MessageNotReadableException { 696 if (!readOnlyBody) { 697 throw new MessageNotReadableException("Message body is write-only"); 698 } 699 } 700 701 private void initializeReading() throws JMSException { 702 checkWriteOnlyBody(); 703 if (dataIn == null ) { 704 ByteSequence data = getContent(); 705 if( data==null ) 706 data = new ByteSequence(new byte[]{}, 0, 0); 707 InputStream is = new ByteArrayInputStream(data); 708 if( isCompressed() ) { 709 try { 712 DataInputStream dis = new DataInputStream(is); 713 length = dis.readInt(); 714 dis.close(); 715 } catch (IOException e) { 716 throw JMSExceptionSupport.create(e); 717 } 718 is = new InflaterInputStream (is); 719 } else { 720 length = data.getLength(); 721 } 722 dataIn = new DataInputStream(is); 723 } 724 } 725 726 public void setObjectProperty(String name, Object value) throws JMSException { 727 initializeWriting(); 728 super.setObjectProperty(name, value); 729 } 730 731 public String toString() { 732 return super.toString() + " ActiveMQBytesMessage{ " + 733 "bytesOut = " + bytesOut + 734 ", dataOut = " + dataOut + 735 ", dataIn = " + dataIn + 736 " }"; 737 } 738 } 739 | Popular Tags |