KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > shared > messages > Message


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2006 - 2007 ScalAgent Distributed Technologies
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): ScalAgent Distributed Technologies
21  * Contributor(s):
22  */

23 package org.objectweb.joram.shared.messages;
24
25 import java.util.*;
26 import java.io.*;
27
28 import org.objectweb.joram.shared.util.Properties;
29 import org.objectweb.joram.shared.stream.Streamable;
30 import org.objectweb.joram.shared.stream.StreamUtil;
31
32 import org.objectweb.joram.shared.JoramTracing;
33 import org.objectweb.util.monolog.api.BasicLevel;
34
35 /**
36  * Implements the <code>Message</code> data structure.
37  */

38 public final class Message implements Cloneable JavaDoc, Serializable, Streamable {
39   /**
40    * Constructs a bright new <code>Message</code>.
41    */

42   public Message() {}
43
44   /**
45    * Table holding header fields that may be required by particular
46    * clients (such as JMS clients).
47    */

48   public transient Properties optionalHeader = null;
49
50   /**
51    * Returns an optional header field value.
52    *
53    * @param name The header field name.
54    */

55   public Object JavaDoc getOptionalHeader(String JavaDoc name) {
56     if (optionalHeader == null)
57       return null;
58
59     return optionalHeader.get(name);
60   }
61
62   /**
63    * Sets an optional header field value.
64    *
65    * @param name The header field name.
66    * @param value The corresponding value.
67    */

68   public void setOptionalHeader(String JavaDoc name, Object JavaDoc value) {
69     if (name == null || name.equals(""))
70       throw new IllegalArgumentException JavaDoc("Invalid header name: " + name);
71
72     if (value == null) return;
73
74     if (optionalHeader == null)
75       optionalHeader = new Properties();
76     optionalHeader.put(name, value);
77   }
78
79   /** <code>true</code> if the body is read-only. */
80   public transient byte[] body = null;
81
82   /** The message properties table. */
83   public transient Properties properties = null;
84
85   /**
86    * Returns a property as an object.
87    *
88    * @param name The property name.
89    */

90   public Object JavaDoc getProperty(String JavaDoc name) {
91     if (properties == null) return null;
92     return properties.get(name);
93   }
94
95   /**
96    * Sets a property value.
97    *
98    * @param name The property name.
99    * @param value The property value.
100    */

101   public void setProperty(String JavaDoc name, Object JavaDoc value) {
102     if (properties == null)
103       properties = new Properties();
104     properties.put(name, value);
105   }
106
107   /** The message identifier. */
108   public transient String JavaDoc id = null;
109   
110   /** <code>true</code> if the message must be persisted. */
111   public transient boolean persistent = true;
112  
113   /** A simple message carries an empty body. */
114   public static final int SIMPLE = 0;
115   /** A text message carries a String body. */
116   public static final int TEXT = 1;
117   /** An object message carries a serializable object. */
118   public static final int OBJECT = 2;
119   /** A map message carries an hashtable. */
120   public static final int MAP = 3;
121   /** A stream message carries a bytes stream. */
122   public static final int STREAM = 4;
123   /** A bytes message carries an array of bytes. */
124   public static final int BYTES = 5;
125
126   /**
127    * The client message type: SIMPLE, TEXT, OBJECT, MAP, STREAM, BYTES.
128    * By default, the message type is SIMPLE.
129    */

130   public transient int type = SIMPLE;
131
132   /**
133    * The message priority from 0 to 9, 9 being the highest.
134    * By default, the priority is 4?
135    */

136   public transient int priority = 4;
137  
138   /** The message expiration time, by default 0 for infinite time-to-live. */
139   public transient long expiration = 0;
140
141   /** The message time stamp. */
142   public transient long timestamp;
143
144   /**
145    * <code>true</code> if the message has been denied at least once by a
146    * consumer.
147    */

148   public transient boolean redelivered = false;
149
150   /** The message destination identifier. */
151   public transient String JavaDoc toId = null;
152   /** The message destination type. */
153   public transient String JavaDoc toType;
154
155   /**
156    * Sets the message destination.
157    *
158    * @param id The destination identifier.
159    * @param type The type of the destination.
160    */

161   public final void setDestination(String JavaDoc id, String JavaDoc type) {
162     toId = id;
163     toType = type;
164   }
165
166   /** Returns the message destination identifier. */
167   public final String JavaDoc getDestinationId() {
168     return toId;
169   }
170
171   /** Returns <code>true</code> if the destination is a queue. */
172   public final String JavaDoc getDestinationType() {
173     return toType;
174   }
175
176   /** The reply to destination identifier. */
177   public transient String JavaDoc replyToId = null;
178   /** <code>true</code> if the "reply to" destination is a queue. */
179   public transient String JavaDoc replyToType;
180
181   /** Returns the destination id the reply should be sent to. */
182   public final String JavaDoc getReplyToId() {
183     return replyToId;
184   }
185
186   /** Returns <code>true</code> if the reply to destination is a queue. */
187   public final String JavaDoc replyToType() {
188     return replyToType;
189   }
190
191   /**
192    * Sets the destination to which a reply should be sent.
193    *
194    * @param id The destination identifier.
195    * @param type The destination type.
196    */

197   public final void setReplyTo(String JavaDoc id, String JavaDoc type) {
198     replyToId = id;
199     replyToType = type;
200   }
201
202   /** The correlation identifier field. */
203   public transient String JavaDoc correlationId = null;
204  
205   /** <code>true</code> if the message target destination is deleted. */
206   public transient boolean deletedDest = false;
207   /** <code>true</code> if the message expired. */
208   public transient boolean expired = false;
209   /** <code>true</code> if the message could not be written on the dest. */
210   public transient boolean notWriteable = false;
211   /** <code>true</code> if the message is considered as undeliverable. */
212   public transient boolean undeliverable = false;
213   /** The number of delivery attempts for this message. */
214   public transient int deliveryCount = 0;
215
216   /**
217    * Sets a String as the body of the message.
218    */

219   public void setText(String JavaDoc text) {
220     if (text == null) {
221       body = null;
222     } else {
223       body = text.getBytes();
224     }
225   }
226
227   /**
228    * Returns the text body of the message.
229    */

230   public String JavaDoc getText() {
231     if (body == null) {
232       return null;
233     } else {
234       return new String JavaDoc(body);
235     }
236   }
237
238   /**
239    * Sets an object as the body of the message.
240    *
241    * @exception IOException In case of an error while setting the object.
242    */

243   public void setObject(Serializable object) throws IOException {
244     type = Message.OBJECT;
245
246     if (object == null) {
247       body = null;
248     } else {
249       ByteArrayOutputStream baos = new ByteArrayOutputStream();
250       ObjectOutputStream oos = new ObjectOutputStream(baos);
251       oos.writeObject(object);
252       oos.flush();
253       body = baos.toByteArray();
254       oos.close();
255       baos.close();
256     }
257   }
258
259   /**
260    * Returns the object body of the message.
261    *
262    * @exception IOException In case of an error while getting the object.
263    * @exception ClassNotFoundException If the object class is unknown.
264    */

265   public Serializable getObject() throws ClassNotFoundException JavaDoc, IOException {
266     // AF: May be, we should verify that it is an Object message!!
267
if (body == null) return null;
268
269     ByteArrayInputStream bais = null;
270     ObjectInputStream ois = null;
271     Object JavaDoc obj = null;
272
273     try {
274      bais = new ByteArrayInputStream(body);
275      ois = new ObjectInputStream(bais);
276      obj = ois.readObject();
277     } finally {
278       try {
279         ois.close();
280       } catch (Exception JavaDoc e) {}
281       try {
282         bais.close();
283       } catch (Exception JavaDoc e) {}
284     }
285
286     return (Serializable) obj;
287   }
288
289   public final String JavaDoc toString() {
290     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
291     toString(strbuf);
292     return strbuf.toString();
293   }
294
295   public void toString(StringBuffer JavaDoc strbuf) {
296     strbuf.append('(').append(super.toString());
297     strbuf.append(",id=").append(id);
298     strbuf.append(",type=").append(type);
299     strbuf.append(",persistent=").append(persistent);
300     strbuf.append(",priority=").append(priority);
301     strbuf.append(",expiration=").append(expiration);
302     strbuf.append(",timestamp=").append(timestamp);
303     strbuf.append(",toId=").append(toId);
304     strbuf.append(",replyToId=").append(replyToId);
305     strbuf.append(",correlationId=").append(correlationId);
306     strbuf.append(')');
307   }
308
309   /** Clones the message. */
310   public Object JavaDoc clone() {
311     try {
312       Message clone = (Message) super.clone();
313       if (body != null) {
314         // AF: May be we can share the body as it should be RO.
315
clone.body = new byte[body.length];
316         System.arraycopy(body, 0, clone.body, 0, body.length);
317       }
318       if (optionalHeader != null) {
319         clone.optionalHeader = (Properties) optionalHeader.clone();
320       }
321       if (properties != null) {
322         clone.properties = (Properties) properties.clone();
323       }
324       return clone;
325     } catch (CloneNotSupportedException JavaDoc cE) {
326       // Should never happened!
327
return null;
328     }
329   }
330
331   public Hashtable soapCode() {
332     Hashtable h = new Hashtable();
333     // AF: TODO
334
return h;
335   }
336
337   public static Message soapDecode(Hashtable h) {
338     // AF: TODO
339
return null;
340   }
341
342 // public static int bodyROFlag = 0x00000001;
343
// public static int propertiesROFlag = 0x00000002;
344
public static int redeliveredFlag = 0x00000004;
345   public static int persistentFlag = 0x00000008;
346   public static int deletedDestFlag = 0x00000010;
347   public static int expiredFlag = 0x00000020;
348   public static int notWriteableFlag = 0x00000040;
349   public static int undeliverableFlag = 0x00000080;
350
351   /* ***** ***** ***** ***** *****
352    * Streamable interface
353    * ***** ***** ***** ***** ***** */

354
355   /**
356    * The object implements the writeTo method to write its contents to
357    * the output stream.
358    *
359    * @param os the stream to write the object to
360    */

361   public void writeTo(OutputStream os) throws IOException {
362     int bool = 0;
363
364     StreamUtil.writeTo(type, os);
365     StreamUtil.writeTo(body, os);
366     StreamUtil.writeTo(optionalHeader, os);
367 // bool = bool | (bodyRO?bodyROFlag:0);
368
// bool = bool | (propertiesRO?propertiesROFlag:0);
369
StreamUtil.writeTo(properties, os);
370     StreamUtil.writeTo(id, os);
371     StreamUtil.writeTo(priority, os);
372     StreamUtil.writeTo(toId, os);
373     StreamUtil.writeTo(toType, os);
374     StreamUtil.writeTo(expiration, os);
375     StreamUtil.writeTo(replyToId, os);
376     StreamUtil.writeTo(replyToType, os);
377     StreamUtil.writeTo(timestamp, os);
378     StreamUtil.writeTo(correlationId, os);
379     StreamUtil.writeTo(deliveryCount, os);
380
381     bool = bool | (redelivered?redeliveredFlag:0);
382     bool = bool | (persistent?persistentFlag:0);
383     bool = bool | (deletedDest?deletedDestFlag:0);
384     bool = bool | (expired?expiredFlag:0);
385     bool = bool | (notWriteable?notWriteableFlag:0);
386     bool = bool | (undeliverable?undeliverableFlag:0);
387     StreamUtil.writeTo(bool, os);
388   }
389
390   /**
391    * The object implements the readFrom method to restore its contents from
392    * the input stream.
393    *
394    * @param is the stream to read data from in order to restore the object
395    */

396   public void readFrom(InputStream is) throws IOException {
397     type = StreamUtil.readIntFrom(is);
398     body = StreamUtil.readByteArrayFrom(is);
399     optionalHeader = StreamUtil.readPropertiesFrom(is);
400     properties = StreamUtil.readPropertiesFrom(is);
401     id = StreamUtil.readStringFrom(is);
402     priority = StreamUtil.readIntFrom(is);
403     toId = StreamUtil.readStringFrom(is);
404     toType = StreamUtil.readStringFrom(is);
405     expiration = StreamUtil.readLongFrom(is);
406     replyToId = StreamUtil.readStringFrom(is);
407     replyToType = StreamUtil.readStringFrom(is);
408     timestamp = StreamUtil.readLongFrom(is);
409     correlationId = StreamUtil.readStringFrom(is);
410     deliveryCount = StreamUtil.readIntFrom(is);
411
412     int bool = StreamUtil.readIntFrom(is);
413 // bodyRO = ((bool & bodyROFlag) != 0);
414
// propertiesRO = ((bool & propertiesROFlag) != 0);
415
redelivered = ((bool & redeliveredFlag) != 0);
416     persistent = ((bool & persistentFlag) != 0);
417     deletedDest = ((bool & deletedDestFlag) != 0);
418     expired = ((bool & expiredFlag) != 0);
419     notWriteable = ((bool & notWriteableFlag) != 0);
420     undeliverable = ((bool & undeliverableFlag) != 0);
421   }
422
423   /**
424    * this method allows to write to the output stream a vector of message.
425    *
426    * @param messages the vector of messages
427    * @param os the stream to write the vector to
428    */

429   public static void writeVectorTo(Vector messages,
430                                    OutputStream os) throws IOException {
431     if (messages == null) {
432       StreamUtil.writeTo(-1, os);
433       JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "writeVectorTo: -1");
434     } else {
435       int size = messages.size();
436       JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "writeVectorTo: " + size);
437       StreamUtil.writeTo(size, os);
438       for (int i=0; i<size; i++) {
439         JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "writeVectorTo: msg#" + i);
440         ((Message) messages.elementAt(i)).writeTo(os);
441       }
442     }
443   }
444
445
446   /**
447    * this method allows to read from the input stream a vector of messages.
448    *
449    * @param is the stream to read data from in order to restore the vector
450    * @return the vector of messages
451    */

