1 18 package org.apache.activemq.broker; 19 20 import java.util.ArrayList ; 21 import java.util.List ; 22 import org.apache.activemq.broker.region.Destination; 23 import org.apache.activemq.broker.region.Subscription; 24 import org.apache.activemq.command.ActiveMQDestination; 25 import org.apache.activemq.command.BrokerInfo; 26 import org.apache.activemq.command.ConnectionInfo; 27 import org.apache.activemq.command.ConsumerInfo; 28 import org.apache.activemq.command.Message; 29 import org.apache.activemq.command.MessageAck; 30 import org.apache.activemq.command.ProducerInfo; 31 import org.apache.activemq.command.RemoveSubscriptionInfo; 32 import org.apache.activemq.command.SessionInfo; 33 import org.apache.activemq.command.TransactionId; 34 39 public class BrokerBroadcaster extends BrokerFilter{ 40 protected volatile Broker[] listeners=new Broker[0]; 41 42 public BrokerBroadcaster(Broker next){ 43 super(next); 44 } 45 46 public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception { 47 next.acknowledge(consumerExchange,ack); 48 Broker brokers[]=getListeners(); 49 for(int i=0;i<brokers.length;i++){ 50 brokers[i].acknowledge(consumerExchange,ack); 51 } 52 } 53 54 public void addConnection(ConnectionContext context,ConnectionInfo info) throws Exception { 55 next.addConnection(context,info); 56 Broker brokers[]=getListeners(); 57 for(int i=0;i<brokers.length;i++){ 58 brokers[i].addConnection(context,info); 59 } 60 } 61 62 public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception { 63 Subscription answer = next.addConsumer(context,info); 64 Broker brokers[]=getListeners(); 65 for(int i=0;i<brokers.length;i++){ 66 brokers[i].addConsumer(context,info); 67 } 68 return answer; 69 } 70 71 public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception { 72 next.addProducer(context,info); 73 Broker brokers[]=getListeners(); 74 for(int i=0;i<brokers.length;i++){ 75 brokers[i].addProducer(context,info); 76 } 77 } 78 79 public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Exception { 80 next.commitTransaction(context,xid,onePhase); 81 Broker brokers[]=getListeners(); 82 for(int i=0;i<brokers.length;i++){ 83 brokers[i].commitTransaction(context,xid,onePhase); 84 } 85 } 86 87 public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Exception { 88 next.removeSubscription(context,info); 89 Broker brokers[]=getListeners(); 90 for(int i=0;i<brokers.length;i++){ 91 brokers[i].removeSubscription(context,info); 92 } 93 } 94 95 public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Exception { 96 int result=next.prepareTransaction(context,xid); 97 Broker brokers[]=getListeners(); 98 for(int i=0;i<brokers.length;i++){ 99 brokers[i].prepareTransaction(context,xid); 101 } 102 return result; 103 } 104 105 public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable error) throws Exception { 106 next.removeConnection(context,info,error); 107 Broker brokers[]=getListeners(); 108 for(int i=0;i<brokers.length;i++){ 109 brokers[i].removeConnection(context,info,error); 110 } 111 } 112 113 public void removeConsumer(ConnectionContext context,ConsumerInfo info) throws Exception { 114 next.removeConsumer(context,info); 115 Broker brokers[]=getListeners(); 116 for(int i=0;i<brokers.length;i++){ 117 brokers[i].removeConsumer(context,info); 118 } 119 } 120 121 public void removeProducer(ConnectionContext context,ProducerInfo info) throws Exception { 122 next.removeProducer(context,info); 123 Broker brokers[]=getListeners(); 124 for(int i=0;i<brokers.length;i++){ 125 brokers[i].removeProducer(context,info); 126 } 127 } 128 129 public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Exception { 130 next.rollbackTransaction(context,xid); 131 Broker brokers[]=getListeners(); 132 for(int i=0;i<brokers.length;i++){ 133 brokers[i].rollbackTransaction(context,xid); 134 } 135 } 136 137 public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception { 138 next.send(producerExchange,messageSend); 139 Broker brokers[]=getListeners(); 140 for(int i=0;i<brokers.length;i++){ 141 brokers[i].send(producerExchange,messageSend); 142 } 143 } 144 145 public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception { 146 next.beginTransaction(context,xid); 147 Broker brokers[]=getListeners(); 148 for(int i=0;i<brokers.length;i++){ 149 brokers[i].beginTransaction(context,xid); 150 } 151 } 152 153 public void forgetTransaction(ConnectionContext context,TransactionId transactionId) throws Exception { 154 next.forgetTransaction(context,transactionId); 155 Broker brokers[]=getListeners(); 156 for(int i=0;i<brokers.length;i++){ 157 brokers[i].forgetTransaction(context,transactionId); 158 } 159 } 160 161 public Destination addDestination(ConnectionContext context,ActiveMQDestination destination) throws Exception { 162 Destination result=next.addDestination(context,destination); 163 Broker brokers[]=getListeners(); 164 for(int i=0;i<brokers.length;i++){ 165 brokers[i].addDestination(context,destination); 166 } 167 return result; 168 } 169 170 public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) 171 throws Exception { 172 next.removeDestination(context,destination,timeout); 173 Broker brokers[]=getListeners(); 174 for(int i=0;i<brokers.length;i++){ 175 brokers[i].removeDestination(context,destination,timeout); 176 } 177 } 178 179 public void start() throws Exception { 180 next.start(); 181 Broker brokers[]=getListeners(); 182 for(int i=0;i<brokers.length;i++){ 183 brokers[i].start(); 184 } 185 } 186 187 public void stop() throws Exception { 188 next.stop(); 189 Broker brokers[]=getListeners(); 190 for(int i=0;i<brokers.length;i++){ 191 brokers[i].stop(); 192 } 193 } 194 195 public void addSession(ConnectionContext context,SessionInfo info) throws Exception { 196 next.addSession(context,info); 197 Broker brokers[]=getListeners(); 198 for(int i=0;i<brokers.length;i++){ 199 brokers[i].addSession(context,info); 200 } 201 } 202 203 public void removeSession(ConnectionContext context,SessionInfo info) throws Exception { 204 next.removeSession(context,info); 205 Broker brokers[]=getListeners(); 206 for(int i=0;i<brokers.length;i++){ 207 brokers[i].removeSession(context,info); 208 } 209 } 210 211 public void gc(){ 212 next.gc(); 213 Broker brokers[]=getListeners(); 214 for(int i=0;i<brokers.length;i++){ 215 brokers[i].gc(); 216 } 217 } 218 219 public void addBroker(Connection connection,BrokerInfo info){ 220 next.addBroker(connection,info); 221 Broker brokers[]=getListeners(); 222 for(int i=0;i<brokers.length;i++){ 223 brokers[i].addBroker(connection, info); 224 } 225 } 226 227 228 protected Broker[] getListeners(){ 229 return listeners; 230 } 231 232 public synchronized void addListener(Broker broker){ 233 List tmp=getListenersAsList(); 234 tmp.add(broker); 235 listeners=(Broker[]) tmp.toArray(new Broker[tmp.size()]); 236 } 237 238 public synchronized void removeListener(Broker broker){ 239 List tmp=getListenersAsList(); 240 tmp.remove(broker); 241 listeners=(Broker[]) tmp.toArray(new Broker[tmp.size()]); 242 } 243 244 protected List getListenersAsList(){ 245 List tmp=new ArrayList (); 246 Broker brokers[]=getListeners(); 247 for(int i=0;i<brokers.length;i++){ 248 tmp.add(brokers[i]); 249 } 250 return tmp; 251 } 252 } 253 | Popular Tags |