KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > QueueDestinationCache


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: QueueDestinationCache.java,v 1.4 2005/03/24 13:41:23 tanderson Exp $
44  */

45 package org.exolab.jms.messagemgr;
46
47 import java.sql.Connection JavaDoc;
48 import java.util.Collections JavaDoc;
49 import java.util.LinkedList JavaDoc;
50 import java.util.List JavaDoc;
51 import java.util.Iterator JavaDoc;
52 import javax.jms.JMSException JavaDoc;
53
54 import org.apache.commons.logging.Log;
55 import org.apache.commons.logging.LogFactory;
56
57 import org.exolab.jms.client.JmsDestination;
58 import org.exolab.jms.client.JmsQueue;
59 import org.exolab.jms.client.JmsTemporaryDestination;
60 import org.exolab.jms.message.MessageImpl;
61 import org.exolab.jms.persistence.DatabaseService;
62 import org.exolab.jms.persistence.PersistenceException;
63 import org.exolab.jms.selector.Selector;
64 import org.exolab.jms.server.JmsServerConnectionManager;
65
66
67 /**
68  * A {@link DestinationCache} for queues.
69  *
70  * @author <a HREF="mailto:jima@comware.com.au">Jim Alateras</a>
71  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
72  * @version $Revision: 1.4 $ $Date: 2005/03/24 13:41:23 $
73  */

