KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > DestinationManager


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: DestinationManager.java,v 1.2 2005/03/18 03:58:39 tanderson Exp $
44  */

45 package org.exolab.jms.messagemgr;
46
47 import java.sql.Connection JavaDoc;
48 import java.sql.SQLException JavaDoc;
49 import java.util.Collections JavaDoc;
50 import java.util.Enumeration JavaDoc;
51 import java.util.HashMap JavaDoc;
52 import java.util.Iterator JavaDoc;
53 import java.util.LinkedList JavaDoc;
54 import java.util.Map JavaDoc;
55 import java.util.Vector JavaDoc;
56 import javax.jms.InvalidDestinationException JavaDoc;
57 import javax.jms.JMSException JavaDoc;
58 import javax.naming.Context JavaDoc;
59 import javax.naming.NamingException JavaDoc;
60
61 import org.apache.commons.logging.Log;
62 import org.apache.commons.logging.LogFactory;
63
64 import org.exolab.jms.client.JmsDestination;
65 import org.exolab.jms.client.JmsQueue;
66 import org.exolab.jms.client.JmsTopic;
67 import org.exolab.jms.config.AdministeredDestinations;
68 import org.exolab.jms.config.AdministeredQueue;
69 import org.exolab.jms.config.AdministeredTopic;
70 import org.exolab.jms.config.ConfigurationManager;
71 import org.exolab.jms.config.Subscriber;
72 import org.exolab.jms.gc.GarbageCollectable;
73 import org.exolab.jms.gc.GarbageCollectionService;
74 import org.exolab.jms.message.MessageImpl;
75 import org.exolab.jms.persistence.DatabaseService;
76 import org.exolab.jms.persistence.PersistenceAdapter;
77 import org.exolab.jms.persistence.PersistenceException;
78 import org.exolab.jms.persistence.SQLHelper;
79 import org.exolab.jms.server.NamingHelper;
80 import org.exolab.jms.service.ServiceException;
81
82
83 /**
84  * The destination manager is responsible for creating and managing the
85  * lifecycle of {@link DestinationCache} objects. The destination manager is
86  * also responsible for managing messages, that are received by the message
87  * manager, which do not have any registered {@link DestinationCache}
88  *
89  * @author <a HREF="mailto:jima@comware.com.au">Jim Alateras</a>
90  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
91  * @version $Revision: 1.2 $ $Date: 2005/03/18 03:58:39 $
92  */

