KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > server > MessageReference


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq.server;
23
24 import java.lang.ref.SoftReference JavaDoc;
25
26 import javax.jms.DeliveryMode JavaDoc;
27 import javax.jms.JMSException JavaDoc;
28
29 import org.jboss.logging.Logger;
30 import org.jboss.mq.DurableSubscriptionID;
31 import org.jboss.mq.SpyMessage;
32
33 /**
34  * This class holds a reference to an actual Message. Where it is actually
35  * at may vary. The reference it holds may be a:
36  * <ul>
37  * <li>Hard Reference - The message is consider recently used and should not be paged out
38  * <li>Soft Reference - The message is consider old and CAN be removed from memory by the GC
39  * <li>No Reference - The message was removed from memory by the GC, but we can load it from a file.
40  * </ul>
41  *
42  * @author <a HREF="mailto:hiram.chirino@jboss.org">Hiram Chirino</a>
43  * @author <a HREF="mailto:pra@tim.se">Peter Antman</a>
44  * @version $Revision: 37459 $
45  */

46 public class MessageReference implements Comparable JavaDoc
47 {
48    static Logger log = Logger.getLogger(MessageReference.class);
49
50    /**
51     * The message is not persisted
52     */

53    public static final int NOT_STORED = 1;
54
55    /**
56     * The message is persisted
57     */

58    public static final int STORED = 2;
59
60    /**
61     * It was a persistent message for a joint
62     * cache store/persistent manager.
63     * This states guards against double
64     * removal from the cache while keeping
65     * error checking for incorrect double removal
66     * No message should be at this state for very long
67     */

68    public static final int REMOVED = 3;
69
70    public long referenceId;
71    public SpyMessage hardReference;
72
73    // These fields are copied over from the messae itself..
74
// they are used too often to not have them handy.
75
public byte jmsPriority;
76    public long messageId;
77    public int jmsDeliveryMode;
78    public long messageScheduledDelivery;
79    public long messageExpiration;
80
81    public boolean redelivered;
82    public long redeliveryDelay;
83    public int redeliveryCount;
84
85    // Data used on the server
86
public BasicQueue queue;
87    public MessageCache messageCache;
88    public SoftReference JavaDoc softReference;
89    public DurableSubscriptionID durableSubscriberID;
90    public int stored;
91    transient public Object JavaDoc persistData;
92
93    MessageReference()
94    {
95    }
96
97    //init and reset methods for use by object pool
98
void init(MessageCache messageCache, long referenceId, SpyMessage message, BasicQueue queue, DurableSubscriptionID id) throws JMSException JavaDoc
99    {
100       this.messageCache = messageCache;
101       this.hardReference = message;
102       this.referenceId = referenceId;
103       this.jmsPriority = (byte) message.getJMSPriority();
104       this.messageId = message.header.messageId;
105       this.stored = NOT_STORED;
106
107       this.jmsDeliveryMode = message.header.jmsDeliveryMode;
108       if (message.propertyExists(SpyMessage.PROPERTY_SCHEDULED_DELIVERY))
109          this.messageScheduledDelivery = message.getLongProperty(SpyMessage.PROPERTY_SCHEDULED_DELIVERY);
110       if (message.propertyExists(SpyMessage.PROPERTY_REDELIVERY_DELAY))
111          this.redeliveryDelay = message.getLongProperty(SpyMessage.PROPERTY_REDELIVERY_DELAY);
112       // If redelivery delay is not specified, then use the value specified
113
// in queue configuration.
114
else if (queue.parameters.redeliveryDelay > 0)
115          this.redeliveryDelay = queue.parameters.redeliveryDelay;
116
117       // If redelivery limit is not specified, then use the value specified
118
// in queue configuration.
119
if (queue.parameters.redeliveryLimit > -1 && !message.propertyExists(SpyMessage.PROPERTY_REDELIVERY_LIMIT))
120          message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_LIMIT,
121                      new Integer JavaDoc(queue.parameters.redeliveryLimit));
122
123       this.messageExpiration = message.getJMSExpiration();
124       this.redelivered = message.getJMSRedelivered();
125       if (message.propertyExists(SpyMessage.PROPERTY_REDELIVERY_COUNT))
126          this.redeliveryCount = message.getIntProperty(SpyMessage.PROPERTY_REDELIVERY_COUNT);
127
128       this.durableSubscriberID = id;
129
130       this.queue = queue;
131       this.persistData = null;
132    }
133
134    void reset()
135    {
136       //clear refs so gc can collect unused objects
137
if (softReference != null && softReference.get() != null)
138          messageCache.softRefCacheSize--;
139       this.messageCache = null;
140       this.hardReference = null;
141       this.softReference = null;
142       this.queue = null;
143    }
144
145    public SpyMessage getMessageForDelivery() throws JMSException JavaDoc
146    {
147       SpyMessage message = getMessage();
148       if (queue.parameters.lateClone)
149       {
150          message = message.myClone();
151          message.header.durableSubscriberID = durableSubscriberID;
152          message.header.jmsRedelivered = redelivered;
153          message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer JavaDoc(redeliveryCount));
154       }
155       return message;
156    }
157
158    public SpyMessage getMessage() throws JMSException JavaDoc
159    {
160       SpyMessage result = null;
161
162       synchronized (this)
163       {
164          if (hardReference == null)
165          {
166             makeHard();
167             result = hardReference;
168             messageCache.messageReferenceUsedEvent(this, false);
169          }
170          else
171          {
172             result = hardReference;
173             messageCache.cacheHits++;
174             messageCache.messageReferenceUsedEvent(this, true);
175          }
176          return result;
177       }
178    }
179
180    /**
181     * The message is being redelivered
182     */

