KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > command > ActiveMQStreamMessage


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq.command;
20
21 import java.io.BufferedInputStream JavaDoc;
22 import java.io.DataInputStream JavaDoc;
23 import java.io.DataOutputStream JavaDoc;
24 import java.io.EOFException JavaDoc;
25 import java.io.IOException JavaDoc;
26 import java.io.InputStream JavaDoc;
27 import java.io.OutputStream JavaDoc;
28 import java.util.zip.DeflaterOutputStream JavaDoc;
29 import java.util.zip.InflaterInputStream JavaDoc;
30
31 import javax.jms.JMSException JavaDoc;
32 import javax.jms.MessageEOFException JavaDoc;
33 import javax.jms.MessageFormatException JavaDoc;
34 import javax.jms.MessageNotReadableException JavaDoc;
35 import javax.jms.MessageNotWriteableException JavaDoc;
36 import javax.jms.StreamMessage JavaDoc;
37
38 import org.apache.activemq.ActiveMQConnection;
39 import org.apache.activemq.util.ByteArrayInputStream;
40 import org.apache.activemq.util.ByteArrayOutputStream;
41 import org.apache.activemq.util.ByteSequence;
42 import org.apache.activemq.util.JMSExceptionSupport;
43 import org.apache.activemq.util.MarshallingSupport;
44
45 /**
46  * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
47  * types in the Java programming language. It is filled and read sequentially.
48  * It inherits from the <CODE>Message</CODE> interface and adds a stream
49  * message body. Its methods are based largely on those found in <CODE>java.io.DataInputStream</CODE>
50  * and <CODE>java.io.DataOutputStream</CODE>. <p/>
51  * <P>
52  * The primitive types can be read or written explicitly using methods for each
53  * type. They may also be read or written generically as objects. For instance,
54  * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to <CODE>StreamMessage.writeObject(new
55  * Integer(6))</CODE>. Both forms are provided, because the explicit form is
56  * convenient for static programming, and the object form is needed when types
57  * are not known at compile time. <p/>
58  * <P>
59  * When the message is first created, and when <CODE>clearBody</CODE> is
60  * called, the body of the message is in write-only mode. After the first call
61  * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
62  * After a message has been sent, the client that sent it can retain and modify
63  * it without affecting the message that has been sent. The same message object
64  * can be sent multiple times. When a message has been received, the provider
65  * has called <CODE>reset</CODE> so that the message body is in read-only mode
66  * for the client. <p/>
67  * <P>
68  * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
69  * message body is cleared and the message body is in write-only mode. <p/>
70  * <P>
71  * If a client attempts to read a message in write-only mode, a <CODE>MessageNotReadableException</CODE>
72  * is thrown. <p/>
73  * <P>
74  * If a client attempts to write a message in read-only mode, a <CODE>MessageNotWriteableException</CODE>
75  * is thrown. <p/>
76  * <P>
77  * <CODE>StreamMessage</CODE> objects support the following conversion table.
78  * The marked cases must be supported. The unmarked cases must throw a <CODE>JMSException</CODE>.
79  * The <CODE>String</CODE>-to-primitive conversions may throw a runtime
80  * exception if the primitive's <CODE>valueOf()</CODE> method does not accept
81  * it as a valid <CODE>String</CODE> representation of the primitive. <p/>
82  * <P>
83  * A value written as the row type can be read as the column type. <p/>
84  *
85  * <PRE> | | boolean byte short char int long float double String byte[]
86  * |----------------------------------------------------------------------
87  * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
88  * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
89  * X |----------------------------------------------------------------------
90  *
91  * </PRE>
92  *
93  * <p/>
94  * <P>
95  * Attempting to read a null value as a primitive type must be treated as
96  * calling the primitive's corresponding <code>valueOf(String)</code>
97  * conversion method with a null value. Since <code>char</code> does not
98  * support a <code>String</code> conversion, attempting to read a null value
99  * as a <code>char</code> must throw a <code>NullPointerException</code>.
100  *
101  * @openwire:marshaller code="27"
102  * @see javax.jms.Session#createStreamMessage()
103  * @see javax.jms.BytesMessage
104  * @see javax.jms.MapMessage
105  * @see javax.jms.Message
106  * @see javax.jms.ObjectMessage
107  * @see javax.jms.TextMessage
108  */

