KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > vm > VMTransportFactory


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.vm;
19
20 import java.io.IOException JavaDoc;
21 import java.net.URI JavaDoc;
22 import java.net.URISyntaxException JavaDoc;
23 import java.util.HashMap JavaDoc;
24 import java.util.Map JavaDoc;
25 import org.apache.activemq.broker.BrokerFactory;
26 import org.apache.activemq.broker.BrokerRegistry;
27 import org.apache.activemq.broker.BrokerService;
28 import org.apache.activemq.broker.TransportConnector;
29 import org.apache.activemq.broker.BrokerFactoryHandler;
30 import org.apache.activemq.transport.MarshallingTransportFilter;
31 import org.apache.activemq.transport.Transport;
32 import org.apache.activemq.transport.TransportFactory;
33 import org.apache.activemq.transport.TransportServer;
34 import org.apache.activemq.util.IOExceptionSupport;
35 import org.apache.activemq.util.IntrospectionSupport;
36 import org.apache.activemq.util.ServiceSupport;
37 import org.apache.activemq.util.URISupport;
38 import org.apache.activemq.util.URISupport.CompositeData;
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import java.util.concurrent.ConcurrentHashMap JavaDoc;
42
43
44 public class VMTransportFactory extends TransportFactory{
45     private static final Log log = LogFactory.getLog(VMTransportFactory.class);
46     final public static ConcurrentHashMap JavaDoc brokers=new ConcurrentHashMap JavaDoc();
47     final public static ConcurrentHashMap JavaDoc connectors=new ConcurrentHashMap JavaDoc();
48     final public static ConcurrentHashMap JavaDoc servers=new ConcurrentHashMap JavaDoc();
49     BrokerFactoryHandler brokerFactoryHandler;
50
51     public Transport doConnect(URI JavaDoc location) throws Exception JavaDoc{
52         return VMTransportServer.configure(doCompositeConnect(location));
53     }
54
55     public Transport doCompositeConnect(URI JavaDoc location) throws Exception JavaDoc{
56         URI JavaDoc brokerURI;
57         String JavaDoc host;
58         Map JavaDoc options;
59         boolean create=true;
60         CompositeData data=URISupport.parseComposite(location);
61         if(data.getComponents().length==1&&"broker".equals(data.getComponents()[0].getScheme())){
62             brokerURI=data.getComponents()[0];
63             CompositeData brokerData=URISupport.parseComposite(brokerURI);
64             host=(String JavaDoc) brokerData.getParameters().get("brokerName");
65             if(host==null)
66                 host="localhost";
67             if(brokerData.getPath()!=null)
68                 host=data.getPath();
69             options=data.getParameters();
70             location=new URI JavaDoc("vm://"+host);
71         }else{
72             // If using the less complex vm://localhost?broker.persistent=true form
73
try{
74                 host=location.getHost();
75                 options=URISupport.parseParamters(location);
76                 String JavaDoc config=(String JavaDoc) options.remove("brokerConfig");
77                 if(config!=null){
78                     brokerURI=new URI JavaDoc(config);
79                 }else{
80                     Map JavaDoc brokerOptions=IntrospectionSupport.extractProperties(options,"broker.");
81                     brokerURI=new URI JavaDoc("broker://()/"+host+"?"+URISupport.createQueryString(brokerOptions));
82                 }
83                 if( "false".equals(options.remove("create")) ) {
84                     create=false;
85                 }
86             }catch(URISyntaxException JavaDoc e1){
87                 throw IOExceptionSupport.create(e1);
88             }
89             location=new URI JavaDoc("vm://"+host);
90         }
91         if (host == null) {
92             host = "localhost";
93         }
94         VMTransportServer server=(VMTransportServer) servers.get(host);
95         // validate the broker is still active
96
if(!validateBroker(host)||server==null){
97             BrokerService broker=null;
98             // Synchronize on the registry so that multiple concurrent threads
99
// doing this do not think that the broker has not been created and cause multiple
100
// brokers to be started.
101
synchronized( BrokerRegistry.getInstance().getRegistryMutext() ) {
102                 broker=BrokerRegistry.getInstance().lookup(host);
103                 if(broker==null){
104                     if( !create ) {
105                         throw new IOException JavaDoc("Broker named '"+host+"' does not exist.");
106                     }
107                     try{
108                         if(brokerFactoryHandler!=null){
109                             broker=brokerFactoryHandler.createBroker(brokerURI);
110                         }else{
111                             broker=BrokerFactory.createBroker(brokerURI);
112                         }
113                         broker.start();
114                     }catch(URISyntaxException JavaDoc e){
115                         throw IOExceptionSupport.create(e);
116                     }
117                     brokers.put(host,broker);
118                 }
119                 
120                 server=(VMTransportServer) servers.get(host);
121                 if(server==null){
122                     server=(VMTransportServer) bind(location,true);
123                     TransportConnector connector=new TransportConnector(broker.getBroker(),server);
124                     connector.setUri(location);
125                     connector.setTaskRunnerFactory( broker.getTaskRunnerFactory() );
126                     connector.start();
127                     connectors.put(host,connector);
128                 }
129                 
130             }
131         }
132         
133         VMTransport vmtransport=server.connect();
134         IntrospectionSupport.setProperties(vmtransport,options);
135         Transport transport=vmtransport;
136         if(vmtransport.isMarshal()){
137             HashMap JavaDoc optionsCopy=new HashMap JavaDoc(options);
138             transport=new MarshallingTransportFilter(transport,createWireFormat(options),createWireFormat(optionsCopy));
139         }
140         if(!options.isEmpty()){
141             throw new IllegalArgumentException JavaDoc("Invalid connect parameters: "+options);
142         }
143         return transport;
144     }
145
146     public TransportServer doBind(String JavaDoc brokerId,URI JavaDoc location) throws IOException JavaDoc{
147         return bind(location,false);
148     }
149
150     /**
151      * @param location
152      * @return the TransportServer
153      * @throws IOException
154      */

155     private TransportServer bind(URI JavaDoc location,boolean dispose) throws IOException JavaDoc{
156         String JavaDoc host=location.getHost();
157         log.debug("binding to broker: " + host);
158         VMTransportServer server=new VMTransportServer(location,dispose);
159         Object JavaDoc currentBoundValue=servers.get(host);
160         if(currentBoundValue!=null){
161             throw new IOException JavaDoc("VMTransportServer already bound at: "+location);
162         }
163         servers.put(host,server);
164         return server;
165     }
166
167     public static void stopped(VMTransportServer server){
168         String JavaDoc host=server.getBindURI().getHost();
169         servers.remove(host);
170         TransportConnector connector=(TransportConnector) connectors.remove(host);
171         if(connector!=null){
172             log.debug("Shutting down VM connectors for broker: " +host);
173             ServiceSupport.dispose(connector);
174             BrokerService broker=(BrokerService) brokers.remove(host);
175             if(broker!=null){
176                 ServiceSupport.dispose(broker);
177             }
178         }
179     }
180
181     public static void stopped(String JavaDoc host){
182         servers.remove(host);
183         TransportConnector connector=(TransportConnector) connectors.remove(host);
184         if(connector!=null){
185             log.debug("Shutting down VM connectors for broker: " +host);
186             ServiceSupport.dispose(connector);
187             BrokerService broker=(BrokerService) brokers.remove(host);
188             if(broker!=null){
189                 ServiceSupport.dispose(broker);
190             }
191         }
192     }
193
194     public BrokerFactoryHandler getBrokerFactoryHandler(){
195         return brokerFactoryHandler;
196     }
197
198     public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler){
199         this.brokerFactoryHandler=brokerFactoryHandler;
200     }
201
202     private boolean validateBroker(String JavaDoc host){
203         boolean result=true;
204         if(brokers.containsKey(host)||servers.containsKey(host)||connectors.containsKey(host)){
205             // check the broker is still in the BrokerRegistry
206
TransportConnector connector=(TransportConnector) connectors.get(host);
207             if(BrokerRegistry.getInstance().lookup(host)==null||(connector!=null&&connector.getBroker().isStopped())){
208                 result=false;
209                 // clean-up
210
brokers.remove(host);
211                 servers.remove(host);
212                 if(connector!=null){
213                     connectors.remove(host);
214                     if(connector!=null){
215                         ServiceSupport.dispose(connector);
216                     }
217                 }
218             }
219         }
220         return result;
221     }
222 }
223
Popular Tags