KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > common > datagram > impl > MessageDatagram


1 package com.ubermq.jms.common.datagram.impl;
2
3 import java.nio.*;
4 import java.util.*;
5
6 import com.ubermq.jms.common.*;
7 import com.ubermq.jms.common.datagram.*;
8 import com.ubermq.kernel.*;
9
10 /**
11  * A complete implementation of the IMessageDatagram interface. This implementation
12  * fully supports standard and customized properties, and efficiently stores some
13  * of the standard properties in a compact bit field so parsing the properties
14  * is not required to retrieve them.
15  * <P>
16  * This is a very important class, and subclassing it to remove extraneous functionality
17  * may be a good way to boost performance and transport overhead.
18  */

19 public class MessageDatagram
20     extends AbstractDatagram
21     implements IMessageDatagram
22 {
23     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(MessageDatagram.class);
24     
25     private String JavaDoc topicName;
26
27     // sender ID and sequence
28
private long senderId;
29     private int seq;
30
31     // original sender ID and sequence (retain original)
32
private long originalSenderId;
33     private int originalSeq;
34
35     // some properties go with every message.
36
private long various; // this is a bitfield, defined below.
37
private long timestamp;
38     private Object JavaDoc body; // the body goes here. it does not make a copy!
39

40     // all custom and standard properties not strictly required.
41
private HashMap props = null;
42
43     private static final int MAX_PROP = Integer.valueOf(
44         Configurator.getProperty(CommonConfig.DGP_MAXIMUM_PROPERTY_LENGTH, "4096")).intValue();
45
46     private ByteBuffer lazyProps;
47     private boolean propsChanged = false;
48
49     // masks and bit shifts for some STDPROP's that are stored in the
50
// class-specific section of the datagram's flags.
51
static final int TTL_SHIFT = 0;
52     static final int TTL_MASK = 0xFF; // TTL is stored as n x 100 ms, meaning we can TTL up to 25500 ms
53
static final int PRI_SHIFT = 16;
54     static final int PRI_MASK = 0xF; // PRI is stored in 4 bits, giving us 16 priority levels
55
static final int REDELIVERED_SHIFT = 20;
56     static final int REDELIVERED_MASK = 0x1; // a single bit
57
static final int MSGTYPE_SHIFT = 21;
58     static final int MSGTYPE_MASK = 0x7; // MSGTYPE gets 3 bits, that's 8 values, 0-7. use them well.
59
static final int DELIVERY_SHIFT = 24;
60     static final int DELIVERY_MASK = 0x7; // DELIVERYMODE gets 3 bits, that's 8 values, 0-7. use them well.
61

62     /**
63      * Constructs a message datagram intended for the given topic.
64      * @param topicName a topic
65      */

66     MessageDatagram(String JavaDoc topicName)
67     {
68         super(DatagramFactory.DGRAM_MSG, 0);
69         this.topicName = topicName;
70         lazyProps = ByteBuffer.allocateDirect(MAX_PROP);
71         lazyProps.clear();
72         lazyProps.flip();
73     }
74
75     /**
76      * Constructs a message datagram for reading from a buffer, or
77      * for sending to an undetermined topic.
78      */

79     public MessageDatagram()
80     {
81         super(DatagramFactory.DGRAM_MSG, 0);
82         lazyProps = ByteBuffer.allocateDirect(MAX_PROP);
83         lazyProps.clear();
84         lazyProps.flip();
85     }
86
87     public String JavaDoc getTopicName() {return topicName;}
88     public void setTopicName(String JavaDoc sz) {topicName = sz;}
89
90     /**
91      * Gets the original unique identifier for this message
92      * when it arrived at the subscriber. This is used for acknowledgement
93      * and identification.
94      * @return the original message ID, constant for the life of the datagram.
95      */

96     public MessageId getIncomingMessageId()
97     {
98         return new MessageId(originalSenderId, originalSeq);
99     }
100
101     public MessageId getMessageId()
102     {
103         return new MessageId(senderId, seq);
104     }
105
106     public long getSenderId() {return senderId;}
107     public void setSenderId(long senderId) {this.senderId = senderId;}
108     public int getSequence() {return seq;}
109     public void setSequence(int seq) {this.seq = seq;}
110
111     // get/setting std properties
112
public void clearProperties()
113     {
114         getProps().clear();
115         propsChanged = true;
116     }
117
118     public void setStandardProperty(int property, Object JavaDoc value)
119     {
120         switch(property)
121         {
122             case STDPROP_PRIORITY:
123                 setFlagBasedProperty(PRI_SHIFT, PRI_MASK, ((Number JavaDoc)value).intValue());
124                 break;
125             case STDPROP_MSGTYPE:
126                 setFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK, ((Number JavaDoc)value).intValue());
127                 break;
128             case STDPROP_TTL:
129                 setFlagBasedProperty(TTL_SHIFT, TTL_MASK, (int)Math.ceil(((Number JavaDoc)value).intValue() / 100.0));
130                 break;
131             case STDPROP_REDELIVERY:
132                 setFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK, ((Boolean JavaDoc)value).booleanValue() ? 1 : 0);
133                 break;
134             case STDPROP_TIMESTAMP:
135                 timestamp = ((Number JavaDoc)value).longValue();
136                 break;
137             case STDPROP_BODY:
138                 body = value;
139                 break;
140             case STDPROP_DELIVERYMODE:
141                 setFlagBasedProperty(DELIVERY_SHIFT, DELIVERY_MASK, ((Number JavaDoc)value).intValue());
142                 break;
143             default:
144                 getProps().put(new Integer JavaDoc(property), value);
145                 propsChanged = true;
146                 break;
147         }
148     }
149
150     /**
151      * In this implementation, some of the standard properties are stored
152      * in the datagram flags for speed, so we don't have to deserialize the
153      * properties.
154      * <P>
155      * I dare you to figure out which are stored this way. :)
156      */