109 public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage JavaDoc {
110
111     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
112
113     transient protected DataOutputStream JavaDoc dataOut;
114     transient protected ByteArrayOutputStream bytesOut;
115     transient protected DataInputStream JavaDoc dataIn;
116     transient protected int remainingBytes = -1;
117
118     public Message copy() {
119         ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
120         copy(copy);
121         return copy;
122     }
123
124     private void copy(ActiveMQStreamMessage copy) {
125         storeContent();
126         super.copy(copy);
127         copy.dataOut = null;
128         copy.bytesOut = null;
129         copy.dataIn = null;
130     }
131
132     public void onSend() throws JMSException JavaDoc {
133         super.onSend();
134         storeContent();
135     }
136
137     private void storeContent() {
138         if (dataOut != null) {
139             try {
140                 dataOut.close();
141                 setContent(bytesOut.toByteSequence());
142                 bytesOut = null;
143                 dataOut = null;
144             } catch (IOException JavaDoc ioe) {
145                 throw new RuntimeException JavaDoc(ioe);
146             }
147         }
148     }
149
150     public byte getDataStructureType() {
151         return DATA_STRUCTURE_TYPE;
152     }
153
154     public String JavaDoc getJMSXMimeType() {
155         return "jms/stream-message";
156     }
157
158
159     /**
160      * Clears out the message body. Clearing a message's body does not clear its
161      * header values or property entries. <p/>
162      * <P>
163      * If this message body was read-only, calling this method leaves the
164      * message body in the same state as an empty body in a newly created
165      * message.
166      *
167      * @throws JMSException
168      * if the JMS provider fails to clear the message body due to
169      * some internal error.
170      */

171
172     public void clearBody() throws JMSException JavaDoc {
173         super.clearBody();
174         this.dataOut = null;
175         this.dataIn = null;
176         this.bytesOut = null;
177         this.remainingBytes=-1;
178     }
179
180     /**
181      * Reads a <code>boolean</code> from the stream message.
182      *
183      * @return the <code>boolean</code> value read
184      * @throws JMSException
185      * if the JMS provider fails to read the message due to some
186      * internal error.
187      * @throws MessageEOFException
188      * if unexpected end of message stream has been reached.
189      * @throws MessageFormatException
190      * if this type conversion is invalid.
191      * @throws MessageNotReadableException
192      * if the message is in write-only mode.
193      */

194
195     public boolean readBoolean() throws JMSException JavaDoc {
196         initializeReading();
197         try {
198
199             this.dataIn.mark(10);
200             int type = this.dataIn.read();
201             if (type == -1) {
202                 throw new MessageEOFException JavaDoc("reached end of data");
203             }
204             if (type == MarshallingSupport.BOOLEAN_TYPE) {
205                 return this.dataIn.readBoolean();
206             }
207             if (type == MarshallingSupport.STRING_TYPE) {
208                 return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
209             }
210             if (type == MarshallingSupport.NULL) {
211                 this.dataIn.reset();
212                 throw new NullPointerException JavaDoc("Cannot convert NULL value to boolean.");
213             } else {
214                 this.dataIn.reset();
215                 throw new MessageFormatException JavaDoc(" not a boolean type");
216             }
217         } catch (EOFException JavaDoc e) {
218             throw JMSExceptionSupport.createMessageEOFException(e);
219         } catch (IOException JavaDoc e) {
220             throw JMSExceptionSupport.createMessageFormatException(e);
221         }
222     }
223
224     /**
225      * Reads a <code>byte</code> value from the stream message.
226      *
227      * @return the next byte from the stream message as a 8-bit
228      * <code>byte</code>
229      * @throws JMSException
230      * if the JMS provider fails to read the message due to some
231      * internal error.
232      * @throws MessageEOFException
233      * if unexpected end of message stream has been reached.
234      * @throws MessageFormatException
235      * if this type conversion is invalid.
236      * @throws MessageNotReadableException
237      * if the message is in write-only mode.
238      */

239
240     public byte readByte() throws JMSException JavaDoc {
241         initializeReading();
242         try {
243
244             this.dataIn.mark(10);
245             int type = this.dataIn.read();
246             if (type == -1) {
247                 throw new MessageEOFException JavaDoc("reached end of data");
248             }
249             if (type == MarshallingSupport.BYTE_TYPE) {
250                 return this.dataIn.readByte();
251             }
252             if (type == MarshallingSupport.STRING_TYPE) {
253                 return Byte.valueOf(this.dataIn.readUTF()).byteValue();
254             }
255             if (type == MarshallingSupport.NULL) {
256                 this.dataIn.reset();
257                 throw new NullPointerException JavaDoc("Cannot convert NULL value to byte.");
258             } else {
259                 this.dataIn.reset();
260                 throw new MessageFormatException JavaDoc(" not a byte type");
261             }
262         } catch (NumberFormatException JavaDoc mfe) {
263             try {
264                 this.dataIn.reset();
265             } catch (IOException JavaDoc ioe) {
266                 throw JMSExceptionSupport.create(ioe);
267             }
268             throw mfe;
269
270         } catch (EOFException JavaDoc e) {
271             throw JMSExceptionSupport.createMessageEOFException(e);
272         } catch (IOException JavaDoc e) {
273             throw JMSExceptionSupport.createMessageFormatException(e);
274         }
275     }
276
277     /**
278      * Reads a 16-bit integer from the stream message.
279      *
280      * @return a 16-bit integer from the stream message
281      * @throws JMSException
282      * if the JMS provider fails to read the message due to some
283      * internal error.
284      * @throws MessageEOFException
285      * if unexpected end of message stream has been reached.
286      * @throws MessageFormatException
287      * if this type conversion is invalid.
288      * @throws MessageNotReadableException
289      * if the message is in write-only mode.
290      */

291
292     public short readShort() throws JMSException JavaDoc {
293         initializeReading();
294         try {
295
296             this.dataIn.mark(17);
297             int type = this.dataIn.read();
298             if (type == -1) {
299                 throw new MessageEOFException JavaDoc("reached end of data");
300             }
301             if (type == MarshallingSupport.SHORT_TYPE) {
302                 return this.dataIn.readShort();
303             }
304             if (type == MarshallingSupport.BYTE_TYPE) {
305                 return this.dataIn.readByte();
306             }
307             if (type == MarshallingSupport.STRING_TYPE) {
308                 return Short.valueOf(this.dataIn.readUTF()).shortValue();
309             }
310             if (type == MarshallingSupport.NULL) {
311                 this.dataIn.reset();
312                 throw new NullPointerException JavaDoc("Cannot convert NULL value to short.");
313             } else {
314                 this.dataIn.reset();
315                 throw new MessageFormatException JavaDoc(" not a short type");
316             }
317         } catch (NumberFormatException JavaDoc mfe) {
318             try {
319                 this.dataIn.reset();
320             } catch (IOException JavaDoc ioe) {
321                 throw JMSExceptionSupport.create(ioe);
322             }
323             throw mfe;
324
325         } catch (EOFException JavaDoc e) {
326             throw JMSExceptionSupport.createMessageEOFException(e);
327         } catch (IOException JavaDoc e) {
328             throw JMSExceptionSupport.createMessageFormatException(e);
329         }
330
331     }
332
333     /**
334      * Reads a Unicode character value from the stream message.
335      *
336      * @return a Unicode character from the stream message
337      * @throws JMSException
338      * if the JMS provider fails to read the message due to some
339      * internal error.
340      * @throws MessageEOFException
341      * if unexpected end of message stream has been reached.
342      * @throws MessageFormatException
343      * if this type conversion is invalid
344      * @throws MessageNotReadableException
345      * if the message is in write-only mode.
346      */

347
348     public char readChar() throws JMSException JavaDoc {
349         initializeReading();
350         try {
351
352             this.dataIn.mark(17);
353             int type = this.dataIn.read();
354             if (type == -1) {
355                 throw new MessageEOFException JavaDoc("reached end of data");
356             }
357             if (type == MarshallingSupport.CHAR_TYPE) {
358                 return this.dataIn.readChar();
359             }
360             if (type == MarshallingSupport.NULL) {
361                 this.dataIn.reset();
362                 throw new NullPointerException JavaDoc("Cannot convert NULL value to char.");
363             } else {
364                 this.dataIn.reset();
365                 throw new MessageFormatException JavaDoc(" not a char type");
366             }
367         } catch (NumberFormatException JavaDoc mfe) {
368             try {
369                 this.dataIn.reset();
370             } catch (IOException JavaDoc ioe) {
371                 throw JMSExceptionSupport.create(ioe);
372             }
373             throw mfe;
374
375         } catch (EOFException JavaDoc e) {
376             throw JMSExceptionSupport.createMessageEOFException(e);
377         } catch (IOException JavaDoc e) {
378             throw JMSExceptionSupport.createMessageFormatException(e);
379         }
380     }
381
382     /**
383      * Reads a 32-bit integer from the stream message.
384      *
385      * @return a 32-bit integer value from the stream message, interpreted as an
386      * <code>int</code>
387      * @throws JMSException
388      * if the JMS provider fails to read the message due to some
389      * internal error.
390      * @throws MessageEOFException
391      * if unexpected end of message stream has been reached.
392      * @throws MessageFormatException
393      * if this type conversion is invalid.
394      * @throws MessageNotReadableException
395      * if the message is in write-only mode.
396      */

397
398     public int readInt() throws JMSException JavaDoc {
399         initializeReading();
400         try {
401
402             this.dataIn.mark(33);
403             int type = this.dataIn.read();
404             if (type == -1) {
405                 throw new MessageEOFException JavaDoc("reached end of data");
406             }
407             if (type == MarshallingSupport.INTEGER_TYPE) {
408                 return this.dataIn.readInt();
409             }
410             if (type == MarshallingSupport.SHORT_TYPE) {
411                 return this.dataIn.readShort();
412             }
413             if (type == MarshallingSupport.BYTE_TYPE) {
414                 return this.dataIn.readByte();
415             }
416             if (type == MarshallingSupport.STRING_TYPE) {
417                 return Integer.valueOf(this.dataIn.readUTF()).intValue();
418             }
419             if (type == MarshallingSupport.NULL) {
420                 this.dataIn.reset();
421                 throw new NullPointerException JavaDoc("Cannot convert NULL value to int.");
422             } else {
423                 this.dataIn.reset();
424                 throw new MessageFormatException JavaDoc(" not an int type");
425             }
426         } catch (NumberFormatException JavaDoc mfe) {
427             try {
428                 this.dataIn.reset();
429             } catch (IOException JavaDoc ioe) {
430                 throw JMSExceptionSupport.create(ioe);
431             }
432             throw mfe;
433
434         } catch (EOFException JavaDoc e) {
435             throw JMSExceptionSupport.createMessageEOFException(e);
436         } catch (IOException JavaDoc e) {
437             throw JMSExceptionSupport.createMessageFormatException(e);
438         }
439     }
440
441     /**
442      * Reads a 64-bit integer from the stream message.
443      *
444      * @return a 64-bit integer value from the stream message, interpreted as a
445      * <code>long</code>
446      * @throws JMSException
447      * if the JMS provider fails to read the message due to some
448      * internal error.
449      * @throws MessageEOFException
450      * if unexpected end of message stream has been reached.
451      * @throws MessageFormatException
452      * if this type conversion is invalid.
453      * @throws MessageNotReadableException
454      * if the message is in write-only mode.
455      */

456
457     public long readLong() throws JMSException JavaDoc {
458         initializeReading();
459         try {
460
461             this.dataIn.mark(65);
462             int type = this.dataIn.read();
463             if (type == -1) {
464                 throw new MessageEOFException JavaDoc("reached end of data");
465             }
466             if (type == MarshallingSupport.LONG_TYPE) {
467                 return this.dataIn.readLong();
468             }
469             if (type == MarshallingSupport.INTEGER_TYPE) {
470                 return this.dataIn.readInt();
471             }
472             if (type == MarshallingSupport.SHORT_TYPE) {
473                 return this.dataIn.readShort();
474             }
475             if (type == MarshallingSupport.BYTE_TYPE) {
476                 return this.dataIn.readByte();
477             }
478             if (type == MarshallingSupport.STRING_TYPE) {
479                 return Long.valueOf(this.dataIn.readUTF()).longValue();
480             }
481             if (type == MarshallingSupport.NULL) {
482                 this.dataIn.reset();
483                 throw new NullPointerException JavaDoc("Cannot convert NULL value to long.");
484             } else {
485                 this.dataIn.reset();
486                 throw new MessageFormatException JavaDoc(" not a long type");
487             }
488         } catch (NumberFormatException JavaDoc mfe) {
489             try {
490                 this.dataIn.reset();
491             } catch (IOException JavaDoc ioe) {
492                 throw JMSExceptionSupport.create(ioe);
493             }
494             throw mfe;
495
496         } catch (EOFException JavaDoc e) {
497             throw JMSExceptionSupport.createMessageEOFException(e);
498         } catch (IOException JavaDoc e) {
499             throw JMSExceptionSupport.createMessageFormatException(e);
500         }
501     }
502
503     /**
504      * Reads a <code>float</code> from the stream message.
505      *
506      * @return a <code>float</code> value from the stream message
507      * @throws JMSException
508      * if the JMS provider fails to read the message due to some
509      * internal error.
510      * @throws MessageEOFException
511      * if unexpected end of message stream has been reached.
512      * @throws MessageFormatException
513      * if this type conversion is invalid.
514      * @throws MessageNotReadableException
515      * if the message is in write-only mode.
516      */

517
518     public float readFloat() throws JMSException JavaDoc {
519         initializeReading();
520         try {
521             this.dataIn.mark(33);
522             int type = this.dataIn.read();
523             if (type == -1) {
524                 throw new MessageEOFException JavaDoc("reached end of data");
525             }
526             if (type == MarshallingSupport.FLOAT_TYPE) {
527                 return this.dataIn.readFloat();
528             }
529             if (type == MarshallingSupport.STRING_TYPE) {
530                 return Float.valueOf(this.dataIn.readUTF()).floatValue();
531             }
532             if (type == MarshallingSupport.NULL) {
533                 this.dataIn.reset();
534                 throw new NullPointerException JavaDoc("Cannot convert NULL value to float.");
535             } else {
536                 this.dataIn.reset();
537                 throw new MessageFormatException JavaDoc(" not a float type");
538             }
539         } catch (NumberFormatException JavaDoc mfe) {
540             try {
541                 this.dataIn.reset();
542             } catch (IOException JavaDoc ioe) {
543                 throw JMSExceptionSupport.create(ioe);
544             }
545             throw mfe;
546
547         } catch (EOFException JavaDoc e) {
548             throw JMSExceptionSupport.createMessageEOFException(e);
549         } catch (IOException JavaDoc e) {
550             throw JMSExceptionSupport.createMessageFormatException(e);
551         }
552     }
553
554     /**
555      * Reads a <code>double</code> from the stream message.
556      *
557      * @return a <code>double</code> value from the stream message
558      * @throws JMSException
559      * if the JMS provider fails to read the message due to some
560      * internal error.
561      * @throws MessageEOFException
562      * if unexpected end of message stream has been reached.
563      * @throws MessageFormatException
564      * if this type conversion is invalid.
565      * @throws MessageNotReadableException
566      * if the message is in write-only mode.
567      */

568
569     public double readDouble() throws JMSException JavaDoc {
570         initializeReading();
571         try {
572
573             this.dataIn.mark(65);
574             int type = this.dataIn.read();
575             if (type == -1) {
576                 throw new MessageEOFException JavaDoc("reached end of data");
577             }
578             if (type == MarshallingSupport.DOUBLE_TYPE) {
579                 return this.dataIn.readDouble();
580             }
581             if (type == MarshallingSupport.FLOAT_TYPE) {
582                 return this.dataIn.readFloat();
583             }
584             if (type == MarshallingSupport.STRING_TYPE) {
585                 return Double.valueOf(this.dataIn.readUTF()).doubleValue();
586             }
587             if (type == MarshallingSupport.NULL) {
588                 this.dataIn.reset();
589                 throw new NullPointerException JavaDoc("Cannot convert NULL value to double.");
590             } else {
591                 this.dataIn.reset();
592                 throw new MessageFormatException JavaDoc(" not a double type");
593             }
594         } catch (NumberFormatException JavaDoc mfe) {
595             try {
596                 this.dataIn.reset();
597             } catch (IOException JavaDoc ioe) {
598                 throw JMSExceptionSupport.create(ioe);
599             }
600             throw mfe;
601
602         } catch (EOFException JavaDoc e) {
603             throw JMSExceptionSupport.createMessageEOFException(e);
604         } catch (IOException JavaDoc e) {
605             throw JMSExceptionSupport.createMessageFormatException(e);
606         }
607     }
608
609     /**
610      * Reads a <CODE>String</CODE> from the stream message.
611      *
612      * @return a Unicode string from the stream message
613      * @throws JMSException
614      * if the JMS provider fails to read the message due to some
615      * internal error.
616      * @throws MessageEOFException
617      * if unexpected end of message stream has been reached.
618      * @throws MessageFormatException
619      * if this type conversion is invalid.
620      * @throws MessageNotReadableException
621      * if the message is in write-only mode.
622      */

623
624     public String JavaDoc readString() throws JMSException JavaDoc {
625         initializeReading();
626         try {
627
628             this.dataIn.mark(65);
629             int type = this.dataIn.read();
630             if (type == -1) {
631                 throw new MessageEOFException JavaDoc("reached end of data");
632             }
633             if (type == MarshallingSupport.NULL) {
634                 return null;
635             }
636             if (type == MarshallingSupport.BIG_STRING_TYPE) {
637                 return MarshallingSupport.readUTF8(dataIn);
638             }
639             if (type == MarshallingSupport.STRING_TYPE) {
640                 return this.dataIn.readUTF();
641             }
642             if (type == MarshallingSupport.LONG_TYPE) {
643                 return new Long JavaDoc(this.dataIn.readLong()).toString();
644             }
645             if (type == MarshallingSupport.INTEGER_TYPE) {
646                 return new Integer JavaDoc(this.dataIn.readInt()).toString();
647             }
648             if (type == MarshallingSupport.SHORT_TYPE) {
649                 return new Short JavaDoc(this.dataIn.readShort()).toString();
650             }
651             if (type == MarshallingSupport.BYTE_TYPE) {
652                 return new Byte JavaDoc(this.dataIn.readByte()).toString();
653             }
654             if (type == MarshallingSupport.FLOAT_TYPE) {
655                 return new Float JavaDoc(this.dataIn.readFloat()).toString();
656             }
657             if (type == MarshallingSupport.DOUBLE_TYPE) {
658                 return new Double JavaDoc(this.dataIn.readDouble()).toString();
659             }
660             if (type == MarshallingSupport.BOOLEAN_TYPE) {
661                 return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
662             }
663             if (type == MarshallingSupport.CHAR_TYPE) {
664                 return new Character JavaDoc(this.dataIn.readChar()).toString();
665             } else {
666                 this.dataIn.reset();
667                 throw new MessageFormatException JavaDoc(" not a String type");
668             }
669         } catch (NumberFormatException JavaDoc mfe) {
670             try {
671                 this.dataIn.reset();
672             } catch (IOException JavaDoc ioe) {
673                 throw JMSExceptionSupport.create(ioe);
674             }
675             throw mfe;
676
677         } catch (EOFException JavaDoc e) {
678             throw JMSExceptionSupport.createMessageEOFException(e);
679         } catch (IOException JavaDoc e) {
680             throw JMSExceptionSupport.createMessageFormatException(e);
681         }
682     }
683
684     /**
685      * Reads a byte array field from the stream message into the specified
686      * <CODE>byte[]</CODE> object (the read buffer). <p/>
687      * <P>
688      * To read the field value, <CODE>readBytes</CODE> should be successively
689      * called until it returns a value less than the length of the read buffer.
690      * The value of the bytes in the buffer following the last byte read is
691      * undefined. <p/>
692      * <P>
693      * If <CODE>readBytes</CODE> returns a value equal to the length of the
694      * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
695      * are no more bytes to be read, this call returns -1. <p/>
696      * <P>
697      * If the byte array field value is null, <CODE>readBytes</CODE> returns
698      * -1. <p/>
699      * <P>
700      * If the byte array field value is empty, <CODE>readBytes</CODE> returns
701      * 0. <p/>
702      * <P>
703      * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
704      * field value has been made, the full value of the field must be read
705      * before it is valid to read the next field. An attempt to read the next
706      * field before that has been done will throw a <CODE>MessageFormatException</CODE>.
707      * <p/>
708      * <P>
709      * To read the byte field value into a new <CODE>byte[]</CODE> object, use
710      * the <CODE>readObject</CODE> method.
711      *
712      * @param value
713      * the buffer into which the data is read
714      * @return the total number of bytes read into the buffer, or -1 if there is
715      * no more data because the end of the byte field has been reached
716      * @throws JMSException
717      * if the JMS provider fails to read the message due to some
718      * internal error.
719      * @throws MessageEOFException
720      * if unexpected end of message stream has been reached.
721      * @throws MessageFormatException
722      * if this type conversion is invalid.
723      * @throws MessageNotReadableException
724      * if the message is in write-only mode.
725      * @see #readObject()
726      */

727
728     public int readBytes(byte[] value) throws JMSException JavaDoc {
729
730         initializeReading();
731         try {
732             if (value == null) {
733                 throw new NullPointerException JavaDoc();
734             }
735
736             if( remainingBytes == -1 ) {
737                 this.dataIn.mark(value.length + 1);
738                 int type = this.dataIn.read();
739                 if (type == -1) {
740                     throw new MessageEOFException JavaDoc("reached end of data");
741                 }
742                 if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
743                     throw new MessageFormatException JavaDoc("Not a byte array");
744                 }
745                 remainingBytes = this.dataIn.readInt();
746             } else if ( remainingBytes == 0 ) {
747                 remainingBytes = -1;
748                 return -1;
749             }
750
751             if (value.length <= remainingBytes) {
752                 // small buffer
753
remainingBytes -= value.length;
754                 this.dataIn.readFully(value);
755                 return value.length;
756             } else {
757                 // big buffer
758
int rc = this.dataIn.read(value, 0, remainingBytes);
759                 remainingBytes=0;
760                 return rc;
761             }
762
763         } catch (EOFException JavaDoc e) {
764             JMSException JavaDoc jmsEx = new MessageEOFException JavaDoc(e.getMessage());
765             jmsEx.setLinkedException(e);
766             throw jmsEx;
767         } catch (IOException JavaDoc e) {
768             JMSException JavaDoc jmsEx = new MessageFormatException JavaDoc(e.getMessage());
769             jmsEx.setLinkedException(e);
770             throw jmsEx;
771         }
772     }
773
774     /**
775      * Reads an object from the stream message. <p/>
776      * <P>
777      * This method can be used to return, in objectified format, an object in
778      * the Java programming language ("Java object") that has been written to
779      * the stream with the equivalent <CODE>writeObject</CODE> method call, or
780      * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
781      * <P>
782      * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>.
783      * <p/>
784      * <P>
785      * An attempt to call <CODE>readObject</CODE> to read a byte field value
786      * into a new <CODE>byte[]</CODE> object before the full value of the byte
787      * field has been read will throw a <CODE>MessageFormatException</CODE>.
788      *
789      * @return a Java object from the stream message, in objectified format (for
790      * example, if the object was written as an <CODE>int</CODE>, an
791      * <CODE>Integer</CODE> is returned)
792      * @throws JMSException
793      * if the JMS provider fails to read the message due to some
794      * internal error.
795      * @throws MessageEOFException
796      * if unexpected end of message stream has been reached.
797      * @throws MessageFormatException
798      * if this type conversion is invalid.
799      * @throws MessageNotReadableException
800      * if the message is in write-only mode.
801      * @see #readBytes(byte[] value)
802      */