93 public class DestinationManager
94         implements MessageManagerEventListener, GarbageCollectable {
95
96     /**
97      * This structure maintains a list of active caches.
98      */

99     private Map JavaDoc _caches = Collections.synchronizedMap(new HashMap JavaDoc());
100
101     /**
102      * The set of persistent and non-persistent destinations, keyed on name.
103      */

104     private HashMap JavaDoc _destinationCache = new HashMap JavaDoc();
105
106     /**
107      * Maintains a list of wildcard destinations, which can either be durable or
108      * transient.
109      */

110     private LinkedList JavaDoc _wildcardDestinations = new LinkedList JavaDoc();
111
112     /**
113      * Maintains a linked list of DestinationEventListener objects. These
114      * listeners will be informed when destinations are added or destroyed.
115      */

116     private LinkedList JavaDoc _listeners = new LinkedList JavaDoc();
117
118     /**
119      * Manage the singleton instance of the DestinationManager.
120      */

121     private static volatile DestinationManager _instance = null;
122
123     /**
124      * The logger.
125      */

126     private static final Log _log =
127             LogFactory.getLog(DestinationManager.class);
128
129     /**
130      * Construct a new <code>DestinationManager</code>.
131      *
132      * @throws ServiceException if the service cannot be initialised
133      */

134     private DestinationManager() throws ServiceException {
135         init();
136
137         // register with the GarbageCollectionService
138
GarbageCollectionService.instance().register(this);
139     }
140
141     /**
142      * Create the singleton instance of the destination manager.
143      *
144      * @return the singleton instance
145      * @throws ServiceException if the service cannot be initialised
146      */

147     public static DestinationManager createInstance() throws ServiceException {
148         _instance = new DestinationManager();
149         return _instance;
150     }
151
152     /**
153      * Return the singleton destination manager.
154      *
155      * @return the singleton instance, or <code>null</code> if it hasn't been
156      * initialised
157      */

158     public static DestinationManager instance() {
159         return _instance;
160     }
161
162     /**
163      * Returns the cache for the supplied destination. If the cache doesn't
164      * exist, it will be created, and any listeners notified.
165      *
166      * @param dest the destination of the cache to return
167      * @return the cache associated with <code>dest</code>
168      * @throws InvalidDestinationException if <code>dest</code> doesn't exist
169      * @throws JMSException if the cache can't be created
170      */

171     public DestinationCache getDestinationCache(JmsDestination dest)
172             throws JMSException JavaDoc {
173         DestinationCache result;
174         try {
175             result = getDestinationCache(dest, null);
176         } catch (PersistenceException exception) {
177             String JavaDoc msg = "Failed to create cache for destination "
178                     + dest.getName();
179             _log.error(msg, exception);
180             throw new JMSException JavaDoc(msg + ": " + exception.getMessage());
181         }
182         return result;
183     }
184
185     /**
186      * Returns the cache for the supplied destination. If the cache doesn't
187      * exist, it will be created, and any listeners notified.
188      *
189      * @param dest the destination of the cache to return
190      * @return the cache associated with <code>dest</code>
191      * @throws InvalidDestinationException if <code>dest</code> doesn't exist
192      * @throws JMSException if the cache can't be created
193      * @throws PersistenceException for any persistence error
194      */

195     public synchronized DestinationCache getDestinationCache(
196             JmsDestination dest, Connection JavaDoc connection) throws JMSException JavaDoc,
197             PersistenceException {
198         DestinationCache result = (DestinationCache) _caches.get(dest);
199         if (result == null) {
200             if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) {
201                 throw new InvalidDestinationException JavaDoc(
202                         "Cannot cache messages for wildcarded topic: "
203                         + dest.getName());
204             }
205
206             JmsDestination existing = getDestination(dest.getName());
207             if (existing == null) {
208                 throw new InvalidDestinationException JavaDoc(
209                         "Destination does not exist: " + dest.getName());
210             }
211             if (existing.getPersistent()) {
212                 if (connection != null) {
213                     result = createPersistentCache(existing, connection);
214                 } else {
215                     result = createPersistentCache(existing);
216                 }
217             } else {
218                 result = createNonPersistentCache(existing);
219             }
220             _caches.put(dest, result);
221
222             // notify the listeners that a new destination has been added
223
// to the destination manager
224
notifyDestinationAdded(dest, result);
225         }
226         return result;
227     }
228
229     /**
230      * Create a persistent {@link DestinationCache} for the specified
231      * destination.
232      *
233      * @param dest the destination to create the cache for
234      * @param connection the database connection to use
235      * @return a new cache
236      * @throws JMSException for any JMS error
237      * @throws PersistenceException for any persistence error
238      */

239     protected DestinationCache createPersistentCache(JmsDestination dest,
240                                                      Connection JavaDoc connection)
241             throws JMSException JavaDoc, PersistenceException {
242
243         DestinationCache result;
244         if (dest instanceof JmsTopic) {
245             result = new TopicDestinationCache((JmsTopic) dest, connection);
246         } else {
247             result = new QueueDestinationCache((JmsQueue) dest, connection);
248         }
249         return result;
250     }
251
252     /**
253      * Create a persistent {@link DestinationCache} for the specified
254      * destination.
255      *
256      * @param dest the destination to create the cache for
257      * @return a new cache
258      * @throws JMSException if the cache cannot be created
259      * @throws PersistenceException for any persistence error
260      */

261     protected DestinationCache createPersistentCache(JmsDestination dest)
262             throws JMSException JavaDoc, PersistenceException {
263
264         DestinationCache result;
265         Connection JavaDoc connection = null;
266         try {
267
268             connection = DatabaseService.getConnection();
269             result = createPersistentCache(dest, connection);
270             connection.commit();
271         } catch (JMSException JavaDoc exception) {
272             SQLHelper.rollback(connection);
273             throw exception;
274         } catch (SQLException JavaDoc exception) {
275             SQLHelper.rollback(connection);
276             throw new PersistenceException(exception);
277         } finally {
278             SQLHelper.close(connection);
279         }
280
281         return result;
282     }
283
284     /**
285      * Create a non-persistent {@link DestinationCache} for the specified
286      * destination.
287      *
288      * @param dest the destination to create the cache for
289      * @return a new cache
290      * @throws JMSException if the cache cannot be created
291      */

