KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > ConsumerManager


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: ConsumerManager.java,v 1.2 2005/03/18 03:58:39 tanderson Exp $
44  */

45 package org.exolab.jms.messagemgr;
46
47 import java.sql.Connection JavaDoc;
48 import java.util.ArrayList JavaDoc;
49 import java.util.HashMap JavaDoc;
50 import java.util.Iterator JavaDoc;
51 import java.util.LinkedList JavaDoc;
52 import java.util.List JavaDoc;
53 import javax.jms.InvalidDestinationException JavaDoc;
54 import javax.jms.InvalidSelectorException JavaDoc;
55 import javax.jms.JMSException JavaDoc;
56
57 import org.apache.commons.logging.Log;
58 import org.apache.commons.logging.LogFactory;
59
60 import org.exolab.jms.client.JmsDestination;
61 import org.exolab.jms.client.JmsQueue;
62 import org.exolab.jms.client.JmsTopic;
63 import org.exolab.jms.persistence.DatabaseService;
64 import org.exolab.jms.persistence.PersistenceAdapter;
65 import org.exolab.jms.persistence.SQLHelper;
66 import org.exolab.jms.scheduler.Scheduler;
67 import org.exolab.jms.server.JmsServerSession;
68 import org.exolab.jms.service.ServiceException;
69
70
71 /**
72  * The consumer manager is responsible for creating and managing the lifecycle
73  * of consumers. The consumer manager maintains a list of all active consumers.
74  *
75  * @author <a HREF="mailto:jima@comware.com.au">Jim Alateras</a>
76  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
77  * @version $Revision: 1.2 $ $Date: 2005/03/18 03:58:39 $
78  */

