|                                                                                                              1
 14
 15  package org.apache.activemq.broker.ft;
 16
 17  import java.util.concurrent.atomic.AtomicBoolean
  ; 18  import org.apache.activemq.broker.Connection;
 19  import org.apache.activemq.broker.ConnectionContext;
 20  import org.apache.activemq.broker.ConsumerBrokerExchange;
 21  import org.apache.activemq.broker.InsertableMutableBrokerFilter;
 22  import org.apache.activemq.broker.MutableBrokerFilter;
 23  import org.apache.activemq.broker.ProducerBrokerExchange;
 24  import org.apache.activemq.broker.region.Subscription;
 25  import org.apache.activemq.command.Command;
 26  import org.apache.activemq.command.ConnectionControl;
 27  import org.apache.activemq.command.ConnectionInfo;
 28  import org.apache.activemq.command.ConsumerInfo;
 29  import org.apache.activemq.command.ExceptionResponse;
 30  import org.apache.activemq.command.Message;
 31  import org.apache.activemq.command.MessageAck;
 32  import org.apache.activemq.command.MessageDispatch;
 33  import org.apache.activemq.command.MessageDispatchNotification;
 34  import org.apache.activemq.command.ProducerInfo;
 35  import org.apache.activemq.command.RemoveInfo;
 36  import org.apache.activemq.command.RemoveSubscriptionInfo;
 37  import org.apache.activemq.command.Response;
 38  import org.apache.activemq.command.SessionInfo;
 39  import org.apache.activemq.command.TransactionId;
 40  import org.apache.activemq.command.TransactionInfo;
 41  import org.apache.activemq.transport.MutexTransport;
 42  import org.apache.activemq.transport.ResponseCorrelator;
 43  import org.apache.activemq.transport.Transport;
 44  import org.apache.commons.logging.Log;
 45  import org.apache.commons.logging.LogFactory;
 46
 47
 52  public class MasterBroker extends InsertableMutableBrokerFilter{
 53
 54      private static final Log log=LogFactory.getLog(MasterBroker.class);
 55      private Transport slave;
 56      private AtomicBoolean
  started=new AtomicBoolean  (false); 57
 58
 64      public MasterBroker(MutableBrokerFilter parent,Transport transport){
 65          super(parent);
 66          this.slave=transport;
 67          this.slave=new MutexTransport(slave);
 68          this.slave=new ResponseCorrelator(slave);
 69          this.slave.setTransportListener(transport.getTransportListener());
 70      }
 71
 72
 76      public void startProcessing(){
 77          started.set(true);
 78          try{
 79              Connection[] connections=getClients();
 80              ConnectionControl command=new ConnectionControl();
 81              command.setFaultTolerant(true);
 82              if(connections!=null){
 83                  for(int i=0;i<connections.length;i++){
 84                      if(connections[i].isActive()&&connections[i].isManageable()){
 85                          connections[i].dispatchAsync(command);
 86                      }
 87                  }
 88              }
 89          }catch(Exception
  e){ 90              log.error("Failed to get Connections",e);
 91          }
 92      }
 93
 94
 99      public void stop() throws Exception
  { 100         super.stop();
 101         stopProcessing();
 102     }
 103
 104
 108     public void stopProcessing(){
 109         if(started.compareAndSet(true,false)){
 110             remove();
 111         }
 112     }
 113
 114
 121     public void addConnection(ConnectionContext context,ConnectionInfo info) throws Exception
  { 122         super.addConnection(context,info);
 123         sendAsyncToSlave(info);
 124     }
 125
 126
 134     public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable
  error) throws Exception  { 135         super.removeConnection(context,info,error);
 136         sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
 137     }
 138
 139
 146     public void addSession(ConnectionContext context,SessionInfo info) throws Exception
  { 147         super.addSession(context,info);
 148         sendAsyncToSlave(info);
 149     }
 150
 151
 158     public void removeSession(ConnectionContext context,SessionInfo info) throws Exception
  { 159         super.removeSession(context,info);
 160         sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
 161     }
 162
 163
 170     public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception
  { 171         super.addProducer(context,info);
 172         sendAsyncToSlave(info);
 173     }
 174
 175
 182     public void removeProducer(ConnectionContext context,ProducerInfo info) throws Exception
  { 183         super.removeProducer(context,info);
 184         sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
 185     }
 186
 187
 195     public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception
  { 196         sendAsyncToSlave(info);
 197         Subscription answer=super.addConsumer(context,info);
 198         return answer;
 199     }
 200
 201
 208     public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Exception
  { 209         super.removeSubscription(context,info);
 210         sendAsyncToSlave(info);
 211     }
 212
 213
 220     public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception
  { 221         TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
 222         sendAsyncToSlave(info);
 223         super.beginTransaction(context,xid);
 224     }
 225
 226
 234     public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Exception
  { 235         TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
 236         sendAsyncToSlave(info);
 237         int result=super.prepareTransaction(context,xid);
 238         return result;
 239     }
 240
 241
 248     public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Exception
  { 249         TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
 250         sendAsyncToSlave(info);
 251         super.rollbackTransaction(context,xid);
 252     }
 253
 254
 262     public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Exception
  { 263         TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
 264         sendSyncToSlave(info);
 265         super.commitTransaction(context,xid,onePhase);
 266     }
 267
 268
 275     public void forgetTransaction(ConnectionContext context,TransactionId xid) throws Exception
  { 276         TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
 277         sendAsyncToSlave(info);
 278         super.forgetTransaction(context,xid);
 279     }
 280
 281
 286     public void processDispatch(MessageDispatch messageDispatch){
 287         MessageDispatchNotification mdn=new MessageDispatchNotification();
 288         mdn.setConsumerId(messageDispatch.getConsumerId());
 289         mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
 290         mdn.setDestination(messageDispatch.getDestination());
 291         if(messageDispatch.getMessage()!=null)
 292             mdn.setMessageId(messageDispatch.getMessage().getMessageId());
 293         sendAsyncToSlave(mdn);
 294         super.processDispatch(messageDispatch);
 295     }
 296
 297
 303     public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception
  { 304
 308         sendToSlave(message);
 309         super.send(producerExchange,message);
 310     }
 311
 312
 318     public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception
  { 319         sendToSlave(ack);
 320         super.acknowledge(consumerExchange,ack);
 321     }
 322
 323     public boolean isFaultTolerantConfiguration(){
 324         return true;
 325     }
 326
 327     protected void sendToSlave(Message message){
 328         if(message.isResponseRequired()){
 329             sendSyncToSlave(message);
 330         }else{
 331             sendAsyncToSlave(message);
 332         }
 333     }
 334
 335     protected void sendToSlave(MessageAck ack){
 336         if(ack.isResponseRequired()){
 337             sendAsyncToSlave(ack);
 338         }else{
 339             sendSyncToSlave(ack);
 340         }
 341     }
 342
 343     protected void sendAsyncToSlave(Command command){
 344         try{
 345             slave.oneway(command);
 346         }catch(Throwable
  e){ 347             log.error("Slave Failed",e);
 348             stopProcessing();
 349         }
 350     }
 351
 352     protected void sendSyncToSlave(Command command){
 353         try{
 354             Response response=(Response)slave.request(command);
 355             if(response.isException()){
 356                 ExceptionResponse er=(ExceptionResponse)response;
 357                 log.error("Slave Failed",er.getException());
 358             }
 359         }catch(Throwable
  e){ 360             log.error("Slave Failed",e);
 361         }
 362     }
 363 }
 364
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |