1 18 package org.apache.activemq.broker; 19 20 import org.apache.activemq.command.ActiveMQDestination; 21 import org.apache.activemq.command.Message; 22 import org.apache.activemq.command.ProducerInfo; 23 24 39 public class CompositeDestinationBroker extends BrokerFilter { 40 41 public CompositeDestinationBroker(Broker next) { 42 super(next); 43 } 44 45 48 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 49 ActiveMQDestination destination = info.getDestination(); 51 if( destination!=null && destination.isComposite() ) { 52 ActiveMQDestination[] destinations = destination.getCompositeDestinations(); 53 for (int i = 0; i < destinations.length; i++) { 54 ProducerInfo copy = info.copy(); 55 copy.setDestination(destinations[i]); 56 next.addProducer(context, copy); 57 } 58 } else { 59 next.addProducer(context, info); 60 } 61 } 62 63 66 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 67 ActiveMQDestination destination = info.getDestination(); 69 if( destination!=null && destination.isComposite() ) { 70 ActiveMQDestination[] destinations = destination.getCompositeDestinations(); 71 for (int i = 0; i < destinations.length; i++) { 72 ProducerInfo copy = info.copy(); 73 copy.setDestination(destinations[i]); 74 next.removeProducer(context, copy); 75 } 76 } else { 77 next.removeProducer(context, info); 78 } 79 } 80 81 84 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 85 ActiveMQDestination destination = message.getDestination(); 86 if( destination.isComposite() ) { 87 ActiveMQDestination[] destinations = destination.getCompositeDestinations(); 88 for (int i = 0; i < destinations.length; i++) { 89 if( i!=0 ) { 90 message = message.copy(); 91 } 92 message.setOriginalDestination(destination); 93 message.setDestination(destinations[i]); 94 next.send(producerExchange, message); 95 } 96 } else { 97 next.send(producerExchange, message); 98 } 99 } 100 101 } 102 | Popular Tags |