KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > ft > MasterBroker


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker.ft;
16
17 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
18 import org.apache.activemq.broker.Connection;
19 import org.apache.activemq.broker.ConnectionContext;
20 import org.apache.activemq.broker.ConsumerBrokerExchange;
21 import org.apache.activemq.broker.InsertableMutableBrokerFilter;
22 import org.apache.activemq.broker.MutableBrokerFilter;
23 import org.apache.activemq.broker.ProducerBrokerExchange;
24 import org.apache.activemq.broker.region.Subscription;
25 import org.apache.activemq.command.Command;
26 import org.apache.activemq.command.ConnectionControl;
27 import org.apache.activemq.command.ConnectionInfo;
28 import org.apache.activemq.command.ConsumerInfo;
29 import org.apache.activemq.command.ExceptionResponse;
30 import org.apache.activemq.command.Message;
31 import org.apache.activemq.command.MessageAck;
32 import org.apache.activemq.command.MessageDispatch;
33 import org.apache.activemq.command.MessageDispatchNotification;
34 import org.apache.activemq.command.ProducerInfo;
35 import org.apache.activemq.command.RemoveInfo;
36 import org.apache.activemq.command.RemoveSubscriptionInfo;
37 import org.apache.activemq.command.Response;
38 import org.apache.activemq.command.SessionInfo;
39 import org.apache.activemq.command.TransactionId;
40 import org.apache.activemq.command.TransactionInfo;
41 import org.apache.activemq.transport.MutexTransport;
42 import org.apache.activemq.transport.ResponseCorrelator;
43 import org.apache.activemq.transport.Transport;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46
47 /**
48  * The Message Broker which passes messages to a slave
49  *
50  * @version $Revision: 1.8 $
51  */

52 public class MasterBroker extends InsertableMutableBrokerFilter{
53
54     private static final Log log=LogFactory.getLog(MasterBroker.class);
55     private Transport slave;
56     private AtomicBoolean JavaDoc started=new AtomicBoolean JavaDoc(false);
57
58     /**
59      * Constructor
60      *
61      * @param parent
62      * @param transport
63      */

64     public MasterBroker(MutableBrokerFilter parent,Transport transport){
65         super(parent);
66         this.slave=transport;
67         this.slave=new MutexTransport(slave);
68         this.slave=new ResponseCorrelator(slave);
69         this.slave.setTransportListener(transport.getTransportListener());
70     }
71
72     /**
73      * start processing this broker
74      *
75      */

76     public void startProcessing(){
77         started.set(true);
78         try{
79             Connection[] connections=getClients();
80             ConnectionControl command=new ConnectionControl();
81             command.setFaultTolerant(true);
82             if(connections!=null){
83                 for(int i=0;i<connections.length;i++){
84                     if(connections[i].isActive()&&connections[i].isManageable()){
85                         connections[i].dispatchAsync(command);
86                     }
87                 }
88             }
89         }catch(Exception JavaDoc e){
90             log.error("Failed to get Connections",e);
91         }
92     }
93
94     /**
95      * stop the broker
96      *
97      * @throws Exception
98      */

99     public void stop() throws Exception JavaDoc{
100         super.stop();
101         stopProcessing();
102     }
103
104     /**
105      * stop processing this broker
106      *
107      */

108     public void stopProcessing(){
109         if(started.compareAndSet(true,false)){
110             remove();
111         }
112     }
113
114     /**
115      * A client is establishing a connection with the broker.
116      *
117      * @param context
118      * @param info
119      * @throws Exception
120      */

121     public void addConnection(ConnectionContext context,ConnectionInfo info) throws Exception JavaDoc{
122         super.addConnection(context,info);
123         sendAsyncToSlave(info);
124     }
125
126     /**
127      * A client is disconnecting from the broker.
128      *
129      * @param context the environment the operation is being executed under.
130      * @param info
131      * @param error null if the client requested the disconnect or the error that caused the client to disconnect.
132      * @throws Exception
133      */

134     public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable JavaDoc error) throws Exception JavaDoc{
135         super.removeConnection(context,info,error);
136         sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
137     }
138
139     /**
140      * Adds a session.
141      *
142      * @param context
143      * @param info
144      * @throws Exception
145      */

146     public void addSession(ConnectionContext context,SessionInfo info) throws Exception JavaDoc{
147         super.addSession(context,info);
148         sendAsyncToSlave(info);
149     }
150
151     /**
152      * Removes a session.
153      *
154      * @param context
155      * @param info
156      * @throws Exception
157      */

158     public void removeSession(ConnectionContext context,SessionInfo info) throws Exception JavaDoc{
159         super.removeSession(context,info);
160         sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
161     }
162
163     /**
164      * Adds a producer.
165      *
166      * @param context the enviorment the operation is being executed under.
167      * @param info
168      * @throws Exception
169      */