79 public class ConsumerManager {
80
81     /**
82      * Maintains a cache of all active endpoints.
83      */

84     private HashMap JavaDoc _endpoints = new HashMap JavaDoc();
85
86     /**
87      * Maintains a list of all unique consumers, durable and non-durable. Each
88      * entry has an associated {@link ConsumerEntry} record. All durable
89      * subscribers are maintained in memory until they are removed from the
90      * system entirely. All non-durable subscribers are maintained in memory
91      * until their endpoint is removed.
92      */

93     private HashMap JavaDoc _consumerCache = new HashMap JavaDoc();
94
95     /**
96      * Maintains a mapping between destinations and consumers. A destination can
97      * have more than one consumer and a consumer can also be registered to more
98      * than one destination
99      */

100     private HashMap JavaDoc _destToConsumerMap = new HashMap JavaDoc();
101
102     /**
103      * Maintains a list of wildcard subscriptions using subscription name and
104      * the JmsTopic.
105      */

106     private HashMap JavaDoc _wildcardConsumers = new HashMap JavaDoc();
107
108     /**
109      * Cache a copy of the scheduler instance
110      */

111     private Scheduler _scheduler = null;
112
113     /**
114      * The consumer Id seed to allocate to new consumers
115      */

116     private long _consumerIdSeed = 0;
117
118     /**
119      * The singleton instance of the consumer manager
120      */

121     private static ConsumerManager _instance = null;
122
123     /**
124      * The logger
125      */

126     private static final Log _log = LogFactory.getLog(ConsumerManager.class);
127
128
129     /**
130      * Create the singleton instance of the consumer manager
131      *
132      * @return the singleton instance
133      * @throws ServiceException if the service cannot be initialised
134      */

135     public static ConsumerManager createInstance() throws ServiceException {
136         _instance = new ConsumerManager();
137         return _instance;
138     }
139
140     /**
141      * Return the singleton instance of the ConsumerManager
142      *
143      * @return ConsumerManager
144      */

145     public static ConsumerManager instance() {
146         return _instance;
147     }
148
149     /**
150      * Construct the <code>ConsumerManager</code>
151      *
152      * @throws ServiceException - if it fails to initialise
153      */

154     private ConsumerManager() throws ServiceException {
155         init();
156     }
157
158     /**
159      * This method creates an actual durable consumer for the specified and
160      * caches it. It does not create and endpoint. To create the endpoint the
161      * client should call createDurableConsumerEndpoint.
162      *
163      * @param topic the topic destination
164      * @param name the consumer name
165      * @throws JMSException if it cannot be created
166      */

167     public synchronized void createDurableConsumer(JmsTopic topic, String JavaDoc name)
168             throws JMSException JavaDoc {
169
170         PersistenceAdapter adapter = DatabaseService.getAdapter();
171
172         Connection JavaDoc connection = null;
173         try {
174
175             connection = DatabaseService.getConnection();
176
177             // ensure that we are trying to create a durable consumer to an
178
// administered destination.
179
if (!adapter.checkDestination(connection, topic.getName())) {
180                 throw new JMSException JavaDoc("Cannot create durable consumer, name="
181                                        + name
182                                        + ", for non-administered topic="
183                                        + topic.getName());
184             }
185
186             if (!adapter.durableConsumerExists(connection, name)) {
187                 adapter.addDurableConsumer(connection, topic.getName(), name);
188             }
189
190             connection.commit();
191             // cache the consumer locally
192
addToConsumerCache(name, topic, true);
193         } catch (JMSException JavaDoc exception) {
194             throw exception;
195         } catch (Exception JavaDoc exception) { // PersistenceException, SQLException
196
SQLHelper.rollback(connection);
197             String JavaDoc msg = "Failed to create durable consumer, name=" + name
198                     + ", for topic=" + topic.getName();
199             _log.error(msg, exception);
200             throw new JMSException JavaDoc(msg + ": " + exception.getMessage());
201         } finally {
202             SQLHelper.close(connection);
203         }
204     }
205
206
207     /**
208      * This method will remove the durable consumer from the database and from
209      * transient memory only if it exists and is inactive. If there is an active
210      * endpoint then it cannot be deleted and an exception will be raise.
211      * <p/>
212      * If the durable consumer does not exist then an exception is also raised.
213      *
214      * @param name - the consumer name
215      * @throws JMSException - if it cannot be removed
216      */

217     public synchronized void removeDurableConsumer(String JavaDoc name)
218             throws JMSException JavaDoc {
219         if (_log.isDebugEnabled()) {
220             _log.debug("removeDurableConsumer(name=" + name + ")");
221         }
222
223         // check to see that the durable consumer exists
224
if (!durableConsumerExists(name)) {
225             throw new JMSException JavaDoc("Durable consumer " + name
226                                    + " is not defined.");
227         }
228         if (isDurableConsumerActive(name)) {
229             throw new JMSException JavaDoc("Cannot remove durable consumer=" + name
230                                    + ": consumer is active");
231         }
232
233         // remove it from the persistent store.
234
Connection JavaDoc connection = null;
235         try {
236             connection = DatabaseService.getConnection();
237
238             DatabaseService.getAdapter().removeDurableConsumer(connection,
239                                                                name);
240             // if it has been successfully removed from persistent store then
241
// clear up the transient references.
242
ConsumerEndpoint endpoint = getConsumerEndpoint(name);
243             if (endpoint != null) {
244                 deleteConsumerEndpoint(endpoint);
245             }
246             removeFromConsumerCache(name);
247             connection.commit();
248         } catch (Exception JavaDoc exception) { // PersistenceException, SQLException
249
SQLHelper.rollback(connection);
250             String JavaDoc msg = "Failed to remove durable consumer, name=" + name;
251             _log.error(msg, exception);
252             throw new JMSException JavaDoc(msg + ":" + exception.getMessage());
253         } finally {
254             SQLHelper.close(connection);
255         }
256     }
257
258     /**
259      * This method will remove all the durable consumers from the database and
260      * from transient memory whether they are active or not.
261      * <p/>
262      * If we have problems removing the durable consumers then throw the
263      * JMSException.
264      *
265      * @param topic the topic to remove consumers for
266      * @throws JMSException if the consumers cannot be removed
267      */

268     public synchronized void removeDurableConsumers(JmsDestination topic)
269             throws JMSException JavaDoc {
270
271         List JavaDoc consumers = (List JavaDoc) _destToConsumerMap.get(topic);
272         if (consumers != null) {
273             Iterator JavaDoc iterator = consumers.iterator();
274             while (iterator.hasNext()) {
275                 ConsumerEntry entry = (ConsumerEntry) iterator.next();
276                 if (entry.isDurable()) {
277                     // remove the actual durable consumer from transient and
278
// secondary memory.
279
removeDurableConsumer(entry.getName());
280                 }
281             }
282         }
283
284         // remove all consumers for the specified destination
285
removeFromConsumerCache(topic);
286     }
287
288     /**
289      * Create a transient consumer for the specified destination
290      *
291      * @param destination the destination to consume messages from
292      * @param selector the message selector. May be <code>null</code>
293      * @param noLocal if true, and the destination is a topic, inhibits the
294      * delivery of messages published by its own connection.
295      * The behavior for <code>noLocal</code> is not specified
296      * if the destination is a queue.
297      * @return a new transient consumer
298      */

299     public synchronized ConsumerEndpoint createConsumerEndpoint(
300             JmsServerSession session, JmsDestination destination,
301             String JavaDoc selector, boolean noLocal)
302             throws JMSException JavaDoc, InvalidSelectorException JavaDoc {
303
304         if (_log.isDebugEnabled()) {
305             _log.debug("createConsumerEndpoint(session=" + session
306                        + ", destination=" + destination
307                        + ", selector=" + selector
308                        + ", noLocal=" + noLocal + ")");
309         }
310
311         ConsumerEndpoint endpoint = null;
312
313         // ensure that the destination is valid before proceeding
314
checkDestination(destination);
315
316         long consumerId = getNextConsumerId();
317
318         // determine what type of consumer endpoint to create based on
319
// the destination it subscribes to.
320

321         if (destination instanceof JmsTopic) {
322             JmsTopic topic = (JmsTopic) destination;
323             endpoint
324                     = new TopicConsumerEndpoint(consumerId, session, topic,
325                                                 selector, noLocal, _scheduler);
326         } else if (destination instanceof JmsQueue) {
327             endpoint = new QueueConsumerEndpoint(consumerId, session,
328                                                  (JmsQueue) destination,
329                                                  selector, _scheduler);
330         }
331
332         if (endpoint != null) {
333             // add it to the list of managed consumers. If it has a persistent
334
// identity, use that as the key, otherwise use its transient
335
// identity.
336
Object JavaDoc key = ConsumerEntry.getConsumerKey(endpoint);
337             _endpoints.put(key, endpoint);
338             addToConsumerCache(key, destination, false);
339         }
340
341         return endpoint;
342     }
343
344     /**
345      * Create a durable consumer with the specified well-known name.
346      *
347      * @param session the owning session
348      * @param topic consumer for this topic
349      * @param name the unique subscriber name
350      * @param selector the message selector. May be <code>null</code>
351      * @param noLocal if true, and the destination is a topic, inhibits the
352      * delivery of messages published by its own connection. The
353      * behavior for <code>noLocal</code> is not specified if the
354      * destination is a queue.
355      * @return the durable consumer endpoint
356      * @throws JMSException if a durable consumer is already active with the
357      * same <code>name</code>, or the <code>topic</code>
358      * doesn't exist, or <code>selector</code> is an
359      * invalid selector
360      */

361     public synchronized DurableConsumerEndpoint createDurableConsumerEndpoint(
362             JmsServerSession session, JmsTopic topic, String JavaDoc name,
363             boolean noLocal,
364             String JavaDoc selector)
365             throws JMSException JavaDoc {
366
367         if (_log.isDebugEnabled()) {
368             _log.debug("createDurableConsumerEndpoint(session=" + session
369                        + ", topic=" + topic + ", name=" + name
370                        + ", selector=" + selector + ", noLocal=" + noLocal
371                        + ")");
372         }
373
374         // check that the durable subscriber is not already registered. If it
375
// is registered then check to see whether the client is still active.
376
DurableConsumerEndpoint endpoint =
377                 (DurableConsumerEndpoint) _endpoints.get(name);
378         if (endpoint != null) {
379             throw new JMSException JavaDoc(name + " is already registered");
380         }
381
382         // check that the destination actually exists, if the topic
383
// is not a wildcard
384
if (!topic.isWildCard() &&
385                 !DestinationManager.instance().destinationExists(topic)) {
386             throw new JMSException JavaDoc("Cannot create a durable consumer for "
387                                    + topic);
388         }
389
390         // if we get this far then we need to create the durable consumer
391
long consumerId = getNextConsumerId();
392         endpoint = new DurableConsumerEndpoint(consumerId, session, topic, name,
393                                                selector, noLocal, _scheduler);
394         _endpoints.put(endpoint.getPersistentId(), endpoint);
395
396         return endpoint;
397     }
398
399     /**
400      * Check whether there are active durable consumers for the specified
401      * destination.
402      *
403      * @param topic the destination to check
404      * @return <code>true</code> if there is at least one active consumer
405      */

406     public synchronized boolean hasActiveDurableConsumers(JmsDestination topic) {
407
408         boolean result = false;
409         List JavaDoc consumers = (List JavaDoc) _destToConsumerMap.get(topic);
410         if (consumers != null) {
411             Iterator JavaDoc iterator = consumers.iterator();
412             while (iterator.hasNext()) {
413                 ConsumerEntry entry = (ConsumerEntry) iterator.next();
414                 if (entry.isDurable()) {
415                     result = true;
416                     break;
417                 }
418             }
419         }
420
421         return result;
422     }
423
424     /**
425      * Create a browser for the specified destination and the selector. A
426      * browser is responsible for passing all messages back to the client that
427      * reside on the queue
428      *
429      * @param session the owning session
430      * @param queue the queue to browse
431      * @param selector the message selector. May be <code>null</code>
432      * @return the queue browser endpoint
433      */

434     public synchronized ConsumerEndpoint createQueueBrowserEndpoint(
435             JmsServerSession session, JmsQueue queue, String JavaDoc selector)
436             throws JMSException JavaDoc {
437
438         // ensure that the destination is valid before proceeding
439
checkDestination(queue);
440
441         long consumerId = getNextConsumerId();
442
443         ConsumerEndpoint consumer = new QueueBrowserEndpoint(consumerId, session,
444                                                              queue, selector,
445                                                              _scheduler);
446         Object JavaDoc key = ConsumerEntry.getConsumerKey(consumer);
447         _endpoints.put(key, consumer);
448         addToConsumerCache(key, queue, false);
449
450         return consumer;
451     }
452
453     /**
454      * Destroy the endpoint associated with the specified durable consumer
455      *
456      * @param name - name of the durable consumer
457      * @throws JMSException - if itt cannot complete the request
458      */

459     public synchronized void deleteDurableConsumerEndpoint(String JavaDoc name)
460             throws JMSException JavaDoc {
461
462         if (_log.isDebugEnabled()) {
463             _log.debug("deleteDurableConsumerEndpoint(name=" + name + ")");
464         }
465
466         ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
467         if (entry != null) {
468             if (entry.isDurable()) {
469                 deleteConsumerEndpoint((ConsumerEndpoint) _endpoints.get(name));
470             } else {
471                 throw new JMSException JavaDoc(name + " is not a durable subscriber");
472             }
473         } else {
474            // ignore since the consumer is not active
475
if (_log.isDebugEnabled()) {
476                 _log.debug("deleteDurableConsumerEndpoint(name=" + name
477                            + "): failed to locate consumer");
478             }
479         }
480     }
481
482     /**
483      * Destroy the specified consumer.
484      *
485      * @param consumer the consumer to destroy
486      */

487     public synchronized void deleteConsumerEndpoint(ConsumerEndpoint consumer) {
488
489         if (_log.isDebugEnabled()) {
490             _log.debug("deleteConsumerEndpoint(consumer=[Id="
491                        + consumer.getId() + ", destination="
492                        + consumer.getDestination() + ")");
493         }
494
495         Object JavaDoc key = ConsumerEntry.getConsumerKey(consumer);
496
497         // if the consumer is currently active then delete it
498
ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(key);
499         if (existing != null) {
500             // remove it from the list of active endpoints
501
// As a fix for bug 759752, only remove the consumer if it
502
// matches the existing one
503
if (consumer.getId() == existing.getId()) {
504                 _endpoints.remove(key);
505             } else {
506                 if (_log.isDebugEnabled()) {
507                     _log.debug("Existing endpoint doesn't match that to " +
508                                "be deleted - retaining");
509                 }
510             }
511
512             // close the endpoint
513
consumer.close();
514
515             // remove it from the consumer cache if and only if it is a
516
// non-durable subscriber
517
if (!(consumer instanceof DurableConsumerEndpoint)) {
518                 removeFromConsumerCache(key);
519             }
520         }
521     }
522
523     /**
524      * Return the consumer with the specified identity.
525      *
526      * @param consumerId the identity of the consumer
527      * @return the associated consumer, or <code>null</code> if none exists
528      */

529     public ConsumerEndpoint getConsumerEndpoint(long consumerId) {
530         return (ConsumerEndpoint) _endpoints.get(new Long JavaDoc(consumerId));
531     }
532
533     /**
534      * Return the consumer with the specified persistent identity.
535      *
536      * @param persistentId the persistent identity of the consumer
537      * @return the associated consumer, or <code>null</code> if none exists
538      */

539     public ConsumerEndpoint getConsumerEndpoint(String JavaDoc persistentId) {
540         return (ConsumerEndpoint) _endpoints.get(persistentId);
541     }
542
543     /**
544      * Check whether there is an active consumer for the specified destination
545      *
546      * @param destination - the destination to check
547      * @return boolean - true if it exists
548      */

549     public boolean hasActiveConsumers(JmsDestination destination)
550             throws JMSException JavaDoc {
551         boolean result = false;
552
553         Object JavaDoc[] endpoints = _endpoints.values().toArray();
554         for (int index = 0; index < endpoints.length; index++) {
555             ConsumerEndpoint endpoint = (ConsumerEndpoint) endpoints[index];
556             JmsDestination endpoint_dest = endpoint.getDestination();
557
558             if ((destination instanceof JmsTopic) &&
559                     (endpoint_dest instanceof JmsTopic) &&
560                     (((JmsTopic) endpoint_dest).isWildCard())) {
561                 if (((JmsTopic) endpoint_dest).match((JmsTopic) destination)) {
562                     result = true;
563                     break;
564                 }
565             } else {
566                 if (endpoint_dest.equals(destination)) {
567                     result = true;
568                     break;
569                 }
570             }
571         }
572
573         return result;
574     }
575
576     /**
577      * Check whether a particular durable consumer is active
578      *
579      * @param name - the consumer name
580      * @return boolean - true if active
581      */

582     public boolean isDurableConsumerActive(String JavaDoc name) {
583         return (_endpoints.get(name) != null);
584     }
585
586     /**
587      * Return the destination assoicated with the specified durable consumer.
588      *
589      * @param name - consumer name
590      * @return JmsDestination - the destination is it registered under or null
591      */

592     public JmsDestination getDestinationForConsumerName(String JavaDoc name) {
593         ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
594         return (entry != null) ? entry.getDestination() : null;
595     }
596
597     /**
598      * Check if the specified durable consumer exists
599      *
600      * @param name - the name of the durable consumer
601      * @return boolean - true if successful
602      */

603     public boolean durableConsumerExists(String JavaDoc name) {
604         boolean result = false;
605         ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
606         if ((entry != null) &&
607                 (entry._durable)) {
608             result = true;
609         }
610
611         return result;
612     }
613
614     /**
615      * This method will check that the name-destination pair actually are valid
616      * and exist as a DurableConsumer entity
617      *
618      * @param topic - the name of the topic
619      * @param name - the name of the durable consumer
620      * @return boolean - true if valid and false otherwise
621      */

622     public boolean validSubscription(String JavaDoc topic, String JavaDoc name) {
623
624         boolean result = false;
625         ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
626
627         if ((entry != null) &&
628                 (entry._destination != null) &&
629                 (entry._destination.getName().equals(topic))) {
630             result = true;
631         }
632
633         return result;
634     }
635
636     /**
637      * Returns inactive subscription names for a persistent topic
638      *
639      * @param topic the topic
640      * @return a list of subscription names, as <code>String</code>s
641      */

642     public synchronized List JavaDoc getInactiveSubscriptions(JmsTopic topic) {
643         List JavaDoc result = new ArrayList JavaDoc();
644         List JavaDoc consumers = (List JavaDoc) _destToConsumerMap.get(topic);
645         if (consumers != null) {
646             Iterator JavaDoc iterator = consumers.iterator();
647             while (iterator.hasNext()) {
648                 ConsumerEntry entry = (ConsumerEntry) iterator.next();
649                 if (entry.isDurable()
650                         && !_endpoints.containsKey(entry.getKey())) {
651                     result.add(entry.getName());
652                 }
653             }
654         }
655         return result;
656     }
657
658     /**
659      * Destroy this manager. This is brutal and final
660      */

661     public synchronized void destroy() {
662
663         // clean up all the destinations
664
Object JavaDoc[] endpoints = _endpoints.values().toArray();
665         for (int index = 0; index < endpoints.length; index++) {
666             deleteConsumerEndpoint((ConsumerEndpoint) endpoints[index]);
667         }
668         _endpoints.clear();
669
670         // remove cache data structures
671
_consumerCache.clear();
672         _consumerCache = null;
673         _destToConsumerMap.clear();
674         _destToConsumerMap = null;
675         _wildcardConsumers.clear();
676         _wildcardConsumers = null;
677
678         // reset the singleton
679
_instance = null;
680     }
681
682     /**
683      * Return a list of durable subscribers for the specified destination
684      *
685      * @return Vector - a vector of strings, which denote the name
686      */

687     public synchronized List JavaDoc getDurableConsumersForDest(JmsTopic dest) {
688         List JavaDoc names = new ArrayList JavaDoc();
689
690         List JavaDoc consumers = (List JavaDoc) _destToConsumerMap.get(dest);
691         if (consumers != null) {
692             Iterator JavaDoc iterator = consumers.iterator();
693             while (iterator.hasNext()) {
694                 ConsumerEntry entry = (ConsumerEntry) iterator.next();
695                 if (entry.isDurable()) {
696                     names.add(entry.getName());
697                 }
698             }
699         }
700
701         // if the destination is a topic and part is also a wildcard then
702
// check the wildcardConsumers for additional consumers
703
Iterator JavaDoc wildconsumers = _wildcardConsumers.keySet().iterator();
704         while (wildconsumers.hasNext()) {
705             ConsumerEntry entry = (ConsumerEntry) wildconsumers.next();
706             JmsDestination adest = entry.getDestination();
707             if (entry.isDurable() && adest instanceof JmsTopic &&
708                     ((JmsTopic) adest).match((JmsTopic) dest)) {
709                 names.add(entry.getName());
710             }
711         }
712
713         return names;
714     }
715
716     /**
717      * Return a list of {@link ConsumerEndpoint} objects attached to the
718      * specified destination
719      *
720      * @param dest the destination to query
721      * @return list of endpoints
722      */

723     public synchronized List JavaDoc getEndpointsForDest(JmsDestination dest) {
724         LinkedList JavaDoc endpoints = new LinkedList JavaDoc();
725         Iterator JavaDoc iter = _endpoints.values().iterator();
726
727         while (iter.hasNext()) {
728             ConsumerEndpoint endpoint = (ConsumerEndpoint) iter.next();
729             if (dest.equals(endpoint.getDestination())) {
730                 endpoints.add(endpoint);
731             }
732         }
733
734         return endpoints;
735     }
736
737     /**
738      * Add the specified consumer to the cache.
739      *
740      * @param key a key to identify the consumer
741      * @param dest the destination it is subscribed to. It can be a wildcard
742      * @param durable indicates whether it is a durable subscription
743      */

744     private synchronized void addToConsumerCache(Object JavaDoc key, JmsDestination dest,
745                                                  boolean durable) {
746         if (_log.isDebugEnabled()) {
747             _log.debug("addToConsumerCache(key=" + key + ", dest=" + dest
748                        + ", durable=" + durable + ")");
749         }
750
751         if (!_consumerCache.containsKey(key)) {
752             ConsumerEntry entry = new ConsumerEntry(key, dest, durable);
753             _consumerCache.put(key, entry);
754
755             // if the specified destination is a JmsTopic and also a wildcard
756
// then we need to add it to all matching desitnations
757
if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) {
758                 // store wild card consumers in a separate array.
759
_wildcardConsumers.put(entry, dest);
760             } else {
761                 // we also need to add the reverse mapping
762
List JavaDoc consumers = (List JavaDoc) _destToConsumerMap.get(dest);
763                 if (consumers == null) {
764                     consumers = new ArrayList JavaDoc();
765                     _destToConsumerMap.put(dest, consumers);
766                 }
767
768                 // add the mapping
769
consumers.add(entry);
770             }
771         }
772     }
773
774     /**
775      * Remove the specified consumer from the cache
776      *
777      * @param key the consumer key
778      */

