KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > BrokerService


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import java.io.File JavaDoc;
21 import java.io.IOException JavaDoc;
22 import java.io.Serializable JavaDoc;
23 import java.net.URI JavaDoc;
24 import java.net.URISyntaxException JavaDoc;
25 import java.net.UnknownHostException JavaDoc;
26 import java.util.ArrayList JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.List JavaDoc;
30 import java.util.Map JavaDoc;
31 import java.util.Set JavaDoc;
32 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
33 import java.util.concurrent.CountDownLatch JavaDoc;
34 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
35 import javax.management.MBeanServer JavaDoc;
36 import javax.management.MalformedObjectNameException JavaDoc;
37 import javax.management.ObjectName JavaDoc;
38 import org.apache.activemq.ActiveMQConnectionMetaData;
39 import org.apache.activemq.Service;
40 import org.apache.activemq.advisory.AdvisoryBroker;
41 import org.apache.activemq.broker.ft.MasterConnector;
42 import org.apache.activemq.broker.jmx.BrokerView;
43 import org.apache.activemq.broker.jmx.ConnectorView;
44 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
45 import org.apache.activemq.broker.jmx.FTConnectorView;
46 import org.apache.activemq.broker.jmx.JmsConnectorView;
47 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
48 import org.apache.activemq.broker.jmx.ManagementContext;
49 import org.apache.activemq.broker.jmx.NetworkConnectorView;
50 import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
51 import org.apache.activemq.broker.jmx.ProxyConnectorView;
52 import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
53 import org.apache.activemq.broker.region.DestinationFactory;
54 import org.apache.activemq.broker.region.DestinationFactoryImpl;
55 import org.apache.activemq.broker.region.DestinationInterceptor;
56 import org.apache.activemq.broker.region.RegionBroker;
57 import org.apache.activemq.broker.region.policy.PolicyMap;
58 import org.apache.activemq.broker.region.virtual.VirtualDestination;
59 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
60 import org.apache.activemq.broker.region.virtual.VirtualTopic;
61 import org.apache.activemq.command.ActiveMQDestination;
62 import org.apache.activemq.command.BrokerId;
63 import org.apache.activemq.kaha.Store;
64 import org.apache.activemq.kaha.StoreFactory;
65 import org.apache.activemq.memory.UsageManager;
66 import org.apache.activemq.network.ConnectionFilter;
67 import org.apache.activemq.network.DiscoveryNetworkConnector;
68 import org.apache.activemq.network.NetworkConnector;
69 import org.apache.activemq.network.jms.JmsConnector;
70 import org.apache.activemq.proxy.ProxyConnector;
71 import org.apache.activemq.security.MessageAuthorizationPolicy;
72 import org.apache.activemq.security.SecurityContext;
73 import org.apache.activemq.store.PersistenceAdapter;
74 import org.apache.activemq.store.PersistenceAdapterFactory;
75 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
76 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
77 import org.apache.activemq.thread.TaskRunnerFactory;
78 import org.apache.activemq.transport.TransportFactory;
79 import org.apache.activemq.transport.TransportServer;
80 import org.apache.activemq.transport.vm.VMTransportFactory;
81 import org.apache.activemq.util.IOExceptionSupport;
82 import org.apache.activemq.util.JMXSupport;
83 import org.apache.activemq.util.ServiceStopper;
84 import org.apache.activemq.util.URISupport;
85 import org.apache.activemq.util.IOHelper;
86 import org.apache.commons.logging.Log;
87 import org.apache.commons.logging.LogFactory;
88
89 /**
90  * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a number of transport
91  * connectors, network connectors and a bunch of properties which can be used to
92  * configure the broker as its lazily created.
93  *
94  * @version $Revision: 1.1 $
95  */

