1 18 package org.apache.activemq.command; 19 20 import java.io.IOException ; 21 22 import javax.jms.JMSException ; 23 24 import org.apache.activemq.filter.BooleanExpression; 25 import org.apache.activemq.filter.MessageEvaluationContext; 26 import org.apache.activemq.util.JMSExceptionSupport; 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 30 34 public class NetworkBridgeFilter implements DataStructure, BooleanExpression { 35 36 static final Log log=LogFactory.getLog(NetworkBridgeFilter.class); 37 public static final byte DATA_STRUCTURE_TYPE=CommandTypes.NETWORK_BRIDGE_FILTER; 38 39 private BrokerId networkBrokerId; 40 private int networkTTL; 41 42 public NetworkBridgeFilter() { 43 } 44 45 public NetworkBridgeFilter(BrokerId remoteBrokerPath, int networkTTL) { 46 this.networkBrokerId = remoteBrokerPath; 47 this.networkTTL = networkTTL; 48 } 49 50 public byte getDataStructureType() { 51 return DATA_STRUCTURE_TYPE; 52 } 53 54 public boolean isMarshallAware() { 55 return false; 56 } 57 58 59 public boolean matches(MessageEvaluationContext mec) throws JMSException { 60 try{ 61 Message message = mec.getMessage(); 65 return message != null && matchesForwardingFilter(message); 66 }catch(IOException e){ 67 throw JMSExceptionSupport.create(e); 68 } 69 } 70 71 public Object evaluate(MessageEvaluationContext message) throws JMSException { 72 return matches(message)?Boolean.TRUE:Boolean.FALSE; 73 } 74 75 protected boolean matchesForwardingFilter(Message message){ 76 77 if ( contains(message.getBrokerPath(), networkBrokerId) ){ 78 if (log.isTraceEnabled()){ 79 log.trace("Message all ready routed once through this broker - ignoring: " + message); 80 } 81 return false; 82 } 83 84 int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length; 85 86 if(hops >= networkTTL){ 87 if (log.isTraceEnabled()){ 88 log.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message); 89 } 90 return false; 91 } 92 93 if(message.isAdvisory()&&message.getDataStructure()!=null 95 &&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){ 96 ConsumerInfo info=(ConsumerInfo) message.getDataStructure(); 97 hops = info.getBrokerPath() == null ? 0 : message.getBrokerPath().length; 98 if(hops >= networkTTL ){ 99 if (log.isTraceEnabled()){ 100 log.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message); 101 } 102 return false; 103 } 104 } 105 return true; 106 } 107 108 public static boolean contains(BrokerId[] brokerPath,BrokerId brokerId){ 109 if(brokerPath!=null){ 110 for(int i=0;i<brokerPath.length;i++){ 111 if(brokerId.equals(brokerPath[i])) 112 return true; 113 } 114 } 115 return false; 116 } 117 118 121 public int getNetworkTTL() { 122 return networkTTL; 123 } 124 public void setNetworkTTL(int networkTTL) { 125 this.networkTTL = networkTTL; 126 } 127 128 131 public BrokerId getNetworkBrokerId() { 132 return networkBrokerId; 133 } 134 135 public void setNetworkBrokerId(BrokerId remoteBrokerPath) { 136 this.networkBrokerId = remoteBrokerPath; 137 } 138 139 } 140 | Popular Tags |