292     protected DestinationCache createNonPersistentCache(JmsDestination dest)
293             throws JMSException JavaDoc {
294
295         DestinationCache result;
296         if (dest instanceof JmsTopic) {
297             result = new TopicDestinationCache((JmsTopic) dest);
298         } else {
299             result = new QueueDestinationCache((JmsQueue) dest);
300         }
301
302         // notify the listeners that a new destination has been added
303
// to the destination manager
304
notifyDestinationAdded(dest, result);
305         _caches.put(dest, result);
306
307         return result;
308     }
309
310     /**
311      * Delete the specfied destination.
312      *
313      * @param cache the destination to destroy
314      */

315     protected void destroyDestinationCache(DestinationCache cache) {
316         destroyDestinationCache(cache.getDestination());
317     }
318
319     /**
320      * Delete the specfied destination.
321      *
322      * @param dest the destination to destroy
323      */

324     protected synchronized void destroyDestinationCache(JmsDestination dest) {
325         DestinationCache cache = (DestinationCache) _caches.remove(dest);
326         if (cache != null) {
327             cache.destroy();
328
329             // notify the listeners that a destination has been removed from
330
// the destination manager
331
notifyDestinationRemoved(dest, cache);
332         }
333     }
334
335     /**
336      * Returns a destination given its name.
337      *
338      * @param name the name of the destination
339      * @return the destination corresponding to <code>name</code> or
340      * <code>null</code> if none exists
341      */

342     public synchronized JmsDestination getDestination(String JavaDoc name) {
343         return (JmsDestination) _destinationCache.get(name);
344     }
345
346     /**
347      * Register the specified DestinationEventListener. If the listener is
348      * already registered then do not re-register it again.
349      *
350      * @param listener the listener to add
351      */

352     void addDestinationEventListener(DestinationEventListener listener) {
353         synchronized (_listeners) {
354             if (!_listeners.contains(listener)) {
355                 _listeners.add(listener);
356             }
357
358         }
359     }
360
361     /**
362      * Remove the specified DestinationEventListener from the list.
363      *
364      * @param listener the listener to remove
365      */

366     void removeDestinationEventListener(DestinationEventListener listener) {
367         synchronized (_listeners) {
368             _listeners.remove(listener);
369         }
370     }
371
372     /**
373      * Create a non-administered destination and cache it. It does not check to
374      * see whether or not it is an administered destination this must be done
375      * by the caller
376      *
377      * @param destination - the destination to create
378      */

379     public synchronized void createDestination(JmsDestination destination) {
380         addToDestinationCache(destination);
381     }
382
383     /**
384      * Create an administered destination using the specified destination. It
385      * will create the destination in the database and register it with the jndi
386      * context.
387      *
388      * @param dest - the destination
389      * @return boolean - true if successful
390      */

391     public synchronized boolean createAdministeredDestination(
392             JmsDestination dest) throws JMSException JavaDoc {
393         if (_log.isDebugEnabled()) {
394             _log.debug("createAdministeredDestination(dest=" + dest + ")");
395         }
396
397         boolean success = true;
398         boolean queue = (dest instanceof JmsQueue) ? true : false;
399         PersistenceAdapter adapter = DatabaseService.getAdapter();
400
401         // check that the destination does not exist. If it exists then return
402
// false. If it doesn't exists the create it and bind it to the jndi
403
// context
404

405         Connection JavaDoc connection = null;
406         try {
407
408             connection = DatabaseService.getConnection();
409
410             if (!adapter.checkDestination(connection, dest.getName())) {
411                 adapter.addDestination(connection, dest.getName(), queue);
412
413                 dest.setPersistent(true);
414
415                 // destination was created in persistent store, now create it
416
// in transient memory and also bind it in the jndi context
417
addToDestinationCache(dest);
418                 try {
419                     ContextHelper.rebind(getContext(), dest.getName(), dest);
420                 } catch (NamingException JavaDoc exception) {
421                     String JavaDoc msg = "Failed to add destination " + dest.getName()
422                             + " to JNDI context";
423                     _log.error(msg, exception);
424                     throw new JMSException JavaDoc(msg + ": " +
425                                            exception.getMessage());
426                 }
427             } else {
428                 success = false;
429             }
430             connection.commit();
431         } catch (JMSException JavaDoc exception) {
432             SQLHelper.rollback(connection);
433             throw exception;
434         } catch (Exception JavaDoc exception) { // PersistenceException, SQLException
435
SQLHelper.rollback(connection);
436             String JavaDoc msg = "Failed to create administered destination"
437                     + dest.getName();
438             _log.error(msg, exception);
439             throw new JMSException JavaDoc(msg + ": " + exception.getMessage());
440         } finally {
441             SQLHelper.close(connection);
442         }
443
444         return success;
445     }
446
447     /**
448      * Remove the corresponding administered destination from persistent store,
449      * from transient memory and from the jndi context. This will also remove
450      * all durable consumers for this topic.
451      *
452      * @param dest - the destination to remove
453      */

