KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > messages > Message


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2006 - 2007 ScalAgent Distributed Technologies
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): ScalAgent Distributed Technologies
21  * Contributor(s):
22  */

23 package org.objectweb.joram.mom.messages;
24
25 import java.io.Serializable JavaDoc;
26 import java.io.ObjectInputStream JavaDoc;
27 import java.io.ObjectOutputStream JavaDoc;
28 import java.io.IOException JavaDoc;
29 import java.util.Vector JavaDoc;
30
31 import org.objectweb.joram.shared.excepts.*;
32
33 import fr.dyade.aaa.util.Transaction;
34 import fr.dyade.aaa.agent.AgentServer;
35
36 import org.objectweb.joram.shared.JoramTracing;
37 import org.objectweb.util.monolog.api.BasicLevel;
38
39 /**
40  * The <code>Message</code> class actually provides the transport facility
41  * for the data exchanged during MOM operations.
42  * <p>
43  * A message content is always wrapped as a bytes array, it is charaterized
44  * by properties and "header" fields.
45  */

46 public final class Message implements Serializable JavaDoc {
47   /** Arrival position of this message on its queue or proxy. */
48   transient public long order;
49
50   /**
51    * The number of acknowledgements a message still expects from its
52    * subscribers before having been fully consumed by them (field used
53    * by JMS proxies).
54    * Be careful, this field is not saved but set to 0 during message
55    * loading then claculated during the proxy initialisation.
56    */

57   public transient int acksCounter;
58   /**
59    * The number of acknowledgements a message still expects from its
60    * durable subscribers before having been fully consumed by them (field used
61    * by JMS proxies).
62    * Be careful, this field is not saved but set to 0 during message
63    * loading then claculated during the proxy initialisation.
64    */

65   public transient int durableAcksCounter;
66
67   public transient org.objectweb.joram.shared.messages.Message msg;
68
69   /**
70    * Constructs a <code>Message</code> instance.
71    */

72   public Message(org.objectweb.joram.shared.messages.Message msg) {
73     this.msg = msg;
74   }
75
76   /** Sets the message identifier. */
77   public void setIdentifier(String JavaDoc id) {
78     msg.id = id;
79   }
80
81   /** Sets the message persistence mode. */
82   public void setPersistent(boolean persistent) {
83     msg.persistent = persistent;
84   }
85
86   /**
87    * Sets the message priority.
88    *
89    * @param priority Priority value: 0 the lowest, 9 the highest, 4 normal.
90    */

91   public void setPriority(int priority) {
92     if (priority >= 0 && priority <= 9)
93       msg.priority = priority;
94   }
95
96   /**
97    * Sets the message expiration.
98    *
99    * @param expiration The expiration time.
100    */

101   public void setExpiration(long expiration) {
102     if (expiration >= 0)
103       msg.expiration = expiration;
104   }
105
106   /** Sets the message time stamp. */
107   public void setTimestamp(long timestamp) {
108     msg.timestamp = timestamp;
109   }
110
111 // /**
112
// * Sets the message destination.
113
// *
114
// * @param id The destination identifier.
115
// * @param type The type of the destination.
116
// */
117
// public void setDestination(String id, String type) {
118
// msg.setDestination(id, type);
119
// }
120

121   /** Sets the message correlation identifier. */
122   public void setCorrelationId(String JavaDoc correlationId) {
123     msg.correlationId = correlationId;
124   }
125
126 // /**
127
// * Sets the destination to which a reply should be sent.
128
// *
129
// * @param id The destination identifier.
130
// * @param type The destination type.
131
// */
132
// public void setReplyTo(String id, String type) {
133
// msg.setReplyTo(id, type);
134
// }
135

136   /** Returns the message type. */
137   public int getType() {
138     return msg.type;
139   }
140
141   /** Returns the message identifier. */
142   public String JavaDoc getIdentifier() {
143     return msg.id;
144   }
145
146   /** Returns <code>true</code> if the message is persistent. */
147   public boolean getPersistent() {
148     return msg.persistent;
149   }
150
151   /** Returns the message priority. */
152   public int getPriority() {
153     return msg.priority;
154   }
155
156   /** Returns the message expiration time. */
157   public long getExpiration() {
158     return msg.expiration;
159   }
160   
161   /** Returns the message time stamp. */
162   public long getTimestamp() {
163     return msg.timestamp;
164   }
165
166 // /** Returns the message destination identifier. */
167
// public final String getDestinationId() {
168
// return msg.toId;
169
// }
170

171 // /** Returns <code>true</code> if the destination is a queue. */
172
// public final String toType() {
173
// return msg.toType;
174
// }
175

176   /** Returns the message correlation identifier. */
177   public final String JavaDoc getCorrelationId() {
178     return msg.correlationId;
179   }
180
181 // /** Returns the destination id the reply should be sent to. */
182
// public final String getReplyToId() {
183
// return msg.replyToId;
184
// }
185

186 // /** Returns <code>true</code> if the reply to destination is a queue. */
187
// public final String replyToType() {
188
// return msg.replyToType;
189
// }
190

191   /**
192    * Sets a property value.
193    *
194    * @param name The property name.
195    * @param value The property value.
196    *
197    * @exception MessageROException
198    * If the message properties are read-only.
199    * @exception MessageValueException
200    * If the value is not a Java primitive object.
201    * @exception IllegalArgumentException
202    * If the key name is illegal (null or empty string).
203    */

204   public void setObjectProperty(String JavaDoc name, Object JavaDoc value) throws MessageException {
205     if (name == null || name.equals(""))
206       throw new IllegalArgumentException JavaDoc("Invalid property name: " + name);
207
208     if (value instanceof Boolean JavaDoc ||
209         value instanceof Number JavaDoc ||
210         value instanceof String JavaDoc) {
211       msg.setProperty(name, value);
212     } else {
213       throw new MessageValueException("Can't set non primitive Java object"
214                                       + " as a property value.");
215     }
216   }
217
218 // /**
219
// * Sets an object as the body of the message.
220
// * AF: Used to wrap addministration message !!
221
// *
222
// * @exception IOException In case of an error while setting the object.
223
// */
224
// public void setObject(Object object) throws IOException {
225
// msg.type = Message.OBJECT;
226

227 // if (object == null) {
228
// msg.body = null;
229
// } else {
230
// ByteArrayOutputStream baos = new ByteArrayOutputStream();
231
// ObjectOutputStream oos = new ObjectOutputStream(baos);
232
// oos.writeObject(object);
233
// oos.flush();
234
// msg.body = baos.toByteArray();
235
// oos.close();
236
// baos.close();
237
// }
238
// }
239

240 // /**
241
// * Returns the object body of the message.
242
// * AF: Used to wrap addministration message !!
243
// *
244
// * @exception IOException In case of an error while getting the object.
245
// * @exception ClassNotFoundException If the object class is unknown.
246
// */
247
// public Object getObject() throws Exception {
248
// // AF: May be, we should verify that it is an Object message!!
249
// if (msg.body == null) return null;
250

251 // ByteArrayInputStream bais = new ByteArrayInputStream(msg.body);
252
// ObjectInputStream ois = new ObjectInputStream(bais);
253
// Object obj = ois.readObject();
254
// ois.close();
255

256 // return obj;
257
// }
258

259   /**
260    * Returns <code>true</code> if the message is valid.
261    *
262    * @param currentTime The current time to verify the expiration time.
263    */

264   public boolean isValid(long currentTime) {
265     if (msg.expiration == 0)
266       return true;
267
268     return ((msg.expiration - currentTime) > 0);
269   }
270
271 // /** Clones the message. */
272
// AF: No longer needed, just inherit from super class.
273
// public Object clone() {
274
// if (JoramTracing.dbgMessage.isLoggable(BasicLevel.DEBUG))
275
// JoramTracing.dbgMessage.log(BasicLevel.DEBUG,
276
// "Message.clone()");
277
// Message clone = (Message) super.clone();
278
// return clone;
279
// }
280

281   /** Name used to store the message */
282   transient String JavaDoc txname = null;
283
284   public void setTxName(String JavaDoc txname) {
285     this.txname = txname;
286   }
287
288   public String JavaDoc getTxName() {
289     return txname;
290   }
291
292   public static Message load(String JavaDoc txname) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
293     if (JoramTracing.dbgMessage.isLoggable(BasicLevel.DEBUG))
294       JoramTracing.dbgMessage.log(BasicLevel.DEBUG,
295                                   "Message.load:" + txname);
296
297     Message msg = (Message) AgentServer.getTransaction().load(txname);
298     msg.txname = txname;
299
300     return msg;
301   }
302
303   public void save() {
304     if (JoramTracing.dbgMessage.isLoggable(BasicLevel.DEBUG))
305       JoramTracing.dbgMessage.log(BasicLevel.DEBUG,
306                                   "Message.save:" + txname);
307
308     if (! getPersistent()) return;
309     try {
310       AgentServer.getTransaction().save(this, txname);
311     } catch (IOException JavaDoc exc) {
312       JoramTracing.dbgMessage.log(BasicLevel.ERROR,
313                                   "Message named [" + txname + "] could not be saved", exc);
314     }
315   }
316
317   public void delete() {
318     if (JoramTracing.dbgMessage.isLoggable(BasicLevel.DEBUG))
319       JoramTracing.dbgMessage.log(BasicLevel.DEBUG,
320                                   "Message.delete:" + txname);
321
322     if (! getPersistent()) return;
323     AgentServer.getTransaction().delete(txname);
324   }
325
326   /** Loads all persisted messages. */
327   public static Vector JavaDoc loadAll(String JavaDoc msgTxname) {
328     if (JoramTracing.dbgMessage.isLoggable(BasicLevel.DEBUG))
329       JoramTracing.dbgMessage.log(BasicLevel.DEBUG,
330                                   "MessagePersistenceModule.loadAll() " + msgTxname);
331
332     Vector JavaDoc messages = new Vector JavaDoc();
333
334     // Retrieving the names of the persistence message previously saved.
335
Transaction tx = AgentServer.getTransaction();
336     String JavaDoc[] names = tx.getList(msgTxname);
337
338     // Retrieving the messages individually persisted.
339
for (int i = 0; i < names.length; i++) {
340       try {
341         Message msg = (Message) tx.load(names[i]);
342         msg.txname = names[i];
343
344         if (JoramTracing.dbgMessage.isLoggable(BasicLevel.DEBUG))
345           JoramTracing.dbgMessage.log(BasicLevel.DEBUG,
346                                       "loadAll: names[" + i + "] = " + msg);
347         messages.add(msg);
348       } catch (Exception JavaDoc exc) {
349         JoramTracing.dbgMessage.log(BasicLevel.ERROR,
350                                     "Message named [" + names[i] + "] could not be loaded", exc);
351       }
352     }
353     return messages;
354   }
355
356   /** Deletes all persisted objects. */
357   public static void deleteAll(String JavaDoc msgTxname) {
358     if (JoramTracing.dbgMessage.isLoggable(BasicLevel.DEBUG))
359       JoramTracing.dbgMessage.log(BasicLevel.DEBUG,
360                                   "MessagePersistenceModule.deleteAll() " + msgTxname);
361
362     Transaction tx = AgentServer.getTransaction();
363
364     // Retrieving the names of the persistence message previously saved.
365
String JavaDoc[] names = tx.getList(msgTxname);
366
367     // Deleting the message.
368
for (int i = 0; i < names.length; i++) {
369       tx.delete(names[i]);
370     }
371   }
372
373 // /* ***** ***** ***** ***** *****
374
// * Streamable interface
375
// * ***** ***** ***** ***** ***** */
376

