1 18 package org.apache.activemq.transport; 19 20 import java.io.IOException ; 21 import java.net.MalformedURLException ; 22 import java.net.URI ; 23 import java.net.URISyntaxException ; 24 import java.net.UnknownHostException ; 25 import java.util.HashMap ; 26 import java.util.Map ; 27 28 import org.apache.activemq.util.FactoryFinder; 29 import org.apache.activemq.util.IOExceptionSupport; 30 import org.apache.activemq.util.IntrospectionSupport; 31 import org.apache.activemq.util.URISupport; 32 import org.apache.activemq.wireformat.WireFormat; 33 import org.apache.activemq.wireformat.WireFormatFactory; 34 35 import java.util.concurrent.ConcurrentHashMap ; 36 import java.util.concurrent.Executor ; 37 38 public abstract class TransportFactory { 39 40 public abstract TransportServer doBind(String brokerId, URI location) throws IOException ; 41 42 public Transport doConnect(URI location, Executor ex) throws Exception { 43 return doConnect(location); 44 } 45 46 public Transport doCompositeConnect(URI location, Executor ex) throws Exception { 47 return doCompositeConnect(location); 48 } 49 50 static final private FactoryFinder transportFactoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/transport/"); 51 static final private FactoryFinder wireFormatFactoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/"); 52 53 static final private ConcurrentHashMap transportFactorys = new ConcurrentHashMap (); 54 55 62 public static Transport connect(URI location) throws Exception { 63 TransportFactory tf = findTransportFactory(location); 64 return tf.doConnect(location); 65 } 66 67 75 public static Transport connect(URI location, Executor ex) throws Exception { 76 TransportFactory tf = findTransportFactory(location); 77 return tf.doConnect(location, ex); 78 } 79 80 88 public static Transport compositeConnect(URI location) throws Exception { 89 TransportFactory tf = findTransportFactory(location); 90 return tf.doCompositeConnect(location); 91 } 92 93 102 public static Transport compositeConnect(URI location, Executor ex) throws Exception { 103 TransportFactory tf = findTransportFactory(location); 104 return tf.doCompositeConnect(location, ex); 105 } 106 107 public static TransportServer bind(String brokerId, URI location) throws IOException { 108 TransportFactory tf = findTransportFactory(location); 109 return tf.doBind(brokerId, location); 110 } 111 112 public Transport doConnect(URI location) throws Exception { 113 try { 114 Map options = new HashMap (URISupport.parseParamters(location)); 115 WireFormat wf = createWireFormat(options); 116 Transport transport = createTransport(location, wf); 117 Transport rc = configure(transport, wf, options); 118 if (!options.isEmpty()) { 119 throw new IllegalArgumentException ("Invalid connect parameters: " + options); 120 } 121 return rc; 122 } 123 catch (URISyntaxException e) { 124 throw IOExceptionSupport.create(e); 125 } 126 } 127 128 public Transport doCompositeConnect(URI location) throws Exception { 129 try { 130 Map options = new HashMap (URISupport.parseParamters(location)); 131 WireFormat wf = createWireFormat(options); 132 Transport transport = createTransport(location, wf); 133 Transport rc = compositeConfigure(transport, wf, options); 134 if (!options.isEmpty()) { 135 throw new IllegalArgumentException ("Invalid connect parameters: " + options); 136 } 137 return rc; 138 139 } 140 catch (URISyntaxException e) { 141 throw IOExceptionSupport.create(e); 142 } 143 } 144 145 150 protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException , UnknownHostException , IOException { 151 throw new IOException ("createTransport() method not implemented!"); 152 } 153 154 159 private static TransportFactory findTransportFactory(URI location) throws IOException { 160 String scheme = location.getScheme(); 161 if( scheme == null ) 162 throw new IOException ("Transport not scheme specified: [" + location + "]"); 163 TransportFactory tf = (TransportFactory) transportFactorys.get(scheme); 164 if (tf == null) { 165 try { 167 tf = (TransportFactory) transportFactoryFinder.newInstance(scheme); 168 transportFactorys.put(scheme, tf); 169 } 170 catch (Throwable e) { 171 throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); 172 } 173 } 174 return tf; 175 } 176 177 protected WireFormat createWireFormat(Map options) throws IOException { 178 WireFormatFactory factory = createWireFormatFactory(options); 179 WireFormat format = factory.createWireFormat(); 180 return format; 181 } 182 183 protected WireFormatFactory createWireFormatFactory(Map options) throws IOException { 184 String wireFormat = (String ) options.get("wireFormat"); 185 if (wireFormat == null) 186 wireFormat = getDefaultWireFormatType(); 187 188 try { 189 WireFormatFactory wff = (WireFormatFactory) wireFormatFactoryFinder.newInstance(wireFormat); 190 IntrospectionSupport.setProperties(wff, options, "wireFormat."); 191 return wff; 192 } 193 catch (Throwable e) { 194 throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e); 195 } 196 } 197 198 protected String getDefaultWireFormatType() { 199 return "default"; 200 } 201 202 212 public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { 213 transport = compositeConfigure(transport, wf, options); 214 215 transport = new MutexTransport(transport); 216 transport = new ResponseCorrelator(transport); 217 218 return transport; 219 } 220 221 233 public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { 234 transport = compositeConfigure(transport, format, options); 235 transport = new MutexTransport(transport); 236 return transport; 237 } 238 239 248 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { 249 IntrospectionSupport.setProperties(transport, options); 250 return transport; 251 } 252 253 } 254 | Popular Tags |