1 18 package org.apache.activemq.transport.peer; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 import java.util.HashMap ; 24 import java.util.Map ; 25 26 import org.apache.activemq.broker.BrokerService; 27 import org.apache.activemq.broker.TransportConnector; 28 import org.apache.activemq.broker.BrokerFactoryHandler; 29 import org.apache.activemq.transport.Transport; 30 import org.apache.activemq.transport.TransportFactory; 31 import org.apache.activemq.transport.TransportServer; 32 import org.apache.activemq.transport.vm.VMTransportFactory; 33 import org.apache.activemq.util.IOExceptionSupport; 34 import org.apache.activemq.util.IdGenerator; 35 import org.apache.activemq.util.IntrospectionSupport; 36 import org.apache.activemq.util.URISupport; 37 38 import java.util.concurrent.ConcurrentHashMap ; 39 40 public class PeerTransportFactory extends TransportFactory { 41 42 final public static ConcurrentHashMap brokers = new ConcurrentHashMap (); 43 44 final public static ConcurrentHashMap connectors = new ConcurrentHashMap (); 45 46 final public static ConcurrentHashMap servers = new ConcurrentHashMap (); 47 48 private IdGenerator idGenerator = new IdGenerator("peer-"); 49 50 51 public Transport doConnect(URI location) throws Exception { 52 VMTransportFactory vmTransportFactory = createTransportFactory(location); 53 return vmTransportFactory.doConnect(location); 54 } 55 56 public Transport doCompositeConnect(URI location) throws Exception { 57 VMTransportFactory vmTransportFactory = createTransportFactory(location); 58 return vmTransportFactory.doCompositeConnect(location); 59 } 60 61 66 private VMTransportFactory createTransportFactory(URI location) throws IOException { 67 try { 68 String group = location.getHost(); 69 String broker = URISupport.stripPrefix(location.getPath(), "/"); 70 71 if( group == null ) { 72 group = "default"; 73 } 74 if (broker == null || broker.length()==0){ 75 broker = idGenerator.generateSanitizedId(); 76 } 77 78 final Map brokerOptions = new HashMap (URISupport.parseParamters(location)); 79 if (!brokerOptions.containsKey("persistent")){ 80 brokerOptions.put("persistent", "false"); 81 } 82 83 final URI finalLocation = new URI ("vm://"+broker); 84 final String finalBroker = broker; 85 final String finalGroup = group; 86 VMTransportFactory rc = new VMTransportFactory() { 87 public Transport doConnect(URI ignore) throws Exception { 88 return super.doConnect(finalLocation); 89 }; 90 public Transport doCompositeConnect(URI ignore) throws Exception { 91 return super.doCompositeConnect(finalLocation); 92 }; 93 }; 94 rc.setBrokerFactoryHandler(new BrokerFactoryHandler(){ 95 public BrokerService createBroker(URI brokerURI) throws Exception { 96 BrokerService service = new BrokerService(); 97 IntrospectionSupport.setProperties(service, brokerOptions); 98 service.setBrokerName(finalBroker); 99 TransportConnector c = service.addConnector("tcp://localhost:0"); 100 c.setDiscoveryUri(new URI ("multicast://"+finalGroup)); 101 service.addNetworkConnector("multicast://"+finalGroup); 102 return service; 103 } 104 }); 105 return rc; 106 107 } catch (URISyntaxException e) { 108 throw IOExceptionSupport.create(e); 109 } 110 } 111 112 113 public TransportServer doBind(String brokerId,URI location) throws IOException { 114 throw new IOException ("This protocol does not support being bound."); 115 } 116 117 } 118 | Popular Tags |