170     public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception JavaDoc{
171         super.addProducer(context,info);
172         sendAsyncToSlave(info);
173     }
174
175     /**
176      * Removes a producer.
177      *
178      * @param context the enviorment the operation is being executed under.
179      * @param info
180      * @throws Exception
181      */

182     public void removeProducer(ConnectionContext context,ProducerInfo info) throws Exception JavaDoc{
183         super.removeProducer(context,info);
184         sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
185     }
186
187     /**
188      * add a consumer
189      *
190      * @param context
191      * @param info
192      * @return the assocated subscription
193      * @throws Exception
194      */

195     public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception JavaDoc{
196         sendAsyncToSlave(info);
197         Subscription answer=super.addConsumer(context,info);
198         return answer;
199     }
200
201     /**
202      * remove a subscription
203      *
204      * @param context
205      * @param info
206      * @throws Exception
207      */

208     public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Exception JavaDoc{
209         super.removeSubscription(context,info);
210         sendAsyncToSlave(info);
211     }
212
213     /**
214      * begin a transaction
215      *
216      * @param context
217      * @param xid
218      * @throws Exception
219      */

220     public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception JavaDoc{
221         TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
222         sendAsyncToSlave(info);
223         super.beginTransaction(context,xid);
224     }
225
226     /**
227      * Prepares a transaction. Only valid for xa transactions.
228      *
229      * @param context
230      * @param xid
231      * @return the state
232      * @throws Exception
233      */

234     public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Exception JavaDoc{
235         TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
236         sendAsyncToSlave(info);
237         int result=super.prepareTransaction(context,xid);
238         return result;
239     }
240
241     /**
242      * Rollsback a transaction.
243      *
244      * @param context
245      * @param xid
246      * @throws Exception
247      */

248     public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Exception JavaDoc{
249         TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
250         sendAsyncToSlave(info);
251         super.rollbackTransaction(context,xid);
252     }
253
254     /**
255      * Commits a transaction.
256      *
257      * @param context
258      * @param xid
259      * @param onePhase
260      * @throws Exception
261      */

262     public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Exception JavaDoc{
263         TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
264         sendSyncToSlave(info);
265         super.commitTransaction(context,xid,onePhase);
266     }
267
268     /**
269      * Forgets a transaction.
270      *
271      * @param context
272      * @param xid
273      * @throws Exception
274      */

275     public void forgetTransaction(ConnectionContext context,TransactionId xid) throws Exception JavaDoc{
276         TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
277         sendAsyncToSlave(info);
278         super.forgetTransaction(context,xid);
279     }
280
281     /**
282      * Notifiy the Broker that a dispatch has happened
283      *
284      * @param messageDispatch
285      */

286     public void processDispatch(MessageDispatch messageDispatch){
287         MessageDispatchNotification mdn=new MessageDispatchNotification();
288         mdn.setConsumerId(messageDispatch.getConsumerId());
289         mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
290         mdn.setDestination(messageDispatch.getDestination());
291         if(messageDispatch.getMessage()!=null)
292             mdn.setMessageId(messageDispatch.getMessage().getMessageId());
293         sendAsyncToSlave(mdn);
294         super.processDispatch(messageDispatch);
295     }
296
297     /**
298      * @param context
299      * @param message
300      * @throws Exception
301      *
302      */

303     public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception JavaDoc{
304         /**
305          * A message can be dispatched before the super.send() method returns so - here the order is switched to avoid
306          * problems on the slave with receiving acks for messages not received yey
307          */

308         sendToSlave(message);
309         super.send(producerExchange,message);
310     }
311
312     /**
313      * @param context
314      * @param ack
315      * @throws Exception
316      *
317      */

318     public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception JavaDoc{
319         sendToSlave(ack);
320         super.acknowledge(consumerExchange,ack);
321     }
322
323     public boolean isFaultTolerantConfiguration(){
324         return true;
325     }
326
327     protected void sendToSlave(Message message){
328         if(message.isResponseRequired()){
329             sendSyncToSlave(message);
330         }else{
331             sendAsyncToSlave(message);
332         }
333     }
334
335     protected void sendToSlave(MessageAck ack){
336         if(ack.isResponseRequired()){
337             sendAsyncToSlave(ack);
338         }else{
339             sendSyncToSlave(ack);
340         }
341     }
342
343     protected void sendAsyncToSlave(Command command){
344         try{
345             slave.oneway(command);
346         }catch(Throwable JavaDoc e){
347             log.error("Slave Failed",e);
348             stopProcessing();
349         }
350     }
351
352     protected void sendSyncToSlave(Command command){
353         try{
354             Response response=(Response)slave.request(command);
355             if(response.isException()){
356                 ExceptionResponse er=(ExceptionResponse)response;
357                 log.error("Slave Failed",er.getException());
358             }
359         }catch(Throwable JavaDoc e){
360             log.error("Slave Failed",e);
361         }
362     }
363 }
364
Popular Tags