KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > BrokerBroadcaster


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.List JavaDoc;
22 import org.apache.activemq.broker.region.Destination;
23 import org.apache.activemq.broker.region.Subscription;
24 import org.apache.activemq.command.ActiveMQDestination;
25 import org.apache.activemq.command.BrokerInfo;
26 import org.apache.activemq.command.ConnectionInfo;
27 import org.apache.activemq.command.ConsumerInfo;
28 import org.apache.activemq.command.Message;
29 import org.apache.activemq.command.MessageAck;
30 import org.apache.activemq.command.ProducerInfo;
31 import org.apache.activemq.command.RemoveSubscriptionInfo;
32 import org.apache.activemq.command.SessionInfo;
33 import org.apache.activemq.command.TransactionId;
34 /**
35  * Used to add listeners for Broker actions
36  *
37  * @version $Revision: 1.10 $
38  */

39 public class BrokerBroadcaster extends BrokerFilter{
40     protected volatile Broker[] listeners=new Broker[0];
41
42     public BrokerBroadcaster(Broker next){
43         super(next);
44     }
45
46     public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception JavaDoc{
47         next.acknowledge(consumerExchange,ack);
48         Broker brokers[]=getListeners();
49         for(int i=0;i<brokers.length;i++){
50             brokers[i].acknowledge(consumerExchange,ack);
51         }
52     }
53
54     public void addConnection(ConnectionContext context,ConnectionInfo info) throws Exception JavaDoc{
55         next.addConnection(context,info);
56         Broker brokers[]=getListeners();
57         for(int i=0;i<brokers.length;i++){
58             brokers[i].addConnection(context,info);
59         }
60     }
61
62     public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception JavaDoc{
63         Subscription answer = next.addConsumer(context,info);
64         Broker brokers[]=getListeners();
65         for(int i=0;i<brokers.length;i++){
66             brokers[i].addConsumer(context,info);
67         }
68         return answer;
69     }
70
71     public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception JavaDoc{
72         next.addProducer(context,info);
73         Broker brokers[]=getListeners();
74         for(int i=0;i<brokers.length;i++){
75             brokers[i].addProducer(context,info);
76         }
77     }
78
79     public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Exception JavaDoc{
80         next.commitTransaction(context,xid,onePhase);
81         Broker brokers[]=getListeners();
82         for(int i=0;i<brokers.length;i++){
83             brokers[i].commitTransaction(context,xid,onePhase);
84         }
85     }
86
87     public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Exception JavaDoc{
88         next.removeSubscription(context,info);
89         Broker brokers[]=getListeners();
90         for(int i=0;i<brokers.length;i++){
91             brokers[i].removeSubscription(context,info);
92         }
93     }
94
95     public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Exception JavaDoc{
96         int result=next.prepareTransaction(context,xid);
97         Broker brokers[]=getListeners();
98         for(int i=0;i<brokers.length;i++){
99             // TODO decide what to do with return values
100
brokers[i].prepareTransaction(context,xid);
101         }
102         return result;
103     }
104
105     public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable JavaDoc error) throws Exception JavaDoc{
106         next.removeConnection(context,info,error);
107         Broker brokers[]=getListeners();
108         for(int i=0;i<brokers.length;i++){
109             brokers[i].removeConnection(context,info,error);
110         }
111     }
112
113     public void removeConsumer(ConnectionContext context,ConsumerInfo info) throws Exception JavaDoc{
114         next.removeConsumer(context,info);
115         Broker brokers[]=getListeners();
116         for(int i=0;i<brokers.length;i++){
117             brokers[i].removeConsumer(context,info);
118         }
119     }
120
121     public void removeProducer(ConnectionContext context,ProducerInfo info) throws Exception JavaDoc{
122         next.removeProducer(context,info);
123         Broker brokers[]=getListeners();
124         for(int i=0;i<brokers.length;i++){
125             brokers[i].removeProducer(context,info);
126         }
127     }
128
129     public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Exception JavaDoc{
130         next.rollbackTransaction(context,xid);
131         Broker brokers[]=getListeners();
132         for(int i=0;i<brokers.length;i++){
133             brokers[i].rollbackTransaction(context,xid);
134         }
135     }
136
137     public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception JavaDoc{
138         next.send(producerExchange,messageSend);
139         Broker brokers[]=getListeners();
140         for(int i=0;i<brokers.length;i++){
141             brokers[i].send(producerExchange,messageSend);
142         }
143     }
144
145     public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception JavaDoc{
146         next.beginTransaction(context,xid);
147         Broker brokers[]=getListeners();
148         for(int i=0;i<brokers.length;i++){
149             brokers[i].beginTransaction(context,xid);
150         }
151     }
152
153     public void forgetTransaction(ConnectionContext context,TransactionId transactionId) throws Exception JavaDoc{
154         next.forgetTransaction(context,transactionId);
155         Broker brokers[]=getListeners();
156         for(int i=0;i<brokers.length;i++){
157             brokers[i].forgetTransaction(context,transactionId);
158         }
159     }
160
161     public Destination addDestination(ConnectionContext context,ActiveMQDestination destination) throws Exception JavaDoc{
162         Destination result=next.addDestination(context,destination);
163         Broker brokers[]=getListeners();
164         for(int i=0;i<brokers.length;i++){
165             brokers[i].addDestination(context,destination);
166         }
167         return result;
168     }
169
170     public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
171                     throws Exception JavaDoc{
172         next.removeDestination(context,destination,timeout);
173         Broker brokers[]=getListeners();
174         for(int i=0;i<brokers.length;i++){
175             brokers[i].removeDestination(context,destination,timeout);
176         }
177     }
178
179     public void start() throws Exception JavaDoc{
180         next.start();
181         Broker brokers[]=getListeners();
182         for(int i=0;i<brokers.length;i++){
183             brokers[i].start();
184         }
185     }
186
187     public void stop() throws Exception JavaDoc{
188         next.stop();
189         Broker brokers[]=getListeners();
190         for(int i=0;i<brokers.length;i++){
191             brokers[i].stop();
192         }
193     }
194
195     public void addSession(ConnectionContext context,SessionInfo info) throws Exception JavaDoc{
196         next.addSession(context,info);
197         Broker brokers[]=getListeners();
198         for(int i=0;i<brokers.length;i++){
199             brokers[i].addSession(context,info);
200         }
201     }
202
203     public void removeSession(ConnectionContext context,SessionInfo info) throws Exception JavaDoc{
204         next.removeSession(context,info);
205         Broker brokers[]=getListeners();
206         for(int i=0;i<brokers.length;i++){
207             brokers[i].removeSession(context,info);
208         }
209     }
210
211     public void gc(){
212         next.gc();
213         Broker brokers[]=getListeners();
214         for(int i=0;i<brokers.length;i++){
215             brokers[i].gc();
216         }
217     }
218     
219     public void addBroker(Connection connection,BrokerInfo info){
220         next.addBroker(connection,info);
221         Broker brokers[]=getListeners();
222         for(int i=0;i<brokers.length;i++){
223             brokers[i].addBroker(connection, info);
224         }
225     }
226
227     
228     protected Broker[] getListeners(){
229         return listeners;
230     }
231
232     public synchronized void addListener(Broker broker){
233         List JavaDoc tmp=getListenersAsList();
234         tmp.add(broker);
235         listeners=(Broker[]) tmp.toArray(new Broker[tmp.size()]);
236     }
237
238     public synchronized void removeListener(Broker broker){
239         List JavaDoc tmp=getListenersAsList();
240         tmp.remove(broker);
241         listeners=(Broker[]) tmp.toArray(new Broker[tmp.size()]);
242     }
243
244     protected List JavaDoc getListenersAsList(){
245         List JavaDoc tmp=new ArrayList JavaDoc();
246         Broker brokers[]=getListeners();
247         for(int i=0;i<brokers.length;i++){
248             tmp.add(brokers[i]);
249         }
250         return tmp;
251     }
252 }
253
Popular Tags