454     public synchronized void deleteAdministeredDestination(JmsDestination dest)
455             throws JMSException JavaDoc {
456
457         if (_log.isDebugEnabled()) {
458             _log.debug("deleteAdministeredDestination(dest=" + dest + ")");
459         }
460
461         boolean queue = (dest instanceof JmsQueue) ? true : false;
462         ConsumerManager consumerMgr = ConsumerManager.instance();
463
464         // If we are dealing with a topic then first check that there are
465
// no active durable consumers for the destination
466
if (!queue) {
467             if (consumerMgr.hasActiveDurableConsumers(dest)) {
468                 throw new JMSException JavaDoc(
469                         "Cannot delete the administered destination "
470                         + dest
471                         + " since there are active durable consumers.");
472             }
473             // no active consumers. Remove all durable consumers to this
474
// destination
475
consumerMgr.removeDurableConsumers(dest);
476         }
477
478         // make sure there are not active endpoints
479
int active = consumerMgr.getEndpointsForDest(dest).size();
480         if (active > 0) {
481             throw new JMSException JavaDoc(
482                     "Cannot delete the administered destination"
483                     + dest
484                     + " since there are "
485                     + active
486                     + " active endpoints.");
487         }
488
489         // unbind it from the jndi context so that now it is unavailable
490
// to other consumers
491
try {
492             getContext().unbind(dest.getName());
493         } catch (NamingException JavaDoc error) {
494             _log.error("Failed to remove destination " + dest.getName()
495                        + " from JNDI", error);
496         }
497
498         // now that we have removed all the durable consumers we can remove
499
// the administered topic. First delete it from memory and then
500
// from the persistent store
501
Connection JavaDoc connection = null;
502         try {
503             connection = DatabaseService.getConnection();
504
505             DatabaseService.getAdapter().removeDestination(connection,
506                                                            dest.getName());
507             destroyDestinationCache(dest);
508             removeFromDestinationCache(dest);
509             connection.commit();
510         } catch (PersistenceException exception) {
511             SQLHelper.rollback(connection);
512             String JavaDoc msg = "Failed to remove destination " + dest.getName();
513             _log.error(msg, exception);
514             throw new JMSException JavaDoc(msg + ":" + exception.getMessage());
515         } catch (SQLException JavaDoc exception) {
516             SQLHelper.rollback(connection);
517             String JavaDoc msg = "Failed to remove destination " + dest.getName();
518             _log.error(msg, exception);
519             throw new JMSException JavaDoc(msg + ":" + exception.getMessage());
520         } finally {
521             SQLHelper.close(connection);
522         }
523     }
524
525     /**
526      * This method will create the administered destinations specified in the
527      * configuration. A topic may also have zero or more preconfigured durable
528      * sunbscribers. An equivalent entity for queues does not exist.
529      */