803
804     public Object JavaDoc readObject() throws JMSException JavaDoc {
805         initializeReading();
806         try {
807             this.dataIn.mark(65);
808             int type = this.dataIn.read();
809             if (type == -1) {
810                 throw new MessageEOFException JavaDoc("reached end of data");
811             }
812             if (type == MarshallingSupport.NULL) {
813                 return null;
814             }
815             if (type == MarshallingSupport.BIG_STRING_TYPE) {
816                 return MarshallingSupport.readUTF8(dataIn);
817             }
818             if (type == MarshallingSupport.STRING_TYPE) {
819                 return this.dataIn.readUTF();
820             }
821             if (type == MarshallingSupport.LONG_TYPE) {
822                 return new Long JavaDoc(this.dataIn.readLong());
823             }
824             if (type == MarshallingSupport.INTEGER_TYPE) {
825                 return new Integer JavaDoc(this.dataIn.readInt());
826             }
827             if (type == MarshallingSupport.SHORT_TYPE) {
828                 return new Short JavaDoc(this.dataIn.readShort());
829             }
830             if (type == MarshallingSupport.BYTE_TYPE) {
831                 return new Byte JavaDoc(this.dataIn.readByte());
832             }
833             if (type == MarshallingSupport.FLOAT_TYPE) {
834                 return new Float JavaDoc(this.dataIn.readFloat());
835             }
836             if (type == MarshallingSupport.DOUBLE_TYPE) {
837                 return new Double JavaDoc(this.dataIn.readDouble());
838             }
839             if (type == MarshallingSupport.BOOLEAN_TYPE) {
840                 return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
841             }
842             if (type == MarshallingSupport.CHAR_TYPE) {
843                 return new Character JavaDoc(this.dataIn.readChar());
844             }
845             if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
846                 int len = this.dataIn.readInt();
847                 byte[] value = new byte[len];
848                 this.dataIn.readFully(value);
849                 return value;
850             } else {
851                 this.dataIn.reset();
852                 throw new MessageFormatException JavaDoc("unknown type");
853             }
854         } catch (NumberFormatException JavaDoc mfe) {
855             try {
856                 this.dataIn.reset();
857             } catch (IOException JavaDoc ioe) {
858                 throw JMSExceptionSupport.create(ioe);
859             }
860             throw mfe;
861
862         } catch (EOFException JavaDoc e) {
863             JMSException JavaDoc jmsEx = new MessageEOFException JavaDoc(e.getMessage());
864             jmsEx.setLinkedException(e);
865             throw jmsEx;
866         } catch (IOException JavaDoc e) {
867             JMSException JavaDoc jmsEx = new MessageFormatException JavaDoc(e.getMessage());
868             jmsEx.setLinkedException(e);
869             throw jmsEx;
870         }
871     }
872
873     /**
874      * Writes a <code>boolean</code> to the stream message. The value
875      * <code>true</code> is written as the value <code>(byte)1</code>; the
876      * value <code>false</code> is written as the value <code>(byte)0</code>.
877      *
878      * @param value
879      * the <code>boolean</code> value to be written
880      * @throws JMSException
881      * if the JMS provider fails to write the message due to some
882      * internal error.
883      * @throws MessageNotWriteableException
884      * if the message is in read-only mode.
885      */

