1 45 package org.exolab.jms.messagemgr; 46 47 import java.sql.Connection ; 48 import java.sql.SQLException ; 49 import java.util.Collections ; 50 import java.util.Enumeration ; 51 import java.util.HashMap ; 52 import java.util.Iterator ; 53 import java.util.LinkedList ; 54 import java.util.Map ; 55 import java.util.Vector ; 56 import javax.jms.InvalidDestinationException ; 57 import javax.jms.JMSException ; 58 import javax.naming.Context ; 59 import javax.naming.NamingException ; 60 61 import org.apache.commons.logging.Log; 62 import org.apache.commons.logging.LogFactory; 63 64 import org.exolab.jms.client.JmsDestination; 65 import org.exolab.jms.client.JmsQueue; 66 import org.exolab.jms.client.JmsTopic; 67 import org.exolab.jms.config.AdministeredDestinations; 68 import org.exolab.jms.config.AdministeredQueue; 69 import org.exolab.jms.config.AdministeredTopic; 70 import org.exolab.jms.config.ConfigurationManager; 71 import org.exolab.jms.config.Subscriber; 72 import org.exolab.jms.gc.GarbageCollectable; 73 import org.exolab.jms.gc.GarbageCollectionService; 74 import org.exolab.jms.message.MessageImpl; 75 import org.exolab.jms.persistence.DatabaseService; 76 import org.exolab.jms.persistence.PersistenceAdapter; 77 import org.exolab.jms.persistence.PersistenceException; 78 import org.exolab.jms.persistence.SQLHelper; 79 import org.exolab.jms.server.NamingHelper; 80 import org.exolab.jms.service.ServiceException; 81 82 83 93 public class DestinationManager 94 implements MessageManagerEventListener, GarbageCollectable { 95 96 99 private Map _caches = Collections.synchronizedMap(new HashMap ()); 100 101 104 private HashMap _destinationCache = new HashMap (); 105 106 110 private LinkedList _wildcardDestinations = new LinkedList (); 111 112 116 private LinkedList _listeners = new LinkedList (); 117 118 121 private static volatile DestinationManager _instance = null; 122 123 126 private static final Log _log = 127 LogFactory.getLog(DestinationManager.class); 128 129 134 private DestinationManager() throws ServiceException { 135 init(); 136 137 GarbageCollectionService.instance().register(this); 139 } 140 141 147 public static DestinationManager createInstance() throws ServiceException { 148 _instance = new DestinationManager(); 149 return _instance; 150 } 151 152 158 public static DestinationManager instance() { 159 return _instance; 160 } 161 162 171 public DestinationCache getDestinationCache(JmsDestination dest) 172 throws JMSException { 173 DestinationCache result; 174 try { 175 result = getDestinationCache(dest, null); 176 } catch (PersistenceException exception) { 177 String msg = "Failed to create cache for destination " 178 + dest.getName(); 179 _log.error(msg, exception); 180 throw new JMSException (msg + ": " + exception.getMessage()); 181 } 182 return result; 183 } 184 185 195 public synchronized DestinationCache getDestinationCache( 196 JmsDestination dest, Connection connection) throws JMSException , 197 PersistenceException { 198 DestinationCache result = (DestinationCache) _caches.get(dest); 199 if (result == null) { 200 if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) { 201 throw new InvalidDestinationException ( 202 "Cannot cache messages for wildcarded topic: " 203 + dest.getName()); 204 } 205 206 JmsDestination existing = getDestination(dest.getName()); 207 if (existing == null) { 208 throw new InvalidDestinationException ( 209 "Destination does not exist: " + dest.getName()); 210 } 211 if (existing.getPersistent()) { 212 if (connection != null) { 213 result = createPersistentCache(existing, connection); 214 } else { 215 result = createPersistentCache(existing); 216 } 217 } else { 218 result = createNonPersistentCache(existing); 219 } 220 _caches.put(dest, result); 221 222 notifyDestinationAdded(dest, result); 225 } 226 return result; 227 } 228 229 239 protected DestinationCache createPersistentCache(JmsDestination dest, 240 Connection connection) 241 throws JMSException , PersistenceException { 242 243 DestinationCache result; 244 if (dest instanceof JmsTopic) { 245 result = new TopicDestinationCache((JmsTopic) dest, connection); 246 } else { 247 result = new QueueDestinationCache((JmsQueue) dest, connection); 248 } 249 return result; 250 } 251 252 261 protected DestinationCache createPersistentCache(JmsDestination dest) 262 throws JMSException , PersistenceException { 263 264 DestinationCache result; 265 Connection connection = null; 266 try { 267 268 connection = DatabaseService.getConnection(); 269 result = createPersistentCache(dest, connection); 270 connection.commit(); 271 } catch (JMSException exception) { 272 SQLHelper.rollback(connection); 273 throw exception; 274 } catch (SQLException exception) { 275 SQLHelper.rollback(connection); 276 throw new PersistenceException(exception); 277 } finally { 278 SQLHelper.close(connection); 279 } 280 281 return result; 282 } 283 284 292 protected DestinationCache createNonPersistentCache(JmsDestination dest) 293 throws JMSException { 294 295 DestinationCache result; 296 if (dest instanceof JmsTopic) { 297 result = new TopicDestinationCache((JmsTopic) dest); 298 } else { 299 result = new QueueDestinationCache((JmsQueue) dest); 300 } 301 302 notifyDestinationAdded(dest, result); 305 _caches.put(dest, result); 306 307 return result; 308 } 309 310 315 protected void destroyDestinationCache(DestinationCache cache) { 316 destroyDestinationCache(cache.getDestination()); 317 } 318 319 324 protected synchronized void destroyDestinationCache(JmsDestination dest) { 325 DestinationCache cache = (DestinationCache) _caches.remove(dest); 326 if (cache != null) { 327 cache.destroy(); 328 329 notifyDestinationRemoved(dest, cache); 332 } 333 } 334 335 342 public synchronized JmsDestination getDestination(String name) { 343 return (JmsDestination) _destinationCache.get(name); 344 } 345 346 352 void addDestinationEventListener(DestinationEventListener listener) { 353 synchronized (_listeners) { 354 if (!_listeners.contains(listener)) { 355 _listeners.add(listener); 356 } 357 358 } 359 } 360 361 366 void removeDestinationEventListener(DestinationEventListener listener) { 367 synchronized (_listeners) { 368 _listeners.remove(listener); 369 } 370 } 371 372 379 public synchronized void createDestination(JmsDestination destination) { 380 addToDestinationCache(destination); 381 } 382 383 391 public synchronized boolean createAdministeredDestination( 392 JmsDestination dest) throws JMSException { 393 if (_log.isDebugEnabled()) { 394 _log.debug("createAdministeredDestination(dest=" + dest + ")"); 395 } 396 397 boolean success = true; 398 boolean queue = (dest instanceof JmsQueue) ? true : false; 399 PersistenceAdapter adapter = DatabaseService.getAdapter(); 400 401 405 Connection connection = null; 406 try { 407 408 connection = DatabaseService.getConnection(); 409 410 if (!adapter.checkDestination(connection, dest.getName())) { 411 adapter.addDestination(connection, dest.getName(), queue); 412 413 dest.setPersistent(true); 414 415 addToDestinationCache(dest); 418 try { 419 ContextHelper.rebind(getContext(), dest.getName(), dest); 420 } catch (NamingException exception) { 421 String msg = "Failed to add destination " + dest.getName() 422 + " to JNDI context"; 423 _log.error(msg, exception); 424 throw new JMSException (msg + ": " + 425 exception.getMessage()); 426 } 427 } else { 428 success = false; 429 } 430 connection.commit(); 431 } catch (JMSException exception) { 432 SQLHelper.rollback(connection); 433 throw exception; 434 } catch (Exception exception) { SQLHelper.rollback(connection); 436 String msg = "Failed to create administered destination" 437 + dest.getName(); 438 _log.error(msg, exception); 439 throw new JMSException (msg + ": " + exception.getMessage()); 440 } finally { 441 SQLHelper.close(connection); 442 } 443 444 return success; 445 } 446 447 454 public synchronized void deleteAdministeredDestination(JmsDestination dest) 455 throws JMSException { 456 457 if (_log.isDebugEnabled()) { 458 _log.debug("deleteAdministeredDestination(dest=" + dest + ")"); 459 } 460 461 boolean queue = (dest instanceof JmsQueue) ? true : false; 462 ConsumerManager consumerMgr = ConsumerManager.instance(); 463 464 if (!queue) { 467 if (consumerMgr.hasActiveDurableConsumers(dest)) { 468 throw new JMSException ( 469 "Cannot delete the administered destination " 470 + dest 471 + " since there are active durable consumers."); 472 } 473 consumerMgr.removeDurableConsumers(dest); 476 } 477 478 int active = consumerMgr.getEndpointsForDest(dest).size(); 480 if (active > 0) { 481 throw new JMSException ( 482 "Cannot delete the administered destination" 483 + dest 484 + " since there are " 485 + active 486 + " active endpoints."); 487 } 488 489 try { 492 getContext().unbind(dest.getName()); 493 } catch (NamingException error) { 494 _log.error("Failed to remove destination " + dest.getName() 495 + " from JNDI", error); 496 } 497 498 Connection connection = null; 502 try { 503 connection = DatabaseService.getConnection(); 504 505 DatabaseService.getAdapter().removeDestination(connection, 506 dest.getName()); 507 destroyDestinationCache(dest); 508 removeFromDestinationCache(dest); 509 connection.commit(); 510 } catch (PersistenceException exception) { 511 SQLHelper.rollback(connection); 512 String msg = "Failed to remove destination " + dest.getName(); 513 _log.error(msg, exception); 514 throw new JMSException (msg + ":" + exception.getMessage()); 515 } catch (SQLException exception) { 516 SQLHelper.rollback(connection); 517 String msg = "Failed to remove destination " + dest.getName(); 518 _log.error(msg, exception); 519 throw new JMSException (msg + ":" + exception.getMessage()); 520 } finally { 521 SQLHelper.close(connection); 522 } 523 } 524 525 530 public void registerConfiguredAdministeredDestinations() { 531 AdministeredDestinations destinations = 532 ConfigurationManager.getConfig().getAdministeredDestinations(); 533 if (destinations != null) { 534 535 int count = destinations.getAdministeredTopicCount(); 537 for (int index = 0; index < count; index++) { 538 AdministeredTopic topic = destinations.getAdministeredTopic( 539 index); 540 541 JmsTopic destination = new JmsTopic(topic.getName()); 544 destination.setPersistent(true); 545 try { 546 547 createAdministeredDestination(destination); 548 549 int scount = topic.getSubscriberCount(); 551 ConsumerManager mgr = ConsumerManager.instance(); 552 for (int sindex = 0; sindex < scount; sindex++) { 553 Subscriber subscriber = topic.getSubscriber(sindex); 554 mgr.createDurableConsumer(destination, 555 subscriber.getName()); 556 } 557 } catch (JMSException exception) { 558 _log.error("Failed to register persistent topic " 559 + topic.getName(), exception); 560 } 561 } 562 563 count = destinations.getAdministeredQueueCount(); 566 for (int index = 0; index < count; index++) { 567 AdministeredQueue queue = destinations.getAdministeredQueue( 568 index); 569 570 JmsQueue destination = new JmsQueue(queue.getName()); 573 destination.setPersistent(true); 574 try { 575 createAdministeredDestination(destination); 576 } catch (JMSException exception) { 577 _log.error("Failed to register persistent queue " 578 + queue.getName(), exception); 579 } 580 } 581 } 582 583 } 584 585 592 public synchronized void messageAdded(JmsDestination destination, 593 MessageImpl message) 594 throws JMSException { 595 if (destination instanceof JmsTopic) { 596 if (ConsumerManager.instance().hasActiveConsumers(destination)) { 600 if (!destinationExists(destination)) { 601 createDestination(destination); 602 } 603 DestinationCache cache = getDestinationCache(destination); 604 cache.messageAdded(destination, message); 605 } 606 } else { 607 if (!destinationExists(destination)) { 610 createDestination(destination); 611 } 612 DestinationCache cache = getDestinationCache(destination); 613 cache.messageAdded(destination, message); 614 } 615 } 616 617 626 public synchronized void persistentMessageAdded(Connection connection, 627 JmsDestination destination, 628 MessageImpl message) 629 throws JMSException , PersistenceException { 630 DestinationCache cache = getDestinationCache(destination, connection); 631 cache.persistentMessageAdded(connection, destination, message); 632 } 633 634 public synchronized void collectGarbage(boolean aggressive) { 636 int gc_caches = 0; 639 int gc_destinations = 0; 640 641 Object [] caches = _caches.values().toArray(); 642 for (int index = 0; index < caches.length; index++) { 643 DestinationCache cache = (DestinationCache) caches[index]; 644 if (cache.canDestroy()) { 645 if (_log.isDebugEnabled()) { 646 _log.debug("Garbage collecting destination cache=" 647 + cache); 648 } 649 destroyDestinationCache(cache); 650 gc_caches++; 651 } else { 652 cache.collectGarbage(aggressive); 655 } 656 } 657 658 Iterator destinations = _destinationCache.values().iterator(); 661 Vector to_delete = new Vector (); 662 while (destinations.hasNext()) { 663 JmsDestination dest = (JmsDestination) destinations.next(); 664 if (!(dest.getPersistent()) && 665 (!_caches.containsKey(dest))) { 666 to_delete.add(dest); 667 gc_destinations++; 668 } 669 } 670 671 Enumeration todel = to_delete.elements(); 673 while (todel.hasMoreElements()) { 674 _destinationCache.remove( 675 ((JmsDestination) todel.nextElement()).getName()); 676 } 677 678 _log.info("DMGC Collected " + gc_caches + " caches, " + _caches.size() 680 + " remaining."); 681 _log.info("DMGC Collected " + gc_destinations + " destinations, " 682 + _destinationCache.size() + " remaining."); 683 } 684 685 696 synchronized HashMap getTopicDestinationCaches(JmsTopic topic) { 697 HashMap result = new HashMap (); 698 699 Iterator iter = _caches.keySet().iterator(); 700 while (iter.hasNext()) { 701 JmsDestination dest = (JmsDestination) iter.next(); 702 if ((dest instanceof JmsTopic) && 703 (topic.match((JmsTopic) dest))) { 704 result.put(dest, _caches.get(dest)); 705 } 706 } 707 708 return result; 709 } 710 711 714 public synchronized void destroy() { 715 Object [] dests = _caches.keySet().toArray(); 717 for (int index = 0; index < dests.length; index++) { 718 destroyDestinationCache((JmsDestination) dests[index]); 719 } 720 721 _caches.clear(); 722 _caches = null; 723 724 _destinationCache.clear(); 725 _destinationCache = null; 726 727 _listeners.clear(); 729 _listeners = null; 730 731 _instance = null; 733 } 734 735 742 public boolean isPersistent(String name) { 743 boolean result = false; 744 JmsDestination destination = 745 (JmsDestination) _destinationCache.get(name); 746 if (destination != null) { 747 result = destination.getPersistent(); 748 } 749 750 return result; 751 } 752 753 760 public boolean isPersistent(JmsDestination destination) { 761 return isPersistent(destination.getName()); 762 } 763 764 773 public boolean isMessageForAdministeredDestination(MessageImpl msg) { 774 boolean result = false; 775 try { 776 JmsDestination mdest = (JmsDestination) msg.getJMSDestination(); 777 JmsDestination dest = (JmsDestination) _destinationCache.get( 778 mdest.getName()); 779 780 if (dest != null) { 781 if (dest.getPersistent()) { 782 result = true; 783 } else if (mdest instanceof JmsTopic) { 784 Object [] dests = _wildcardDestinations.toArray(); 786 for (int index = 0; index < dests.length; index++) { 787 JmsTopic adest = (JmsTopic) dests[index]; 788 if ((adest.match((JmsTopic) mdest)) && 789 (adest.getPersistent())) { 790 result = true; 791 break; 792 } 793 } 794 795 } 796 } 797 } catch (JMSException ignore) { 798 } 799 800 return result; 801 } 802 803 809 void addToDestinationCache(JmsDestination destination) { 810 synchronized (_destinationCache) { 811 if (!_destinationCache.containsKey(destination.getName())) { 812 _destinationCache.put(destination.getName(), destination); 813 814 if (((destination instanceof JmsTopic) && 816 (((JmsTopic) destination).isWildCard()))) { 817 _wildcardDestinations.add(destination); 818 } 819 } 820 } 821 } 822 823 828 void removeFromDestinationCache(JmsDestination destination) { 829 synchronized (_destinationCache) { 830 if (_destinationCache.remove(destination.getName()) != null) { 831 832 if (((destination instanceof JmsTopic) && 835 (((JmsTopic) destination).isWildCard()))) { 836 _wildcardDestinations.remove(destination); 837 } 838 } 839 840 } 841 } 842 843 849 public boolean destinationExists(JmsDestination destination) { 850 return _destinationCache.containsKey(destination.getName()); 851 } 852 853 858 protected void init() throws ServiceException { 859 Connection connection = null; 860 try { 861 connection = DatabaseService.getConnection(); 862 863 Enumeration iter = 865 DatabaseService.getAdapter().getAllDestinations(connection); 866 connection.commit(); 867 868 while (iter.hasMoreElements()) { 869 JmsDestination dest = (JmsDestination) iter.nextElement(); 872 addToDestinationCache(dest); 873 try { 874 ContextHelper.rebind(getContext(), dest.getName(), dest); 877 } catch (NamingException error) { 878 throw new ServiceException("Failed to add destination " 879 + dest.getName() 880 + " to JNDI", error); 881 } 882 } 883 } catch (PersistenceException exception) { 884 SQLHelper.rollback(connection); 885 String msg = "Failed to initialise DestinationManager"; 886 _log.error(msg, exception); 887 throw exception; 888 } catch (SQLException exception) { 889 SQLHelper.rollback(connection); 890 String msg = "Failed to initialise DestinationManager"; 891 _log.error(msg, exception); 892 throw new ServiceException(msg, exception); 893 } finally { 894 SQLHelper.close(connection); 895 } 896 } 897 898 905 private void notifyDestinationAdded(JmsDestination dest, 906 DestinationCache cache) { 907 synchronized (_listeners) { 908 Iterator iter = _listeners.iterator(); 909 while (iter.hasNext()) { 910 ((DestinationEventListener) iter.next()).destinationAdded(dest, 911 cache); 912 } 913 } 914 } 915 916 923 private void notifyDestinationRemoved(JmsDestination dest, 924 DestinationCache cache) { 925 synchronized (_listeners) { 926 Iterator iter = _listeners.iterator(); 927 while (iter.hasNext()) { 928 ((DestinationEventListener) iter.next()).destinationRemoved( 929 dest, cache); 930 } 931 } 932 } 933 934 942 private static Context getContext() throws NamingException { 943 return NamingHelper.getInitialContext(ConfigurationManager.getConfig()); 944 } 945 946 } 947 | Popular Tags |