96 public class BrokerService implements Service, Serializable JavaDoc {
97
98    
99
100     private static final Log log = LogFactory.getLog(BrokerService.class);
101     private static final long serialVersionUID = 7353129142305630237L;
102     public static final String JavaDoc DEFAULT_PORT = "61616";
103     static final String JavaDoc DEFAULT_BROKER_NAME = "localhost";
104     public static final String JavaDoc LOCAL_HOST_NAME;
105
106     private boolean useJmx = true;
107     private boolean enableStatistics = true;
108     private boolean persistent = true;
109     private boolean populateJMSXUserID = false;
110     private boolean useShutdownHook = true;
111     private boolean useLoggingForShutdownErrors = false;
112     private boolean shutdownOnMasterFailure = false;
113     private String JavaDoc brokerName = DEFAULT_BROKER_NAME;
114     private File JavaDoc dataDirectoryFile;
115     private File JavaDoc tmpDataDirectory;
116     private Broker broker;
117     private BrokerView adminView;
118     private ManagementContext managementContext;
119     private ObjectName JavaDoc brokerObjectName;
120     private TaskRunnerFactory taskRunnerFactory;
121     private TaskRunnerFactory persistenceTaskRunnerFactory;
122     private UsageManager usageManager;
123     private UsageManager producerUsageManager;
124     private UsageManager consumerUsageManager;
125     private PersistenceAdapter persistenceAdapter;
126     private PersistenceAdapterFactory persistenceFactory;
127     private DestinationFactory destinationFactory;
128     private MessageAuthorizationPolicy messageAuthorizationPolicy;
129     private List JavaDoc transportConnectors = new CopyOnWriteArrayList JavaDoc();
130     private List JavaDoc networkConnectors = new CopyOnWriteArrayList JavaDoc();
131     private List JavaDoc proxyConnectors = new CopyOnWriteArrayList JavaDoc();
132     private List JavaDoc registeredMBeanNames = new CopyOnWriteArrayList JavaDoc();
133     private List JavaDoc jmsConnectors = new CopyOnWriteArrayList JavaDoc();
134     private Service[] services;
135     private MasterConnector masterConnector;
136     private String JavaDoc masterConnectorURI;
137     private transient Thread JavaDoc shutdownHook;
138     private String JavaDoc[] transportConnectorURIs;
139     private String JavaDoc[] networkConnectorURIs;
140     private String JavaDoc[] proxyConnectorURIs;
141     private JmsConnector[] jmsBridgeConnectors; //these are Jms to Jms bridges to other jms messaging systems
142
private boolean deleteAllMessagesOnStartup;
143     private boolean advisorySupport = true;
144     private URI JavaDoc vmConnectorURI;
145     private PolicyMap destinationPolicy;
146     private AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
147     private AtomicBoolean JavaDoc stopped = new AtomicBoolean JavaDoc(false);
148     private BrokerPlugin[] plugins;
149     private boolean keepDurableSubsActive=true;
150     private boolean useVirtualTopics=true;
151     private BrokerId brokerId;
152     private DestinationInterceptor[] destinationInterceptors;
153     private ActiveMQDestination[] destinations;
154     private Store tempDataStore;
155     private int persistenceThreadPriority = Thread.MAX_PRIORITY;
156     private boolean useLocalHostBrokerName = false;
157     private CountDownLatch JavaDoc stoppedLatch = new CountDownLatch JavaDoc(1);
158
159     static{
160         String JavaDoc localHostName = "localhost";
161         try{
162             localHostName=java.net.InetAddress.getLocalHost().getHostName();
163         }catch(UnknownHostException JavaDoc e){
164             log.error("Failed to resolve localhost");
165         }
166         LOCAL_HOST_NAME = localHostName;
167     }
168
169     @Override JavaDoc
170     public String JavaDoc toString() {
171         return "BrokerService[" + getBrokerName() + "]";
172     }
173
174     /**
175      * Adds a new transport connector for the given bind address
176      *
177      * @return the newly created and added transport connector
178      * @throws Exception
179      */

180     public TransportConnector addConnector(String JavaDoc bindAddress) throws Exception JavaDoc {
181         return addConnector(new URI JavaDoc(bindAddress));
182     }
183
184     /**
185      * Adds a new transport connector for the given bind address
186      *
187      * @return the newly created and added transport connector
188      * @throws Exception
189      */

190     public TransportConnector addConnector(URI JavaDoc bindAddress) throws Exception JavaDoc {
191         return addConnector(createTransportConnector(getBroker(), bindAddress));
192     }
193
194     /**
195      * Adds a new transport connector for the given TransportServer transport
196      *
197      * @return the newly created and added transport connector
198      * @throws Exception
199      */

200     public TransportConnector addConnector(TransportServer transport) throws Exception JavaDoc {
201         return addConnector(new TransportConnector(getBroker(), transport));
202     }
203
204     /**
205      * Adds a new transport connector
206      *
207      * @return the transport connector
208      * @throws Exception
209      */

210     public TransportConnector addConnector(TransportConnector connector) throws Exception JavaDoc {
211         
212         transportConnectors.add(connector);
213
214         return connector;
215     }
216
217
218     /**
219      * Stops and removes a transport connector from the broker.
220      *
221      * @param connector
222      * @return true if the connector has been previously added to the broker
223      * @throws Exception
224      */

225     public boolean removeConnector(TransportConnector connector) throws Exception JavaDoc {
226         boolean rc = transportConnectors.remove(connector);
227         if( rc ) {
228            unregisterConnectorMBean(connector);
229         }
230         return rc;
231         
232     }
233
234     /**
235      * Adds a new network connector using the given discovery address
236      *
237      * @return the newly created and added network connector
238      * @throws Exception
239      */

240     public NetworkConnector addNetworkConnector(String JavaDoc discoveryAddress) throws Exception JavaDoc {
241         return addNetworkConnector(new URI JavaDoc(discoveryAddress));
242     }
243     
244     /**
245      * Adds a new proxy connector using the given bind address
246      *
247      * @return the newly created and added network connector
248      * @throws Exception
249      */

250     public ProxyConnector addProxyConnector(String JavaDoc bindAddress) throws Exception JavaDoc {
251         return addProxyConnector(new URI JavaDoc(bindAddress));
252     }
253
254     /**
255      * Adds a new network connector using the given discovery address
256      *
257      * @return the newly created and added network connector
258      * @throws Exception
259      */

260     public NetworkConnector addNetworkConnector(URI JavaDoc discoveryAddress) throws Exception JavaDoc{
261         NetworkConnector connector=new DiscoveryNetworkConnector(discoveryAddress);
262         return addNetworkConnector(connector);
263     }
264
265     /**
266      * Adds a new proxy connector using the given bind address
267      *
268      * @return the newly created and added network connector
269      * @throws Exception
270      */

271     public ProxyConnector addProxyConnector(URI JavaDoc bindAddress) throws Exception JavaDoc{
272         ProxyConnector connector=new ProxyConnector();
273         connector.setBind(bindAddress);
274         connector.setRemote(new URI JavaDoc("fanout:multicast://default"));
275         return addProxyConnector(connector);
276     }
277
278     /**
279      * Adds a new network connector to connect this broker to a federated
280      * network
281      */

282     public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception JavaDoc {
283         URI JavaDoc uri = getVmConnectorURI();
284         HashMap JavaDoc map = new HashMap JavaDoc(URISupport.parseParamters(uri));
285         map.put("network", "true");
286         uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
287         connector.setLocalUri(uri);
288         
289         // Set a connection filter so that the connector does not establish loop back connections.
290
connector.setConnectionFilter(new ConnectionFilter() {
291             public boolean connectTo(URI JavaDoc location) {
292                 List JavaDoc transportConnectors = getTransportConnectors();
293                 for (Iterator JavaDoc iter = transportConnectors.iterator(); iter.hasNext();) {
294                     try {
295                         TransportConnector tc = (TransportConnector) iter.next();
296                         if( location.equals(tc.getConnectUri()) ) {
297                             return false;
298                         }
299                     } catch (Throwable JavaDoc e) {
300                     }
301                 }
302                 return true;
303             }
304         });
305         
306         networkConnectors.add(connector);
307         if (isUseJmx()) {
308             registerNetworkConnectorMBean(connector);
309         }
310         return connector;
311     }
312     
313     /**
314      * Removes the given network connector without stopping it.
315      * The caller should call {@link NetworkConnector#stop()} to close the connector
316      */

317     public boolean removeNetworkConnector(NetworkConnector connector) {
318         boolean answer = networkConnectors.remove(connector);
319         if (answer) {
320             unregisterNetworkConnectorMBean(connector);
321         }
322         return answer;
323     }
324     
325     public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception JavaDoc {
326         URI JavaDoc uri = getVmConnectorURI();
327         connector.setLocalUri(uri);
328         proxyConnectors.add(connector);
329         if (isUseJmx()) {
330             registerProxyConnectorMBean(connector);
331         }
332         return connector;
333     }
334     
335     public JmsConnector addJmsConnector(JmsConnector connector) throws Exception JavaDoc{
336         connector.setBrokerService(this);
337         jmsConnectors.add(connector);
338         if (isUseJmx()) {
339             registerJmsConnectorMBean(connector);
340         }
341         return connector;
342     }
343     
344     public JmsConnector removeJmsConnector(JmsConnector connector){
345         if (jmsConnectors.remove(connector)){
346             return connector;
347         }
348         return null;
349     }
350     
351     /**
352      * @return Returns the masterConnectorURI.
353      */

354     public String JavaDoc getMasterConnectorURI(){
355         return masterConnectorURI;
356     }
357
358     /**
359      * @param masterConnectorURI The masterConnectorURI to set.
360      */

361     public void setMasterConnectorURI(String JavaDoc masterConnectorURI){
362         this.masterConnectorURI=masterConnectorURI;
363     }
364
365     /**
366      * @return true if this Broker is a slave to a Master
367      */

368     public boolean isSlave(){
369         return masterConnector != null && masterConnector.isSlave();
370     }
371     
372     public void masterFailed(){
373         if (shutdownOnMasterFailure){
374             log.fatal("The Master has failed ... shutting down");
375             try {
376             stop();
377             }catch(Exception JavaDoc e){
378                 log.error("Failed to stop for master failure",e);
379             }
380         }else {
381             log.warn("Master Failed - starting all connectors");
382             try{
383                 startAllConnectors();
384             }catch(Exception JavaDoc e){
385                log.error("Failed to startAllConnectors");
386             }
387         }
388     }
389     
390     public boolean isStarted() {
391         return started.get();
392     }
393     
394     // Service interface
395
// -------------------------------------------------------------------------
396
public void start() throws Exception JavaDoc {
397         if (! started.compareAndSet(false, true)) {
398             // lets just ignore redundant start() calls
399
// as its way too easy to not be completely sure if start() has been
400
// called or not with the gazillion of different configuration mechanisms
401

402             //throw new IllegalStateException("Allready started.");
403
return;
404         }
405         
406         try {
407             processHelperProperties();
408
409             BrokerRegistry.getInstance().bind(getBrokerName(), this);
410
411             startDestinations();
412             
413             addShutdownHook();
414             log.info("Using Persistence Adapter: " + getPersistenceAdapter());
415             if (deleteAllMessagesOnStartup) {
416                 deleteAllMessages();
417             }
418
419             if (isUseJmx()) {
420                 getManagementContext().start();
421             }
422
423             getBroker().start();
424
425             /*
426             if(isUseJmx()){
427                 // yes - this is order dependent!
428                 // register all destination in persistence store including inactive destinations as mbeans
429                 this.startDestinationsInPersistenceStore(broker);
430             }
431             */

432             startAllConnectors();
433             
434             if (isUseJmx() && masterConnector != null) {
435                 registerFTConnectorMBean(masterConnector);
436             }
437      
438             brokerId = broker.getBrokerId();
439             log.info("ActiveMQ JMS Message Broker (" + getBrokerName()+", "+brokerId+") started");
440         }
441         catch (Exception JavaDoc e) {
442             log.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
443             throw e;
444         }
445     }
446
447     
448     public void stop() throws Exception JavaDoc{
449         if(!started.compareAndSet(true,false)){
450             return;
451         }
452         log.info("ActiveMQ Message Broker ("+getBrokerName()+", "+brokerId+") is shutting down");
453         removeShutdownHook();
454         ServiceStopper stopper=new ServiceStopper();
455         if(services!=null){
456             for(int i=0;i<services.length;i++){
457                 Service service=services[i];
458                 stopper.stop(service);
459             }
460         }
461         stopAllConnectors(stopper);
462         stopper.stop(persistenceAdapter);
463         if(broker!=null){
464             stopper.stop(broker);
465         }
466         if(tempDataStore!=null){
467             tempDataStore.close();
468         }
469         if(isUseJmx()){
470             MBeanServer JavaDoc mbeanServer=getManagementContext().getMBeanServer();
471             if(mbeanServer!=null){
472                 for(Iterator JavaDoc iter=registeredMBeanNames.iterator();iter.hasNext();){
473                     ObjectName JavaDoc name=(ObjectName JavaDoc)iter.next();
474                     try{
475                         mbeanServer.unregisterMBean(name);
476                     }catch(Exception JavaDoc e){
477                         stopper.onException(mbeanServer,e);
478                     }
479                 }
480             }
481             stopper.stop(getManagementContext());
482         }
483         // remove any VMTransports connected
484
// this has to be done after services are stopped,
485
// to avoid timimg issue with discovery (spinning up a new instance)
486
BrokerRegistry.getInstance().unbind(getBrokerName());
487         VMTransportFactory.stopped(getBrokerName());
488         stopped.set(true);
489         stoppedLatch.countDown();
490
491         log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+", "+brokerId+") stopped");
492         stopper.throwFirstException();
493     }
494
495     /**
496      * A helper method to block the caller thread until the broker has been stopped
497      */