886
887     public void writeBoolean(boolean value) throws JMSException JavaDoc {
888         initializeWriting();
889         try {
890             MarshallingSupport.marshalBoolean(dataOut, value);
891         } catch (IOException JavaDoc ioe) {
892             throw JMSExceptionSupport.create(ioe);
893         }
894     }
895
896     /**
897      * Writes a <code>byte</code> to the stream message.
898      *
899      * @param value
900      * the <code>byte</code> value to be written
901      * @throws JMSException
902      * if the JMS provider fails to write the message due to some
903      * internal error.
904      * @throws MessageNotWriteableException
905      * if the message is in read-only mode.
906      */

907
908     public void writeByte(byte value) throws JMSException JavaDoc {
909         initializeWriting();
910         try {
911             MarshallingSupport.marshalByte(dataOut, value);
912         } catch (IOException JavaDoc ioe) {
913             throw JMSExceptionSupport.create(ioe);
914         }
915     }
916
917     /**
918      * Writes a <code>short</code> to the stream message.
919      *
920      * @param value
921      * the <code>short</code> value to be written
922      * @throws JMSException
923      * if the JMS provider fails to write the message due to some
924      * internal error.
925      * @throws MessageNotWriteableException
926      * if the message is in read-only mode.
927      */

928
929     public void writeShort(short value) throws JMSException JavaDoc {
930         initializeWriting();
931         try {
932             MarshallingSupport.marshalShort(dataOut, value);
933         } catch (IOException JavaDoc ioe) {
934             throw JMSExceptionSupport.create(ioe);
935         }
936     }
937
938     /**
939      * Writes a <code>char</code> to the stream message.
940      *
941      * @param value
942      * the <code>char</code> value to be written
943      * @throws JMSException
944      * if the JMS provider fails to write the message due to some
945      * internal error.
946      * @throws MessageNotWriteableException
947      * if the message is in read-only mode.
948      */

