1 45 package org.exolab.jms.messagemgr; 46 47 import java.sql.Connection ; 48 import java.util.ArrayList ; 49 import java.util.HashMap ; 50 import java.util.Iterator ; 51 import java.util.LinkedList ; 52 import java.util.List ; 53 import javax.jms.InvalidDestinationException ; 54 import javax.jms.InvalidSelectorException ; 55 import javax.jms.JMSException ; 56 57 import org.apache.commons.logging.Log; 58 import org.apache.commons.logging.LogFactory; 59 60 import org.exolab.jms.client.JmsDestination; 61 import org.exolab.jms.client.JmsQueue; 62 import org.exolab.jms.client.JmsTopic; 63 import org.exolab.jms.persistence.DatabaseService; 64 import org.exolab.jms.persistence.PersistenceAdapter; 65 import org.exolab.jms.persistence.SQLHelper; 66 import org.exolab.jms.scheduler.Scheduler; 67 import org.exolab.jms.server.JmsServerSession; 68 import org.exolab.jms.service.ServiceException; 69 70 71 79 public class ConsumerManager { 80 81 84 private HashMap _endpoints = new HashMap (); 85 86 93 private HashMap _consumerCache = new HashMap (); 94 95 100 private HashMap _destToConsumerMap = new HashMap (); 101 102 106 private HashMap _wildcardConsumers = new HashMap (); 107 108 111 private Scheduler _scheduler = null; 112 113 116 private long _consumerIdSeed = 0; 117 118 121 private static ConsumerManager _instance = null; 122 123 126 private static final Log _log = LogFactory.getLog(ConsumerManager.class); 127 128 129 135 public static ConsumerManager createInstance() throws ServiceException { 136 _instance = new ConsumerManager(); 137 return _instance; 138 } 139 140 145 public static ConsumerManager instance() { 146 return _instance; 147 } 148 149 154 private ConsumerManager() throws ServiceException { 155 init(); 156 } 157 158 167 public synchronized void createDurableConsumer(JmsTopic topic, String name) 168 throws JMSException { 169 170 PersistenceAdapter adapter = DatabaseService.getAdapter(); 171 172 Connection connection = null; 173 try { 174 175 connection = DatabaseService.getConnection(); 176 177 if (!adapter.checkDestination(connection, topic.getName())) { 180 throw new JMSException ("Cannot create durable consumer, name=" 181 + name 182 + ", for non-administered topic=" 183 + topic.getName()); 184 } 185 186 if (!adapter.durableConsumerExists(connection, name)) { 187 adapter.addDurableConsumer(connection, topic.getName(), name); 188 } 189 190 connection.commit(); 191 addToConsumerCache(name, topic, true); 193 } catch (JMSException exception) { 194 throw exception; 195 } catch (Exception exception) { SQLHelper.rollback(connection); 197 String msg = "Failed to create durable consumer, name=" + name 198 + ", for topic=" + topic.getName(); 199 _log.error(msg, exception); 200 throw new JMSException (msg + ": " + exception.getMessage()); 201 } finally { 202 SQLHelper.close(connection); 203 } 204 } 205 206 207 217 public synchronized void removeDurableConsumer(String name) 218 throws JMSException { 219 if (_log.isDebugEnabled()) { 220 _log.debug("removeDurableConsumer(name=" + name + ")"); 221 } 222 223 if (!durableConsumerExists(name)) { 225 throw new JMSException ("Durable consumer " + name 226 + " is not defined."); 227 } 228 if (isDurableConsumerActive(name)) { 229 throw new JMSException ("Cannot remove durable consumer=" + name 230 + ": consumer is active"); 231 } 232 233 Connection connection = null; 235 try { 236 connection = DatabaseService.getConnection(); 237 238 DatabaseService.getAdapter().removeDurableConsumer(connection, 239 name); 240 ConsumerEndpoint endpoint = getConsumerEndpoint(name); 243 if (endpoint != null) { 244 deleteConsumerEndpoint(endpoint); 245 } 246 removeFromConsumerCache(name); 247 connection.commit(); 248 } catch (Exception exception) { SQLHelper.rollback(connection); 250 String msg = "Failed to remove durable consumer, name=" + name; 251 _log.error(msg, exception); 252 throw new JMSException (msg + ":" + exception.getMessage()); 253 } finally { 254 SQLHelper.close(connection); 255 } 256 } 257 258 268 public synchronized void removeDurableConsumers(JmsDestination topic) 269 throws JMSException { 270 271 List consumers = (List ) _destToConsumerMap.get(topic); 272 if (consumers != null) { 273 Iterator iterator = consumers.iterator(); 274 while (iterator.hasNext()) { 275 ConsumerEntry entry = (ConsumerEntry) iterator.next(); 276 if (entry.isDurable()) { 277 removeDurableConsumer(entry.getName()); 280 } 281 } 282 } 283 284 removeFromConsumerCache(topic); 286 } 287 288 299 public synchronized ConsumerEndpoint createConsumerEndpoint( 300 JmsServerSession session, JmsDestination destination, 301 String selector, boolean noLocal) 302 throws JMSException , InvalidSelectorException { 303 304 if (_log.isDebugEnabled()) { 305 _log.debug("createConsumerEndpoint(session=" + session 306 + ", destination=" + destination 307 + ", selector=" + selector 308 + ", noLocal=" + noLocal + ")"); 309 } 310 311 ConsumerEndpoint endpoint = null; 312 313 checkDestination(destination); 315 316 long consumerId = getNextConsumerId(); 317 318 321 if (destination instanceof JmsTopic) { 322 JmsTopic topic = (JmsTopic) destination; 323 endpoint 324 = new TopicConsumerEndpoint(consumerId, session, topic, 325 selector, noLocal, _scheduler); 326 } else if (destination instanceof JmsQueue) { 327 endpoint = new QueueConsumerEndpoint(consumerId, session, 328 (JmsQueue) destination, 329 selector, _scheduler); 330 } 331 332 if (endpoint != null) { 333 Object key = ConsumerEntry.getConsumerKey(endpoint); 337 _endpoints.put(key, endpoint); 338 addToConsumerCache(key, destination, false); 339 } 340 341 return endpoint; 342 } 343 344 361 public synchronized DurableConsumerEndpoint createDurableConsumerEndpoint( 362 JmsServerSession session, JmsTopic topic, String name, 363 boolean noLocal, 364 String selector) 365 throws JMSException { 366 367 if (_log.isDebugEnabled()) { 368 _log.debug("createDurableConsumerEndpoint(session=" + session 369 + ", topic=" + topic + ", name=" + name 370 + ", selector=" + selector + ", noLocal=" + noLocal 371 + ")"); 372 } 373 374 DurableConsumerEndpoint endpoint = 377 (DurableConsumerEndpoint) _endpoints.get(name); 378 if (endpoint != null) { 379 throw new JMSException (name + " is already registered"); 380 } 381 382 if (!topic.isWildCard() && 385 !DestinationManager.instance().destinationExists(topic)) { 386 throw new JMSException ("Cannot create a durable consumer for " 387 + topic); 388 } 389 390 long consumerId = getNextConsumerId(); 392 endpoint = new DurableConsumerEndpoint(consumerId, session, topic, name, 393 selector, noLocal, _scheduler); 394 _endpoints.put(endpoint.getPersistentId(), endpoint); 395 396 return endpoint; 397 } 398 399 406 public synchronized boolean hasActiveDurableConsumers(JmsDestination topic) { 407 408 boolean result = false; 409 List consumers = (List ) _destToConsumerMap.get(topic); 410 if (consumers != null) { 411 Iterator iterator = consumers.iterator(); 412 while (iterator.hasNext()) { 413 ConsumerEntry entry = (ConsumerEntry) iterator.next(); 414 if (entry.isDurable()) { 415 result = true; 416 break; 417 } 418 } 419 } 420 421 return result; 422 } 423 424 434 public synchronized ConsumerEndpoint createQueueBrowserEndpoint( 435 JmsServerSession session, JmsQueue queue, String selector) 436 throws JMSException { 437 438 checkDestination(queue); 440 441 long consumerId = getNextConsumerId(); 442 443 ConsumerEndpoint consumer = new QueueBrowserEndpoint(consumerId, session, 444 queue, selector, 445 _scheduler); 446 Object key = ConsumerEntry.getConsumerKey(consumer); 447 _endpoints.put(key, consumer); 448 addToConsumerCache(key, queue, false); 449 450 return consumer; 451 } 452 453 459 public synchronized void deleteDurableConsumerEndpoint(String name) 460 throws JMSException { 461 462 if (_log.isDebugEnabled()) { 463 _log.debug("deleteDurableConsumerEndpoint(name=" + name + ")"); 464 } 465 466 ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name); 467 if (entry != null) { 468 if (entry.isDurable()) { 469 deleteConsumerEndpoint((ConsumerEndpoint) _endpoints.get(name)); 470 } else { 471 throw new JMSException (name + " is not a durable subscriber"); 472 } 473 } else { 474 if (_log.isDebugEnabled()) { 476 _log.debug("deleteDurableConsumerEndpoint(name=" + name 477 + "): failed to locate consumer"); 478 } 479 } 480 } 481 482 487 public synchronized void deleteConsumerEndpoint(ConsumerEndpoint consumer) { 488 489 if (_log.isDebugEnabled()) { 490 _log.debug("deleteConsumerEndpoint(consumer=[Id=" 491 + consumer.getId() + ", destination=" 492 + consumer.getDestination() + ")"); 493 } 494 495 Object key = ConsumerEntry.getConsumerKey(consumer); 496 497 ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(key); 499 if (existing != null) { 500 if (consumer.getId() == existing.getId()) { 504 _endpoints.remove(key); 505 } else { 506 if (_log.isDebugEnabled()) { 507 _log.debug("Existing endpoint doesn't match that to " + 508 "be deleted - retaining"); 509 } 510 } 511 512 consumer.close(); 514 515 if (!(consumer instanceof DurableConsumerEndpoint)) { 518 removeFromConsumerCache(key); 519 } 520 } 521 } 522 523 529 public ConsumerEndpoint getConsumerEndpoint(long consumerId) { 530 return (ConsumerEndpoint) _endpoints.get(new Long (consumerId)); 531 } 532 533 539 public ConsumerEndpoint getConsumerEndpoint(String persistentId) { 540 return (ConsumerEndpoint) _endpoints.get(persistentId); 541 } 542 543 549 public boolean hasActiveConsumers(JmsDestination destination) 550 throws JMSException { 551 boolean result = false; 552 553 Object [] endpoints = _endpoints.values().toArray(); 554 for (int index = 0; index < endpoints.length; index++) { 555 ConsumerEndpoint endpoint = (ConsumerEndpoint) endpoints[index]; 556 JmsDestination endpoint_dest = endpoint.getDestination(); 557 558 if ((destination instanceof JmsTopic) && 559 (endpoint_dest instanceof JmsTopic) && 560 (((JmsTopic) endpoint_dest).isWildCard())) { 561 if (((JmsTopic) endpoint_dest).match((JmsTopic) destination)) { 562 result = true; 563 break; 564 } 565 } else { 566 if (endpoint_dest.equals(destination)) { 567 result = true; 568 break; 569 } 570 } 571 } 572 573 return result; 574 } 575 576 582 public boolean isDurableConsumerActive(String name) { 583 return (_endpoints.get(name) != null); 584 } 585 586 592 public JmsDestination getDestinationForConsumerName(String name) { 593 ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name); 594 return (entry != null) ? entry.getDestination() : null; 595 } 596 597 603 public boolean durableConsumerExists(String name) { 604 boolean result = false; 605 ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name); 606 if ((entry != null) && 607 (entry._durable)) { 608 result = true; 609 } 610 611 return result; 612 } 613 614 622 public boolean validSubscription(String topic, String name) { 623 624 boolean result = false; 625 ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name); 626 627 if ((entry != null) && 628 (entry._destination != null) && 629 (entry._destination.getName().equals(topic))) { 630 result = true; 631 } 632 633 return result; 634 } 635 636 642 public synchronized List getInactiveSubscriptions(JmsTopic topic) { 643 List result = new ArrayList (); 644 List consumers = (List ) _destToConsumerMap.get(topic); 645 if (consumers != null) { 646 Iterator iterator = consumers.iterator(); 647 while (iterator.hasNext()) { 648 ConsumerEntry entry = (ConsumerEntry) iterator.next(); 649 if (entry.isDurable() 650 && !_endpoints.containsKey(entry.getKey())) { 651 result.add(entry.getName()); 652 } 653 } 654 } 655 return result; 656 } 657 658 661 public synchronized void destroy() { 662 663 Object [] endpoints = _endpoints.values().toArray(); 665 for (int index = 0; index < endpoints.length; index++) { 666 deleteConsumerEndpoint((ConsumerEndpoint) endpoints[index]); 667 } 668 _endpoints.clear(); 669 670 _consumerCache.clear(); 672 _consumerCache = null; 673 _destToConsumerMap.clear(); 674 _destToConsumerMap = null; 675 _wildcardConsumers.clear(); 676 _wildcardConsumers = null; 677 678 _instance = null; 680 } 681 682 687 public synchronized List getDurableConsumersForDest(JmsTopic dest) { 688 List names = new ArrayList (); 689 690 List consumers = (List ) _destToConsumerMap.get(dest); 691 if (consumers != null) { 692 Iterator iterator = consumers.iterator(); 693 while (iterator.hasNext()) { 694 ConsumerEntry entry = (ConsumerEntry) iterator.next(); 695 if (entry.isDurable()) { 696 names.add(entry.getName()); 697 } 698 } 699 } 700 701 Iterator wildconsumers = _wildcardConsumers.keySet().iterator(); 704 while (wildconsumers.hasNext()) { 705 ConsumerEntry entry = (ConsumerEntry) wildconsumers.next(); 706 JmsDestination adest = entry.getDestination(); 707 if (entry.isDurable() && adest instanceof JmsTopic && 708 ((JmsTopic) adest).match((JmsTopic) dest)) { 709 names.add(entry.getName()); 710 } 711 } 712 713 return names; 714 } 715 716 723 public synchronized List getEndpointsForDest(JmsDestination dest) { 724 LinkedList endpoints = new LinkedList (); 725 Iterator iter = _endpoints.values().iterator(); 726 727 while (iter.hasNext()) { 728 ConsumerEndpoint endpoint = (ConsumerEndpoint) iter.next(); 729 if (dest.equals(endpoint.getDestination())) { 730 endpoints.add(endpoint); 731 } 732 } 733 734 return endpoints; 735 } 736 737 744 private synchronized void addToConsumerCache(Object key, JmsDestination dest, 745 boolean durable) { 746 if (_log.isDebugEnabled()) { 747 _log.debug("addToConsumerCache(key=" + key + ", dest=" + dest 748 + ", durable=" + durable + ")"); 749 } 750 751 if (!_consumerCache.containsKey(key)) { 752 ConsumerEntry entry = new ConsumerEntry(key, dest, durable); 753 _consumerCache.put(key, entry); 754 755 if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) { 758 _wildcardConsumers.put(entry, dest); 760 } else { 761 List consumers = (List ) _destToConsumerMap.get(dest); 763 if (consumers == null) { 764 consumers = new ArrayList (); 765 _destToConsumerMap.put(dest, consumers); 766 } 767 768 consumers.add(entry); 770 } 771 } 772 } 773 774 779 private synchronized void removeFromConsumerCache(Object key) { 780 if (_log.isDebugEnabled()) { 781 _log.debug("removeFromConsumerCache(key=" + key + ")"); 782 } 783 784 ConsumerEntry entry = (ConsumerEntry) _consumerCache.remove(key); 785 if (entry != null) { 786 JmsDestination dest = entry.getDestination(); 787 788 if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) { 789 _wildcardConsumers.remove(entry); 791 } else { 792 List consumers = (List ) _destToConsumerMap.get(dest); 794 if (consumers != null) { 795 consumers.remove(entry); 796 797 if (consumers.isEmpty()) { 799 _destToConsumerMap.remove(dest); 800 } 801 } 802 } 803 } else { 804 if (_log.isDebugEnabled()) { 805 _log.debug("removeFromConsumerCache(key=" + key + 806 "): consumer not found"); 807 } 808 } 809 } 810 811 816 private synchronized void removeFromConsumerCache( 817 JmsDestination destination) { 818 if (_destToConsumerMap.containsKey(destination)) { 819 _destToConsumerMap.remove(destination); 820 } 821 } 822 823 828 private void init() throws ServiceException { 829 _scheduler = Scheduler.instance(); 830 831 Connection connection = null; 832 try { 833 connection = DatabaseService.getConnection(); 834 835 PersistenceAdapter adapter = DatabaseService.getAdapter(); 836 connection.commit(); 837 838 HashMap map = adapter.getAllDurableConsumers(connection); 840 Iterator iter = map.keySet().iterator(); 841 842 while (iter.hasNext()) { 844 String consumer = (String ) iter.next(); 846 String deststr = (String ) map.get(consumer); 847 848 JmsDestination dest = 849 DestinationManager.instance().getDestination(deststr); 850 if (dest == null) { 851 dest = new JmsTopic(deststr); 853 if (!((JmsTopic) dest).isWildCard()) { 854 dest = null; 855 } 856 } 857 858 if (consumer != null && dest != null && 859 dest instanceof JmsTopic) { 860 addToConsumerCache(consumer, dest, true); 862 } else { 863 _log.error("Failure in ConsumerManager.init : " + consumer + 865 ":" + dest); 866 } 867 } 868 } catch (ServiceException exception) { 869 SQLHelper.rollback(connection); 870 throw exception; 871 } catch (Exception exception) { 872 SQLHelper.rollback(connection); 873 throw new ServiceException("Failed to initialise ConsumerManager", 874 exception); 875 } finally { 876 SQLHelper.close(connection); 877 } 878 } 879 880 885 private synchronized long getNextConsumerId() { 886 return ++_consumerIdSeed; 887 } 888 889 899 private void checkDestination(JmsDestination destination) 900 throws InvalidDestinationException { 901 final DestinationManager manager = DestinationManager.instance(); 902 final String name = destination.getName(); 903 final JmsDestination existing = manager.getDestination(name); 904 905 if (existing == null) { 906 if (destination.getPersistent()) { 907 throw new InvalidDestinationException ( 908 "No persistent destination with name=" + name 909 + " exists"); 910 } 911 manager.createDestination(destination); 913 } else { 914 if (existing.getPersistent() != destination.getPersistent()) { 917 throw new InvalidDestinationException ( 918 "Mismatched destination properties for destination" 919 + "with name=" + name); 920 } 921 } 922 } 923 924 927 private static final class ConsumerEntry { 928 929 932 private final Object _key; 933 934 937 private final boolean _durable; 938 939 942 private final JmsDestination _destination; 943 944 952 public ConsumerEntry(Object key, JmsDestination destination, 953 boolean durable) { 954 _key = key; 955 _destination = destination; 956 _durable = durable; 957 } 958 959 public boolean equals(Object obj) { 961 962 boolean result = false; 963 if ((obj != null) && 964 (obj instanceof ConsumerEntry) && 965 (((ConsumerEntry) obj)._key.equals(_key))) { 966 result = true; 967 } 968 969 return result; 970 } 971 972 public Object getKey() { 973 return _key; 974 } 975 976 public String getName() { 977 return (_key instanceof String ) ? (String ) _key : null; 978 } 979 980 public JmsDestination getDestination() { 981 return _destination; 982 } 983 984 public boolean isDurable() { 985 return _durable; 986 } 987 988 996 public static Object getConsumerKey(ConsumerEndpoint consumer) { 997 Object key = null; 998 String id = consumer.getPersistentId(); 999 if (id != null) { 1000 key = id; 1001 } else { 1002 key = new Long (consumer.getId()); 1003 } 1004 return key; 1005 } 1006 } 1007 1008} 1009 1010 | Popular Tags |