779     private synchronized void removeFromConsumerCache(Object JavaDoc key) {
780         if (_log.isDebugEnabled()) {
781             _log.debug("removeFromConsumerCache(key=" + key + ")");
782         }
783
784         ConsumerEntry entry = (ConsumerEntry) _consumerCache.remove(key);
785         if (entry != null) {
786             JmsDestination dest = entry.getDestination();
787
788             if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) {
789                 // remove it from the wildcard cache.
790
_wildcardConsumers.remove(entry);
791             } else {
792                 // remove it from the specified destination
793
List JavaDoc consumers = (List JavaDoc) _destToConsumerMap.get(dest);
794                 if (consumers != null) {
795                     consumers.remove(entry);
796
797                     // if consumers is of size 0 then remove it
798
if (consumers.isEmpty()) {
799                         _destToConsumerMap.remove(dest);
800                     }
801                 }
802             }
803         } else {
804             if (_log.isDebugEnabled()) {
805                 _log.debug("removeFromConsumerCache(key=" + key +
806                            "): consumer not found");
807             }
808         }
809     }
810
811     /**
812      * Remove all the consumers for the specified destination from the cache.
813      *
814      * @param destination - destination to remove.
815      */

816     private synchronized void removeFromConsumerCache(
817             JmsDestination destination) {
818         if (_destToConsumerMap.containsKey(destination)) {
819             _destToConsumerMap.remove(destination);
820         }
821     }
822
823     /**
824      * Initialises the consumer manager
825      *
826      * @throws ServiceException if the manager can't be initialised
827      */

