1 package com.ubermq.jms.common.datagram.impl; 2 3 import java.nio.*; 4 import java.util.*; 5 6 import com.ubermq.jms.common.*; 7 import com.ubermq.jms.common.datagram.*; 8 import com.ubermq.kernel.*; 9 10 19 public class MessageDatagram 20 extends AbstractDatagram 21 implements IMessageDatagram 22 { 23 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(MessageDatagram.class); 24 25 private String topicName; 26 27 private long senderId; 29 private int seq; 30 31 private long originalSenderId; 33 private int originalSeq; 34 35 private long various; private long timestamp; 38 private Object body; 40 private HashMap props = null; 42 43 private static final int MAX_PROP = Integer.valueOf( 44 Configurator.getProperty(CommonConfig.DGP_MAXIMUM_PROPERTY_LENGTH, "4096")).intValue(); 45 46 private ByteBuffer lazyProps; 47 private boolean propsChanged = false; 48 49 static final int TTL_SHIFT = 0; 52 static final int TTL_MASK = 0xFF; static final int PRI_SHIFT = 16; 54 static final int PRI_MASK = 0xF; static final int REDELIVERED_SHIFT = 20; 56 static final int REDELIVERED_MASK = 0x1; static final int MSGTYPE_SHIFT = 21; 58 static final int MSGTYPE_MASK = 0x7; static final int DELIVERY_SHIFT = 24; 60 static final int DELIVERY_MASK = 0x7; 62 66 MessageDatagram(String topicName) 67 { 68 super(DatagramFactory.DGRAM_MSG, 0); 69 this.topicName = topicName; 70 lazyProps = ByteBuffer.allocateDirect(MAX_PROP); 71 lazyProps.clear(); 72 lazyProps.flip(); 73 } 74 75 79 public MessageDatagram() 80 { 81 super(DatagramFactory.DGRAM_MSG, 0); 82 lazyProps = ByteBuffer.allocateDirect(MAX_PROP); 83 lazyProps.clear(); 84 lazyProps.flip(); 85 } 86 87 public String getTopicName() {return topicName;} 88 public void setTopicName(String sz) {topicName = sz;} 89 90 96 public MessageId getIncomingMessageId() 97 { 98 return new MessageId(originalSenderId, originalSeq); 99 } 100 101 public MessageId getMessageId() 102 { 103 return new MessageId(senderId, seq); 104 } 105 106 public long getSenderId() {return senderId;} 107 public void setSenderId(long senderId) {this.senderId = senderId;} 108 public int getSequence() {return seq;} 109 public void setSequence(int seq) {this.seq = seq;} 110 111 public void clearProperties() 113 { 114 getProps().clear(); 115 propsChanged = true; 116 } 117 118 public void setStandardProperty(int property, Object value) 119 { 120 switch(property) 121 { 122 case STDPROP_PRIORITY: 123 setFlagBasedProperty(PRI_SHIFT, PRI_MASK, ((Number )value).intValue()); 124 break; 125 case STDPROP_MSGTYPE: 126 setFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK, ((Number )value).intValue()); 127 break; 128 case STDPROP_TTL: 129 setFlagBasedProperty(TTL_SHIFT, TTL_MASK, (int)Math.ceil(((Number )value).intValue() / 100.0)); 130 break; 131 case STDPROP_REDELIVERY: 132 setFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK, ((Boolean )value).booleanValue() ? 1 : 0); 133 break; 134 case STDPROP_TIMESTAMP: 135 timestamp = ((Number )value).longValue(); 136 break; 137 case STDPROP_BODY: 138 body = value; 139 break; 140 case STDPROP_DELIVERYMODE: 141 setFlagBasedProperty(DELIVERY_SHIFT, DELIVERY_MASK, ((Number )value).intValue()); 142 break; 143 default: 144 getProps().put(new Integer (property), value); 145 propsChanged = true; 146 break; 147 } 148 } 149 150 157 public Object getStandardProperty(int property) 158 { 159 switch(property) 160 { 161 case STDPROP_PRIORITY: 162 return new Integer (getFlagBasedProperty(PRI_SHIFT, PRI_MASK)); 163 case STDPROP_MSGTYPE: 164 return new Integer (getFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK)); 165 case STDPROP_TTL: 166 return new Integer (getFlagBasedProperty(TTL_SHIFT, TTL_MASK) * 100); 167 case STDPROP_REDELIVERY: 168 return new Boolean (getFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK) != 0 ? true : false); 169 case STDPROP_TIMESTAMP: 170 return new Long (timestamp); 171 case STDPROP_BODY: 172 return body; 173 case STDPROP_DELIVERYMODE: 174 return new Integer (getFlagBasedProperty(DELIVERY_SHIFT, DELIVERY_MASK)); 175 default: 176 return getProps().get(new Integer (property)); 177 } 178 } 179 180 private void setFlagBasedProperty(int shift, int mask, int value) 181 { 182 various = setFlagBasedProperty(various, shift, mask, value); 183 } 184 185 static long setFlagBasedProperty(long v, int shift, int mask, int value) 186 { 187 v &= ~(mask << shift); 188 v |= (value & mask) << shift; 189 return v; 190 } 191 192 private int getFlagBasedProperty(int shift, int mask) 193 { 194 return getFlagBasedProperty(various, shift, mask); 195 } 196 197 static int getFlagBasedProperty(long v, int shift, int mask) 198 { 199 return (int)((v >> shift) & mask); 200 } 201 202 public void setCustomProperty(String property, Object value) 203 { 204 getProps().put(property, value); 205 propsChanged = true; 206 } 207 208 public Object getCustomProperty(String property) 209 { 210 return getProps().get(property); 211 } 212 213 public Collection getCustomPropertyNames() 214 { 215 List customNames = new LinkedList(); 216 217 Iterator iter = getProps().keySet().iterator(); 218 while (iter.hasNext()) 219 { 220 Object element = iter.next(); 221 if (element instanceof String ) 222 customNames.add(element); 223 } 224 225 return customNames; 226 } 227 228 public void prepareToSend(int deliveryMode, 229 int priority, 230 long ttl) 231 { 232 setStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE, new Integer (deliveryMode)); 233 setStandardProperty(IMessageDatagram.STDPROP_PRIORITY, new Integer (priority)); 234 setStandardProperty(IMessageDatagram.STDPROP_TTL, new Long (ttl)); 235 setStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP, new Long (System.currentTimeMillis())); 236 } 237 238 private synchronized HashMap getProps() 239 { 240 if (props != null) 241 return props; 242 else 243 { 244 try { 245 parseProps(); 246 } catch(NullPointerException npe) 247 { 248 log.fatal(lazyProps.limit() + " " + lazyProps.position(), npe); 249 } 250 return props; 251 } 252 } 253 254 private void parseProps() 255 { 256 props = new HashMap(); 257 parsePropsBuffer(lazyProps, props); 258 lazyProps.rewind(); 259 } 260 261 266 static void parsePropsBuffer(ByteBuffer lazyProps, 267 Map props) 268 { 269 lazyProps.rewind(); 270 while(lazyProps.remaining() > 0) 271 { 272 boolean std = (lazyProps.get() != 0); 273 if (std) 274 { 275 int prop = lazyProps.getInt(); 276 Object value = readTypedData(lazyProps); 277 278 props.put(new Integer (prop), value); 279 } 280 else 281 { 282 String prop = readPascalString(lazyProps); 283 Object value = readTypedData(lazyProps); 284 285 props.put(prop, value); 286 } 287 } 288 } 289 290 private void rebuildProps() 291 { 292 lazyProps.clear(); 293 for(Iterator i = props.keySet().iterator();i.hasNext();) 294 { 295 Object key = i.next(); 296 Object value = props.get(key); 297 298 if (key instanceof Integer ) 299 { 300 lazyProps.put((byte)0x1); 301 lazyProps.putInt( ((Integer )key).intValue() ); 302 writeTypedData(value, lazyProps); 303 } 304 else 305 { 306 lazyProps.put((byte)0x0); 307 writePascalString(key.toString(), lazyProps); 308 writeTypedData(value, lazyProps); 309 } 310 } 311 312 lazyProps.flip(); 313 propsChanged = false; 314 } 315 316 339 public void incoming(java.nio.ByteBuffer bb) 340 throws java.io.IOException 341 { 342 super.incoming(bb); 343 originalSenderId = senderId = bb.getLong(); 344 originalSeq = seq = bb.getInt(); 345 various = bb.getLong(); 346 timestamp = bb.getLong(); 347 int nprops = bb.getInt(); 348 topicName = readPascalString(bb); 349 350 props = null; 352 propsChanged = false; 353 354 ByteBuffer props = bb.slice(); 355 props.limit(nprops); 356 lazyProps.clear(); 357 if (bb.hasRemaining()) lazyProps.put(props); 358 lazyProps.flip(); 359 360 bb.position(bb.position() + nprops); 362 body = readTypedData(bb); 363 } 364 365 368 public void outgoing(java.nio.ByteBuffer bb) 369 { 370 if (propsChanged) 372 rebuildProps(); 373 374 super.outgoing(bb); 376 bb.putLong(senderId); 377 bb.putInt(seq); 378 bb.putLong(various); 379 bb.putLong(timestamp); 380 bb.putInt(lazyProps.limit()); 381 writePascalString(topicName, bb); 382 383 lazyProps.rewind(); 385 if (lazyProps.hasRemaining()) bb.put(lazyProps); 386 387 if (body != null) 389 writeTypedData(body, bb); 390 } 391 392 public String toString() 393 { 394 return super.toString() + 395 "\nMsgID:\t\t" + getMessageId() + 396 "\nSendr:\t\t" + getSenderId() + 397 "\nSeq :\t\t" + getSequence() + 398 "\nTopic:\t\t" + getTopicName() + 399 "\nProps:\t\t" + getProps().toString(); 400 } 401 402 public boolean equals(Object o) 403 { 404 if (o instanceof IMessageDatagram) 405 { 406 return getMessageId().equals(((IMessageDatagram)o).getMessageId()); 407 } 408 else 409 { 410 return false; 411 } 412 } 413 414 public int hashCode() {return getMessageId().hashCode();} 415 } 416 417 418 | Popular Tags |