183    public void redelivered() throws JMSException JavaDoc
184    {
185       this.redelivered = true;
186
187       if (redeliveryDelay != 0)
188       {
189          log.trace("message has redelivery delay");
190          messageScheduledDelivery = System.currentTimeMillis() + redeliveryDelay;
191       }
192
193       ++redeliveryCount;
194       
195       if (isLateClone() == false)
196       {
197          SpyMessage message = getMessage();
198          message.setJMSRedelivered(redelivered);
199
200          message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer JavaDoc(redeliveryCount));
201       }
202    }
203
204    /**
205     * Returns true if this message reference has expired.
206     */

207    public boolean isExpired()
208    {
209       if (messageExpiration == 0)
210          return false;
211       long ts = System.currentTimeMillis();
212       return messageExpiration < ts;
213    }
214
215    /**
216     * Determines whether the message is persistent in the sense
217     * that it survives a crash
218     */

219    public boolean isPersistent()
220    {
221       return queue instanceof PersistentQueue && jmsDeliveryMode == DeliveryMode.PERSISTENT;
222    }
223
224    /**
225     * Are we entirely in memory?
226     *
227     * @return true when in memory, false otherwise
228     */

229    public boolean inMemory()
230    {
231       return queue.parameters.inMemory;
232    }
233    
234    /**
235     * Determines the persistent for storing the message
236     */

237    public String JavaDoc getPersistentKey()
238    {
239       return queue.getDescription();
240    }
241
242    /**
243     * Are we late cloning messages?
244     */

245    public boolean isLateClone()
246    {
247       return queue.parameters.lateClone;
248    }
249
250    /**
251     * We could optimize caching by keeping the headers but not the body.
252     * The server will uses the headers more often than the body and the
253     * headers take up much message memory than the body
254     *
255     * For now just return the message.
256     */

257    public SpyMessage.Header getHeaders() throws JMSException JavaDoc
258    {
259       return getMessage().header;
260    }
261
262    void clear() throws JMSException JavaDoc
263    {
264       synchronized (this)
265       {
266          if (stored == STORED)
267             messageCache.removeFromStorage(this);
268          stored = MessageReference.REMOVED;
269       }
270    }
271
272    public void invalidate() throws JMSException JavaDoc
273    {
274       synchronized (this)
275       {
276          if (stored == STORED)
277          {
278             if (hardReference == null)
279             {
280                makeHard();
281                messageCache.messageReferenceUsedEvent(this, false);
282             }
283             messageCache.removeFromStorage(this);
284          }
285       }
286    }
287
288    public void removeDelayed() throws JMSException JavaDoc
289    {
290       messageCache.removeDelayed(this);
291    }
292
293    void makeSoft() throws JMSException JavaDoc
294    {
295       boolean trace = log.isTraceEnabled();
296       synchronized (this)
297       {
298          // Attempt to soften a removed message
299
if (stored == REMOVED)
300             throw new JMSException JavaDoc("CACHE ERROR: makeSoft() on a removed message " + this);
301
302          // It is already soft
303
if (softReference != null)
304          {
305             // Sanity check
306
if (stored == NOT_STORED)
307                throw new JMSException JavaDoc("CACHE ERROR: soft reference to unstored message " + this);
308             return;
309          }
310
311          if (stored == NOT_STORED)
312             messageCache.saveToStorage(this, hardReference);
313
314          // HACK: allow the jdbc2 driver to reject saveToStorage for persistent messages
315
// it is just about to persist the message when it gets some cpu
316
if (stored != STORED)
317          {
318             if (trace)
319                log.trace("saveToStorage rejected by cache " + toString());
320             return;
321          }
322
323          if (messageCache.getMakeSoftReferences())
324             softReference = new SoftReference JavaDoc(hardReference, messageCache.referenceQueue);
325
326          // We don't need the hard ref anymore..
327
messageCache.soften(this);
328          hardReference = null;
329       }
330    }
331
332    /**
333     * Called from A PeristenceManager/CacheStore,
334     * to let us know that this message is already stored on disk.
335     */