828     private void init() throws ServiceException {
829         _scheduler = Scheduler.instance();
830
831         Connection JavaDoc connection = null;
832         try {
833             connection = DatabaseService.getConnection();
834
835             PersistenceAdapter adapter = DatabaseService.getAdapter();
836             connection.commit();
837
838             // return a list of JmsDestination objects.
839
HashMap JavaDoc map = adapter.getAllDurableConsumers(connection);
840             Iterator JavaDoc iter = map.keySet().iterator();
841
842             // Create an endpoint for each durable consumer
843
while (iter.hasNext()) {
844                 // for each destination, create the destination cache
845
String JavaDoc consumer = (String JavaDoc) iter.next();
846                 String JavaDoc deststr = (String JavaDoc) map.get(consumer);
847
848                 JmsDestination dest =
849                         DestinationManager.instance().getDestination(deststr);
850                 if (dest == null) {
851                     // this maybe a wildcard subscription
852
dest = new JmsTopic(deststr);
853                     if (!((JmsTopic) dest).isWildCard()) {
854                         dest = null;
855                     }
856                 }
857
858                 if (consumer != null && dest != null &&
859                         dest instanceof JmsTopic) {
860                     // cache the consumer-destination mapping in memory.
861
addToConsumerCache(consumer, dest, true);
862                 } else {
863                     // what should we do about this stage
864
_log.error("Failure in ConsumerManager.init : " + consumer +
865                                ":" + dest);
866                 }
867             }
868         } catch (ServiceException exception) {
869             SQLHelper.rollback(connection);
870             throw exception;
871         } catch (Exception JavaDoc exception) {
872             SQLHelper.rollback(connection);
873             throw new ServiceException("Failed to initialise ConsumerManager",
874                                        exception);
875         } finally {
876             SQLHelper.close(connection);
877         }
878     }
879
880     /**
881      * Returns the next seed value to be allocated to a new consumer
882      *
883      * @return a unique identifier for a consumer
884      */