498     public void waitUntilStopped() {
499         while (!stopped.get()) {
500             try {
501                 stoppedLatch.await();
502             }
503             catch (InterruptedException JavaDoc e) {
504                 // ignore
505
}
506         }
507     }
508
509
510     // Properties
511
// -------------------------------------------------------------------------
512

513     /**
514      * Returns the message broker
515      */

516     public Broker getBroker() throws Exception JavaDoc {
517         if (broker == null) {
518             log.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker ("
519                     + getBrokerName() + ") is starting");
520             log.info("For help or more information please see: http://activemq.apache.org/");
521             broker = createBroker();
522         }
523         return broker;
524     }
525
526     
527     /**
528      * Returns the administration view of the broker; used to create and destroy resources such as queues and topics.
529      *
530      * Note this method returns null if JMX is disabled.
531      */

532     public BrokerView getAdminView() throws Exception JavaDoc {
533         if (adminView == null) {
534             // force lazy creation
535
getBroker();
536         }
537         return adminView;
538     }
539
540     public void setAdminView(BrokerView adminView) {
541         this.adminView = adminView;
542     }
543
544     public String JavaDoc getBrokerName() {
545         return brokerName;
546     }
547
548     /**
549      * Sets the name of this broker; which must be unique in the network
550      */

551     public void setBrokerName(String JavaDoc brokerName) {
552         if (brokerName == null) {
553             throw new NullPointerException JavaDoc("The broker name cannot be null");
554         }
555         brokerName = brokerName.trim();
556         this.brokerName = brokerName;
557     }
558
559     public PersistenceAdapterFactory getPersistenceFactory() {
560         if (persistenceFactory == null) {
561             persistenceFactory = createPersistenceFactory();
562         }
563         return persistenceFactory;
564     }
565
566     public File JavaDoc getDataDirectoryFile() {
567         if (dataDirectoryFile == null) {
568             dataDirectoryFile = new File JavaDoc(IOHelper.getDefaultDataDirectory());
569         }
570         return dataDirectoryFile;
571     }
572
573     public File JavaDoc getBrokerDataDirectory() {
574         String JavaDoc brokerDir = getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_");
575         return new File JavaDoc(getDataDirectoryFile(), brokerDir);
576     }
577
578
579     /**
580      * Sets the directory in which the data files will be stored by default for
581      * the JDBC and Journal persistence adaptors.
582      *
583      * @param dataDirectory
584      * the directory to store data files
585      */

586     public void setDataDirectory(String JavaDoc dataDirectory) {
587         setDataDirectoryFile(new File JavaDoc(dataDirectory));
588     }
589     
590     /**
591      * Sets the directory in which the data files will be stored by default for
592      * the JDBC and Journal persistence adaptors.
593      *
594      * @param dataDirectoryFile
595      * the directory to store data files
596      */

597     public void setDataDirectoryFile(File JavaDoc dataDirectoryFile) {
598         this.dataDirectoryFile = dataDirectoryFile;
599     }
600
601     /**
602      * @return the tmpDataDirectory
603      */

604     public File JavaDoc getTmpDataDirectory(){
605         if (tmpDataDirectory == null) {
606             tmpDataDirectory = new File JavaDoc(getBrokerDataDirectory(), "tmp_storage");
607         }
608         return tmpDataDirectory;
609     }
610
611     /**
612      * @param tmpDataDirectory the tmpDataDirectory to set
613      */