157     public Object JavaDoc getStandardProperty(int property)
158     {
159         switch(property)
160         {
161             case STDPROP_PRIORITY:
162                 return new Integer JavaDoc(getFlagBasedProperty(PRI_SHIFT, PRI_MASK));
163             case STDPROP_MSGTYPE:
164                 return new Integer JavaDoc(getFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK));
165             case STDPROP_TTL:
166                 return new Integer JavaDoc(getFlagBasedProperty(TTL_SHIFT, TTL_MASK) * 100);
167             case STDPROP_REDELIVERY:
168                 return new Boolean JavaDoc(getFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK) != 0 ? true : false);
169             case STDPROP_TIMESTAMP:
170                 return new Long JavaDoc(timestamp);
171             case STDPROP_BODY:
172                 return body;
173             case STDPROP_DELIVERYMODE:
174                 return new Integer JavaDoc(getFlagBasedProperty(DELIVERY_SHIFT, DELIVERY_MASK));
175             default:
176                 return getProps().get(new Integer JavaDoc(property));
177         }
178     }
179
180     private void setFlagBasedProperty(int shift, int mask, int value)
181     {
182         various = setFlagBasedProperty(various, shift, mask, value);
183     }
184
185     static long setFlagBasedProperty(long v, int shift, int mask, int value)
186     {
187         v &= ~(mask << shift);
188         v |= (value & mask) << shift;
189         return v;
190     }
191
192     private int getFlagBasedProperty(int shift, int mask)
193     {
194         return getFlagBasedProperty(various, shift, mask);
195     }
196
197     static int getFlagBasedProperty(long v, int shift, int mask)
198     {
199         return (int)((v >> shift) & mask);
200     }
201
202     public void setCustomProperty(String JavaDoc property, Object JavaDoc value)
203     {
204         getProps().put(property, value);
205         propsChanged = true;
206     }
207
208     public Object JavaDoc getCustomProperty(String JavaDoc property)
209     {
210         return getProps().get(property);
211     }
212
213     public Collection getCustomPropertyNames()
214     {
215         List customNames = new LinkedList();
216
217         Iterator iter = getProps().keySet().iterator();
218         while (iter.hasNext())
219         {
220             Object JavaDoc element = iter.next();
221             if (element instanceof String JavaDoc)
222                 customNames.add(element);
223         }
224
225         return customNames;
226     }
227
228     public void prepareToSend(int deliveryMode,
229                               int priority,
230                               long ttl)
231     {
232         setStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE, new Integer JavaDoc(deliveryMode));
233         setStandardProperty(IMessageDatagram.STDPROP_PRIORITY, new Integer JavaDoc(priority));
234         setStandardProperty(IMessageDatagram.STDPROP_TTL, new Long JavaDoc(ttl));
235         setStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP, new Long JavaDoc(System.currentTimeMillis()));
236     }
237
238     private synchronized HashMap getProps()
239     {
240         if (props != null)
241             return props;
242         else
243         {
244             try {
245                 parseProps();
246             } catch(NullPointerException JavaDoc npe)
247             {
248                 log.fatal(lazyProps.limit() + " " + lazyProps.position(), npe);
249             }
250             return props;
251         }
252     }
253
254     private void parseProps()
255     {
256         props = new HashMap();
257         parsePropsBuffer(lazyProps, props);
258         lazyProps.rewind();
259     }
260
261     /**
262      * Parses a properties buffer into a map of properties contained inside.
263      * This is exposed to other implementations to leverage the same type of
264      * functionality.
265      */