377 // /**
378
// * The object implements the writeTo method to write its contents to
379
// * the output stream.
380
// *
381
// * @param os the stream to write the object to
382
// */
383
// public void writeTo(OutputStream os) throws IOException {
384
// StreamUtil.writeTo(type, os);
385
// super.writeTo(os);
386
// }
387

388 // /**
389
// * The object implements the readFrom method to restore its contents from
390
// * the input stream.
391
// *
392
// * @param is the stream to read data from in order to restore the object
393
// */
394
// public void readMessage(InputStream is) throws IOException {
395
// int type = StreamUtil.readIntFrom(is);
396
// super.readFrom(is);
397
// }
398

399   /** ***** ***** ***** ***** ***** ***** ***** *****
400    * Serializable interface
401    * ***** ***** ***** ***** ***** ***** ***** ***** */

402
403   private void writeObject(ObjectOutputStream JavaDoc out) throws IOException JavaDoc {
404     out.writeLong(order);
405 // out.writeBoolean(denied);
406
// out.writeInt(acksCounter);
407
// out.writeInt(durableAcksCounter);
408
msg.writeTo(out);
409   }
410
411   private void readObject(ObjectInputStream JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
412     order = in.readLong();
413 // denied = in.readBoolean();
414
// acksCounter = in.readInt();
415
acksCounter = 0;
416 // durableAcksCounter = in.readInt();
417
durableAcksCounter = 0;
418
419     msg = new org.objectweb.joram.shared.messages.Message();
420     msg.readFrom(in);
421   }
422 }
423
Popular Tags