1 18 package org.apache.activemq.network; 19 20 import org.apache.activemq.command.BrokerId; 21 import org.apache.activemq.command.BrokerInfo; 22 import org.apache.activemq.command.Command; 23 import org.apache.activemq.command.ConsumerInfo; 24 import org.apache.activemq.command.Endpoint; 25 import org.apache.activemq.command.NetworkBridgeFilter; 26 import org.apache.activemq.transport.Transport; 27 import org.apache.activemq.util.ServiceSupport; 28 29 import java.io.IOException ; 30 31 39 public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport { 40 41 protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null }; 42 protected Object brokerInfoMutex = new Object (); 43 44 public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker, Transport remoteBroker) { 45 super(configuration,localBroker, remoteBroker); 46 remoteBrokerName = remoteBroker.toString(); 47 remoteBrokerNameKnownLatch.countDown(); 48 } 49 50 protected void serviceRemoteBrokerInfo(Command command) throws IOException { 51 synchronized (brokerInfoMutex) { 52 BrokerInfo remoteBrokerInfo = (BrokerInfo) command; 53 BrokerId remoteBrokerId = remoteBrokerInfo.getBrokerId(); 54 55 Endpoint from = command.getFrom(); 57 if (from == null) { 58 log.warn("Incoming command does not have a from endpoint: " + command); 59 } 60 else { 61 from.setBrokerInfo(remoteBrokerInfo); 62 } 63 if (localBrokerId != null) { 64 if (localBrokerId.equals(remoteBrokerId)) { 65 log.info("Disconnecting loop back connection."); 66 ServiceSupport.dispose(this); 68 } 69 } 70 if (!disposed) { 71 triggerLocalStartBridge(); 72 } 73 } 74 } 75 76 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { 77 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getFromBrokerId(info))); 78 } 79 80 83 protected BrokerId getFromBrokerId(Command command) throws IOException { 84 BrokerId answer = null; 85 Endpoint from = command.getFrom(); 86 if (from == null) { 87 log.warn("Incoming command does not have a from endpoint: " + command); 88 } 89 else { 90 answer = from.getBrokerId(); 91 } 92 if (answer != null) { 93 return answer; 94 } 95 else { 96 throw new IOException ("No broker ID is available for endpoint: " + from + " from command: " + command); 97 } 98 } 99 100 protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { 101 } 103 104 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { 105 return new NetworkBridgeFilter(getFromBrokerId(info), configuration.getNetworkTTL()); 106 } 107 108 protected BrokerId[] getRemoteBrokerPath(){ 109 return remoteBrokerPath; 110 } 111 112 } 113 | Popular Tags |