1 18 package org.apache.activemq.network; 19 20 import org.apache.activemq.command.ActiveMQDestination; 21 import org.apache.activemq.command.ConsumerId; 22 import org.apache.activemq.command.ConsumerInfo; 23 import org.apache.activemq.filter.DestinationFilter; 24 import org.apache.activemq.transport.Transport; 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 28 import java.io.IOException ; 29 import java.util.Iterator ; 30 35 public class DurableConduitBridge extends ConduitBridge{ 36 static final private Log log=LogFactory.getLog(DurableConduitBridge.class); 37 38 45 public DurableConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){ 46 super(configuration,localBroker,remoteBroker); 47 } 48 49 53 protected void setupStaticDestinations(){ 54 super.setupStaticDestinations(); 55 ActiveMQDestination[] dests=durableDestinations; 56 if(dests!=null){ 57 for(int i=0;i<dests.length;i++){ 58 ActiveMQDestination dest=dests[i]; 59 if(isPermissableDestination(dest) && !doesConsumerExist(dest)){ 60 DemandSubscription sub=createDemandSubscription(dest); 61 if(dest.isTopic()){ 62 sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); 63 } 64 try{ 65 addSubscription(sub); 66 }catch(IOException e){ 67 log.error("Failed to add static destination "+dest,e); 68 } 69 if(log.isTraceEnabled()) 70 log.trace("Forwarding messages for durable destination: "+dest); 71 } 72 } 73 } 74 } 75 76 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 77 if(addToAlreadyInterestedConsumers(info)){ 78 return null; } 80 if(info.isDurable()||(info.getDestination().isQueue()&&!info.getDestination().isTemporary())){ 85 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator.getNextSequenceId())); 86 } 87 if(info.isDurable()){ 88 90 info.setSubscriptionName(getSubscriberName(info.getDestination())); 91 } 92 return doCreateDemandSubscription(info); 93 } 94 95 protected String getSubscriberName(ActiveMQDestination dest){ 96 String subscriberName = configuration.getBrokerName()+"_"+dest.getPhysicalName(); 97 return subscriberName; 98 } 99 100 protected boolean doesConsumerExist(ActiveMQDestination dest){ 101 DestinationFilter filter=DestinationFilter.parseFilter(dest); 102 for(Iterator i=subscriptionMapByLocalId.values().iterator();i.hasNext();){ 103 DemandSubscription ds=(DemandSubscription) i.next(); 104 if(filter.matches(ds.getLocalInfo().getDestination())){ 105 return true; 106 } 107 } 108 return false; 109 } 110 } 111 | Popular Tags |