74 public class QueueDestinationCache extends AbstractDestinationCache {
75
76     /**
77      * Maintains a list of {@link QueueConsumerMessageHandle} instances.
78      */

79     private MessageQueue _handles;
80
81     /**
82      * Maintains a list of queue browsers for this cache.
83      */

84     protected List JavaDoc _browsers = Collections.synchronizedList(new LinkedList JavaDoc());
85
86     /**
87      * Index of the last {@link QueueConsumerEndpoint} that received a message
88      * from this destination.
89      * If multiple consumers are attached to this queue then messages will be
90      * sent to each in a round robin fashion
91      */

92     private int _lastConsumerIndex = 0;
93
94     /**
95      * The logger.
96      */

97     private static final Log _log = LogFactory.getLog(
98             QueueDestinationCache.class);
99
100
101     /**
102      * Construct a new <code>QueueDestinationCache</code> for a non-persistent
103      * queue.
104      *
105      * @param queue the queue to cache messages for
106      */

107     public QueueDestinationCache(JmsQueue queue) {
108         super(queue);
109         _handles = new MessageQueue();
110     }
111
112     /**
113      * Construct a new <code>QueueDestinationCache</code> for a persistent
114      * queue.
115      *
116      * @param queue the queue to cache messages for
117      * @param connection the database connection
118      * @throws JMSException for any error
119      * @throws PersistenceException if initialisation fails
120      */

121     public QueueDestinationCache(JmsQueue queue, Connection JavaDoc connection)
122             throws JMSException JavaDoc, PersistenceException {
123         super(queue, connection);
124     }
125
126     /**
127      * A Queue can also hav a queue listener, which simply gets informed of all
128      * messages that arrive at this destination.
129      *
130      * @param listener - queue listener
131      */

132     public void addQueueListener(QueueBrowserEndpoint listener) {
133         // add if not present
134
if (!_browsers.contains(listener)) {
135             _browsers.add(listener);
136         }
137     }
138
139     /**
140      * Remove the queue listener associated with this cache
141      *
142      * @param listener - queue listener to remove
143      */

144     public void removeQueueListener(QueueBrowserEndpoint listener) {
145         // add if not present
146
if (_browsers.contains(listener)) {
147             _browsers.remove(listener);
148         }
149     }
150
151     /**
152      * Invoked when the {@link MessageMgr} receives a non-persistent message.
153      *
154      * @param destination the message's destination
155      * @param message the message
156      * @throws JMSException if the listener fails to handle the message
157      */

158     public void messageAdded(JmsDestination destination, MessageImpl message)
159             throws JMSException JavaDoc {
160         MessageRef reference = new CachedMessageRef(message, false,
161                                                     getMessageCache());
162         MessageHandle shared = new SharedMessageHandle(reference, message);
163         MessageHandle handle = new QueueConsumerMessageHandle(shared);
164
165         // all messages are added to this queue. Receivers will
166
// then pick messages from it as required.
167
addMessage(reference, message, handle);
168
169         // if we have any registered consumers then we need to
170
// send the message to one of them first.
171
QueueConsumerEndpoint endpoint = getEndpointForMessage(message);
172         if (endpoint != null) {
173             endpoint.messageAdded(handle, message);
174         }
175     }
176
177     /**
178      * Invoked when the {@link MessageMgr} receives a persistent message.
179      *
180      * @param connection the database connection
181      * @param destination the message's destination
182      * @param message the message
183      * @throws JMSException if the listener fails to handle the message
184      * @throws PersistenceException if there is a persistence related problem
185      */

186     public void persistentMessageAdded(Connection JavaDoc connection,
187                                        JmsDestination destination,
188                                        MessageImpl message)
189             throws JMSException JavaDoc, PersistenceException {
190         MessageRef reference = new CachedMessageRef(message, true,
191                                                     getMessageCache());
192         MessageHandle shared = new SharedMessageHandle(reference, message);
193         MessageHandle handle = new QueueConsumerMessageHandle(shared);
194         handle.add(connection);
195
196         addMessage(reference, message, handle);
197
198         // if there are any registered consumers, notify one of them that
199
// a message has arrived
200
QueueConsumerEndpoint endpoint = getEndpointForMessage(message);
201         if (endpoint != null) {
202             endpoint.persistentMessageAdded(handle, message, connection);
203         }
204     }
205
206     /**
207      * Returns the first available message matching the supplied message
208      * selector.
209      *
210      * @param selector the message selector to use. May be <code>null</code>
211      * @return handle to the first message, or <code>null</code> if there
212      * are no messages, or none matching <code>selector</code>
213      * @throws JMSException for any error
214      */

215     public synchronized MessageHandle getMessage(Selector selector)
216             throws JMSException JavaDoc {
217         QueueConsumerMessageHandle handle = null;
218         if (selector == null) {
219            // if no selector has been specified then remove and return
220
// the first message
221
handle = (QueueConsumerMessageHandle) _handles.removeFirst();
222         } else {
223             // for non null selector we must find the first matching
224
MessageHandle[] handles = _handles.toArray();
225             for (int i = 0; i < handles.length; ++i) {
226                 MessageHandle hdl = handles[i];
227                 if (selector.selects(hdl.getMessage())) {
228                     handle = (QueueConsumerMessageHandle) hdl;
229                     _handles.remove(handle);
230                     break;
231                 }
232             }
233         }
234         return handle;
235     }
236
237     /**
238      * Playback all the messages in the cache to the specified {@link
239      * QueueBrowserEndpoint}.
240      *
241      * @param browser the queue browser
242      * @throws JMSException for any error
243      */

244     public void playbackMessages(QueueBrowserEndpoint browser)
245             throws JMSException JavaDoc {
246         MessageHandle[] handles = _handles.toArray();
247         for (int i = 0; i < handles.length; ++i) {
248             MessageHandle handle = handles[i];
249             MessageImpl message = handle.getMessage();
250             if (message != null) {
251                 browser.messageAdded(handle, message);
252             }
253         }
254     }
255
256     /**
257      * Return a message handle back to the cache, to recover unsent or
258      * unacknowledged messages.
259      *
260      * @param handle the message handle to return
261      */

262     public void returnMessageHandle(MessageHandle handle) {
263         // add the message to the destination cache
264
_handles.add(handle);
265
266         // if there are registered consumers then check whether
267
// any of them have registered message listeners
268
ConsumerEndpoint[] consumers = getConsumerArray();
269         final int size = consumers.length;
270         if (size > 0) {
271             // roll over the consumer index if it is greater
272
// than the number of registered consumers
273
if ((_lastConsumerIndex + 1) > size) {
274                 _lastConsumerIndex = 0;
275             }
276
277             int index = (_lastConsumerIndex >= size) ? 0 : _lastConsumerIndex;
278
279             do {
280                 QueueConsumerEndpoint endpoint
281                         = (QueueConsumerEndpoint) consumers[index];
282
283                 if (endpoint.hasMessageListener()) {
284                     // if we find an endpoint with a listener then
285
// we should reschedule it.
286
endpoint.schedule();
287                     _lastConsumerIndex = ++index;
288                     break;
289                 } else if (endpoint.isWaitingForMessage()) {
290                     endpoint.notifyMessageAvailable();
291                     _lastConsumerIndex = ++index;
292                     break;
293                 }
294
295                 // advance to the next consumer
296
if (++index >= size) {
297                     index = 0;
298                 }
299             } while (index != _lastConsumerIndex);
300         }
301     }
302
303     /**
304      * Determines if there are any registered consumers.
305      *
306      * @return <code>true</code> if there are registered consumers
307      */

308     public boolean hasActiveConsumers() {
309         boolean active = super.hasActiveConsumers();
310         if (!active && !_browsers.isEmpty()) {
311             active = true;
312         }
313         if (_log.isDebugEnabled()) {
314             _log.debug("hasActiveConsumers()[queue=" + getDestination() + "]="
315                        + active);
316         }
317         return active;
318     }
319
320     /**
321      * Determines if this cache can be destroyed.
322      * A <code>QueueDestinationCache</code> can be destroyed if there are no
323      * active consumers and:
324      * <ul>
325      * <li>the queue is persistent and there are no messages</li>
326      * <li> the queue is temporary and the corresponding connection is closed
327      * </li>
328      * </ul>
329      *
330      * @return <code>true</code> if the cache can be destroyed, otherwise
331      * <code>false</code>
332      */

333     public boolean canDestroy() {
334         boolean destroy = false;
335         if (!hasActiveConsumers()) {
336             JmsDestination queue = getDestination();
337             if (queue.getPersistent() && getMessageCount() == 0) {
338                 destroy = true;
339             } else if (queue.isTemporaryDestination()) {
340                 // check if there is a corresponding connection. If
341
// not, it has been closed, and the cache can be removed
342
long connectionId =
343                         ((JmsTemporaryDestination) queue).getConnectionId();
344                 JmsServerConnectionManager manager =
345                         JmsServerConnectionManager.instance();
346                 if (manager.getConnection(connectionId) == null) {
347                     destroy = true;
348                 }
349             }
350         }
351         return destroy;
352     }
353
354     /**
355      * Destroy this object
356      */

357     public synchronized void destroy() {
358         super.destroy();
359         _browsers.clear();
360     }
361
362     /**
363      * Initialise the cache. This removes all the expired messages, and then
364      * retrieves all unacked messages from the database and stores them
365      * locally.
366      *
367      * @param connection the database connection
368      * @throws JMSException for any JMS error
369      * @throws PersistenceException for any persistence error
370      */

371     protected void init(Connection JavaDoc connection) throws JMSException JavaDoc, PersistenceException {
372         _handles = new MessageQueue();
373
374         JmsDestination queue = getDestination();
375         DatabaseService.getAdapter().removeExpiredMessageHandles(connection,
376                                                                  queue.getName());
377         DefaultMessageCache cache = getMessageCache();
378         List JavaDoc handles = DatabaseService.getAdapter().getMessageHandles(
379                 connection, queue, queue.getName());
380         Iterator JavaDoc iterator = handles.iterator();
381         while (iterator.hasNext()) {
382             PersistentMessageHandle handle = (PersistentMessageHandle) iterator.next();
383             String JavaDoc messageId = handle.getMessageId();
384             MessageRef reference = cache.getMessageRef(messageId);
385             if (reference == null) {
386                 reference = new CachedMessageRef(messageId, true, cache);
387             }
388             cache.addMessageRef(reference);
389             handle.reference(reference);
390             _handles.add(new QueueConsumerMessageHandle(handle));
391
392             checkMessageExpiry(reference, handle.getExpiryTime());
393         }
394     }
395
396     /**
397      * Add a message, and notify any listeners.
398      *
399      * @param reference a reference to the message
400      * @param message the message
401      * @param handle the handle to add
402      * @throws JMSException for any error
403      */

404     protected void addMessage(MessageRef reference, MessageImpl message,
405                               MessageHandle handle) throws JMSException JavaDoc {
406         addMessage(reference, message);
407         _handles.add(handle);
408
409         // notify any queue listeners that a message has arrived
410
notifyQueueListeners(handle, message);
411
412         // create a lease iff one is required
413
checkMessageExpiry(reference, message);
414     }
415
416
417     /**
418      * Notify queue browsers that a message has arrived.
419      *
420      * @param handle a handle to the message
421      * @param message the message
422      * @throws JMSException if a browser fails to handle the message
423      */

424     protected void notifyQueueListeners(MessageHandle handle,
425                                         MessageImpl message)
426             throws JMSException JavaDoc {
427         QueueBrowserEndpoint[] browsers =
428                 (QueueBrowserEndpoint[]) _browsers.toArray(
429                         new QueueBrowserEndpoint[0]);
430
431         for (int index = 0; index < browsers.length; ++index) {
432             QueueBrowserEndpoint browser = browsers[index];
433             browser.messageAdded(handle, message);
434         }
435     }
436
437     /**
438      * Remove an expired non-peristent message, and notify any listeners.
439      *
440      * @param reference the reference to the expired message
441      * @throws JMSException for any error
442      */

443     protected void messageExpired(MessageRef reference) throws JMSException JavaDoc {
444         _handles.remove(reference.getMessageId());
445         // @todo - notify browser
446
super.messageExpired(reference);
447     }
448
449     /**
450      * Remove an expired persistent message, and notify any listeners.
451      *
452      * @param reference the reference to the expired message
453      * @param connection the database connection to use
454      * @throws JMSException if a listener fails to handle the
455      * expiration
456      * @throws PersistenceException if there is a persistence related problem
457      */

458     protected void persistentMessageExpired(MessageRef reference,
459                                             Connection JavaDoc connection)
460             throws JMSException JavaDoc, PersistenceException {
461         _handles.remove(reference.getMessageId());
462         // @todo - notify browsers
463
super.messageExpired(reference);
464     }
465
466     /**
467      * Return the next QueueConsumerEndpoint that can consume the
468      * specified message or null if there is none.
469      *
470      * @param message - the message to consume
471      * @return the consumer who should receive this message, or null
472      */

473     private synchronized QueueConsumerEndpoint getEndpointForMessage(
474             MessageImpl message) {
475         QueueConsumerEndpoint result = null;
476
477         ConsumerEndpoint[] consumers = getConsumerArray();
478         final int size = consumers.length;
479         if (size > 0) {
480             // roll over the consumer index if it is greater
481
// than the number of registered consumers
482
if ((_lastConsumerIndex + 1) > size) {
483                 _lastConsumerIndex = 0;
484             }
485
486             // look over the list of consumers and return the
487
// first endpoint that can process this message
488
int index = _lastConsumerIndex;
489             do {
490                 QueueConsumerEndpoint endpoint =
491                         (QueueConsumerEndpoint) consumers[index];
492                 Selector selector = endpoint.getSelector();
493
494                 // if the endpoint has a message listener registered
495
// or the endpoint is waiting for a message and the
496
// message satisfies the selector then return it to
497
// the client.
498
if (((endpoint.hasMessageListener()) ||
499                         (endpoint.isWaitingForMessage())) &&
500                         ((selector == null) ||
501                         (selector.selects(message)))) {
502                     _lastConsumerIndex = ++index;
503                     result = endpoint;
504                     break;
505                 }
506
507                 // advance to the next consumer
508
if (++index >= size) {
509                     index = 0;
510                 }
511             } while (index != _lastConsumerIndex);
512         }
513
514         return result;
515     }
516
517 }
518
Popular Tags