614     public void setTmpDataDirectory(File JavaDoc tmpDataDirectory){
615         this.tmpDataDirectory=tmpDataDirectory;
616     }
617     
618
619     public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
620         this.persistenceFactory = persistenceFactory;
621     }
622
623     public void setDestinationFactory(DestinationFactory destinationFactory) {
624         this.destinationFactory = destinationFactory;
625     }
626
627     public boolean isPersistent() {
628         return persistent;
629     }
630
631     /**
632      * Sets whether or not persistence is enabled or disabled.
633      */

634     public void setPersistent(boolean persistent) {
635         this.persistent = persistent;
636     }
637
638     public boolean isPopulateJMSXUserID() {
639         return populateJMSXUserID;
640     }
641
642     /**
643      * Sets whether or not the broker should populate the JMSXUserID header.
644      */

645     public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
646         this.populateJMSXUserID = populateJMSXUserID;
647     }
648
649     public UsageManager getMemoryManager() {
650         if (usageManager == null) {
651             usageManager = new UsageManager("Main");
652             usageManager.setLimit(1024 * 1024 * 64); // Default to 64 Meg
653
// limit
654
}
655         return usageManager;
656     }
657        
658
659     public void setMemoryManager(UsageManager memoryManager) {
660         this.usageManager = memoryManager;
661     }
662     
663     /**
664      * @return the consumerUsageManager
665      */

666     public UsageManager getConsumerUsageManager(){
667         if (consumerUsageManager==null) {
668             consumerUsageManager = new UsageManager(getMemoryManager(),"Consumer",0.5f);
669         }
670         return consumerUsageManager;
671     }
672
673     
674     /**
675      * @param consumerUsageManager the consumerUsageManager to set
676      */

677     public void setConsumerUsageManager(UsageManager consumerUsageManager){
678         this.consumerUsageManager=consumerUsageManager;
679     }
680
681     
682     /**
683      * @return the producerUsageManager
684      */

685     public UsageManager getProducerUsageManager(){
686         if (producerUsageManager==null) {
687             producerUsageManager = new UsageManager(getMemoryManager(),"Producer",0.45f);
688         }
689         return producerUsageManager;
690     }
691     
692     /**
693      * @param producerUsageManager the producerUsageManager to set
694      */

695     public void setProducerUsageManager(UsageManager producerUsageManager){
696         this.producerUsageManager=producerUsageManager;
697     }
698
699    
700     public PersistenceAdapter getPersistenceAdapter() throws IOException JavaDoc {
701         if (persistenceAdapter == null) {
702             persistenceAdapter = createPersistenceAdapter();
703             configureService(persistenceAdapter);
704         }
705         return persistenceAdapter;
706     }
707
708     /**
709      * Sets the persistence adaptor implementation to use for this broker
710      */

711     public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
712         this.persistenceAdapter = persistenceAdapter;
713     }
714
715     public TaskRunnerFactory getTaskRunnerFactory() {
716         if (taskRunnerFactory == null) {
717             taskRunnerFactory = new TaskRunnerFactory();
718         }
719         return taskRunnerFactory;
720     }
721
722     public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
723         this.taskRunnerFactory = taskRunnerFactory;
724     }
725     
726     
727     public TaskRunnerFactory getPersistenceTaskRunnerFactory(){
728         if (taskRunnerFactory == null) {
729             persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, true, 1000);
730         }
731         return persistenceTaskRunnerFactory;
732     }
733
734    
735     public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory){
736         this.persistenceTaskRunnerFactory=persistenceTaskRunnerFactory;
737     }
738
739     public boolean isUseJmx() {
740         return useJmx;
741     }
742     
743     public boolean isEnableStatistics() {
744         return enableStatistics;
745     }
746
747     /**
748      * Sets whether or not the Broker's services enable statistics or
749      * not.
750      */

751     public void setEnableStatistics(boolean enableStatistics) {
752         this.enableStatistics = enableStatistics;
753     }
754     
755     /**
756      * Sets whether or not the Broker's services should be exposed into JMX or
757      * not.
758      */

759     public void setUseJmx(boolean useJmx) {
760         this.useJmx = useJmx;
761     }
762
763     public ObjectName JavaDoc getBrokerObjectName() throws IOException JavaDoc {
764         if (brokerObjectName == null) {
765             brokerObjectName = createBrokerObjectName();
766         }
767         return brokerObjectName;
768     }
769
770     /**
771      * Sets the JMX ObjectName for this broker
772      */

773     public void setBrokerObjectName(ObjectName JavaDoc brokerObjectName) {
774         this.brokerObjectName = brokerObjectName;
775     }
776
777     public ManagementContext getManagementContext() {
778         if (managementContext == null) {
779             managementContext = new ManagementContext();
780         }
781         return managementContext;
782     }
783
784     public void setManagementContext(ManagementContext managementContext) {
785         this.managementContext = managementContext;
786     }
787
788     public String JavaDoc[] getNetworkConnectorURIs() {
789         return networkConnectorURIs;
790     }
791
792     public void setNetworkConnectorURIs(String JavaDoc[] networkConnectorURIs) {
793         this.networkConnectorURIs = networkConnectorURIs;
794     }
795
796     public String JavaDoc[] getTransportConnectorURIs() {
797         return transportConnectorURIs;
798     }
799
800     public void setTransportConnectorURIs(String JavaDoc[] transportConnectorURIs) {
801         this.transportConnectorURIs = transportConnectorURIs;
802     }
803
804     /**
805      * @return Returns the jmsBridgeConnectors.
806      */

807     public JmsConnector[] getJmsBridgeConnectors(){
808         return jmsBridgeConnectors;
809     }
810
811     /**
812      * @param jmsConnectors The jmsBridgeConnectors to set.
813      */

814     public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors){
815         this.jmsBridgeConnectors=jmsConnectors;
816     }
817
818     public Service[] getServices() {
819         return services;
820     }
821
822     /**
823      * Sets the services associated with this broker such as a {@link MasterConnector}
824      */

825     public void setServices(Service[] services) {
826         this.services = services;
827     }
828
829     /**
830      * Adds a new service so that it will be started as part of the broker lifecycle
831      */

832     public void addService(Service service) {
833         if (services == null) {
834             services = new Service[] { service };
835         }
836         else {
837             int length = services.length;
838             Service[] temp = new Service[length + 1];
839             System.arraycopy(services, 1, temp, 1, length);
840             temp[length] = service;
841             services = temp;
842         }
843     }
844
845
846     public boolean isUseLoggingForShutdownErrors() {
847         return useLoggingForShutdownErrors;
848     }
849
850     /**
851      * Sets whether or not we should use commons-logging when reporting errors
852      * when shutting down the broker
853      */

854     public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
855         this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
856     }
857
858     public boolean isUseShutdownHook() {
859         return useShutdownHook;
860     }
861
862     /**
863      * Sets whether or not we should use a shutdown handler to close down the
864      * broker cleanly if the JVM is terminated. It is recommended you leave this
865      * enabled.
866      */

867     public void setUseShutdownHook(boolean useShutdownHook) {
868         this.useShutdownHook = useShutdownHook;
869     }
870     
871     public boolean isAdvisorySupport() {
872         return advisorySupport;
873     }
874
875     /**
876      * Allows the support of advisory messages to be disabled for performance reasons.
877      */