949
950     public void writeChar(char value) throws JMSException JavaDoc {
951         initializeWriting();
952         try {
953             MarshallingSupport.marshalChar(dataOut, value);
954         } catch (IOException JavaDoc ioe) {
955             throw JMSExceptionSupport.create(ioe);
956         }
957     }
958
959     /**
960      * Writes an <code>int</code> to the stream message.
961      *
962      * @param value
963      * the <code>int</code> value to be written
964      * @throws JMSException
965      * if the JMS provider fails to write the message due to some
966      * internal error.
967      * @throws MessageNotWriteableException
968      * if the message is in read-only mode.
969      */

970
971     public void writeInt(int value) throws JMSException JavaDoc {
972         initializeWriting();
973         try {
974             MarshallingSupport.marshalInt(dataOut, value);
975         } catch (IOException JavaDoc ioe) {
976             throw JMSExceptionSupport.create(ioe);
977         }
978     }
979
980     /**
981      * Writes a <code>long</code> to the stream message.
982      *
983      * @param value
984      * the <code>long</code> value to be written
985      * @throws JMSException
986      * if the JMS provider fails to write the message due to some
987      * internal error.
988      * @throws MessageNotWriteableException
989      * if the message is in read-only mode.
990      */

991
992     public void writeLong(long value) throws JMSException JavaDoc {
993         initializeWriting();
994         try {
995             MarshallingSupport.marshalLong(dataOut, value);
996         } catch (IOException JavaDoc ioe) {
997             throw JMSExceptionSupport.create(ioe);
998         }
999     }
1000
1001    /**
1002     * Writes a <code>float</code> to the stream message.
1003     *
1004     * @param value
1005     * the <code>float</code> value to be written
1006     * @throws JMSException
1007     * if the JMS provider fails to write the message due to some
1008     * internal error.
1009     * @throws MessageNotWriteableException
1010     * if the message is in read-only mode.
1011     */

1012
1013    public void writeFloat(float value) throws JMSException JavaDoc {
1014        initializeWriting();
1015        try {
1016            MarshallingSupport.marshalFloat(dataOut, value);
1017        } catch (IOException JavaDoc ioe) {
1018            throw JMSExceptionSupport.create(ioe);
1019        }
1020    }
1021
1022    /**
1023     * Writes a <code>double</code> to the stream message.
1024     *
1025     * @param value
1026     * the <code>double</code> value to be written
1027     * @throws JMSException
1028     * if the JMS provider fails to write the message due to some
1029     * internal error.
1030     * @throws MessageNotWriteableException
1031     * if the message is in read-only mode.
1032     */

1033
1034    public void writeDouble(double value) throws JMSException JavaDoc {
1035        initializeWriting();
1036        try {
1037            MarshallingSupport.marshalDouble(dataOut, value);
1038        } catch (IOException JavaDoc ioe) {
1039            throw JMSExceptionSupport.create(ioe);
1040        }
1041    }
1042
1043    /**
1044     * Writes a <code>String</code> to the stream message.
1045     *
1046     * @param value
1047     * the <code>String</code> value to be written
1048     * @throws JMSException
1049     * if the JMS provider fails to write the message due to some
1050     * internal error.
1051     * @throws MessageNotWriteableException
1052     * if the message is in read-only mode.
1053     */

