1 22 package org.jboss.mq.server; 23 24 import java.lang.ref.SoftReference ; 25 26 import javax.jms.DeliveryMode ; 27 import javax.jms.JMSException ; 28 29 import org.jboss.logging.Logger; 30 import org.jboss.mq.DurableSubscriptionID; 31 import org.jboss.mq.SpyMessage; 32 33 46 public class MessageReference implements Comparable 47 { 48 static Logger log = Logger.getLogger(MessageReference.class); 49 50 53 public static final int NOT_STORED = 1; 54 55 58 public static final int STORED = 2; 59 60 68 public static final int REMOVED = 3; 69 70 public long referenceId; 71 public SpyMessage hardReference; 72 73 public byte jmsPriority; 76 public long messageId; 77 public int jmsDeliveryMode; 78 public long messageScheduledDelivery; 79 public long messageExpiration; 80 81 public boolean redelivered; 82 public long redeliveryDelay; 83 public int redeliveryCount; 84 85 public BasicQueue queue; 87 public MessageCache messageCache; 88 public SoftReference softReference; 89 public DurableSubscriptionID durableSubscriberID; 90 public int stored; 91 transient public Object persistData; 92 93 MessageReference() 94 { 95 } 96 97 void init(MessageCache messageCache, long referenceId, SpyMessage message, BasicQueue queue, DurableSubscriptionID id) throws JMSException 99 { 100 this.messageCache = messageCache; 101 this.hardReference = message; 102 this.referenceId = referenceId; 103 this.jmsPriority = (byte) message.getJMSPriority(); 104 this.messageId = message.header.messageId; 105 this.stored = NOT_STORED; 106 107 this.jmsDeliveryMode = message.header.jmsDeliveryMode; 108 if (message.propertyExists(SpyMessage.PROPERTY_SCHEDULED_DELIVERY)) 109 this.messageScheduledDelivery = message.getLongProperty(SpyMessage.PROPERTY_SCHEDULED_DELIVERY); 110 if (message.propertyExists(SpyMessage.PROPERTY_REDELIVERY_DELAY)) 111 this.redeliveryDelay = message.getLongProperty(SpyMessage.PROPERTY_REDELIVERY_DELAY); 112 else if (queue.parameters.redeliveryDelay > 0) 115 this.redeliveryDelay = queue.parameters.redeliveryDelay; 116 117 if (queue.parameters.redeliveryLimit > -1 && !message.propertyExists(SpyMessage.PROPERTY_REDELIVERY_LIMIT)) 120 message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_LIMIT, 121 new Integer (queue.parameters.redeliveryLimit)); 122 123 this.messageExpiration = message.getJMSExpiration(); 124 this.redelivered = message.getJMSRedelivered(); 125 if (message.propertyExists(SpyMessage.PROPERTY_REDELIVERY_COUNT)) 126 this.redeliveryCount = message.getIntProperty(SpyMessage.PROPERTY_REDELIVERY_COUNT); 127 128 this.durableSubscriberID = id; 129 130 this.queue = queue; 131 this.persistData = null; 132 } 133 134 void reset() 135 { 136 if (softReference != null && softReference.get() != null) 138 messageCache.softRefCacheSize--; 139 this.messageCache = null; 140 this.hardReference = null; 141 this.softReference = null; 142 this.queue = null; 143 } 144 145 public SpyMessage getMessageForDelivery() throws JMSException 146 { 147 SpyMessage message = getMessage(); 148 if (queue.parameters.lateClone) 149 { 150 message = message.myClone(); 151 message.header.durableSubscriberID = durableSubscriberID; 152 message.header.jmsRedelivered = redelivered; 153 message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer (redeliveryCount)); 154 } 155 return message; 156 } 157 158 public SpyMessage getMessage() throws JMSException 159 { 160 SpyMessage result = null; 161 162 synchronized (this) 163 { 164 if (hardReference == null) 165 { 166 makeHard(); 167 result = hardReference; 168 messageCache.messageReferenceUsedEvent(this, false); 169 } 170 else 171 { 172 result = hardReference; 173 messageCache.cacheHits++; 174 messageCache.messageReferenceUsedEvent(this, true); 175 } 176 return result; 177 } 178 } 179 180 183 public void redelivered() throws JMSException 184 { 185 this.redelivered = true; 186 187 if (redeliveryDelay != 0) 188 { 189 log.trace("message has redelivery delay"); 190 messageScheduledDelivery = System.currentTimeMillis() + redeliveryDelay; 191 } 192 193 ++redeliveryCount; 194 195 if (isLateClone() == false) 196 { 197 SpyMessage message = getMessage(); 198 message.setJMSRedelivered(redelivered); 199 200 message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer (redeliveryCount)); 201 } 202 } 203 204 207 public boolean isExpired() 208 { 209 if (messageExpiration == 0) 210 return false; 211 long ts = System.currentTimeMillis(); 212 return messageExpiration < ts; 213 } 214 215 219 public boolean isPersistent() 220 { 221 return queue instanceof PersistentQueue && jmsDeliveryMode == DeliveryMode.PERSISTENT; 222 } 223 224 229 public boolean inMemory() 230 { 231 return queue.parameters.inMemory; 232 } 233 234 237 public String getPersistentKey() 238 { 239 return queue.getDescription(); 240 } 241 242 245 public boolean isLateClone() 246 { 247 return queue.parameters.lateClone; 248 } 249 250 257 public SpyMessage.Header getHeaders() throws JMSException 258 { 259 return getMessage().header; 260 } 261 262 void clear() throws JMSException 263 { 264 synchronized (this) 265 { 266 if (stored == STORED) 267 messageCache.removeFromStorage(this); 268 stored = MessageReference.REMOVED; 269 } 270 } 271 272 public void invalidate() throws JMSException 273 { 274 synchronized (this) 275 { 276 if (stored == STORED) 277 { 278 if (hardReference == null) 279 { 280 makeHard(); 281 messageCache.messageReferenceUsedEvent(this, false); 282 } 283 messageCache.removeFromStorage(this); 284 } 285 } 286 } 287 288 public void removeDelayed() throws JMSException 289 { 290 messageCache.removeDelayed(this); 291 } 292 293 void makeSoft() throws JMSException 294 { 295 boolean trace = log.isTraceEnabled(); 296 synchronized (this) 297 { 298 if (stored == REMOVED) 300 throw new JMSException ("CACHE ERROR: makeSoft() on a removed message " + this); 301 302 if (softReference != null) 304 { 305 if (stored == NOT_STORED) 307 throw new JMSException ("CACHE ERROR: soft reference to unstored message " + this); 308 return; 309 } 310 311 if (stored == NOT_STORED) 312 messageCache.saveToStorage(this, hardReference); 313 314 if (stored != STORED) 317 { 318 if (trace) 319 log.trace("saveToStorage rejected by cache " + toString()); 320 return; 321 } 322 323 if (messageCache.getMakeSoftReferences()) 324 softReference = new SoftReference (hardReference, messageCache.referenceQueue); 325 326 messageCache.soften(this); 328 hardReference = null; 329 } 330 } 331 332 336 public void setStored(int stored) 337 { 338 this.stored = stored; 339 } 340 341 void makeHard() throws JMSException 342 { 343 synchronized (this) 344 { 345 if (stored == REMOVED) 347 throw new JMSException ("CACHE ERROR: makeHard() on a removed message " + this); 348 349 if (hardReference != null) 351 return; 352 353 if (softReference != null) 355 hardReference = (SpyMessage) softReference.get(); 356 357 if (hardReference == null) 359 { 360 hardReference = messageCache.loadFromStorage(this); 362 messageCache.cacheMisses++; 363 } 364 else 365 { 366 messageCache.cacheHits++; 367 } 368 369 if (softReference != null && softReference.get() != null) 371 messageCache.softRefCacheSize--; 372 softReference = null; 373 } 374 } 375 376 public boolean equals(Object o) 377 { 378 try 379 { 380 return referenceId == ((MessageReference) o).referenceId; 381 } 382 catch (Throwable e) 383 { 384 return false; 385 } 386 } 387 388 394 public int compareTo(Object o) 395 { 396 MessageReference sm = (MessageReference) o; 397 if (jmsPriority > sm.jmsPriority) 398 { 399 return -1; 400 } 401 if (jmsPriority < sm.jmsPriority) 402 { 403 return 1; 404 } 405 return (int) (messageId - sm.messageId); 406 } 407 408 411 public String toString() 412 { 413 StringBuffer buffer = new StringBuffer (100); 414 if (messageCache == null) 415 buffer.append(" NOT IN CACHE hashCode=").append(hashCode()); 416 417 else 418 { 419 buffer.append(referenceId); 420 buffer.append(" msg=").append(messageId); 421 if (hardReference != null) 422 buffer.append(" hard"); 423 if (softReference != null) 424 buffer.append(" soft"); 425 switch (stored) 426 { 427 case NOT_STORED : 428 buffer.append(" NOT_STORED"); 429 break; 430 case STORED : 431 buffer.append(" STORED"); 432 break; 433 case REMOVED : 434 buffer.append(" REMOVED"); 435 break; 436 } 437 switch (jmsDeliveryMode) 438 { 439 case DeliveryMode.NON_PERSISTENT : 440 buffer.append(" NON_PERSISTENT"); 441 break; 442 case DeliveryMode.PERSISTENT : 443 buffer.append(" PERSISTENT"); 444 break; 445 } 446 if (persistData != null) 447 buffer.append(" persistData=").append(persistData); 448 if (queue != null) 449 buffer.append(" queue=").append(queue.getDescription()); 450 else 451 buffer.append(" NO_QUEUE"); 452 buffer.append(" priority=").append(jmsPriority); 453 buffer.append(" lateClone=").append(isLateClone()); 454 buffer.append(" hashCode=").append(hashCode()); 455 } 456 return buffer.toString(); 457 } 458 } 459 | Popular Tags |