1 18 package org.apache.activemq.network; 19 20 import org.apache.activemq.command.ConsumerId; 21 import org.apache.activemq.command.ConsumerInfo; 22 import org.apache.activemq.filter.DestinationFilter; 23 import org.apache.activemq.transport.Transport; 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 27 import java.io.IOException ; 28 import java.util.ArrayList ; 29 import java.util.Iterator ; 30 import java.util.List ; 31 32 33 38 public class ConduitBridge extends DemandForwardingBridge{ 39 static final private Log log=LogFactory.getLog(ConduitBridge.class); 40 45 public ConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){ 46 super(configuration,localBroker,remoteBroker); 47 } 48 49 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 50 51 if (addToAlreadyInterestedConsumers(info)){ 52 return null; } 54 return doCreateDemandSubscription(info); 55 } 56 57 protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info){ 58 59 if( info.getSelector()!=null ) 60 return false; 61 62 boolean matched = false; 64 DestinationFilter filter=DestinationFilter.parseFilter(info.getDestination()); 65 for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){ 66 DemandSubscription ds = (DemandSubscription)i.next(); 67 if (filter.matches(ds.getLocalInfo().getDestination())){ 68 ds.add(info.getConsumerId()); 71 matched = true; 72 } 74 } 75 return matched; 76 } 77 78 protected void removeDemandSubscription(ConsumerId id) throws IOException { 79 List tmpList = new ArrayList (); 80 81 for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){ 82 DemandSubscription ds = (DemandSubscription)i.next(); 83 ds.remove(id); 84 if (ds.isEmpty()){ 85 tmpList.add(ds); 86 } 87 } 88 for (Iterator i = tmpList.iterator(); i.hasNext();){ 89 DemandSubscription ds = (DemandSubscription) i.next(); 90 subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId()); 91 removeSubscription(ds); 92 if(log.isTraceEnabled()) 93 log.trace("removing sub on "+localBroker+" from "+remoteBrokerName+" : "+ds.getRemoteInfo()); 94 } 95 96 } 97 98 } 99 | Popular Tags |