1 18 package org.apache.activemq.network; 19 20 import org.apache.activemq.command.BrokerId; 21 import org.apache.activemq.command.BrokerInfo; 22 import org.apache.activemq.command.Command; 23 import org.apache.activemq.command.ConsumerInfo; 24 import org.apache.activemq.command.NetworkBridgeFilter; 25 import org.apache.activemq.transport.Transport; 26 import org.apache.activemq.util.ServiceSupport; 27 28 import java.io.IOException ; 29 30 37 public class DemandForwardingBridge extends DemandForwardingBridgeSupport { 38 39 protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null }; 40 protected Object brokerInfoMutex = new Object (); 41 protected BrokerId remoteBrokerId; 42 43 public DemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){ 44 super(configuration,localBroker, remoteBroker); 45 } 46 47 protected void serviceRemoteBrokerInfo(Command command) throws IOException { 48 synchronized(brokerInfoMutex){ 49 BrokerInfo remoteBrokerInfo=(BrokerInfo) command; 50 remoteBrokerId=remoteBrokerInfo.getBrokerId(); 51 remoteBrokerPath[0]=remoteBrokerId; 52 remoteBrokerName=remoteBrokerInfo.getBrokerName(); 53 if(localBrokerId!=null){ 54 if(localBrokerId.equals(remoteBrokerId)){ 55 log.info("Disconnecting loop back connection."); 56 ServiceSupport.dispose(this); 58 } 59 } 60 remoteBrokerNameKnownLatch.countDown(); 61 } 62 } 63 64 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) { 65 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),getRemoteBrokerPath())); 66 } 67 68 protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { 69 synchronized(brokerInfoMutex){ 70 localBrokerId=((BrokerInfo) command).getBrokerId(); 71 localBrokerPath[0]=localBrokerId; 72 if(remoteBrokerId!=null){ 73 if(remoteBrokerId.equals(localBrokerId)){ 74 log.info("Disconnecting loop back connection."); 75 waitStarted(); 76 ServiceSupport.dispose(this); 77 } 78 } 79 } 80 } 81 82 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { 83 return new NetworkBridgeFilter(remoteBrokerPath[0], configuration.getNetworkTTL()); 84 } 85 86 protected BrokerId[] getRemoteBrokerPath(){ 87 return remoteBrokerPath; 88 } 89 } 90 | Popular Tags |