878     public void setAdvisorySupport(boolean advisorySupport) {
879         this.advisorySupport = advisorySupport;
880     }
881
882     public List JavaDoc getTransportConnectors() {
883         return new ArrayList JavaDoc(transportConnectors);
884     }
885
886     /**
887      * Sets the transport connectors which this broker will listen on for new
888      * clients
889      *
890      * @org.apache.xbean.Property nestedType="org.apache.activemq.broker.TransportConnector"
891      */

892     public void setTransportConnectors(List JavaDoc transportConnectors) throws Exception JavaDoc {
893         for (Iterator JavaDoc iter = transportConnectors.iterator(); iter.hasNext();) {
894             TransportConnector connector = (TransportConnector) iter.next();
895             addConnector(connector);
896         }
897     }
898
899     public List JavaDoc getNetworkConnectors() {
900         return new ArrayList JavaDoc(networkConnectors);
901     }
902
903     public List JavaDoc getProxyConnectors() {
904         return new ArrayList JavaDoc(proxyConnectors);
905     }
906
907     /**
908      * Sets the network connectors which this broker will use to connect to
909      * other brokers in a federated network
910      *
911      * @org.apache.xbean.Property nestedType="org.apache.activemq.network.NetworkConnector"
912      */

913     public void setNetworkConnectors(List JavaDoc networkConnectors) throws Exception JavaDoc {
914         for (Iterator JavaDoc iter = networkConnectors.iterator(); iter.hasNext();) {
915             NetworkConnector connector = (NetworkConnector) iter.next();
916             addNetworkConnector(connector);
917         }
918     }
919
920     /**
921      * Sets the network connectors which this broker will use to connect to
922      * other brokers in a federated network
923      */

924     public void setProxyConnectors(List JavaDoc proxyConnectors) throws Exception JavaDoc {
925         for (Iterator JavaDoc iter = proxyConnectors.iterator(); iter.hasNext();) {
926             ProxyConnector connector = (ProxyConnector) iter.next();
927             addProxyConnector(connector);
928         }
929     }
930     
931     public PolicyMap getDestinationPolicy() {
932         return destinationPolicy;
933     }
934
935     /**
936      * Sets the destination specific policies available either for exact
937      * destinations or for wildcard areas of destinations.
938      */

939     public void setDestinationPolicy(PolicyMap policyMap) {
940         this.destinationPolicy = policyMap;
941     }
942
943     public BrokerPlugin[] getPlugins() {
944         return plugins;
945     }
946
947     /**
948      * Sets a number of broker plugins to install such as for security authentication or authorization
949      */

950     public void setPlugins(BrokerPlugin[] plugins) {
951         this.plugins = plugins;
952     }
953     
954     public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
955         return messageAuthorizationPolicy;
956     }
957
958     /**
959      * Sets the policy used to decide if the current connection is authorized to consume
960      * a given message
961      */

962     public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
963         this.messageAuthorizationPolicy = messageAuthorizationPolicy;
964     }
965
966     /**
967      * Delete all messages from the persistent store
968      * @throws IOException
969      */

970     public void deleteAllMessages() throws IOException JavaDoc{
971         getPersistenceAdapter().deleteAllMessages();
972     }
973
974     public boolean isDeleteAllMessagesOnStartup() {
975         return deleteAllMessagesOnStartup;
976     }
977
978     /**
979      * Sets whether or not all messages are deleted on startup - mostly only
980      * useful for testing.
981      */

982     public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
983         this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
984     }
985
986     public URI JavaDoc getVmConnectorURI() {
987         if (vmConnectorURI == null) {
988             try {
989                 vmConnectorURI = new URI JavaDoc("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
990             }
991             catch (URISyntaxException JavaDoc e) {
992                 log.error("Badly formed URI from " + getBrokerName(),e);
993             }
994         }
995         return vmConnectorURI;
996     }
997
998     public void setVmConnectorURI(URI JavaDoc vmConnectorURI) {
999         this.vmConnectorURI = vmConnectorURI;
1000    }
1001
1002    /**
1003     * @return Returns the shutdownOnMasterFailure.
1004     */

1005    public boolean isShutdownOnMasterFailure(){
1006        return shutdownOnMasterFailure;
1007    }
1008
1009    /**
1010     * @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
1011     */

1012    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){
1013        this.shutdownOnMasterFailure=shutdownOnMasterFailure;
1014    }
1015
1016    public boolean isKeepDurableSubsActive() {
1017        return keepDurableSubsActive;
1018    }
1019
1020    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1021        this.keepDurableSubsActive = keepDurableSubsActive;
1022    }
1023    
1024    public boolean isUseVirtualTopics() {
1025        return useVirtualTopics;
1026    }
1027
1028    /**
1029     * Sets whether or not
1030     * <a HREF="http://activemq.apache.org/virtual-destinations.html">Virtual Topics</a>
1031     * should be supported by default if they have not been explicitly configured.
1032     */

1033    public void setUseVirtualTopics(boolean useVirtualTopics) {
1034        this.useVirtualTopics = useVirtualTopics;
1035    }
1036    
1037    public DestinationInterceptor[] getDestinationInterceptors() {
1038        return destinationInterceptors;
1039    }
1040
1041    /**
1042     * Sets the destination interceptors to use
1043     */

1044    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1045        this.destinationInterceptors = destinationInterceptors;
1046    }
1047    
1048    public ActiveMQDestination[] getDestinations() {
1049        return destinations;
1050    }
1051
1052    /**
1053     * Sets the destinations which should be loaded/created on startup
1054     */

1055    public void setDestinations(ActiveMQDestination[] destinations) {
1056        this.destinations = destinations;
1057    }
1058    
1059    /**
1060     * @return the tempDataStore
1061     */

1062    public synchronized Store getTempDataStore(){
1063        if(tempDataStore==null){
1064            String JavaDoc name=getTmpDataDirectory().getPath();
1065            try{
1066                log.info("About to delete any non-persistent messages that may have overflowed to disk ...");
1067                StoreFactory.delete(name);
1068                log.info("Successfully deleted temporary storage");
1069                tempDataStore=StoreFactory.open(name,"rw");
1070            }catch(IOException JavaDoc e){
1071                throw new RuntimeException JavaDoc(e);
1072            }
1073        }
1074        return tempDataStore;
1075    }
1076
1077    /**
1078     * @param tempDataStore the tempDataStore to set
1079     */

1080    public void setTempDataStore(Store tempDataStore){
1081        this.tempDataStore=tempDataStore;
1082    }
1083    
1084    public int getPersistenceThreadPriority(){
1085        return persistenceThreadPriority;
1086    }
1087
1088    public void setPersistenceThreadPriority(int persistenceThreadPriority){
1089        this.persistenceThreadPriority=persistenceThreadPriority;
1090    }
1091        
1092    /**
1093     * @return the useLocalHostBrokerName
1094     */

1095    public boolean isUseLocalHostBrokerName(){
1096        return this.useLocalHostBrokerName;
1097    }
1098
1099    /**
1100     * @param useLocalHostBrokerName the useLocalHostBrokerName to set
1101     */