530     public void registerConfiguredAdministeredDestinations() {
531         AdministeredDestinations destinations =
532                 ConfigurationManager.getConfig().getAdministeredDestinations();
533         if (destinations != null) {
534
535             // first process the topics
536
int count = destinations.getAdministeredTopicCount();
537             for (int index = 0; index < count; index++) {
538                 AdministeredTopic topic = destinations.getAdministeredTopic(
539                         index);
540
541                 // define a persistent topic destination and then use the
542
// message manager administrator to add it
543
JmsTopic destination = new JmsTopic(topic.getName());
544                 destination.setPersistent(true);
545                 try {
546
547                     createAdministeredDestination(destination);
548
549                     // register the subscribers for each topic.
550
int scount = topic.getSubscriberCount();
551                     ConsumerManager mgr = ConsumerManager.instance();
552                     for (int sindex = 0; sindex < scount; sindex++) {
553                         Subscriber subscriber = topic.getSubscriber(sindex);
554                         mgr.createDurableConsumer(destination,
555                                                   subscriber.getName());
556                     }
557                 } catch (JMSException JavaDoc exception) {
558                     _log.error("Failed to register persistent topic "
559                                + topic.getName(), exception);
560                 }
561             }
562
563             // next process the queue destinations. QueueDestinations do not
564
// have any associated durable subscribers
565
count = destinations.getAdministeredQueueCount();
566             for (int index = 0; index < count; index++) {
567                 AdministeredQueue queue = destinations.getAdministeredQueue(
568                         index);
569
570                 // define a persistent topic destination and then use the
571
// message manager administrator to add it
572
JmsQueue destination = new JmsQueue(queue.getName());
573                 destination.setPersistent(true);
574                 try {
575                     createAdministeredDestination(destination);
576                 } catch (JMSException JavaDoc exception) {
577                     _log.error("Failed to register persistent queue "
578                                + queue.getName(), exception);
579                 }
580             }
581         }
582
583     }
584
585     /**
586      * Invoked when the {@link MessageMgr} receives a non-persistent message
587      *
588      * @param destination the message's destination
589      * @param message the message
590      * @throws JMSException if the message can't be processed
591      */

592     public synchronized void messageAdded(JmsDestination destination,
593                                           MessageImpl message)
594             throws JMSException JavaDoc {
595         if (destination instanceof JmsTopic) {
596             // check to see whether there are active consumers for the
597
// specified destination. If there are then we need to
598
// create a destination cache and pass the message to it.
599
if (ConsumerManager.instance().hasActiveConsumers(destination)) {
600                 if (!destinationExists(destination)) {
601                     createDestination(destination);
602                 }
603                 DestinationCache cache = getDestinationCache(destination);
604                 cache.messageAdded(destination, message);
605             }
606         } else {
607             // destination is a queue. Since the message is non-persistent,
608
// create the cache and pass the message to it.
609
if (!destinationExists(destination)) {
610                 createDestination(destination);
611             }
612             DestinationCache cache = getDestinationCache(destination);
613             cache.messageAdded(destination, message);
614         }
615     }
616
617     /**
618      * Invoked when the {@link MessageMgr} receives a persistent message
619      *
620      * @param connection the database connection
621      * @param destination the message's destination
622      * @param message the message
623      * @throws JMSException if the message can't be processed
624      * @throws PersistenceException if there is a persistence related problem
625      */

626     public synchronized void persistentMessageAdded(Connection JavaDoc connection,
627                                                     JmsDestination destination,
628                                                     MessageImpl message)
629             throws JMSException JavaDoc, PersistenceException {
630         DestinationCache cache = getDestinationCache(destination, connection);
631         cache.persistentMessageAdded(connection, destination, message);
632     }
633
634     // implement of GarbageCollectable.collectGarbage
635
public synchronized void collectGarbage(boolean aggressive) {
636         // before continuing we should change the priority of the thread
637
// to the lowest value.
638
int gc_caches = 0;
639         int gc_destinations = 0;
640
641         Object JavaDoc[] caches = _caches.values().toArray();
642         for (int index = 0; index < caches.length; index++) {
643             DestinationCache cache = (DestinationCache) caches[index];
644             if (cache.canDestroy()) {
645                 if (_log.isDebugEnabled()) {
646                     _log.debug("Garbage collecting destination cache="
647                                + cache);
648                 }
649                 destroyDestinationCache(cache);
650                 gc_caches++;
651             } else {
652                 // the cache is active, so issue a garbage collection
653
// request on it
654
cache.collectGarbage(aggressive);
655             }
656         }
657
658         // get rid of non-administered destinations, without
659
// associated caches.
660
Iterator JavaDoc destinations = _destinationCache.values().iterator();
661         Vector JavaDoc to_delete = new Vector JavaDoc();
662         while (destinations.hasNext()) {
663             JmsDestination dest = (JmsDestination) destinations.next();
664             if (!(dest.getPersistent()) &&
665                     (!_caches.containsKey(dest))) {
666                 to_delete.add(dest);
667                 gc_destinations++;
668             }
669         }
670
671         // now delete the actual destinations
672
Enumeration JavaDoc todel = to_delete.elements();
673         while (todel.hasMoreElements()) {
674             _destinationCache.remove(
675                     ((JmsDestination) todel.nextElement()).getName());
676         }
677
678         // log the information
679
_log.info("DMGC Collected " + gc_caches + " caches, " + _caches.size()
680                   + " remaining.");
681         _log.info("DMGC Collected " + gc_destinations + " destinations, "
682                   + _destinationCache.size() + " remaining.");
683     }
684
685     /**
686      * Return a HashMap of all destinations that match the specified topic If
687      * the topic represents a wildcard then it may match none, one or more
688      * destinations. The results are returns as a mapping of destination and the
689      * corresponding cache
690      * <p/>
691      * The topic maybe a straight topic name or a wildcard
692      *
693      * @param topic - topic to query
694      * @return HashMap
695      */

