1 18 package org.apache.activemq.transport.fanout; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 import java.util.HashMap ; 24 import java.util.Map ; 25 26 import org.apache.activemq.transport.MutexTransport; 27 import org.apache.activemq.transport.ResponseCorrelator; 28 import org.apache.activemq.transport.Transport; 29 import org.apache.activemq.transport.TransportFactory; 30 import org.apache.activemq.transport.TransportServer; 31 import org.apache.activemq.transport.discovery.DiscoveryAgent; 32 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 33 import org.apache.activemq.transport.discovery.DiscoveryTransport; 34 import org.apache.activemq.util.IntrospectionSupport; 35 import org.apache.activemq.util.URISupport; 36 import org.apache.activemq.util.URISupport.CompositeData; 37 38 public class FanoutTransportFactory extends TransportFactory { 39 40 public Transport doConnect(URI location) throws IOException { 41 try { 42 Transport transport = createTransport(location); 43 transport = new MutexTransport(transport); 44 transport = new ResponseCorrelator(transport); 45 return transport; 46 } catch (URISyntaxException e) { 47 throw new IOException ("Invalid location: "+location); 48 } 49 } 50 51 public Transport doCompositeConnect(URI location) throws IOException { 52 try { 53 return createTransport(location); 54 } catch (URISyntaxException e) { 55 throw new IOException ("Invalid location: "+location); 56 } 57 } 58 59 65 public Transport createTransport(URI location) throws IOException , URISyntaxException { 66 67 CompositeData compositData = URISupport.parseComposite(location); 68 Map parameters = new HashMap (compositData.getParameters()); 69 DiscoveryTransport transport = new DiscoveryTransport(createTransport(parameters)); 70 71 DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositData.getComponents()[0]); 72 transport.setDiscoveryAgent(discoveryAgent); 73 74 return transport; 75 76 } 77 78 public FanoutTransport createTransport(Map parameters) throws IOException { 79 FanoutTransport transport = new FanoutTransport(); 80 IntrospectionSupport.setProperties(transport, parameters); 81 return transport; 82 } 83 84 public TransportServer doBind(String brokerId,URI location) throws IOException { 85 throw new IOException ("Invalid server URI: "+location); 86 } 87 88 } 89 | Popular Tags |