KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > AbstractDestinationCache


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: AbstractDestinationCache.java,v 1.1 2005/03/18 03:58:38 tanderson Exp $
44  */

45 package org.exolab.jms.messagemgr;
46
47 import java.sql.Connection JavaDoc;
48 import java.util.Collections JavaDoc;
49 import java.util.HashMap JavaDoc;
50 import java.util.Map JavaDoc;
51 import javax.jms.JMSException JavaDoc;
52
53 import org.apache.commons.logging.Log;
54 import org.apache.commons.logging.LogFactory;
55
56 import org.exolab.jms.client.JmsDestination;
57 import org.exolab.jms.lease.LeaseEventListenerIfc;
58 import org.exolab.jms.lease.LeaseManager;
59 import org.exolab.jms.message.MessageImpl;
60 import org.exolab.jms.persistence.PersistenceException;
61 import org.exolab.jms.persistence.DatabaseService;
62 import org.exolab.jms.persistence.SQLHelper;
63
64
65 /**
66  * Abstract implementation of the {@link DestinationCache} interface
67  *
68  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
69  * @version $Revision: 1.1 $ $Date: 2005/03/18 03:58:38 $
70  */

71 public abstract class AbstractDestinationCache implements DestinationCache,
72         LeaseEventListenerIfc {
73
74     /**
75      * The destination to cache messages for.
76      */

77     private final JmsDestination _destination;
78
79     /**
80      * The message cache for this destination.
81      */

82     private DefaultMessageCache _cache = new DefaultMessageCache();
83
84     /**
85      * The set of consumers that have subscribed to this cache, keyed on id
86      */

87     private Map JavaDoc _consumers = Collections.synchronizedMap(new HashMap JavaDoc());
88
89     /**
90      * A map of String -> MessageLease objects, representing the active
91      * leases keyed on JMSMessageID.
92      */

93     private HashMap JavaDoc _leases = new HashMap JavaDoc();
94
95     /**
96      * The logger.
97      */

98     private static final Log _log = LogFactory.getLog(
99             AbstractDestinationCache.class);
100
101
102     /**
103      * Construct a new <code>AbstractDestinationCache</code>, for a temporary
104      * destination.
105      *
106      * @param destination the destination to cache messages for
107      */

108     public AbstractDestinationCache(JmsDestination destination) {
109         if (destination == null) {
110             throw new IllegalArgumentException JavaDoc(
111                     "Argument 'destination' is null");
112         }
113         if (destination.getPersistent()) {
114             throw new IllegalArgumentException JavaDoc(
115                     "Argument 'destination' refers to a persistent destination");
116         }
117         _destination = destination;
118
119         // register this with the message manager
120
MessageMgr.instance().addEventListener(getDestination(), this);
121     }
122
123     /**
124      * Construct a new <code>DestinationCache</code>, for a persistent
125      * destination.
126      *
127      * @param destination the destination to cache messages for
128      * @param connection the database connection to use
129      * @throws JMSException for any JMS error
130      * @throws PersistenceException if persistent messages can't be loaded from
131      * the dataabase
132      */

133     public AbstractDestinationCache(JmsDestination destination,
134                                     Connection JavaDoc connection)
135             throws JMSException JavaDoc, PersistenceException {
136         if (destination == null) {
137             throw new IllegalArgumentException JavaDoc(
138                     "Argument 'destination' is null");
139         }
140         if (!destination.getPersistent()) {
141             throw new IllegalArgumentException JavaDoc(
142                     "Argument 'destination' refers to a non-persistent destination");
143         }
144         _destination = destination;
145
146         init(connection);
147
148         MessageMgr.instance().addEventListener(getDestination(), this);
149     }
150
151     /**
152      * Returns the destination that messages are being cached for.
153      *
154      * @return the destination that messages are being cached for
155      */

156     public JmsDestination getDestination() {
157         return _destination;
158     }
159
160     /**
161      * Register a consumer with this cache.
162      *
163      * @param consumer the message consumer for this destination
164      * @return <code>true</code> if registered; otherwise <code>false</code>
165      */

166     public boolean addConsumer(ConsumerEndpoint consumer) {
167         boolean result = false;
168
169         // check to see that the consumer is actually one for this
170
// destination
171
if (consumer.getDestination().equals(getDestination())) {
172             Long JavaDoc key = new Long JavaDoc(consumer.getId());
173             if (!_consumers.containsKey(key)) {
174                 _consumers.put(key, consumer);
175                 result = true;
176             }
177         }
178
179         return result;
180     }
181
182     /**
183      * Remove the consumer for the list of registered consumers.
184      *
185      * @param consumer the consumer to remove
186      */

187     public void removeConsumer(ConsumerEndpoint consumer) {
188         Long JavaDoc key = new Long JavaDoc(consumer.getId());
189         _consumers.remove(key);
190     }
191
192     /**
193      * Returns the number of messages in the cache.
194      *
195      * @return the number of messages in the cache
196      */

197     public int getMessageCount() {
198         return _cache.getMessageCount();
199     }
200
201     /**
202      * Determines if this cache can be destroyed. This implementation returns
203      * <code>true</code> if there are no active consumers.
204      *
205      * @return <code>true</code> if the cache can be destroyed, otherwise
206      * <code>false</code>
207      */

208     public boolean canDestroy() {
209         return !hasActiveConsumers();
210     }
211
212     /**
213      * Destroy this cache
214      */

215     public synchronized void destroy() {
216         // clear the cache
217
_cache.clear();
218
219         // remove the consumers
220
_consumers.clear();
221
222         // unregister itself from the message manager
223
MessageMgr.instance().removeEventListener(getDestination(), this);
224
225         // remove the leases
226
MessageLease[] leases = null;
227         synchronized (_leases) {
228             leases = (MessageLease[]) _leases.values().toArray(
229                     new MessageLease[0]);
230             _leases.clear();
231         }
232
233         for (int i = 0; i < leases.length; ++i) {
234             MessageLease lease = leases[i];
235             LeaseManager.instance().removeLease(lease);
236         }
237     }
238
239     /**
240      * Invoked when a message lease has expired.
241      *
242      * @param object an instance of {@link MessageRef}
243      */

244     public void onLeaseExpired(Object JavaDoc object) {
245         MessageRef reference = (MessageRef) object;
246         String JavaDoc messageId = ((MessageRef) reference).getMessageId();
247         synchronized (_leases) {
248             _leases.remove(messageId);
249         }
250
251         // determine whether the message is persistent or not and take
252
// the corresponding action
253
try {
254             if (reference.isPersistent()) {
255                 Connection JavaDoc connection = null;
256                 try {
257                     connection = DatabaseService.getConnection();
258                     persistentMessageExpired(reference, connection);
259                     reference.destroy(connection);
260                     connection.commit();
261                 } catch (JMSException JavaDoc exception) {
262                     SQLHelper.rollback(connection);
263                     throw exception;
264                 } catch (Exception JavaDoc exception) {
265                     SQLHelper.rollback(connection);
266                     _log.error("Failed to expire message", exception);
267                     throw new JMSException JavaDoc(exception.getMessage());
268                 } finally {
269                     SQLHelper.close(connection);
270                 }
271
272             } else {
273                 messageExpired(reference);
274                 reference.destroy();
275             }
276         } catch (JMSException JavaDoc exception) {
277             _log.error("Failed to expire message", exception);
278         }
279     }
280
281     public void collectGarbage(boolean aggressive) {
282         if (aggressive) {
283             // clear all persistent messages in the cache
284
_cache.clearPersistentMessages();
285             if (_log.isDebugEnabled()) {
286                 _log.debug("Evicted all persistent messages from cache "
287                            + getDestination().getName());
288             }
289         }
290
291         if (_log.isDebugEnabled()) {
292             _log.debug("DESTCACHE -" + getDestination().getName()
293                        + " Messages: P[" + _cache.getPersistentCount()
294                        + "] T[" + _cache.getTransientCount() + "] Total: ["
295                        + _cache.getMessageCount() + "]");
296         }
297     }
298
299     /**
300      * Initialise the cache from the database.
301      *
302      * @param connection the database connection to use
303      * @throws JMSException for any JMS error
304      * @throws PersistenceException for any persistence error
305      */

306     protected abstract void init(Connection JavaDoc connection) throws JMSException JavaDoc,
307             PersistenceException;
308
309     /**
310      * Add a message reference and its corresponding message to the cache
311      *
312      * @param reference the reference to the message
313      * @param message the message
314      */

315     protected void addMessage(MessageRef reference, MessageImpl message) {
316         _cache.addMessage(reference, message);
317     }
318
319     /**
320      * Returns the message cache
321      *
322      * @return the message cache
323      */

324     protected DefaultMessageCache getMessageCache() {
325         return _cache;
326     }
327
328     /**
329      * Determines if there are any registered consumers
330      *
331      * @return <code>true</code> if there are registered consumers
332      */

333     protected boolean hasActiveConsumers() {
334         return !_consumers.isEmpty();
335     }
336
337     /**
338      * Returns a consumer endpoint, given its id
339      *
340      * @param consumerId the consumer identity
341      * @return the consumer corresponding to <code>id</code>, or
342      * <code>null</code> if none is registered
343      */

344     protected ConsumerEndpoint getConsumerEndpoint(long consumerId) {
345         return (ConsumerEndpoint) _consumers.get(new Long JavaDoc(consumerId));
346     }
347
348     /**
349      * Helper to return the consumers as an array
350      *
351      * @return the consumers of this cache
352      */

353     protected ConsumerEndpoint[] getConsumerArray() {
354         ConsumerEndpoint[] result =
355                 (ConsumerEndpoint[]) _consumers.values().toArray(
356                         new ConsumerEndpoint[0]);
357         return result;
358     }
359
360     /**
361      * Remove an expired non-peristent message, and notify any listeners
362      *
363      * @param reference the reference to the expired message
364      * @throws JMSException for any error
365      */

366     protected void messageExpired(MessageRef reference)
367             throws JMSException JavaDoc {
368         // notify consumers
369
String JavaDoc messageId = reference.getMessageId();
370         ConsumerEndpoint[] consumers = getConsumerArray();
371         for (int i = 0; i < consumers.length; ++i) {
372             consumers[i].messageRemoved(messageId);
373         }
374     }
375
376     /**
377      * Remove an expired persistent message, and notify any listeners.
378      *
379      * @param reference the reference to the expired message
380      * @param connection the database connection to use
381      * @throws JMSException if a listener fails to handle the expiration
382      * @throws PersistenceException if there is a persistence related problem
383      */

384     protected void persistentMessageExpired(MessageRef reference,
385                                             Connection JavaDoc connection)
386             throws JMSException JavaDoc, PersistenceException {
387         // notify consumers
388
String JavaDoc messageId = reference.getMessageId();
389         ConsumerEndpoint[] consumers = getConsumerArray();
390
391         for (int i = 0; i < consumers.length; ++i) {
392             consumers[i].persistentMessageRemoved(messageId, connection);
393         }
394     }
395
396     /**
397      * Check to see if the message has a TTL. If so then set up a lease for it.
398      * An expiry time of 0 means that the message never expires
399      *
400      * @param reference a reference to the message
401      * @param message the message
402      * @throws JMSException if the JMSExpiration property can't be accessed
403      */

404     protected void checkMessageExpiry(MessageRef reference,
405                                       MessageImpl message) throws JMSException JavaDoc {
406         checkMessageExpiry(reference, message.getJMSExpiration());
407     }
408
409     /**
410      * Check to see if the message has a TTL. If so then set up a lease for it.
411      * An expiry time of 0 means that the message never expires
412      *
413      * @param reference a reference to the message
414      * @param expiryTime the time when the message expires
415      */

416     protected void checkMessageExpiry(MessageRef reference,
417                                       long expiryTime) {
418         if (expiryTime != 0) {
419             synchronized (_leases) {
420                 // ensure that a lease for this message does not already exist.
421
if (!_leases.containsKey(reference.getMessageId())) {
422                     long duration = expiryTime - System.currentTimeMillis();
423                     if (duration <= 0) {
424                         duration = 1;
425                     }
426                     MessageLease lease = new MessageLease(reference, duration,
427                                                           this);
428                     LeaseManager.instance().addLease(lease);
429                     _leases.put(reference.getMessageId(), lease);
430                 }
431             }
432         }
433     }
434
435 }
436
Popular Tags