1102    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName){
1103        this.useLocalHostBrokerName=useLocalHostBrokerName;
1104        if(useLocalHostBrokerName&&!started.get()&&brokerName==null||brokerName==DEFAULT_BROKER_NAME){
1105            brokerName=LOCAL_HOST_NAME;
1106        }
1107    }
1108
1109    // Implementation methods
1110
// -------------------------------------------------------------------------
1111
/**
1112     * Handles any lazy-creation helper properties which are added to make
1113     * things easier to configure inside environments such as Spring
1114     *
1115     * @throws Exception
1116     */

1117    protected void processHelperProperties() throws Exception JavaDoc {
1118        if (transportConnectorURIs != null) {
1119            for (int i = 0; i < transportConnectorURIs.length; i++) {
1120                String JavaDoc uri = transportConnectorURIs[i];
1121                addConnector(uri);
1122            }
1123        }
1124        if (networkConnectorURIs != null) {
1125            for (int i = 0; i < networkConnectorURIs.length; i++) {
1126                String JavaDoc uri = networkConnectorURIs[i];
1127                addNetworkConnector(uri);
1128            }
1129        }
1130        if (proxyConnectorURIs != null) {
1131            for (int i = 0; i < proxyConnectorURIs.length; i++) {
1132                String JavaDoc uri = proxyConnectorURIs[i];
1133                addProxyConnector(uri);
1134            }
1135        }
1136        
1137        if (jmsBridgeConnectors != null){
1138            for (int i = 0; i < jmsBridgeConnectors.length; i++){
1139                addJmsConnector(jmsBridgeConnectors[i]);
1140            }
1141        }
1142        if (masterConnectorURI != null) {
1143            if (masterConnector != null) {
1144                throw new IllegalStateException JavaDoc("Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1145            }
1146            else {
1147                addService(new MasterConnector(masterConnectorURI));
1148            }
1149        }
1150    }
1151
1152    protected void stopAllConnectors(ServiceStopper stopper) {
1153
1154        for (Iterator JavaDoc iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1155            NetworkConnector connector = (NetworkConnector) iter.next();
1156            unregisterNetworkConnectorMBean(connector);
1157            stopper.stop(connector);
1158        }
1159
1160        for (Iterator JavaDoc iter = getProxyConnectors().iterator(); iter.hasNext();) {
1161            ProxyConnector connector = (ProxyConnector) iter.next();
1162            stopper.stop(connector);
1163        }
1164
1165        for (Iterator JavaDoc iter = jmsConnectors.iterator(); iter.hasNext();) {
1166            JmsConnector connector = (JmsConnector) iter.next();
1167            stopper.stop(connector);
1168        }
1169
1170        for (Iterator JavaDoc iter = getTransportConnectors().iterator(); iter.hasNext();) {
1171            TransportConnector connector = (TransportConnector) iter.next();
1172            stopper.stop(connector);
1173        }
1174    }
1175
1176    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException JavaDoc {
1177        MBeanServer JavaDoc mbeanServer = getManagementContext().getMBeanServer();
1178        if (mbeanServer != null) {
1179
1180            try {
1181                ObjectName JavaDoc objectName = createConnectorObjectName(connector);
1182                connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), objectName);
1183                ConnectorViewMBean view = new ConnectorView(connector);
1184                mbeanServer.registerMBean(view, objectName);
1185                registeredMBeanNames.add(objectName);
1186                return connector;
1187            }
1188            catch (Throwable JavaDoc e) {
1189                throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1190            }
1191        }
1192        return connector;
1193    }
1194    
1195    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException JavaDoc {
1196        if (isUseJmx()) {
1197            MBeanServer JavaDoc mbeanServer = getManagementContext().getMBeanServer();
1198            if (mbeanServer != null) {
1199                try {
1200                    ObjectName JavaDoc objectName = createConnectorObjectName(connector);
1201    
1202                    if( registeredMBeanNames.remove(objectName) ) {
1203                           mbeanServer.unregisterMBean(objectName);
1204                    }
1205                }
1206                catch (Throwable JavaDoc e) {
1207                    throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1208                }
1209            }
1210        }
1211    }
1212
1213    private ObjectName JavaDoc createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException JavaDoc {
1214        return new ObjectName JavaDoc(
1215                managementContext.getJmxDomainName()+":"+
1216                "BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+
1217                "Type=Connector,"+
1218                "ConnectorName="+JMXSupport.encodeObjectNamePart(connector.getName())
1219                );
1220    }
1221
1222    protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException JavaDoc {
1223        MBeanServer JavaDoc mbeanServer = getManagementContext().getMBeanServer();
1224        if (mbeanServer != null) {
1225            NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
1226            try {
1227                ObjectName JavaDoc objectName = createNetworkConnectorObjectName(connector);
1228                mbeanServer.registerMBean(view, objectName);
1229                registeredMBeanNames.add(objectName);
1230            }
1231            catch (Throwable JavaDoc e) {
1232                throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
1233            }
1234        }
1235    }
1236
1237    protected ObjectName JavaDoc createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException JavaDoc {
1238        return new ObjectName JavaDoc(managementContext.getJmxDomainName() + ":" + "BrokerName="
1239                + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector," + "NetworkConnectorName="
1240                + JMXSupport.encodeObjectNamePart(connector.getName()));
1241    }
1242
1243    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
1244        if (isUseJmx()) {
1245            MBeanServer JavaDoc mbeanServer = getManagementContext().getMBeanServer();
1246            if (mbeanServer != null) {
1247                try {
1248                    ObjectName JavaDoc objectName = createNetworkConnectorObjectName(connector);
1249                    if (registeredMBeanNames.remove(objectName)) {
1250                        mbeanServer.unregisterMBean(objectName);
1251                    }
1252                }
1253                catch (Exception JavaDoc e) {
1254                    log.error("Network Connector could not be unregistered from JMX: " + e, e);
1255                }
1256            }
1257        }
1258    }
1259    
1260    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException JavaDoc {
1261        MBeanServer JavaDoc mbeanServer = getManagementContext().getMBeanServer();
1262        if (mbeanServer != null) {
1263            ProxyConnectorView view = new ProxyConnectorView(connector);
1264            try {
1265                ObjectName JavaDoc objectName = new ObjectName JavaDoc(managementContext.getJmxDomainName() + ":" + "BrokerName="
1266                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector," + "ProxyConnectorName="
1267                        + JMXSupport.encodeObjectNamePart(connector.getName()));
1268                mbeanServer.registerMBean(view, objectName);
1269                registeredMBeanNames.add(objectName);
1270            }
1271            catch (Throwable JavaDoc e) {
1272                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1273            }
1274        }
1275    }
1276
1277    protected void registerFTConnectorMBean(MasterConnector connector) throws IOException JavaDoc {
1278        MBeanServer JavaDoc mbeanServer = getManagementContext().getMBeanServer();
1279        if (mbeanServer != null) {
1280            FTConnectorView view = new FTConnectorView(connector);
1281            try {
1282                ObjectName JavaDoc objectName = new ObjectName JavaDoc(managementContext.getJmxDomainName() + ":" + "BrokerName="
1283                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
1284                mbeanServer.registerMBean(view, objectName);
1285                registeredMBeanNames.add(objectName);
1286            }
1287            catch (Throwable JavaDoc e) {
1288                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1289            }
1290        }
1291    }
1292
1293    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException JavaDoc {
1294        MBeanServer JavaDoc mbeanServer = getManagementContext().getMBeanServer();
1295        if (mbeanServer != null) {
1296            JmsConnectorView view = new JmsConnectorView(connector);
1297            try {
1298                ObjectName JavaDoc objectName = new ObjectName JavaDoc(managementContext.getJmxDomainName() + ":" + "BrokerName="
1299                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector," + "JmsConnectorName="
1300                        + JMXSupport.encodeObjectNamePart(connector.getName()));
1301                mbeanServer.registerMBean(view, objectName);
1302                registeredMBeanNames.add(objectName);
1303            }
1304            catch (Throwable JavaDoc e) {
1305                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1306            }
1307        }
1308    }
1309    
1310    /**
1311     * Factory method to create a new broker
1312     *
1313     * @throws Exception
1314     *
1315     * @throws
1316     * @throws
1317     */

1318    protected Broker createBroker() throws Exception JavaDoc {
1319        Broker regionBroker = createRegionBroker();
1320        Broker broker = addInterceptors(regionBroker);
1321
1322        // Add a filter that will stop access to the broker once stopped
1323
broker = new MutableBrokerFilter(broker) {
1324            public void stop() throws Exception JavaDoc {
1325                super.stop();
1326                setNext(new ErrorBroker("Broker has been stopped: "+this) {
1327                    // Just ignore additional stop actions.
1328
public void stop() throws Exception JavaDoc {
1329                    }
1330                });
1331            }
1332        };
1333        
1334        RegionBroker rBroker = (RegionBroker) regionBroker;
1335        rBroker.getDestinationStatistics().setEnabled(enableStatistics);
1336        
1337        
1338
1339        if (isUseJmx()) {
1340            ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
1341            managedBroker.setContextBroker(broker);
1342            adminView = new BrokerView(this, managedBroker);
1343            MBeanServer JavaDoc mbeanServer = getManagementContext().getMBeanServer();
1344            if (mbeanServer != null) {
1345                ObjectName JavaDoc objectName = getBrokerObjectName();
1346                mbeanServer.registerMBean(adminView, objectName);
1347                registeredMBeanNames.add(objectName);
1348            }
1349        }
1350        
1351
1352        return broker;
1353
1354    }
1355
1356    /**
1357     * Factory method to create the core region broker onto which interceptors
1358     * are added
1359     *
1360     * @throws Exception
1361     */

1362    protected Broker createRegionBroker() throws Exception JavaDoc {
1363        // we must start the persistence adaptor before we can create the region
1364
// broker
1365
getPersistenceAdapter().setUsageManager(getProducerUsageManager());
1366        getPersistenceAdapter().setBrokerName(getBrokerName());
1367        if(this.deleteAllMessagesOnStartup){
1368            getPersistenceAdapter().deleteAllMessages();
1369        }
1370        getPersistenceAdapter().start();
1371        
1372        DestinationInterceptor destinationInterceptor = null;
1373        if (destinationInterceptors != null) {
1374            destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
1375        }
1376        else {
1377            destinationInterceptor = createDefaultDestinationInterceptor();
1378        }
1379    RegionBroker regionBroker = null;
1380    if (destinationFactory == null) {
1381            destinationFactory = new DestinationFactoryImpl(getProducerUsageManager(), getTaskRunnerFactory(), getPersistenceAdapter());
1382        }
1383        if (isUseJmx()) {
1384            MBeanServer JavaDoc mbeanServer = getManagementContext().getMBeanServer();
1385            regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerUsageManager(),
1386                    destinationFactory, destinationInterceptor);
1387        }
1388        else {
1389            regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory, destinationInterceptor);
1390        }
1391        destinationFactory.setRegionBroker(regionBroker);
1392        
1393        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
1394        regionBroker.setBrokerName(getBrokerName());
1395        return regionBroker;
1396    }
1397
1398    /**
1399     * Create the default destination interceptor
1400     */

