1 18 package org.apache.activemq.network; 19 20 import java.io.IOException ; 21 22 import org.apache.activemq.Service; 23 import org.apache.activemq.command.ActiveMQQueue; 24 import org.apache.activemq.command.ActiveMQTopic; 25 import org.apache.activemq.command.BrokerId; 26 import org.apache.activemq.command.BrokerInfo; 27 import org.apache.activemq.command.Command; 28 import org.apache.activemq.command.ConnectionId; 29 import org.apache.activemq.command.ConnectionInfo; 30 import org.apache.activemq.command.ConsumerInfo; 31 import org.apache.activemq.command.ExceptionResponse; 32 import org.apache.activemq.command.Message; 33 import org.apache.activemq.command.MessageAck; 34 import org.apache.activemq.command.MessageDispatch; 35 import org.apache.activemq.command.ProducerInfo; 36 import org.apache.activemq.command.Response; 37 import org.apache.activemq.command.SessionInfo; 38 import org.apache.activemq.command.ShutdownInfo; 39 import org.apache.activemq.transport.DefaultTransportListener; 40 import org.apache.activemq.transport.FutureResponse; 41 import org.apache.activemq.transport.ResponseCallback; 42 import org.apache.activemq.transport.Transport; 43 import org.apache.activemq.util.IdGenerator; 44 import org.apache.activemq.util.ServiceStopper; 45 import org.apache.activemq.util.ServiceSupport; 46 import org.apache.commons.logging.Log; 47 import org.apache.commons.logging.LogFactory; 48 49 56 public class ForwardingBridge implements Service{ 57 58 static final private Log log = LogFactory.getLog(ForwardingBridge.class); 59 60 private final Transport localBroker; 61 private final Transport remoteBroker; 62 63 IdGenerator idGenerator = new IdGenerator(); 64 ConnectionInfo connectionInfo; 65 SessionInfo sessionInfo; 66 ProducerInfo producerInfo; 67 ConsumerInfo queueConsumerInfo; 68 ConsumerInfo topicConsumerInfo; 69 70 private String clientId; 71 private int prefetchSize=1000; 72 private boolean dispatchAsync; 73 private String destinationFilter = ">"; 74 75 private int queueDispatched; 76 private int topicDispatched; 77 78 BrokerId localBrokerId; 79 BrokerId remoteBrokerId; 80 private NetworkBridgeFailedListener bridgeFailedListener; 81 82 public ForwardingBridge(Transport localBroker, Transport remoteBroker) { 83 this.localBroker = localBroker; 84 this.remoteBroker = remoteBroker; 85 } 86 87 public void start() throws Exception { 88 log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established."); 89 90 localBroker.setTransportListener(new DefaultTransportListener(){ 91 public void onCommand(Object o) { 92 Command command = (Command) o; 93 serviceLocalCommand(command); 94 } 95 public void onException(IOException error) { 96 serviceLocalException(error); 97 } 98 }); 99 100 remoteBroker.setTransportListener(new DefaultTransportListener(){ 101 public void onCommand(Object o) { 102 Command command = (Command) o; 103 serviceRemoteCommand(command); 104 } 105 public void onException(IOException error) { 106 serviceRemoteException(error); 107 } 108 }); 109 110 localBroker.start(); 111 remoteBroker.start(); 112 } 113 114 protected void triggerStartBridge() throws IOException { 115 Thread thead = new Thread () { 116 public void run() { 117 try { 118 startBridge(); 119 } 120 catch (IOException e) { 121 log.error("Failed to start network bridge: " + e, e); 122 } 123 } 124 }; 125 thead.start(); 126 } 127 128 131 private void startBridge() throws IOException { 132 connectionInfo = new ConnectionInfo(); 133 connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 134 connectionInfo.setClientId(clientId); 135 localBroker.oneway(connectionInfo); 136 remoteBroker.oneway(connectionInfo); 137 138 sessionInfo=new SessionInfo(connectionInfo, 1); 139 localBroker.oneway(sessionInfo); 140 remoteBroker.oneway(sessionInfo); 141 142 queueConsumerInfo = new ConsumerInfo(sessionInfo, 1); 143 queueConsumerInfo.setDispatchAsync(dispatchAsync); 144 queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter)); 145 queueConsumerInfo.setPrefetchSize(prefetchSize); 146 queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); 147 localBroker.oneway(queueConsumerInfo); 148 149 producerInfo = new ProducerInfo(sessionInfo, 1); 150 producerInfo.setResponseRequired(false); 151 remoteBroker.oneway(producerInfo); 152 153 if( connectionInfo.getClientId()!=null ) { 154 topicConsumerInfo = new ConsumerInfo(sessionInfo, 2); 155 topicConsumerInfo.setDispatchAsync(dispatchAsync); 156 topicConsumerInfo.setSubscriptionName("topic-bridge"); 157 topicConsumerInfo.setRetroactive(true); 158 topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter)); 159 topicConsumerInfo.setPrefetchSize(prefetchSize); 160 topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); 161 localBroker.oneway(topicConsumerInfo); 162 } 163 log.info("Network connection between " + localBroker + " and " + remoteBroker + " has been established."); 164 } 165 166 public void stop() throws Exception { 167 try { 168 if( connectionInfo!=null ) { 169 localBroker.request(connectionInfo.createRemoveCommand()); 170 remoteBroker.request(connectionInfo.createRemoveCommand()); 171 } 172 localBroker.setTransportListener(null); 173 remoteBroker.setTransportListener(null); 174 localBroker.oneway(new ShutdownInfo()); 175 remoteBroker.oneway(new ShutdownInfo()); 176 } finally { 177 ServiceStopper ss = new ServiceStopper(); 178 ss.stop(localBroker); 179 ss.stop(remoteBroker); 180 ss.throwFirstException(); 181 } 182 } 183 184 public void serviceRemoteException(Throwable error) { 185 log.info("Unexpected remote exception: "+error); 186 log.debug("Exception trace: ", error); 187 } 188 189 protected void serviceRemoteCommand(Command command) { 190 try { 191 if(command.isBrokerInfo() ) { 192 synchronized( this ) { 193 remoteBrokerId = ((BrokerInfo)command).getBrokerId(); 194 if( localBrokerId !=null) { 195 if( localBrokerId.equals(remoteBrokerId) ) { 196 log.info("Disconnecting loop back connection."); 197 ServiceSupport.dispose(this); 198 } else { 199 triggerStartBridge(); 200 } 201 } 202 } 203 } else { 204 log.warn("Unexpected remote command: "+command); 205 } 206 } catch (IOException e) { 207 serviceLocalException(e); 208 } 209 } 210 211 public void serviceLocalException(Throwable error) { 212 log.info("Unexpected local exception: "+error); 213 log.debug("Exception trace: ", error); 214 fireBridgeFailed(); 215 } 216 protected void serviceLocalCommand(Command command) { 217 try { 218 if( command.isMessageDispatch() ) { 219 final MessageDispatch md = (MessageDispatch) command; 220 Message message = md.getMessage(); 221 message.setProducerId(producerInfo.getProducerId()); 222 223 if( message.getOriginalTransactionId()==null ) 224 message.setOriginalTransactionId(message.getTransactionId()); 225 message.setTransactionId(null); 226 227 228 if( !message.isResponseRequired() ) { 229 230 remoteBroker.oneway(message); 233 localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); 234 235 } else { 236 237 ResponseCallback callback = new ResponseCallback() { 240 public void onCompletion(FutureResponse future) { 241 try { 242 Response response = future.getResult(); 243 if(response.isException()){ 244 ExceptionResponse er=(ExceptionResponse) response; 245 serviceLocalException(er.getException()); 246 } else { 247 localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); 248 } 249 } catch (IOException e) { 250 serviceLocalException(e); 251 } 252 } 253 }; 254 255 remoteBroker.asyncRequest(message, callback); 256 } 257 258 259 262 } else if(command.isBrokerInfo() ) { 278 synchronized( this ) { 279 localBrokerId = ((BrokerInfo)command).getBrokerId(); 280 if( remoteBrokerId !=null) { 281 if( remoteBrokerId.equals(localBrokerId) ) { 282 log.info("Disconnecting loop back connection."); 283 ServiceSupport.dispose(this); 284 } else { 285 triggerStartBridge(); 286 } 287 } 288 } 289 } else { 290 log.debug("Unexpected local command: "+command); 291 } 292 } catch (IOException e) { 293 serviceLocalException(e); 294 } 295 } 296 297 public String getClientId() { 298 return clientId; 299 } 300 public void setClientId(String clientId) { 301 this.clientId = clientId; 302 } 303 304 public int getPrefetchSize() { 305 return prefetchSize; 306 } 307 public void setPrefetchSize(int prefetchSize) { 308 this.prefetchSize = prefetchSize; 309 } 310 311 public boolean isDispatchAsync() { 312 return dispatchAsync; 313 } 314 public void setDispatchAsync(boolean dispatchAsync) { 315 this.dispatchAsync = dispatchAsync; 316 } 317 318 public String getDestinationFilter() { 319 return destinationFilter; 320 } 321 public void setDestinationFilter(String destinationFilter) { 322 this.destinationFilter = destinationFilter; 323 } 324 325 326 public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){ 327 this.bridgeFailedListener=listener; 328 } 329 330 private void fireBridgeFailed() { 331 NetworkBridgeFailedListener l = this.bridgeFailedListener; 332 if (l!=null) { 333 l.bridgeFailed(); 334 } 335 } 336 } 337 | Popular Tags |