696     synchronized HashMap JavaDoc getTopicDestinationCaches(JmsTopic topic) {
697         HashMap JavaDoc result = new HashMap JavaDoc();
698
699         Iterator JavaDoc iter = _caches.keySet().iterator();
700         while (iter.hasNext()) {
701             JmsDestination dest = (JmsDestination) iter.next();
702             if ((dest instanceof JmsTopic) &&
703                     (topic.match((JmsTopic) dest))) {
704                 result.put(dest, _caches.get(dest));
705             }
706         }
707
708         return result;
709     }
710
711     /**
712      * Destroy this manager. This is brutal and final
713      */

714     public synchronized void destroy() {
715         // clean up all the destinations
716
Object JavaDoc[] dests = _caches.keySet().toArray();
717         for (int index = 0; index < dests.length; index++) {
718             destroyDestinationCache((JmsDestination) dests[index]);
719         }
720
721         _caches.clear();
722         _caches = null;
723
724         _destinationCache.clear();
725         _destinationCache = null;
726
727         // remove all the listeners
728
_listeners.clear();
729         _listeners = null;
730
731         // reset the singleton
732
_instance = null;
733     }
734
735     /**
736      * Determines if a destination is persistent
737      *
738      * @param name the name of the destination
739      * @return <code>true</code> if the named destination is persistent;
740      * <code>false</code> if it is temporary or doesn't exist
741      */

742     public boolean isPersistent(String JavaDoc name) {
743         boolean result = false;
744         JmsDestination destination =
745                 (JmsDestination) _destinationCache.get(name);
746         if (destination != null) {
747             result = destination.getPersistent();
748         }
749
750         return result;
751     }
752
753     /**
754      * Determines if a destination is persistent
755      *
756      * @param destination the destination
757      * @return <code>true</code> if the named destination is persistent;
758      * <code>false</code> if it is temporary or doesn't exist
759      */

760     public boolean isPersistent(JmsDestination destination) {
761         return isPersistent(destination.getName());
762     }
763
764     /**
765      * Test whether the specified message is for an administered destination.
766      * This would be the case if the destination is administered or if there is
767      * an administered wildcard destination that is satisfied by the message
768      * destination
769      *
770      * @param msg - the message to check
771      * @return boolean
772      */

773     public boolean isMessageForAdministeredDestination(MessageImpl msg) {
774         boolean result = false;
775         try {
776             JmsDestination mdest = (JmsDestination) msg.getJMSDestination();
777             JmsDestination dest = (JmsDestination) _destinationCache.get(
778                     mdest.getName());
779
780             if (dest != null) {
781                 if (dest.getPersistent()) {
782                     result = true;
783                 } else if (mdest instanceof JmsTopic) {
784                     // check if any of the wildcards are administered
785
Object JavaDoc[] dests = _wildcardDestinations.toArray();
786                     for (int index = 0; index < dests.length; index++) {
787                         JmsTopic adest = (JmsTopic) dests[index];
788                         if ((adest.match((JmsTopic) mdest)) &&
789                                 (adest.getPersistent())) {
790                             result = true;
791                             break;
792                         }
793                     }
794
795                 }
796             }
797         } catch (JMSException JavaDoc ignore) {
798         }
799
800         return result;
801     }
802
803     /**
804      * Add the specified entry to the destination cache, if it doesn't already
805      * exist.
806      *
807      * @param destination - destination to add
808      */