1401    protected DestinationInterceptor createDefaultDestinationInterceptor() {
1402        if (! isUseVirtualTopics()) {
1403            return null;
1404        }
1405        VirtualDestinationInterceptor answer = new VirtualDestinationInterceptor();
1406        VirtualTopic virtualTopic = new VirtualTopic();
1407        virtualTopic.setName("VirtualTopic.>");
1408        VirtualDestination[] virtualDestinations = { virtualTopic };
1409        answer.setVirtualDestinations(virtualDestinations);
1410        return answer;
1411    }
1412
1413    /**
1414     * Strategy method to add interceptors to the broker
1415     *
1416     * @throws IOException
1417     */

1418    protected Broker addInterceptors(Broker broker) throws Exception JavaDoc {
1419        broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
1420        if (isAdvisorySupport()) {
1421            broker = new AdvisoryBroker(broker);
1422        }
1423        broker = new CompositeDestinationBroker(broker);
1424        if (isPopulateJMSXUserID()) {
1425            broker = new UserIDBroker(broker);
1426        }
1427        if (plugins != null) {
1428            for (int i = 0; i < plugins.length; i++) {
1429                BrokerPlugin plugin = plugins[i];
1430                broker = plugin.installPlugin(broker);
1431            }
1432        }
1433        return broker;
1434    }
1435
1436    protected PersistenceAdapter createPersistenceAdapter() throws IOException JavaDoc {
1437        if (isPersistent()) {
1438            return getPersistenceFactory().createPersistenceAdapter();
1439        }
1440        else {
1441            return new MemoryPersistenceAdapter();
1442        }
1443    }
1444
1445    protected AMQPersistenceAdapterFactory createPersistenceFactory() {
1446        AMQPersistenceAdapterFactory factory = new AMQPersistenceAdapterFactory();
1447        factory.setDataDirectory(getBrokerDataDirectory());
1448        factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory());
1449        factory.setBrokerName(getBrokerName());
1450        return factory;
1451    }
1452
1453    protected ObjectName JavaDoc createBrokerObjectName() throws IOException JavaDoc {
1454        try {
1455            return new ObjectName JavaDoc(
1456                    getManagementContext().getJmxDomainName()+":"+
1457                    "BrokerName="+JMXSupport.encodeObjectNamePart(getBrokerName())+","+
1458                    "Type=Broker"
1459                    );
1460        }
1461        catch (Throwable JavaDoc e) {
1462            throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
1463        }
1464    }
1465
1466    protected TransportConnector createTransportConnector(Broker broker, URI JavaDoc brokerURI) throws Exception JavaDoc {
1467        TransportServer transport = TransportFactory.bind(getBrokerName(),brokerURI);
1468        return new TransportConnector(broker, transport);
1469    }
1470
1471    /**
1472     * Extracts the port from the options
1473     */