266     static void parsePropsBuffer(ByteBuffer lazyProps,
267                                  Map props)
268     {
269         lazyProps.rewind();
270         while(lazyProps.remaining() > 0)
271         {
272             boolean std = (lazyProps.get() != 0);
273             if (std)
274             {
275                 int prop = lazyProps.getInt();
276                 Object JavaDoc value = readTypedData(lazyProps);
277
278                 props.put(new Integer JavaDoc(prop), value);
279             }
280             else
281             {
282                 String JavaDoc prop = readPascalString(lazyProps);
283                 Object JavaDoc value = readTypedData(lazyProps);
284
285                 props.put(prop, value);
286             }
287         }
288     }
289
290     private void rebuildProps()
291     {
292         lazyProps.clear();
293         for(Iterator i = props.keySet().iterator();i.hasNext();)
294         {
295             Object JavaDoc key = i.next();
296             Object JavaDoc value = props.get(key);
297
298             if (key instanceof Integer JavaDoc)
299             {
300                 lazyProps.put((byte)0x1);
301                 lazyProps.putInt( ((Integer JavaDoc)key).intValue() );
302                 writeTypedData(value, lazyProps);
303             }
304             else
305             {
306                 lazyProps.put((byte)0x0);
307                 writePascalString(key.toString(), lazyProps);
308                 writeTypedData(value, lazyProps);
309             }
310         }
311
312         lazyProps.flip();
313         propsChanged = false;
314     }
315
316     /**
317      * the MSG datagram format is:
318      *
319      * <PRE>
320      * ------
321      * senderId LONG
322      * ------
323      * seq INT
324      * ------
325      * various LONG
326      * ------
327      * timestamp LONG
328      * ------
329      * props-length INT
330      * ------
331      * topic-name pascal string
332      * ------
333      * props props-length
334      * ------
335      * body n
336      * ------
337      * </PRE>
338      */

339     public void incoming(java.nio.ByteBuffer JavaDoc bb)
340         throws java.io.IOException JavaDoc
341     {
342         super.incoming(bb);
343         originalSenderId = senderId = bb.getLong();
344         originalSeq = seq = bb.getInt();
345         various = bb.getLong();
346         timestamp = bb.getLong();
347         int nprops = bb.getInt();
348         topicName = readPascalString(bb);
349
350         // read in props array
351
props = null;
352         propsChanged = false;
353
354         ByteBuffer props = bb.slice();
355         props.limit(nprops);
356         lazyProps.clear();
357         if (bb.hasRemaining()) lazyProps.put(props);
358         lazyProps.flip();
359
360         // read in body
361
bb.position(bb.position() + nprops);
362         body = readTypedData(bb);
363     }
364
365     /**
366      * allows the datagram to be output to a channel.
367      */

368     public void outgoing(java.nio.ByteBuffer JavaDoc bb)
369     {
370         // if the props have changed, build them.
371
if (propsChanged)
372             rebuildProps();
373
374         // now output
375
super.outgoing(bb);
376         bb.putLong(senderId);
377         bb.putInt(seq);
378         bb.putLong(various);
379         bb.putLong(timestamp);
380         bb.putInt(lazyProps.limit());
381         writePascalString(topicName, bb);
382
383         // output props
384
lazyProps.rewind();
385         if (lazyProps.hasRemaining()) bb.put(lazyProps);
386
387         // output body
388
if (body != null)
389             writeTypedData(body, bb);
390     }
391
392     public String JavaDoc toString()
393     {
394         return super.toString() +
395             "\nMsgID:\t\t" + getMessageId() +
396             "\nSendr:\t\t" + getSenderId() +
397             "\nSeq :\t\t" + getSequence() +
398             "\nTopic:\t\t" + getTopicName() +
399             "\nProps:\t\t" + getProps().toString();
400     }
401
402     public boolean equals(Object JavaDoc o)
403     {
404         if (o instanceof IMessageDatagram)
405         {
406             return getMessageId().equals(((IMessageDatagram)o).getMessageId());
407         }
408         else
409         {
410             return false;
411         }
412     }
413
414     public int hashCode() {return getMessageId().hashCode();}
415 }
416
417
418
Popular Tags