452   public static Vector readVectorFrom(InputStream is) throws IOException {
453     int size = StreamUtil.readIntFrom(is);
454     JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "readVectorFrom: " + size);
455     if (size == -1) {
456       return null;
457     } else {
458       Vector messages = new Vector(size);
459       for (int i=0; i<size; i++) {
460         JoramTracing.dbgProxy.log(BasicLevel.DEBUG, "readVectorFrom: msg#" + i);
461         Message msg = new Message();
462         msg.readFrom(is);
463         messages.addElement(msg);
464       }
465       return messages;
466     }
467   }
468
469 // /** ***** ***** ***** ***** ***** ***** ***** *****
470
// * Externalizable interface
471
// * ***** ***** ***** ***** ***** ***** ***** ***** */
472

473 // public void writeExternal(ObjectOutput out) throws IOException {
474
// writeTo(out);
475
// }
476

477 // public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
478
// readFrom(in);
479
// }
480

481   /** ***** ***** ***** ***** ***** ***** ***** *****
482    * Serializable interface
483    * ***** ***** ***** ***** ***** ***** ***** ***** */

484
485   private void writeObject(ObjectOutputStream out) throws IOException {
486     writeTo(out);
487   }
488
489   private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException JavaDoc {
490     readFrom(in);
491   }
492 }
493
Popular Tags