1 18 package org.apache.activemq.network; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 import java.util.Iterator ; 24 25 import org.apache.activemq.command.DiscoveryEvent; 26 import org.apache.activemq.transport.Transport; 27 import org.apache.activemq.transport.TransportFactory; 28 import org.apache.activemq.transport.discovery.DiscoveryAgent; 29 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 30 import org.apache.activemq.transport.discovery.DiscoveryListener; 31 import org.apache.activemq.util.ServiceStopper; 32 import org.apache.activemq.util.ServiceSupport; 33 34 import java.util.concurrent.ConcurrentHashMap ; 35 36 44 public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener { 45 46 private DiscoveryAgent discoveryAgent; 47 private ConcurrentHashMap bridges = new ConcurrentHashMap (); 48 49 public DiscoveryNetworkConnector() { 50 } 51 52 public DiscoveryNetworkConnector(URI discoveryURI) throws IOException { 53 setUri(discoveryURI); 54 } 55 56 public void setUri(URI discoveryURI) throws IOException { 57 setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); 58 } 59 60 public void onServiceAdd(DiscoveryEvent event) { 61 62 if( serviceSupport.isStopped() || serviceSupport.isStopping() ) 64 return; 65 66 String url = event.getServiceName(); 67 if (url != null) { 68 69 URI uri; 70 try { 71 uri = new URI (url); 72 } 73 catch (URISyntaxException e) { 74 log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 75 return; 76 } 77 78 if ( bridges.containsKey(uri) 80 || localURI.equals(uri) 81 || (connectionFilter!=null && !connectionFilter.connectTo(uri)) 82 ) 83 return; 84 85 URI connectUri = uri; 86 log.info("Establishing network connection between from " + localURI + " to " + connectUri); 87 Transport remoteTransport; 88 try { 89 remoteTransport = TransportFactory.connect(connectUri); 90 } 91 catch (Exception e) { 92 log.warn("Could not connect to remote URI: " + localURI + ": " + e.getMessage()); 93 log.debug("Connection failure exception: "+ e, e); 94 return; 95 } 96 97 Transport localTransport; 98 try { 99 localTransport = createLocalTransport(); 100 } 101 catch (Exception e) { 102 ServiceSupport.dispose(remoteTransport); 103 log.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage()); 104 log.debug("Connection failure exception: "+ e, e); 105 return; 106 } 107 108 NetworkBridge bridge = createBridge(localTransport, remoteTransport, event); 109 bridges.put(uri, bridge); 110 try { 111 bridge.start(); 112 } 113 catch (Exception e) { 114 ServiceSupport.dispose(localTransport); 115 ServiceSupport.dispose(remoteTransport); 116 log.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e); 117 log.debug("Start failure exception: "+ e, e); 118 119 try { 120 discoveryAgent.serviceFailed(event); 121 } catch (IOException e1) { 122 } 123 return; 124 } 125 } 126 } 127 128 public void onServiceRemove(DiscoveryEvent event) { 129 String url = event.getServiceName(); 130 if (url != null) { 131 URI uri; 132 try { 133 uri = new URI (url); 134 } 135 catch (URISyntaxException e) { 136 log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 137 return; 138 } 139 140 NetworkBridge bridge = (NetworkBridge) bridges.remove(uri); 141 if (bridge == null) 142 return; 143 144 ServiceSupport.dispose(bridge); 145 } 146 } 147 148 public DiscoveryAgent getDiscoveryAgent() { 149 return discoveryAgent; 150 } 151 152 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 153 this.discoveryAgent = discoveryAgent; 154 if (discoveryAgent != null) { 155 this.discoveryAgent.setDiscoveryListener(this); 156 this.discoveryAgent.setBrokerName(getBrokerName()); 157 } 158 } 159 160 protected void handleStart() throws Exception { 161 if (discoveryAgent == null) { 162 throw new IllegalStateException ("You must configure the 'discoveryAgent' property"); 163 } 164 this.discoveryAgent.start(); 165 super.handleStart(); 166 } 167 168 protected void handleStop(ServiceStopper stopper) throws Exception { 169 for (Iterator i = bridges.values().iterator(); i.hasNext();) { 170 NetworkBridge bridge = (NetworkBridge) i.next(); 171 try { 172 bridge.stop(); 173 } 174 catch (Exception e) { 175 stopper.onException(this, e); 176 } 177 } 178 try { 179 this.discoveryAgent.stop(); 180 } 181 catch (Exception e) { 182 stopper.onException(this, e); 183 } 184 185 super.handleStop(stopper); 186 } 187 188 protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { 189 NetworkBridgeFailedListener listener = new NetworkBridgeFailedListener() { 190 191 public void bridgeFailed(){ 192 if( !serviceSupport.isStopped() ) { 193 try { 194 discoveryAgent.serviceFailed(event); 195 } catch (IOException e) { 196 } 197 } 198 199 } 200 201 }; 202 DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener); 203 return configureBridge(result); 204 } 205 206 public String getName() { 207 return discoveryAgent.toString(); 208 } 209 210 211 212 } 213 | Popular Tags |