1 18 package org.apache.activemq.transport.vm; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 import java.util.HashMap ; 24 import java.util.Map ; 25 import org.apache.activemq.broker.BrokerFactory; 26 import org.apache.activemq.broker.BrokerRegistry; 27 import org.apache.activemq.broker.BrokerService; 28 import org.apache.activemq.broker.TransportConnector; 29 import org.apache.activemq.broker.BrokerFactoryHandler; 30 import org.apache.activemq.transport.MarshallingTransportFilter; 31 import org.apache.activemq.transport.Transport; 32 import org.apache.activemq.transport.TransportFactory; 33 import org.apache.activemq.transport.TransportServer; 34 import org.apache.activemq.util.IOExceptionSupport; 35 import org.apache.activemq.util.IntrospectionSupport; 36 import org.apache.activemq.util.ServiceSupport; 37 import org.apache.activemq.util.URISupport; 38 import org.apache.activemq.util.URISupport.CompositeData; 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 import java.util.concurrent.ConcurrentHashMap ; 42 43 44 public class VMTransportFactory extends TransportFactory{ 45 private static final Log log = LogFactory.getLog(VMTransportFactory.class); 46 final public static ConcurrentHashMap brokers=new ConcurrentHashMap (); 47 final public static ConcurrentHashMap connectors=new ConcurrentHashMap (); 48 final public static ConcurrentHashMap servers=new ConcurrentHashMap (); 49 BrokerFactoryHandler brokerFactoryHandler; 50 51 public Transport doConnect(URI location) throws Exception { 52 return VMTransportServer.configure(doCompositeConnect(location)); 53 } 54 55 public Transport doCompositeConnect(URI location) throws Exception { 56 URI brokerURI; 57 String host; 58 Map options; 59 boolean create=true; 60 CompositeData data=URISupport.parseComposite(location); 61 if(data.getComponents().length==1&&"broker".equals(data.getComponents()[0].getScheme())){ 62 brokerURI=data.getComponents()[0]; 63 CompositeData brokerData=URISupport.parseComposite(brokerURI); 64 host=(String ) brokerData.getParameters().get("brokerName"); 65 if(host==null) 66 host="localhost"; 67 if(brokerData.getPath()!=null) 68 host=data.getPath(); 69 options=data.getParameters(); 70 location=new URI ("vm://"+host); 71 }else{ 72 try{ 74 host=location.getHost(); 75 options=URISupport.parseParamters(location); 76 String config=(String ) options.remove("brokerConfig"); 77 if(config!=null){ 78 brokerURI=new URI (config); 79 }else{ 80 Map brokerOptions=IntrospectionSupport.extractProperties(options,"broker."); 81 brokerURI=new URI ("broker://()/"+host+"?"+URISupport.createQueryString(brokerOptions)); 82 } 83 if( "false".equals(options.remove("create")) ) { 84 create=false; 85 } 86 }catch(URISyntaxException e1){ 87 throw IOExceptionSupport.create(e1); 88 } 89 location=new URI ("vm://"+host); 90 } 91 if (host == null) { 92 host = "localhost"; 93 } 94 VMTransportServer server=(VMTransportServer) servers.get(host); 95 if(!validateBroker(host)||server==null){ 97 BrokerService broker=null; 98 synchronized( BrokerRegistry.getInstance().getRegistryMutext() ) { 102 broker=BrokerRegistry.getInstance().lookup(host); 103 if(broker==null){ 104 if( !create ) { 105 throw new IOException ("Broker named '"+host+"' does not exist."); 106 } 107 try{ 108 if(brokerFactoryHandler!=null){ 109 broker=brokerFactoryHandler.createBroker(brokerURI); 110 }else{ 111 broker=BrokerFactory.createBroker(brokerURI); 112 } 113 broker.start(); 114 }catch(URISyntaxException e){ 115 throw IOExceptionSupport.create(e); 116 } 117 brokers.put(host,broker); 118 } 119 120 server=(VMTransportServer) servers.get(host); 121 if(server==null){ 122 server=(VMTransportServer) bind(location,true); 123 TransportConnector connector=new TransportConnector(broker.getBroker(),server); 124 connector.setUri(location); 125 connector.setTaskRunnerFactory( broker.getTaskRunnerFactory() ); 126 connector.start(); 127 connectors.put(host,connector); 128 } 129 130 } 131 } 132 133 VMTransport vmtransport=server.connect(); 134 IntrospectionSupport.setProperties(vmtransport,options); 135 Transport transport=vmtransport; 136 if(vmtransport.isMarshal()){ 137 HashMap optionsCopy=new HashMap (options); 138 transport=new MarshallingTransportFilter(transport,createWireFormat(options),createWireFormat(optionsCopy)); 139 } 140 if(!options.isEmpty()){ 141 throw new IllegalArgumentException ("Invalid connect parameters: "+options); 142 } 143 return transport; 144 } 145 146 public TransportServer doBind(String brokerId,URI location) throws IOException { 147 return bind(location,false); 148 } 149 150 155 private TransportServer bind(URI location,boolean dispose) throws IOException { 156 String host=location.getHost(); 157 log.debug("binding to broker: " + host); 158 VMTransportServer server=new VMTransportServer(location,dispose); 159 Object currentBoundValue=servers.get(host); 160 if(currentBoundValue!=null){ 161 throw new IOException ("VMTransportServer already bound at: "+location); 162 } 163 servers.put(host,server); 164 return server; 165 } 166 167 public static void stopped(VMTransportServer server){ 168 String host=server.getBindURI().getHost(); 169 servers.remove(host); 170 TransportConnector connector=(TransportConnector) connectors.remove(host); 171 if(connector!=null){ 172 log.debug("Shutting down VM connectors for broker: " +host); 173 ServiceSupport.dispose(connector); 174 BrokerService broker=(BrokerService) brokers.remove(host); 175 if(broker!=null){ 176 ServiceSupport.dispose(broker); 177 } 178 } 179 } 180 181 public static void stopped(String host){ 182 servers.remove(host); 183 TransportConnector connector=(TransportConnector) connectors.remove(host); 184 if(connector!=null){ 185 log.debug("Shutting down VM connectors for broker: " +host); 186 ServiceSupport.dispose(connector); 187 BrokerService broker=(BrokerService) brokers.remove(host); 188 if(broker!=null){ 189 ServiceSupport.dispose(broker); 190 } 191 } 192 } 193 194 public BrokerFactoryHandler getBrokerFactoryHandler(){ 195 return brokerFactoryHandler; 196 } 197 198 public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler){ 199 this.brokerFactoryHandler=brokerFactoryHandler; 200 } 201 202 private boolean validateBroker(String host){ 203 boolean result=true; 204 if(brokers.containsKey(host)||servers.containsKey(host)||connectors.containsKey(host)){ 205 TransportConnector connector=(TransportConnector) connectors.get(host); 207 if(BrokerRegistry.getInstance().lookup(host)==null||(connector!=null&&connector.getBroker().isStopped())){ 208 result=false; 209 brokers.remove(host); 211 servers.remove(host); 212 if(connector!=null){ 213 connectors.remove(host); 214 if(connector!=null){ 215 ServiceSupport.dispose(connector); 216 } 217 } 218 } 219 } 220 return result; 221 } 222 } 223 | Popular Tags |