1 18 package org.apache.activemq.broker; 19 20 import java.io.File ; 21 import java.io.IOException ; 22 import java.io.Serializable ; 23 import java.net.URI ; 24 import java.net.URISyntaxException ; 25 import java.net.UnknownHostException ; 26 import java.util.ArrayList ; 27 import java.util.HashMap ; 28 import java.util.Iterator ; 29 import java.util.List ; 30 import java.util.Map ; 31 import java.util.Set ; 32 import java.util.concurrent.CopyOnWriteArrayList ; 33 import java.util.concurrent.CountDownLatch ; 34 import java.util.concurrent.atomic.AtomicBoolean ; 35 import javax.management.MBeanServer ; 36 import javax.management.MalformedObjectNameException ; 37 import javax.management.ObjectName ; 38 import org.apache.activemq.ActiveMQConnectionMetaData; 39 import org.apache.activemq.Service; 40 import org.apache.activemq.advisory.AdvisoryBroker; 41 import org.apache.activemq.broker.ft.MasterConnector; 42 import org.apache.activemq.broker.jmx.BrokerView; 43 import org.apache.activemq.broker.jmx.ConnectorView; 44 import org.apache.activemq.broker.jmx.ConnectorViewMBean; 45 import org.apache.activemq.broker.jmx.FTConnectorView; 46 import org.apache.activemq.broker.jmx.JmsConnectorView; 47 import org.apache.activemq.broker.jmx.ManagedRegionBroker; 48 import org.apache.activemq.broker.jmx.ManagementContext; 49 import org.apache.activemq.broker.jmx.NetworkConnectorView; 50 import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; 51 import org.apache.activemq.broker.jmx.ProxyConnectorView; 52 import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 53 import org.apache.activemq.broker.region.DestinationFactory; 54 import org.apache.activemq.broker.region.DestinationFactoryImpl; 55 import org.apache.activemq.broker.region.DestinationInterceptor; 56 import org.apache.activemq.broker.region.RegionBroker; 57 import org.apache.activemq.broker.region.policy.PolicyMap; 58 import org.apache.activemq.broker.region.virtual.VirtualDestination; 59 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 60 import org.apache.activemq.broker.region.virtual.VirtualTopic; 61 import org.apache.activemq.command.ActiveMQDestination; 62 import org.apache.activemq.command.BrokerId; 63 import org.apache.activemq.kaha.Store; 64 import org.apache.activemq.kaha.StoreFactory; 65 import org.apache.activemq.memory.UsageManager; 66 import org.apache.activemq.network.ConnectionFilter; 67 import org.apache.activemq.network.DiscoveryNetworkConnector; 68 import org.apache.activemq.network.NetworkConnector; 69 import org.apache.activemq.network.jms.JmsConnector; 70 import org.apache.activemq.proxy.ProxyConnector; 71 import org.apache.activemq.security.MessageAuthorizationPolicy; 72 import org.apache.activemq.security.SecurityContext; 73 import org.apache.activemq.store.PersistenceAdapter; 74 import org.apache.activemq.store.PersistenceAdapterFactory; 75 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; 76 import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 77 import org.apache.activemq.thread.TaskRunnerFactory; 78 import org.apache.activemq.transport.TransportFactory; 79 import org.apache.activemq.transport.TransportServer; 80 import org.apache.activemq.transport.vm.VMTransportFactory; 81 import org.apache.activemq.util.IOExceptionSupport; 82 import org.apache.activemq.util.JMXSupport; 83 import org.apache.activemq.util.ServiceStopper; 84 import org.apache.activemq.util.URISupport; 85 import org.apache.activemq.util.IOHelper; 86 import org.apache.commons.logging.Log; 87 import org.apache.commons.logging.LogFactory; 88 89 96 public class BrokerService implements Service, Serializable { 97 98 99 100 private static final Log log = LogFactory.getLog(BrokerService.class); 101 private static final long serialVersionUID = 7353129142305630237L; 102 public static final String DEFAULT_PORT = "61616"; 103 static final String DEFAULT_BROKER_NAME = "localhost"; 104 public static final String LOCAL_HOST_NAME; 105 106 private boolean useJmx = true; 107 private boolean enableStatistics = true; 108 private boolean persistent = true; 109 private boolean populateJMSXUserID = false; 110 private boolean useShutdownHook = true; 111 private boolean useLoggingForShutdownErrors = false; 112 private boolean shutdownOnMasterFailure = false; 113 private String brokerName = DEFAULT_BROKER_NAME; 114 private File dataDirectoryFile; 115 private File tmpDataDirectory; 116 private Broker broker; 117 private BrokerView adminView; 118 private ManagementContext managementContext; 119 private ObjectName brokerObjectName; 120 private TaskRunnerFactory taskRunnerFactory; 121 private TaskRunnerFactory persistenceTaskRunnerFactory; 122 private UsageManager usageManager; 123 private UsageManager producerUsageManager; 124 private UsageManager consumerUsageManager; 125 private PersistenceAdapter persistenceAdapter; 126 private PersistenceAdapterFactory persistenceFactory; 127 private DestinationFactory destinationFactory; 128 private MessageAuthorizationPolicy messageAuthorizationPolicy; 129 private List transportConnectors = new CopyOnWriteArrayList (); 130 private List networkConnectors = new CopyOnWriteArrayList (); 131 private List proxyConnectors = new CopyOnWriteArrayList (); 132 private List registeredMBeanNames = new CopyOnWriteArrayList (); 133 private List jmsConnectors = new CopyOnWriteArrayList (); 134 private Service[] services; 135 private MasterConnector masterConnector; 136 private String masterConnectorURI; 137 private transient Thread shutdownHook; 138 private String [] transportConnectorURIs; 139 private String [] networkConnectorURIs; 140 private String [] proxyConnectorURIs; 141 private JmsConnector[] jmsBridgeConnectors; private boolean deleteAllMessagesOnStartup; 143 private boolean advisorySupport = true; 144 private URI vmConnectorURI; 145 private PolicyMap destinationPolicy; 146 private AtomicBoolean started = new AtomicBoolean (false); 147 private AtomicBoolean stopped = new AtomicBoolean (false); 148 private BrokerPlugin[] plugins; 149 private boolean keepDurableSubsActive=true; 150 private boolean useVirtualTopics=true; 151 private BrokerId brokerId; 152 private DestinationInterceptor[] destinationInterceptors; 153 private ActiveMQDestination[] destinations; 154 private Store tempDataStore; 155 private int persistenceThreadPriority = Thread.MAX_PRIORITY; 156 private boolean useLocalHostBrokerName = false; 157 private CountDownLatch stoppedLatch = new CountDownLatch (1); 158 159 static{ 160 String localHostName = "localhost"; 161 try{ 162 localHostName=java.net.InetAddress.getLocalHost().getHostName(); 163 }catch(UnknownHostException e){ 164 log.error("Failed to resolve localhost"); 165 } 166 LOCAL_HOST_NAME = localHostName; 167 } 168 169 @Override 170 public String toString() { 171 return "BrokerService[" + getBrokerName() + "]"; 172 } 173 174 180 public TransportConnector addConnector(String bindAddress) throws Exception { 181 return addConnector(new URI (bindAddress)); 182 } 183 184 190 public TransportConnector addConnector(URI bindAddress) throws Exception { 191 return addConnector(createTransportConnector(getBroker(), bindAddress)); 192 } 193 194 200 public TransportConnector addConnector(TransportServer transport) throws Exception { 201 return addConnector(new TransportConnector(getBroker(), transport)); 202 } 203 204 210 public TransportConnector addConnector(TransportConnector connector) throws Exception { 211 212 transportConnectors.add(connector); 213 214 return connector; 215 } 216 217 218 225 public boolean removeConnector(TransportConnector connector) throws Exception { 226 boolean rc = transportConnectors.remove(connector); 227 if( rc ) { 228 unregisterConnectorMBean(connector); 229 } 230 return rc; 231 232 } 233 234 240 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { 241 return addNetworkConnector(new URI (discoveryAddress)); 242 } 243 244 250 public ProxyConnector addProxyConnector(String bindAddress) throws Exception { 251 return addProxyConnector(new URI (bindAddress)); 252 } 253 254 260 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { 261 NetworkConnector connector=new DiscoveryNetworkConnector(discoveryAddress); 262 return addNetworkConnector(connector); 263 } 264 265 271 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { 272 ProxyConnector connector=new ProxyConnector(); 273 connector.setBind(bindAddress); 274 connector.setRemote(new URI ("fanout:multicast://default")); 275 return addProxyConnector(connector); 276 } 277 278 282 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { 283 URI uri = getVmConnectorURI(); 284 HashMap map = new HashMap (URISupport.parseParamters(uri)); 285 map.put("network", "true"); 286 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 287 connector.setLocalUri(uri); 288 289 connector.setConnectionFilter(new ConnectionFilter() { 291 public boolean connectTo(URI location) { 292 List transportConnectors = getTransportConnectors(); 293 for (Iterator iter = transportConnectors.iterator(); iter.hasNext();) { 294 try { 295 TransportConnector tc = (TransportConnector) iter.next(); 296 if( location.equals(tc.getConnectUri()) ) { 297 return false; 298 } 299 } catch (Throwable e) { 300 } 301 } 302 return true; 303 } 304 }); 305 306 networkConnectors.add(connector); 307 if (isUseJmx()) { 308 registerNetworkConnectorMBean(connector); 309 } 310 return connector; 311 } 312 313 317 public boolean removeNetworkConnector(NetworkConnector connector) { 318 boolean answer = networkConnectors.remove(connector); 319 if (answer) { 320 unregisterNetworkConnectorMBean(connector); 321 } 322 return answer; 323 } 324 325 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { 326 URI uri = getVmConnectorURI(); 327 connector.setLocalUri(uri); 328 proxyConnectors.add(connector); 329 if (isUseJmx()) { 330 registerProxyConnectorMBean(connector); 331 } 332 return connector; 333 } 334 335 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { 336 connector.setBrokerService(this); 337 jmsConnectors.add(connector); 338 if (isUseJmx()) { 339 registerJmsConnectorMBean(connector); 340 } 341 return connector; 342 } 343 344 public JmsConnector removeJmsConnector(JmsConnector connector){ 345 if (jmsConnectors.remove(connector)){ 346 return connector; 347 } 348 return null; 349 } 350 351 354 public String getMasterConnectorURI(){ 355 return masterConnectorURI; 356 } 357 358 361 public void setMasterConnectorURI(String masterConnectorURI){ 362 this.masterConnectorURI=masterConnectorURI; 363 } 364 365 368 public boolean isSlave(){ 369 return masterConnector != null && masterConnector.isSlave(); 370 } 371 372 public void masterFailed(){ 373 if (shutdownOnMasterFailure){ 374 log.fatal("The Master has failed ... shutting down"); 375 try { 376 stop(); 377 }catch(Exception e){ 378 log.error("Failed to stop for master failure",e); 379 } 380 }else { 381 log.warn("Master Failed - starting all connectors"); 382 try{ 383 startAllConnectors(); 384 }catch(Exception e){ 385 log.error("Failed to startAllConnectors"); 386 } 387 } 388 } 389 390 public boolean isStarted() { 391 return started.get(); 392 } 393 394 public void start() throws Exception { 397 if (! started.compareAndSet(false, true)) { 398 402 return; 404 } 405 406 try { 407 processHelperProperties(); 408 409 BrokerRegistry.getInstance().bind(getBrokerName(), this); 410 411 startDestinations(); 412 413 addShutdownHook(); 414 log.info("Using Persistence Adapter: " + getPersistenceAdapter()); 415 if (deleteAllMessagesOnStartup) { 416 deleteAllMessages(); 417 } 418 419 if (isUseJmx()) { 420 getManagementContext().start(); 421 } 422 423 getBroker().start(); 424 425 432 startAllConnectors(); 433 434 if (isUseJmx() && masterConnector != null) { 435 registerFTConnectorMBean(masterConnector); 436 } 437 438 brokerId = broker.getBrokerId(); 439 log.info("ActiveMQ JMS Message Broker (" + getBrokerName()+", "+brokerId+") started"); 440 } 441 catch (Exception e) { 442 log.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e); 443 throw e; 444 } 445 } 446 447 448 public void stop() throws Exception { 449 if(!started.compareAndSet(true,false)){ 450 return; 451 } 452 log.info("ActiveMQ Message Broker ("+getBrokerName()+", "+brokerId+") is shutting down"); 453 removeShutdownHook(); 454 ServiceStopper stopper=new ServiceStopper(); 455 if(services!=null){ 456 for(int i=0;i<services.length;i++){ 457 Service service=services[i]; 458 stopper.stop(service); 459 } 460 } 461 stopAllConnectors(stopper); 462 stopper.stop(persistenceAdapter); 463 if(broker!=null){ 464 stopper.stop(broker); 465 } 466 if(tempDataStore!=null){ 467 tempDataStore.close(); 468 } 469 if(isUseJmx()){ 470 MBeanServer mbeanServer=getManagementContext().getMBeanServer(); 471 if(mbeanServer!=null){ 472 for(Iterator iter=registeredMBeanNames.iterator();iter.hasNext();){ 473 ObjectName name=(ObjectName )iter.next(); 474 try{ 475 mbeanServer.unregisterMBean(name); 476 }catch(Exception e){ 477 stopper.onException(mbeanServer,e); 478 } 479 } 480 } 481 stopper.stop(getManagementContext()); 482 } 483 BrokerRegistry.getInstance().unbind(getBrokerName()); 487 VMTransportFactory.stopped(getBrokerName()); 488 stopped.set(true); 489 stoppedLatch.countDown(); 490 491 log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+", "+brokerId+") stopped"); 492 stopper.throwFirstException(); 493 } 494 495 498 public void waitUntilStopped() { 499 while (!stopped.get()) { 500 try { 501 stoppedLatch.await(); 502 } 503 catch (InterruptedException e) { 504 } 506 } 507 } 508 509 510 513 516 public Broker getBroker() throws Exception { 517 if (broker == null) { 518 log.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker (" 519 + getBrokerName() + ") is starting"); 520 log.info("For help or more information please see: http://activemq.apache.org/"); 521 broker = createBroker(); 522 } 523 return broker; 524 } 525 526 527 532 public BrokerView getAdminView() throws Exception { 533 if (adminView == null) { 534 getBroker(); 536 } 537 return adminView; 538 } 539 540 public void setAdminView(BrokerView adminView) { 541 this.adminView = adminView; 542 } 543 544 public String getBrokerName() { 545 return brokerName; 546 } 547 548 551 public void setBrokerName(String brokerName) { 552 if (brokerName == null) { 553 throw new NullPointerException ("The broker name cannot be null"); 554 } 555 brokerName = brokerName.trim(); 556 this.brokerName = brokerName; 557 } 558 559 public PersistenceAdapterFactory getPersistenceFactory() { 560 if (persistenceFactory == null) { 561 persistenceFactory = createPersistenceFactory(); 562 } 563 return persistenceFactory; 564 } 565 566 public File getDataDirectoryFile() { 567 if (dataDirectoryFile == null) { 568 dataDirectoryFile = new File (IOHelper.getDefaultDataDirectory()); 569 } 570 return dataDirectoryFile; 571 } 572 573 public File getBrokerDataDirectory() { 574 String brokerDir = getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"); 575 return new File (getDataDirectoryFile(), brokerDir); 576 } 577 578 579 586 public void setDataDirectory(String dataDirectory) { 587 setDataDirectoryFile(new File (dataDirectory)); 588 } 589 590 597 public void setDataDirectoryFile(File dataDirectoryFile) { 598 this.dataDirectoryFile = dataDirectoryFile; 599 } 600 601 604 public File getTmpDataDirectory(){ 605 if (tmpDataDirectory == null) { 606 tmpDataDirectory = new File (getBrokerDataDirectory(), "tmp_storage"); 607 } 608 return tmpDataDirectory; 609 } 610 611 614 public void setTmpDataDirectory(File tmpDataDirectory){ 615 this.tmpDataDirectory=tmpDataDirectory; 616 } 617 618 619 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { 620 this.persistenceFactory = persistenceFactory; 621 } 622 623 public void setDestinationFactory(DestinationFactory destinationFactory) { 624 this.destinationFactory = destinationFactory; 625 } 626 627 public boolean isPersistent() { 628 return persistent; 629 } 630 631 634 public void setPersistent(boolean persistent) { 635 this.persistent = persistent; 636 } 637 638 public boolean isPopulateJMSXUserID() { 639 return populateJMSXUserID; 640 } 641 642 645 public void setPopulateJMSXUserID(boolean populateJMSXUserID) { 646 this.populateJMSXUserID = populateJMSXUserID; 647 } 648 649 public UsageManager getMemoryManager() { 650 if (usageManager == null) { 651 usageManager = new UsageManager("Main"); 652 usageManager.setLimit(1024 * 1024 * 64); } 655 return usageManager; 656 } 657 658 659 public void setMemoryManager(UsageManager memoryManager) { 660 this.usageManager = memoryManager; 661 } 662 663 666 public UsageManager getConsumerUsageManager(){ 667 if (consumerUsageManager==null) { 668 consumerUsageManager = new UsageManager(getMemoryManager(),"Consumer",0.5f); 669 } 670 return consumerUsageManager; 671 } 672 673 674 677 public void setConsumerUsageManager(UsageManager consumerUsageManager){ 678 this.consumerUsageManager=consumerUsageManager; 679 } 680 681 682 685 public UsageManager getProducerUsageManager(){ 686 if (producerUsageManager==null) { 687 producerUsageManager = new UsageManager(getMemoryManager(),"Producer",0.45f); 688 } 689 return producerUsageManager; 690 } 691 692 695 public void setProducerUsageManager(UsageManager producerUsageManager){ 696 this.producerUsageManager=producerUsageManager; 697 } 698 699 700 public PersistenceAdapter getPersistenceAdapter() throws IOException { 701 if (persistenceAdapter == null) { 702 persistenceAdapter = createPersistenceAdapter(); 703 configureService(persistenceAdapter); 704 } 705 return persistenceAdapter; 706 } 707 708 711 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) { 712 this.persistenceAdapter = persistenceAdapter; 713 } 714 715 public TaskRunnerFactory getTaskRunnerFactory() { 716 if (taskRunnerFactory == null) { 717 taskRunnerFactory = new TaskRunnerFactory(); 718 } 719 return taskRunnerFactory; 720 } 721 722 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 723 this.taskRunnerFactory = taskRunnerFactory; 724 } 725 726 727 public TaskRunnerFactory getPersistenceTaskRunnerFactory(){ 728 if (taskRunnerFactory == null) { 729 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, true, 1000); 730 } 731 return persistenceTaskRunnerFactory; 732 } 733 734 735 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory){ 736 this.persistenceTaskRunnerFactory=persistenceTaskRunnerFactory; 737 } 738 739 public boolean isUseJmx() { 740 return useJmx; 741 } 742 743 public boolean isEnableStatistics() { 744 return enableStatistics; 745 } 746 747 751 public void setEnableStatistics(boolean enableStatistics) { 752 this.enableStatistics = enableStatistics; 753 } 754 755 759 public void setUseJmx(boolean useJmx) { 760 this.useJmx = useJmx; 761 } 762 763 public ObjectName getBrokerObjectName() throws IOException { 764 if (brokerObjectName == null) { 765 brokerObjectName = createBrokerObjectName(); 766 } 767 return brokerObjectName; 768 } 769 770 773 public void setBrokerObjectName(ObjectName brokerObjectName) { 774 this.brokerObjectName = brokerObjectName; 775 } 776 777 public ManagementContext getManagementContext() { 778 if (managementContext == null) { 779 managementContext = new ManagementContext(); 780 } 781 return managementContext; 782 } 783 784 public void setManagementContext(ManagementContext managementContext) { 785 this.managementContext = managementContext; 786 } 787 788 public String [] getNetworkConnectorURIs() { 789 return networkConnectorURIs; 790 } 791 792 public void setNetworkConnectorURIs(String [] networkConnectorURIs) { 793 this.networkConnectorURIs = networkConnectorURIs; 794 } 795 796 public String [] getTransportConnectorURIs() { 797 return transportConnectorURIs; 798 } 799 800 public void setTransportConnectorURIs(String [] transportConnectorURIs) { 801 this.transportConnectorURIs = transportConnectorURIs; 802 } 803 804 807 public JmsConnector[] getJmsBridgeConnectors(){ 808 return jmsBridgeConnectors; 809 } 810 811 814 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors){ 815 this.jmsBridgeConnectors=jmsConnectors; 816 } 817 818 public Service[] getServices() { 819 return services; 820 } 821 822 825 public void setServices(Service[] services) { 826 this.services = services; 827 } 828 829 832 public void addService(Service service) { 833 if (services == null) { 834 services = new Service[] { service }; 835 } 836 else { 837 int length = services.length; 838 Service[] temp = new Service[length + 1]; 839 System.arraycopy(services, 1, temp, 1, length); 840 temp[length] = service; 841 services = temp; 842 } 843 } 844 845 846 public boolean isUseLoggingForShutdownErrors() { 847 return useLoggingForShutdownErrors; 848 } 849 850 854 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { 855 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; 856 } 857 858 public boolean isUseShutdownHook() { 859 return useShutdownHook; 860 } 861 862 867 public void setUseShutdownHook(boolean useShutdownHook) { 868 this.useShutdownHook = useShutdownHook; 869 } 870 871 public boolean isAdvisorySupport() { 872 return advisorySupport; 873 } 874 875 878 public void setAdvisorySupport(boolean advisorySupport) { 879 this.advisorySupport = advisorySupport; 880 } 881 882 public List getTransportConnectors() { 883 return new ArrayList (transportConnectors); 884 } 885 886 892 public void setTransportConnectors(List transportConnectors) throws Exception { 893 for (Iterator iter = transportConnectors.iterator(); iter.hasNext();) { 894 TransportConnector connector = (TransportConnector) iter.next(); 895 addConnector(connector); 896 } 897 } 898 899 public List getNetworkConnectors() { 900 return new ArrayList (networkConnectors); 901 } 902 903 public List getProxyConnectors() { 904 return new ArrayList (proxyConnectors); 905 } 906 907 913 public void setNetworkConnectors(List networkConnectors) throws Exception { 914 for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) { 915 NetworkConnector connector = (NetworkConnector) iter.next(); 916 addNetworkConnector(connector); 917 } 918 } 919 920 924 public void setProxyConnectors(List proxyConnectors) throws Exception { 925 for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) { 926 ProxyConnector connector = (ProxyConnector) iter.next(); 927 addProxyConnector(connector); 928 } 929 } 930 931 public PolicyMap getDestinationPolicy() { 932 return destinationPolicy; 933 } 934 935 939 public void setDestinationPolicy(PolicyMap policyMap) { 940 this.destinationPolicy = policyMap; 941 } 942 943 public BrokerPlugin[] getPlugins() { 944 return plugins; 945 } 946 947 950 public void setPlugins(BrokerPlugin[] plugins) { 951 this.plugins = plugins; 952 } 953 954 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 955 return messageAuthorizationPolicy; 956 } 957 958 962 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 963 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 964 } 965 966 970 public void deleteAllMessages() throws IOException { 971 getPersistenceAdapter().deleteAllMessages(); 972 } 973 974 public boolean isDeleteAllMessagesOnStartup() { 975 return deleteAllMessagesOnStartup; 976 } 977 978 982 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { 983 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; 984 } 985 986 public URI getVmConnectorURI() { 987 if (vmConnectorURI == null) { 988 try { 989 vmConnectorURI = new URI ("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_")); 990 } 991 catch (URISyntaxException e) { 992 log.error("Badly formed URI from " + getBrokerName(),e); 993 } 994 } 995 return vmConnectorURI; 996 } 997 998 public void setVmConnectorURI(URI vmConnectorURI) { 999 this.vmConnectorURI = vmConnectorURI; 1000 } 1001 1002 1005 public boolean isShutdownOnMasterFailure(){ 1006 return shutdownOnMasterFailure; 1007 } 1008 1009 1012 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){ 1013 this.shutdownOnMasterFailure=shutdownOnMasterFailure; 1014 } 1015 1016 public boolean isKeepDurableSubsActive() { 1017 return keepDurableSubsActive; 1018 } 1019 1020 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 1021 this.keepDurableSubsActive = keepDurableSubsActive; 1022 } 1023 1024 public boolean isUseVirtualTopics() { 1025 return useVirtualTopics; 1026 } 1027 1028 1033 public void setUseVirtualTopics(boolean useVirtualTopics) { 1034 this.useVirtualTopics = useVirtualTopics; 1035 } 1036 1037 public DestinationInterceptor[] getDestinationInterceptors() { 1038 return destinationInterceptors; 1039 } 1040 1041 1044 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { 1045 this.destinationInterceptors = destinationInterceptors; 1046 } 1047 1048 public ActiveMQDestination[] getDestinations() { 1049 return destinations; 1050 } 1051 1052 1055 public void setDestinations(ActiveMQDestination[] destinations) { 1056 this.destinations = destinations; 1057 } 1058 1059 1062 public synchronized Store getTempDataStore(){ 1063 if(tempDataStore==null){ 1064 String name=getTmpDataDirectory().getPath(); 1065 try{ 1066 log.info("About to delete any non-persistent messages that may have overflowed to disk ..."); 1067 StoreFactory.delete(name); 1068 log.info("Successfully deleted temporary storage"); 1069 tempDataStore=StoreFactory.open(name,"rw"); 1070 }catch(IOException e){ 1071 throw new RuntimeException (e); 1072 } 1073 } 1074 return tempDataStore; 1075 } 1076 1077 1080 public void setTempDataStore(Store tempDataStore){ 1081 this.tempDataStore=tempDataStore; 1082 } 1083 1084 public int getPersistenceThreadPriority(){ 1085 return persistenceThreadPriority; 1086 } 1087 1088 public void setPersistenceThreadPriority(int persistenceThreadPriority){ 1089 this.persistenceThreadPriority=persistenceThreadPriority; 1090 } 1091 1092 1095 public boolean isUseLocalHostBrokerName(){ 1096 return this.useLocalHostBrokerName; 1097 } 1098 1099 1102 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName){ 1103 this.useLocalHostBrokerName=useLocalHostBrokerName; 1104 if(useLocalHostBrokerName&&!started.get()&&brokerName==null||brokerName==DEFAULT_BROKER_NAME){ 1105 brokerName=LOCAL_HOST_NAME; 1106 } 1107 } 1108 1109 1117 protected void processHelperProperties() throws Exception { 1118 if (transportConnectorURIs != null) { 1119 for (int i = 0; i < transportConnectorURIs.length; i++) { 1120 String uri = transportConnectorURIs[i]; 1121 addConnector(uri); 1122 } 1123 } 1124 if (networkConnectorURIs != null) { 1125 for (int i = 0; i < networkConnectorURIs.length; i++) { 1126 String uri = networkConnectorURIs[i]; 1127 addNetworkConnector(uri); 1128 } 1129 } 1130 if (proxyConnectorURIs != null) { 1131 for (int i = 0; i < proxyConnectorURIs.length; i++) { 1132 String uri = proxyConnectorURIs[i]; 1133 addProxyConnector(uri); 1134 } 1135 } 1136 1137 if (jmsBridgeConnectors != null){ 1138 for (int i = 0; i < jmsBridgeConnectors.length; i++){ 1139 addJmsConnector(jmsBridgeConnectors[i]); 1140 } 1141 } 1142 if (masterConnectorURI != null) { 1143 if (masterConnector != null) { 1144 throw new IllegalStateException ("Cannot specify masterConnectorURI when a masterConnector is already registered via the services property"); 1145 } 1146 else { 1147 addService(new MasterConnector(masterConnectorURI)); 1148 } 1149 } 1150 } 1151 1152 protected void stopAllConnectors(ServiceStopper stopper) { 1153 1154 for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { 1155 NetworkConnector connector = (NetworkConnector) iter.next(); 1156 unregisterNetworkConnectorMBean(connector); 1157 stopper.stop(connector); 1158 } 1159 1160 for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { 1161 ProxyConnector connector = (ProxyConnector) iter.next(); 1162 stopper.stop(connector); 1163 } 1164 1165 for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) { 1166 JmsConnector connector = (JmsConnector) iter.next(); 1167 stopper.stop(connector); 1168 } 1169 1170 for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) { 1171 TransportConnector connector = (TransportConnector) iter.next(); 1172 stopper.stop(connector); 1173 } 1174 } 1175 1176 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { 1177 MBeanServer mbeanServer = getManagementContext().getMBeanServer(); 1178 if (mbeanServer != null) { 1179 1180 try { 1181 ObjectName objectName = createConnectorObjectName(connector); 1182 connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), objectName); 1183 ConnectorViewMBean view = new ConnectorView(connector); 1184 mbeanServer.registerMBean(view, objectName); 1185 registeredMBeanNames.add(objectName); 1186 return connector; 1187 } 1188 catch (Throwable e) { 1189 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e); 1190 } 1191 } 1192 return connector; 1193 } 1194 1195 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { 1196 if (isUseJmx()) { 1197 MBeanServer mbeanServer = getManagementContext().getMBeanServer(); 1198 if (mbeanServer != null) { 1199 try { 1200 ObjectName objectName = createConnectorObjectName(connector); 1201 1202 if( registeredMBeanNames.remove(objectName) ) { 1203 mbeanServer.unregisterMBean(objectName); 1204 } 1205 } 1206 catch (Throwable e) { 1207 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e); 1208 } 1209 } 1210 } 1211 } 1212 1213 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { 1214 return new ObjectName ( 1215 managementContext.getJmxDomainName()+":"+ 1216 "BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+ 1217 "Type=Connector,"+ 1218 "ConnectorName="+JMXSupport.encodeObjectNamePart(connector.getName()) 1219 ); 1220 } 1221 1222 protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { 1223 MBeanServer mbeanServer = getManagementContext().getMBeanServer(); 1224 if (mbeanServer != null) { 1225 NetworkConnectorViewMBean view = new NetworkConnectorView(connector); 1226 try { 1227 ObjectName objectName = createNetworkConnectorObjectName(connector); 1228 mbeanServer.registerMBean(view, objectName); 1229 registeredMBeanNames.add(objectName); 1230 } 1231 catch (Throwable e) { 1232 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); 1233 } 1234 } 1235 } 1236 1237 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { 1238 return new ObjectName (managementContext.getJmxDomainName() + ":" + "BrokerName=" 1239 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector," + "NetworkConnectorName=" 1240 + JMXSupport.encodeObjectNamePart(connector.getName())); 1241 } 1242 1243 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { 1244 if (isUseJmx()) { 1245 MBeanServer mbeanServer = getManagementContext().getMBeanServer(); 1246 if (mbeanServer != null) { 1247 try { 1248 ObjectName objectName = createNetworkConnectorObjectName(connector); 1249 if (registeredMBeanNames.remove(objectName)) { 1250 mbeanServer.unregisterMBean(objectName); 1251 } 1252 } 1253 catch (Exception e) { 1254 log.error("Network Connector could not be unregistered from JMX: " + e, e); 1255 } 1256 } 1257 } 1258 } 1259 1260 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { 1261 MBeanServer mbeanServer = getManagementContext().getMBeanServer(); 1262 if (mbeanServer != null) { 1263 ProxyConnectorView view = new ProxyConnectorView(connector); 1264 try { 1265 ObjectName objectName = new ObjectName (managementContext.getJmxDomainName() + ":" + "BrokerName=" 1266 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector," + "ProxyConnectorName=" 1267 + JMXSupport.encodeObjectNamePart(connector.getName())); 1268 mbeanServer.registerMBean(view, objectName); 1269 registeredMBeanNames.add(objectName); 1270 } 1271 catch (Throwable e) { 1272 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1273 } 1274 } 1275 } 1276 1277 protected void registerFTConnectorMBean(MasterConnector connector) throws IOException { 1278 MBeanServer mbeanServer = getManagementContext().getMBeanServer(); 1279 if (mbeanServer != null) { 1280 FTConnectorView view = new FTConnectorView(connector); 1281 try { 1282 ObjectName objectName = new ObjectName (managementContext.getJmxDomainName() + ":" + "BrokerName=" 1283 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector"); 1284 mbeanServer.registerMBean(view, objectName); 1285 registeredMBeanNames.add(objectName); 1286 } 1287 catch (Throwable e) { 1288 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1289 } 1290 } 1291 } 1292 1293 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { 1294 MBeanServer mbeanServer = getManagementContext().getMBeanServer(); 1295 if (mbeanServer != null) { 1296 JmsConnectorView view = new JmsConnectorView(connector); 1297 try { 1298 ObjectName objectName = new ObjectName (managementContext.getJmxDomainName() + ":" + "BrokerName=" 1299 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector," + "JmsConnectorName=" 1300 + JMXSupport.encodeObjectNamePart(connector.getName())); 1301 mbeanServer.registerMBean(view, objectName); 1302 registeredMBeanNames.add(objectName); 1303 } 1304 catch (Throwable e) { 1305 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1306 } 1307 } 1308 } 1309 1310 1318 protected Broker createBroker() throws Exception { 1319 Broker regionBroker = createRegionBroker(); 1320 Broker broker = addInterceptors(regionBroker); 1321 1322 broker = new MutableBrokerFilter(broker) { 1324 public void stop() throws Exception { 1325 super.stop(); 1326 setNext(new ErrorBroker("Broker has been stopped: "+this) { 1327 public void stop() throws Exception { 1329 } 1330 }); 1331 } 1332 }; 1333 1334 RegionBroker rBroker = (RegionBroker) regionBroker; 1335 rBroker.getDestinationStatistics().setEnabled(enableStatistics); 1336 1337 1338 1339 if (isUseJmx()) { 1340 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; 1341 managedBroker.setContextBroker(broker); 1342 adminView = new BrokerView(this, managedBroker); 1343 MBeanServer mbeanServer = getManagementContext().getMBeanServer(); 1344 if (mbeanServer != null) { 1345 ObjectName objectName = getBrokerObjectName(); 1346 mbeanServer.registerMBean(adminView, objectName); 1347 registeredMBeanNames.add(objectName); 1348 } 1349 } 1350 1351 1352 return broker; 1353 1354 } 1355 1356 1362 protected Broker createRegionBroker() throws Exception { 1363 getPersistenceAdapter().setUsageManager(getProducerUsageManager()); 1366 getPersistenceAdapter().setBrokerName(getBrokerName()); 1367 if(this.deleteAllMessagesOnStartup){ 1368 getPersistenceAdapter().deleteAllMessages(); 1369 } 1370 getPersistenceAdapter().start(); 1371 1372 DestinationInterceptor destinationInterceptor = null; 1373 if (destinationInterceptors != null) { 1374 destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); 1375 } 1376 else { 1377 destinationInterceptor = createDefaultDestinationInterceptor(); 1378 } 1379 RegionBroker regionBroker = null; 1380 if (destinationFactory == null) { 1381 destinationFactory = new DestinationFactoryImpl(getProducerUsageManager(), getTaskRunnerFactory(), getPersistenceAdapter()); 1382 } 1383 if (isUseJmx()) { 1384 MBeanServer mbeanServer = getManagementContext().getMBeanServer(); 1385 regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerUsageManager(), 1386 destinationFactory, destinationInterceptor); 1387 } 1388 else { 1389 regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory, destinationInterceptor); 1390 } 1391 destinationFactory.setRegionBroker(regionBroker); 1392 1393 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); 1394 regionBroker.setBrokerName(getBrokerName()); 1395 return regionBroker; 1396 } 1397 1398 1401 protected DestinationInterceptor createDefaultDestinationInterceptor() { 1402 if (! isUseVirtualTopics()) { 1403 return null; 1404 } 1405 VirtualDestinationInterceptor answer = new VirtualDestinationInterceptor(); 1406 VirtualTopic virtualTopic = new VirtualTopic(); 1407 virtualTopic.setName("VirtualTopic.>"); 1408 VirtualDestination[] virtualDestinations = { virtualTopic }; 1409 answer.setVirtualDestinations(virtualDestinations); 1410 return answer; 1411 } 1412 1413 1418 protected Broker addInterceptors(Broker broker) throws Exception { 1419 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); 1420 if (isAdvisorySupport()) { 1421 broker = new AdvisoryBroker(broker); 1422 } 1423 broker = new CompositeDestinationBroker(broker); 1424 if (isPopulateJMSXUserID()) { 1425 broker = new UserIDBroker(broker); 1426 } 1427 if (plugins != null) { 1428 for (int i = 0; i < plugins.length; i++) { 1429 BrokerPlugin plugin = plugins[i]; 1430 broker = plugin.installPlugin(broker); 1431 } 1432 } 1433 return broker; 1434 } 1435 1436 protected PersistenceAdapter createPersistenceAdapter() throws IOException { 1437 if (isPersistent()) { 1438 return getPersistenceFactory().createPersistenceAdapter(); 1439 } 1440 else { 1441 return new MemoryPersistenceAdapter(); 1442 } 1443 } 1444 1445 protected AMQPersistenceAdapterFactory createPersistenceFactory() { 1446 AMQPersistenceAdapterFactory factory = new AMQPersistenceAdapterFactory(); 1447 factory.setDataDirectory(getBrokerDataDirectory()); 1448 factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory()); 1449 factory.setBrokerName(getBrokerName()); 1450 return factory; 1451 } 1452 1453 protected ObjectName createBrokerObjectName() throws IOException { 1454 try { 1455 return new ObjectName ( 1456 getManagementContext().getJmxDomainName()+":"+ 1457 "BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+ 1458 "Type=Broker" 1459 ); 1460 } 1461 catch (Throwable e) { 1462 throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e); 1463 } 1464 } 1465 1466 protected TransportConnector createTransportConnector(Broker broker, URI brokerURI) throws Exception { 1467 TransportServer transport = TransportFactory.bind(getBrokerName(),brokerURI); 1468 return new TransportConnector(broker, transport); 1469 } 1470 1471 1474 protected Object getPort(Map options) { 1475 Object port = options.get("port"); 1476 if (port == null) { 1477 port = DEFAULT_PORT; 1478 log.warn("No port specified so defaulting to: " + port); 1479 } 1480 return port; 1481 } 1482 1483 protected void addShutdownHook() { 1484 if (useShutdownHook) { 1485 shutdownHook = new Thread ("ActiveMQ ShutdownHook") { 1486 public void run() { 1487 containerShutdown(); 1488 } 1489 }; 1490 Runtime.getRuntime().addShutdownHook(shutdownHook); 1491 } 1492 } 1493 1494 protected void removeShutdownHook() { 1495 if (shutdownHook != null) { 1496 try { 1497 Runtime.getRuntime().removeShutdownHook(shutdownHook); 1498 } 1499 catch (Exception e) { 1500 log.debug("Caught exception, must be shutting down: " + e); 1501 } 1502 } 1503 } 1504 1505 1508 protected void containerShutdown() { 1509 try { 1510 stop(); 1511 } 1512 catch (IOException e) { 1513 Throwable linkedException = e.getCause(); 1514 if (linkedException != null) { 1515 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); 1516 } 1517 else { 1518 logError("Failed to shut down: " + e, e); 1519 } 1520 if (!useLoggingForShutdownErrors) { 1521 e.printStackTrace(System.err); 1522 } 1523 } 1524 catch (Exception e) { 1525 logError("Failed to shut down: " + e, e); 1526 } 1527 } 1528 1529 protected void logError(String message, Throwable e) { 1530 if (useLoggingForShutdownErrors) { 1531 log.error("Failed to shut down: " + e); 1532 } 1533 else { 1534 System.err.println("Failed to shut down: " + e); 1535 } 1536 } 1537 1538 1542 protected void startDestinations() throws Exception { 1543 if (destinations != null) { 1544 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 1545 1546 for (int i = 0; i < destinations.length; i++) { 1547 ActiveMQDestination destination = destinations[i]; 1548 getBroker().addDestination(adminConnectionContext, destination); 1549 } 1550 } 1551 } 1552 1553 1557 public ConnectionContext getAdminConnectionContext() throws Exception { 1558 ConnectionContext adminConnectionContext = getBroker().getAdminConnectionContext(); 1559 if (adminConnectionContext == null) { 1560 adminConnectionContext = createAdminConnectionContext(); 1561 getBroker().setAdminConnectionContext(adminConnectionContext); 1562 } 1563 return adminConnectionContext; 1564 } 1565 1566 1571 protected ConnectionContext createAdminConnectionContext() throws Exception { 1572 ConnectionContext context = new ConnectionContext(); 1573 context.setBroker(getBroker()); 1574 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 1575 return context; 1576 } 1577 1578 1579 1580 1584 protected void startAllConnectors() throws Exception { 1585 if (!isSlave()){ 1586 1587 ArrayList al = new ArrayList (); 1588 1589 for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) { 1590 TransportConnector connector = (TransportConnector) iter.next(); 1591 al.add(startTransportConnector(connector)); 1592 } 1593 1594 if (al.size()>0) { 1595 this.transportConnectors.clear(); 1597 setTransportConnectors(al); 1598 } 1599 1600 for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { 1601 NetworkConnector connector = (NetworkConnector) iter.next(); 1602 connector.setLocalUri(getVmConnectorURI()); 1603 connector.setBrokerName(getBrokerName()); 1604 connector.setDurableDestinations(getBroker().getDurableDestinations()); 1605 connector.start(); 1606 } 1607 1608 for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { 1609 ProxyConnector connector = (ProxyConnector) iter.next(); 1610 connector.start(); 1611 } 1612 1613 for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) { 1614 JmsConnector connector = (JmsConnector) iter.next(); 1615 connector.start(); 1616 } 1617 1618 if (services != null) { 1619 for (int i = 0; i < services.length; i++) { 1620 Service service = services[i]; 1621 configureService(service); 1622 service.start(); 1623 } 1624 } 1625 } 1626 } 1627 1628 protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception { 1629 connector.setBroker(getBroker()); 1630 connector.setBrokerName(getBrokerName()); 1631 connector.setTaskRunnerFactory(getTaskRunnerFactory()); 1632 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); 1633 if (policy != null) { 1634 connector.setMessageAuthorizationPolicy(policy); 1635 } 1636 1637 if (isUseJmx()) { 1638 connector = registerConnectorMBean(connector); 1639 } 1640 1641 connector.getStatistics().setEnabled(enableStatistics); 1642 1643 connector.start(); 1644 1645 return connector; 1646 } 1647 1648 1651 protected void configureService(Object service) { 1652 if (service instanceof BrokerServiceAware) { 1653 BrokerServiceAware serviceAware = (BrokerServiceAware) service; 1654 serviceAware.setBrokerService(this); 1655 } 1656 if (service instanceof MasterConnector) { 1657 masterConnector = (MasterConnector) service; 1658 } 1659 } 1660 1661 1662 1665 protected void startDestinationsInPersistenceStore(Broker broker) throws Exception { 1666 Set destinations = destinationFactory.getDestinations(); 1667 if (destinations != null) { 1668 Iterator iter = destinations.iterator(); 1669 1670 ConnectionContext adminConnectionContext = broker.getAdminConnectionContext(); 1671 if (adminConnectionContext == null) { 1672 ConnectionContext context = new ConnectionContext(); 1673 context.setBroker(broker); 1674 adminConnectionContext = context; 1675 broker.setAdminConnectionContext(adminConnectionContext); 1676 } 1677 1678 1679 while (iter.hasNext()) { 1680 ActiveMQDestination destination = (ActiveMQDestination) iter.next(); 1681 broker.addDestination(adminConnectionContext, destination); 1682 } 1683 } 1684 } 1685 1686 1687 1688} 1689 | Popular Tags |