336    public void setStored(int stored)
337    {
338       this.stored = stored;
339    }
340
341    void makeHard() throws JMSException JavaDoc
342    {
343       synchronized (this)
344       {
345          // Attempt to harden a removed message
346
if (stored == REMOVED)
347             throw new JMSException JavaDoc("CACHE ERROR: makeHard() on a removed message " + this);
348
349          // allready hard
350
if (hardReference != null)
351             return;
352
353          // Get the object via the softref
354
if (softReference != null)
355             hardReference = (SpyMessage) softReference.get();
356
357          // It might have been removed from the cache due to memory constraints
358
if (hardReference == null)
359          {
360             // load it from disk.
361
hardReference = messageCache.loadFromStorage(this);
362             messageCache.cacheMisses++;
363          }
364          else
365          {
366             messageCache.cacheHits++;
367          }
368
369          // Since we have hard ref, we do not need the soft one.
370
if (softReference != null && softReference.get() != null)
371             messageCache.softRefCacheSize--;
372          softReference = null;
373       }
374    }
375
376    public boolean equals(Object JavaDoc o)
377    {
378       try
379       {
380          return referenceId == ((MessageReference) o).referenceId;
381       }
382       catch (Throwable JavaDoc e)
383       {
384          return false;
385       }
386    }
387
388    /**
389     * This method allows message to be order on the server queues
390     * by priority and the order that they came in on.
391     *
392     * @see Comparable#compareTo(Object)
393     */

394    public int compareTo(Object JavaDoc o)
395    {
396       MessageReference sm = (MessageReference) o;
397       if (jmsPriority > sm.jmsPriority)
398       {
399          return -1;
400       }
401       if (jmsPriority < sm.jmsPriority)
402       {
403          return 1;
404       }
405       return (int) (messageId - sm.messageId);
406    }
407
408    /**
409     * For debugging
410     */

411    public String JavaDoc toString()
412    {
413       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc(100);
414       if (messageCache == null)
415          buffer.append(" NOT IN CACHE hashCode=").append(hashCode());
416
417       else
418       {
419          buffer.append(referenceId);
420          buffer.append(" msg=").append(messageId);
421          if (hardReference != null)
422             buffer.append(" hard");
423          if (softReference != null)
424             buffer.append(" soft");
425          switch (stored)
426          {
427             case NOT_STORED :
428                buffer.append(" NOT_STORED");
429                break;
430             case STORED :
431                buffer.append(" STORED");
432                break;
433             case REMOVED :
434                buffer.append(" REMOVED");
435                break;
436          }
437          switch (jmsDeliveryMode)
438          {
439             case DeliveryMode.NON_PERSISTENT :
440                buffer.append(" NON_PERSISTENT");
441                break;
442             case DeliveryMode.PERSISTENT :
443                buffer.append(" PERSISTENT");
444                break;
445          }
446          if (persistData != null)
447             buffer.append(" persistData=").append(persistData);
448          if (queue != null)
449             buffer.append(" queue=").append(queue.getDescription());
450          else
451             buffer.append(" NO_QUEUE");
452          buffer.append(" priority=").append(jmsPriority);
453          buffer.append(" lateClone=").append(isLateClone());
454          buffer.append(" hashCode=").append(hashCode());
455       }
456       return buffer.toString();
457    }
458 }
459
Popular Tags