885     private synchronized long getNextConsumerId() {
886         return ++_consumerIdSeed;
887     }
888
889     /**
890      * Verifies that a destination is valid.
891      * <p/>
892      * A destination is valid if it is persistent and is registered with {@link
893      * DestinationManager}, or non-persistent. If it is non-persistent, it will
894      * be registered
895      *
896      * @param destination the destination to check
897      * @throws InvalidDestinationException if the destination is invalid
898      */

899     private void checkDestination(JmsDestination destination)
900             throws InvalidDestinationException JavaDoc {
901         final DestinationManager manager = DestinationManager.instance();
902         final String JavaDoc name = destination.getName();
903         final JmsDestination existing = manager.getDestination(name);
904
905         if (existing == null) {
906             if (destination.getPersistent()) {
907                 throw new InvalidDestinationException JavaDoc(
908                         "No persistent destination with name=" + name
909                         + " exists");
910             }
911             // non-persistent destinations can be registered dynamically
912
manager.createDestination(destination);
913         } else {
914             // make sure the supplied destination has the same properties
915
// as the existing one
916
if (existing.getPersistent() != destination.getPersistent()) {
917                 throw new InvalidDestinationException JavaDoc(
918                         "Mismatched destination properties for destination"
919                         + "with name=" + name);
920             }
921         }
922     }
923
924     /**
925      * Helper class used to maintain consumer information
926      */