1054
1055    public void writeString(String JavaDoc value) throws JMSException JavaDoc {
1056        initializeWriting();
1057        try {
1058            if (value == null) {
1059                MarshallingSupport.marshalNull(dataOut);
1060            } else {
1061                MarshallingSupport.marshalString(dataOut, value);
1062            }
1063        } catch (IOException JavaDoc ioe) {
1064            throw JMSExceptionSupport.create(ioe);
1065        }
1066    }
1067
1068    /**
1069     * Writes a byte array field to the stream message. <p/>
1070     * <P>
1071     * The byte array <code>value</code> is written to the message as a byte
1072     * array field. Consecutively written byte array fields are treated as two
1073     * distinct fields when the fields are read.
1074     *
1075     * @param value
1076     * the byte array value to be written
1077     * @throws JMSException
1078     * if the JMS provider fails to write the message due to some
1079     * internal error.
1080     * @throws MessageNotWriteableException
1081     * if the message is in read-only mode.
1082     */

1083
1084    public void writeBytes(byte[] value) throws JMSException JavaDoc {
1085        writeBytes(value, 0, value.length);
1086    }
1087
1088    /**
1089     * Writes a portion of a byte array as a byte array field to the stream
1090     * message. <p/>
1091     * <P>
1092     * The a portion of the byte array <code>value</code> is written to the
1093     * message as a byte array field. Consecutively written byte array fields
1094     * are treated as two distinct fields when the fields are read.
1095     *
1096     * @param value
1097     * the byte array value to be written
1098     * @param offset
1099     * the initial offset within the byte array
1100     * @param length
1101     * the number of bytes to use
1102     * @throws JMSException
1103     * if the JMS provider fails to write the message due to some
1104     * internal error.
1105     * @throws MessageNotWriteableException
1106     * if the message is in read-only mode.
1107     */