809     void addToDestinationCache(JmsDestination destination) {
810         synchronized (_destinationCache) {
811             if (!_destinationCache.containsKey(destination.getName())) {
812                 _destinationCache.put(destination.getName(), destination);
813
814                 // check whether it is a wildcard destination
815
if (((destination instanceof JmsTopic) &&
816                         (((JmsTopic) destination).isWildCard()))) {
817                     _wildcardDestinations.add(destination);
818                 }
819             }
820         }
821     }
822
823     /**
824      * Remove the specified destination from the cache
825      *
826      * @param destination - the destination to remove
827      */

828     void removeFromDestinationCache(JmsDestination destination) {
829         synchronized (_destinationCache) {
830             if (_destinationCache.remove(destination.getName()) != null) {
831
832                 // check whether we also need to delete it from the
833
// list of wildcard subscriptions
834
if (((destination instanceof JmsTopic) &&
835                         (((JmsTopic) destination).isWildCard()))) {
836                     _wildcardDestinations.remove(destination);
837                 }
838             }
839
840         }
841     }
842
843     /**
844      * Check if the specified destination exists.
845      *
846      * @param destination - the destination to check
847      * @return boolean - true if it exists and false otherwise
848      */

849     public boolean destinationExists(JmsDestination destination) {
850         return _destinationCache.containsKey(destination.getName());
851     }
852
853     /**
854      * Initialises the destination manager.
855      *
856      * @throws ServiceException if the service cannot be initialised
857      */

858     protected void init() throws ServiceException {
859         Connection JavaDoc connection = null;
860         try {
861             connection = DatabaseService.getConnection();
862
863             // return a list of JmsDestination objects.
864
Enumeration JavaDoc iter =
865                     DatabaseService.getAdapter().getAllDestinations(connection);
866             connection.commit();
867
868             while (iter.hasMoreElements()) {
869                 // add each destination to the cache and also bind
870
// it to the context
871
JmsDestination dest = (JmsDestination) iter.nextElement();
872                 addToDestinationCache(dest);
873                 try {
874                     // for each of the administered destinations rebind it to
875
// the jndi context
876
ContextHelper.rebind(getContext(), dest.getName(), dest);
877                 } catch (NamingException JavaDoc error) {
878                     throw new ServiceException("Failed to add destination "
879                                                + dest.getName()
880                                                + " to JNDI", error);
881                 }
882             }
883         } catch (PersistenceException exception) {
884             SQLHelper.rollback(connection);
885             String JavaDoc msg = "Failed to initialise DestinationManager";
886             _log.error(msg, exception);
887             throw exception;
888         } catch (SQLException JavaDoc exception) {
889             SQLHelper.rollback(connection);
890             String JavaDoc msg = "Failed to initialise DestinationManager";
891             _log.error(msg, exception);
892             throw new ServiceException(msg, exception);
893         } finally {
894             SQLHelper.close(connection);
895         }
896     }
897
898     /**
899      * Notyify the list of DestinationEventListener objects that the specified
900      * destination has been added
901      *
902      * @param dest - the destination that was added
903      * @param cache - the corresponding cache
904      */

905     private void notifyDestinationAdded(JmsDestination dest,
906                                         DestinationCache cache) {
907         synchronized (_listeners) {
908             Iterator JavaDoc iter = _listeners.iterator();
909             while (iter.hasNext()) {
910                 ((DestinationEventListener) iter.next()).destinationAdded(dest,
911                                                                           cache);
912             }
913         }
914     }
915
916     /**
917      * Notyify the list of DestinationEventListener objects that the specified
918      * destination has been removed
919      *
920      * @param dest - the destination that was removed
921      * @param cache - the corresponding cache
922      */

923     private void notifyDestinationRemoved(JmsDestination dest,
924                                           DestinationCache cache) {
925         synchronized (_listeners) {
926             Iterator JavaDoc iter = _listeners.iterator();
927             while (iter.hasNext()) {
928                 ((DestinationEventListener) iter.next()).destinationRemoved(
929                         dest, cache);
930             }
931         }
932     }
933
934     /**
935      * Return the initial context using the JndiHelper. It assumes that the
936      * configuration manager has been successfully initialized. If the context
937      * has been retrieved it is cached so subsequent gets will return the cached
938      * instance
939      *
940      * @return the root context
941      */

942     private static Context JavaDoc getContext() throws NamingException JavaDoc {
943         return NamingHelper.getInitialContext(ConfigurationManager.getConfig());
944     }
945
946 }
947
Popular Tags