927     private static final class ConsumerEntry {
928
929         /**
930          * An identifier for the consumer.
931          */

932         private final Object JavaDoc _key;
933
934         /**
935          * Indicated whether this entry is for a durable subscriber
936          */

937         private final boolean _durable;
938
939         /**
940          * The destination that the consumer is actually subscribed too
941          */

942         private final JmsDestination _destination;
943
944         /**
945          * Construct an instance of this class using the specified name and
946          * durable subscriber indicator
947          *
948          * @param key an identifier for the consumer
949          * @param destination the destination consumer is subscribed to
950          * @param durable indicates whether it is a durable subscription
951          */

952         public ConsumerEntry(Object JavaDoc key, JmsDestination destination,
953                              boolean durable) {
954             _key = key;
955             _destination = destination;
956             _durable = durable;
957         }
958
959         // override Object.equals
960
public boolean equals(Object JavaDoc obj) {
961
962             boolean result = false;
963             if ((obj != null) &&
964                     (obj instanceof ConsumerEntry) &&
965                     (((ConsumerEntry) obj)._key.equals(_key))) {
966                 result = true;
967             }
968
969             return result;
970         }
971
972         public Object JavaDoc getKey() {
973             return _key;
974         }
975
976         public String JavaDoc getName() {
977             return (_key instanceof String JavaDoc) ? (String JavaDoc) _key : null;
978         }
979
980         public JmsDestination getDestination() {
981             return _destination;
982         }
983
984         public boolean isDurable() {
985             return _durable;
986         }
987
988         /**
989          * Helper to return a key for identifying {@link ConsumerEndpoint}
990          * instances. This returns the consumers persistent identifier if it has
991          * one; if not, it returns its transient identifier.
992          *
993          * @param consumer the consumer
994          * @return a key for identifying <code>consumer</code>
995          */

996         public static Object JavaDoc getConsumerKey(ConsumerEndpoint consumer) {
997             Object JavaDoc key = null;
998             String JavaDoc id = consumer.getPersistentId();
999             if (id != null) {
1000                key = id;
1001            } else {
1002                key = new Long JavaDoc(consumer.getId());
1003            }
1004            return key;
1005        }
1006    }
1007
1008}
1009
1010
Popular Tags