1 18 package org.apache.activemq.advisory; 19 20 import java.util.Iterator ; 21 22 import org.apache.activemq.broker.Broker; 23 import org.apache.activemq.broker.BrokerFilter; 24 import org.apache.activemq.broker.ConnectionContext; 25 import org.apache.activemq.broker.ProducerBrokerExchange; 26 import org.apache.activemq.broker.region.Destination; 27 import org.apache.activemq.broker.region.Subscription; 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.ActiveMQMessage; 30 import org.apache.activemq.command.ActiveMQTopic; 31 import org.apache.activemq.command.Command; 32 import org.apache.activemq.command.ConnectionInfo; 33 import org.apache.activemq.command.ConsumerId; 34 import org.apache.activemq.command.ConsumerInfo; 35 import org.apache.activemq.command.DestinationInfo; 36 import org.apache.activemq.command.MessageId; 37 import org.apache.activemq.command.ProducerId; 38 import org.apache.activemq.command.ProducerInfo; 39 import org.apache.activemq.util.IdGenerator; 40 import org.apache.activemq.util.LongSequenceGenerator; 41 42 import java.util.concurrent.ConcurrentHashMap ; 43 44 50 public class AdvisoryBroker extends BrokerFilter { 51 52 54 protected final ConcurrentHashMap connections = new ConcurrentHashMap (); 55 protected final ConcurrentHashMap consumers = new ConcurrentHashMap (); 56 protected final ConcurrentHashMap producers = new ConcurrentHashMap (); 57 protected final ConcurrentHashMap destinations = new ConcurrentHashMap (); 58 59 static final private IdGenerator idGenerator = new IdGenerator(); 60 protected final ProducerId advisoryProducerId = new ProducerId(); 61 final private LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 62 63 public AdvisoryBroker(Broker next) { 64 super(next); 65 advisoryProducerId.setConnectionId(idGenerator.generateId()); 66 } 67 68 69 70 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 71 next.addConnection(context, info); 72 73 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 74 fireAdvisory(context, topic, info); 75 connections.put(info.getConnectionId(), info); 76 } 77 78 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 79 Subscription answer = next.addConsumer(context, info); 80 81 if( !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) { 83 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); 84 consumers.put(info.getConsumerId(), info); 85 fireConsumerAdvisory(context, topic, info); 86 } else { 87 88 if( AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination()) ) { 91 for (Iterator iter = connections.values().iterator(); iter.hasNext();) { 93 ConnectionInfo value = (ConnectionInfo) iter.next(); 94 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 95 fireAdvisory(context, topic, value, info.getConsumerId()); 96 } 97 } 98 99 if( AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination()) ) { 102 for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { 104 DestinationInfo value = (DestinationInfo) iter.next(); 105 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination()); 106 fireAdvisory(context, topic, value, info.getConsumerId()); 107 } 108 } 109 110 if( AdvisorySupport.isProducerAdvisoryTopic(info.getDestination()) ) { 112 for (Iterator iter = producers.values().iterator(); iter.hasNext();) { 113 ProducerInfo value = (ProducerInfo) iter.next(); 114 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); 115 fireProducerAdvisory(context, topic, value, info.getConsumerId()); 116 } 117 } 118 119 if( AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ) { 121 for (Iterator iter = consumers.values().iterator(); iter.hasNext();) { 122 ConsumerInfo value = (ConsumerInfo) iter.next(); 123 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); 124 fireConsumerAdvisory(context, topic, value, info.getConsumerId()); 125 } 126 } 127 } 128 return answer; 129 } 130 131 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 132 next.addProducer(context, info); 133 134 if( info.getDestination()!=null && !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) { 136 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); 137 fireAdvisory(context, topic, info); 138 producers.put(info.getProducerId(), info); 139 } 140 } 141 142 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { 143 Destination answer = next.addDestination(context, destination); 144 145 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 146 DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); 147 fireAdvisory(context, topic, info); 148 destinations.put(destination, info); 149 return answer; 150 } 151 152 public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception { 153 ActiveMQDestination destination = info.getDestination(); 154 next.addDestinationInfo(context, info); 155 156 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 157 fireAdvisory(context, topic, info); 158 destinations.put(destination, info); 159 } 160 161 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 162 next.removeDestination(context, destination, timeout); 163 DestinationInfo info = (DestinationInfo) destinations.remove(destination); 164 if( info !=null ) { 165 info.setDestination(destination); 166 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 167 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 168 fireAdvisory(context, topic, info); 169 try { 170 next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1); 171 } catch (Exception expectedIfDestinationDidNotExistYet) { 172 } 173 try { 174 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); 175 } catch (Exception expectedIfDestinationDidNotExistYet) { 176 } 177 } 178 179 } 180 181 public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { 182 next.removeDestinationInfo(context, destInfo); 183 DestinationInfo info = (DestinationInfo) destinations.remove(destInfo.getDestination()); 184 185 if( info !=null ) { 186 info.setDestination(destInfo.getDestination()); 187 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 188 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination()); 189 fireAdvisory(context, topic, info); 190 try { 191 next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1); 192 } catch (Exception expectedIfDestinationDidNotExistYet) { 193 } 194 try { 195 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); 196 } catch (Exception expectedIfDestinationDidNotExistYet) { 197 } 198 } 199 200 } 201 202 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 203 next.removeConnection(context, info, error); 204 205 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 206 fireAdvisory(context, topic, info.createRemoveCommand()); 207 connections.remove(info.getConnectionId()); 208 } 209 210 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 211 next.removeConsumer(context, info); 212 213 if( !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) { 215 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); 216 consumers.remove(info.getConsumerId()); 217 fireConsumerAdvisory(context, topic, info.createRemoveCommand()); 218 } 219 } 220 221 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 222 next.removeProducer(context, info); 223 224 if( info.getDestination()!=null && !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) { 226 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); 227 producers.remove(info.getProducerId()); 228 fireProducerAdvisory(context, topic, info.createRemoveCommand()); 229 } 230 } 231 232 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { 233 fireAdvisory(context, topic, command, null); 234 } 235 236 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 237 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 238 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 239 } 240 241 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { 242 fireConsumerAdvisory(context, topic, command, null); 243 } 244 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 245 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 246 advisoryMessage.setIntProperty("consumerCount", consumers.size()); 247 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 248 } 249 250 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { 251 fireProducerAdvisory(context, topic, command, null); 252 } 253 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 254 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 255 advisoryMessage.setIntProperty("producerCount", producers.size()); 256 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 257 } 258 259 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { 260 advisoryMessage.setDataStructure(command); 261 advisoryMessage.setPersistent(false); 262 advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 263 advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId())); 264 advisoryMessage.setTargetConsumerId(targetConsumerId); 265 266 advisoryMessage.setDestination(topic); 267 advisoryMessage.setResponseRequired(false); 268 advisoryMessage.setProducerId(advisoryProducerId); 269 boolean originalFlowControl = context.isProducerFlowControl(); 270 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 271 producerExchange.setConnectionContext(context); 272 producerExchange.setMutable(true); 273 try { 274 context.setProducerFlowControl(false); 275 next.send(producerExchange, advisoryMessage); 276 } finally { 277 context.setProducerFlowControl(originalFlowControl); 278 } 279 } 280 281 } 282 | Popular Tags |