1108
1109    public void writeBytes(byte[] value, int offset, int length) throws JMSException JavaDoc {
1110        initializeWriting();
1111        try {
1112            MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
1113        } catch (IOException JavaDoc ioe) {
1114            throw JMSExceptionSupport.create(ioe);
1115        }
1116    }
1117
1118    /**
1119     * Writes an object to the stream message. <p/>
1120     * <P>
1121     * This method works only for the objectified primitive object types (<code>Integer</code>,
1122     * <code>Double</code>, <code>Long</code>&nbsp;...),
1123     * <code>String</code> objects, and byte arrays.
1124     *
1125     * @param value
1126     * the Java object to be written
1127     * @throws JMSException
1128     * if the JMS provider fails to write the message due to some
1129     * internal error.
1130     * @throws MessageFormatException
1131     * if the object is invalid.
1132     * @throws MessageNotWriteableException
1133     * if the message is in read-only mode.
1134     */

1135
1136    public void writeObject(Object JavaDoc value) throws JMSException JavaDoc {
1137        initializeWriting();
1138        if (value == null) {
1139            try {
1140                MarshallingSupport.marshalNull(dataOut);
1141            } catch (IOException JavaDoc ioe) {
1142                throw JMSExceptionSupport.create(ioe);
1143            }
1144        } else if (value instanceof String JavaDoc) {
1145            writeString(value.toString());
1146        } else if (value instanceof Character JavaDoc) {
1147            writeChar(((Character JavaDoc) value).charValue());
1148        } else if (value instanceof Boolean JavaDoc) {
1149            writeBoolean(((Boolean JavaDoc) value).booleanValue());
1150        } else if (value instanceof Byte JavaDoc) {
1151            writeByte(((Byte JavaDoc) value).byteValue());
1152        } else if (value instanceof Short JavaDoc) {
1153            writeShort(((Short JavaDoc) value).shortValue());
1154        } else if (value instanceof Integer JavaDoc) {
1155            writeInt(((Integer JavaDoc) value).intValue());
1156        } else if (value instanceof Float JavaDoc) {
1157            writeFloat(((Float JavaDoc) value).floatValue());
1158        } else if (value instanceof Double JavaDoc) {
1159            writeDouble(((Double JavaDoc) value).doubleValue());
1160        } else if (value instanceof byte[]) {
1161            writeBytes((byte[]) value);
1162        }
1163    }
1164
1165    /**
1166     * Puts the message body in read-only mode and repositions the stream of
1167     * bytes to the beginning.
1168     *
1169     * @throws JMSException
1170     * if an internal error occurs
1171     */

1172
1173    public void reset() throws JMSException JavaDoc {
1174        storeContent();
1175        this.bytesOut = null;
1176        this.dataIn = null;
1177        this.dataOut = null;
1178        this.remainingBytes=-1;
1179        setReadOnlyBody(true);
1180    }
1181
1182    private void initializeWriting() throws MessageNotWriteableException JavaDoc {
1183        checkReadOnlyBody();
1184        if (this.dataOut == null) {
1185            this.bytesOut = new ByteArrayOutputStream();
1186            OutputStream JavaDoc os = bytesOut;
1187            ActiveMQConnection connection = getConnection();
1188            if (connection!=null && connection.isUseCompression()) {
1189                compressed = true;
1190                os = new DeflaterOutputStream JavaDoc(os);
1191            }
1192            this.dataOut = new DataOutputStream JavaDoc(os);
1193        }
1194    }
1195
1196    protected void checkWriteOnlyBody() throws MessageNotReadableException JavaDoc {
1197        if (!readOnlyBody) {
1198            throw new MessageNotReadableException JavaDoc("Message body is write-only");
1199        }
1200    }
1201
1202    private void initializeReading() throws MessageNotReadableException JavaDoc {
1203        checkWriteOnlyBody();
1204        if (this.dataIn == null) {
1205            ByteSequence data = getContent();
1206            if (data == null)
1207                data = new ByteSequence(new byte[] {}, 0 ,0);
1208            InputStream JavaDoc is = new ByteArrayInputStream(data);
1209            if (isCompressed()) {
1210                is = new InflaterInputStream JavaDoc(is);
1211                is = new BufferedInputStream JavaDoc(is);
1212            }
1213            this.dataIn = new DataInputStream JavaDoc(is);
1214        }
1215    }
1216
1217    public String JavaDoc toString() {
1218        return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut
1219                + ", dataIn = " + dataIn + " }";
1220    }
1221}
1222
Popular Tags