1474    protected Object JavaDoc getPort(Map JavaDoc options) {
1475        Object JavaDoc port = options.get("port");
1476        if (port == null) {
1477            port = DEFAULT_PORT;
1478            log.warn("No port specified so defaulting to: " + port);
1479        }
1480        return port;
1481    }
1482
1483    protected void addShutdownHook() {
1484        if (useShutdownHook) {
1485            shutdownHook = new Thread JavaDoc("ActiveMQ ShutdownHook") {
1486                public void run() {
1487                    containerShutdown();
1488                }
1489            };
1490            Runtime.getRuntime().addShutdownHook(shutdownHook);
1491        }
1492    }
1493
1494    protected void removeShutdownHook() {
1495        if (shutdownHook != null) {
1496            try {
1497                Runtime.getRuntime().removeShutdownHook(shutdownHook);
1498            }
1499            catch (Exception JavaDoc e) {
1500                log.debug("Caught exception, must be shutting down: " + e);
1501            }
1502        }
1503    }
1504
1505    /**
1506     * Causes a clean shutdown of the container when the VM is being shut down
1507     */

1508    protected void containerShutdown() {
1509        try {
1510            stop();
1511        }
1512        catch (IOException JavaDoc e) {
1513            Throwable JavaDoc linkedException = e.getCause();
1514            if (linkedException != null) {
1515                logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
1516            }
1517            else {
1518                logError("Failed to shut down: " + e, e);
1519            }
1520            if (!useLoggingForShutdownErrors) {
1521                e.printStackTrace(System.err);
1522            }
1523        }
1524        catch (Exception JavaDoc e) {
1525            logError("Failed to shut down: " + e, e);
1526        }
1527    }
1528
1529    protected void logError(String JavaDoc message, Throwable JavaDoc e) {
1530        if (useLoggingForShutdownErrors) {
1531            log.error("Failed to shut down: " + e);
1532        }
1533        else {
1534            System.err.println("Failed to shut down: " + e);
1535        }
1536    }
1537
1538    /**
1539     * Starts any configured destinations on startup
1540     *
1541     */

1542    protected void startDestinations() throws Exception JavaDoc {
1543        if (destinations != null) {
1544            ConnectionContext adminConnectionContext = getAdminConnectionContext();
1545            
1546            for (int i = 0; i < destinations.length; i++) {
1547                ActiveMQDestination destination = destinations[i];
1548                getBroker().addDestination(adminConnectionContext, destination);
1549            }
1550        }
1551    }
1552    
1553    /**
1554     * Returns the broker's administration connection context used for configuring the broker
1555     * at startup
1556     */

1557    public ConnectionContext getAdminConnectionContext() throws Exception JavaDoc {
1558        ConnectionContext adminConnectionContext = getBroker().getAdminConnectionContext();
1559        if (adminConnectionContext == null) {
1560            adminConnectionContext = createAdminConnectionContext();
1561            getBroker().setAdminConnectionContext(adminConnectionContext);
1562        }
1563        return adminConnectionContext;
1564    }
1565    
1566    /**
1567     * Factory method to create the new administration connection context object.
1568     * Note this method is here rather than inside a default broker implementation to
1569     * ensure that the broker reference inside it is the outer most interceptor
1570     */

1571    protected ConnectionContext createAdminConnectionContext() throws Exception JavaDoc {
1572        ConnectionContext context = new ConnectionContext();
1573        context.setBroker(getBroker());
1574        context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
1575        return context;
1576    }
1577
1578
1579    
1580    /**
1581     * Start all transport and network connections, proxies and bridges
1582     * @throws Exception
1583     */

1584    protected void startAllConnectors() throws Exception JavaDoc{
1585        if (!isSlave()){
1586            
1587            ArrayList JavaDoc al = new ArrayList JavaDoc();
1588
1589            for (Iterator JavaDoc iter = getTransportConnectors().iterator(); iter.hasNext();) {
1590                TransportConnector connector = (TransportConnector) iter.next();
1591                al.add(startTransportConnector(connector));
1592            }
1593 
1594            if (al.size()>0) {
1595                //let's clear the transportConnectors list and replace it with the started transportConnector instances
1596
this.transportConnectors.clear();
1597                setTransportConnectors(al);
1598            }
1599
1600            for (Iterator JavaDoc iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1601                NetworkConnector connector = (NetworkConnector) iter.next();
1602                connector.setLocalUri(getVmConnectorURI());
1603                connector.setBrokerName(getBrokerName());
1604                connector.setDurableDestinations(getBroker().getDurableDestinations());
1605                connector.start();
1606            }
1607            
1608            for (Iterator JavaDoc iter = getProxyConnectors().iterator(); iter.hasNext();) {
1609                ProxyConnector connector = (ProxyConnector) iter.next();
1610                connector.start();
1611            }
1612            
1613            for (Iterator JavaDoc iter = jmsConnectors.iterator(); iter.hasNext();) {
1614                JmsConnector connector = (JmsConnector) iter.next();
1615                connector.start();
1616            }
1617            
1618            if (services != null) {
1619                for (int i = 0; i < services.length; i++) {
1620                    Service service = services[i];
1621                    configureService(service);
1622                    service.start();
1623                }
1624            }
1625        }
1626    }
1627
1628    protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception JavaDoc {
1629        connector.setBroker(getBroker());
1630        connector.setBrokerName(getBrokerName());
1631        connector.setTaskRunnerFactory(getTaskRunnerFactory());
1632        MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
1633        if (policy != null) {
1634            connector.setMessageAuthorizationPolicy(policy);
1635        }
1636        
1637        if (isUseJmx()) {
1638            connector = registerConnectorMBean(connector);
1639        }
1640        
1641        connector.getStatistics().setEnabled(enableStatistics);
1642        
1643        connector.start();
1644        
1645        return connector;
1646    }
1647
1648    /**
1649     * Perform any custom dependency injection
1650     */

1651    protected void configureService(Object JavaDoc service) {
1652        if (service instanceof BrokerServiceAware) {
1653            BrokerServiceAware serviceAware = (BrokerServiceAware) service;
1654            serviceAware.setBrokerService(this);
1655        }
1656        if (service instanceof MasterConnector) {
1657            masterConnector = (MasterConnector) service;
1658        }
1659    }
1660
1661   
1662    /**
1663     * Starts all destiantions in persistence store. This includes all inactive destinations
1664     */

1665    protected void startDestinationsInPersistenceStore(Broker broker) throws Exception JavaDoc {
1666        Set JavaDoc destinations = destinationFactory.getDestinations();
1667        if (destinations != null) {
1668            Iterator JavaDoc iter = destinations.iterator();
1669
1670            ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
1671            if (adminConnectionContext == null) {
1672                ConnectionContext context = new ConnectionContext();
1673                context.setBroker(broker);
1674                adminConnectionContext = context;
1675                broker.setAdminConnectionContext(adminConnectionContext);
1676            }
1677
1678
1679            while (iter.hasNext()) {
1680                ActiveMQDestination destination = (ActiveMQDestination) iter.next();
1681                broker.addDestination(adminConnectionContext, destination);
1682            }
1683        }
1684    }
1685
1686    
1687   
1688}
1689
Popular Tags