1 14 15 package org.apache.activemq.broker.ft; 16 17 import java.util.concurrent.atomic.AtomicBoolean ; 18 import org.apache.activemq.broker.Connection; 19 import org.apache.activemq.broker.ConnectionContext; 20 import org.apache.activemq.broker.ConsumerBrokerExchange; 21 import org.apache.activemq.broker.InsertableMutableBrokerFilter; 22 import org.apache.activemq.broker.MutableBrokerFilter; 23 import org.apache.activemq.broker.ProducerBrokerExchange; 24 import org.apache.activemq.broker.region.Subscription; 25 import org.apache.activemq.command.Command; 26 import org.apache.activemq.command.ConnectionControl; 27 import org.apache.activemq.command.ConnectionInfo; 28 import org.apache.activemq.command.ConsumerInfo; 29 import org.apache.activemq.command.ExceptionResponse; 30 import org.apache.activemq.command.Message; 31 import org.apache.activemq.command.MessageAck; 32 import org.apache.activemq.command.MessageDispatch; 33 import org.apache.activemq.command.MessageDispatchNotification; 34 import org.apache.activemq.command.ProducerInfo; 35 import org.apache.activemq.command.RemoveInfo; 36 import org.apache.activemq.command.RemoveSubscriptionInfo; 37 import org.apache.activemq.command.Response; 38 import org.apache.activemq.command.SessionInfo; 39 import org.apache.activemq.command.TransactionId; 40 import org.apache.activemq.command.TransactionInfo; 41 import org.apache.activemq.transport.MutexTransport; 42 import org.apache.activemq.transport.ResponseCorrelator; 43 import org.apache.activemq.transport.Transport; 44 import org.apache.commons.logging.Log; 45 import org.apache.commons.logging.LogFactory; 46 47 52 public class MasterBroker extends InsertableMutableBrokerFilter{ 53 54 private static final Log log=LogFactory.getLog(MasterBroker.class); 55 private Transport slave; 56 private AtomicBoolean started=new AtomicBoolean (false); 57 58 64 public MasterBroker(MutableBrokerFilter parent,Transport transport){ 65 super(parent); 66 this.slave=transport; 67 this.slave=new MutexTransport(slave); 68 this.slave=new ResponseCorrelator(slave); 69 this.slave.setTransportListener(transport.getTransportListener()); 70 } 71 72 76 public void startProcessing(){ 77 started.set(true); 78 try{ 79 Connection[] connections=getClients(); 80 ConnectionControl command=new ConnectionControl(); 81 command.setFaultTolerant(true); 82 if(connections!=null){ 83 for(int i=0;i<connections.length;i++){ 84 if(connections[i].isActive()&&connections[i].isManageable()){ 85 connections[i].dispatchAsync(command); 86 } 87 } 88 } 89 }catch(Exception e){ 90 log.error("Failed to get Connections",e); 91 } 92 } 93 94 99 public void stop() throws Exception { 100 super.stop(); 101 stopProcessing(); 102 } 103 104 108 public void stopProcessing(){ 109 if(started.compareAndSet(true,false)){ 110 remove(); 111 } 112 } 113 114 121 public void addConnection(ConnectionContext context,ConnectionInfo info) throws Exception { 122 super.addConnection(context,info); 123 sendAsyncToSlave(info); 124 } 125 126 134 public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable error) throws Exception { 135 super.removeConnection(context,info,error); 136 sendAsyncToSlave(new RemoveInfo(info.getConnectionId())); 137 } 138 139 146 public void addSession(ConnectionContext context,SessionInfo info) throws Exception { 147 super.addSession(context,info); 148 sendAsyncToSlave(info); 149 } 150 151 158 public void removeSession(ConnectionContext context,SessionInfo info) throws Exception { 159 super.removeSession(context,info); 160 sendAsyncToSlave(new RemoveInfo(info.getSessionId())); 161 } 162 163 170 public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception { 171 super.addProducer(context,info); 172 sendAsyncToSlave(info); 173 } 174 175 182 public void removeProducer(ConnectionContext context,ProducerInfo info) throws Exception { 183 super.removeProducer(context,info); 184 sendAsyncToSlave(new RemoveInfo(info.getProducerId())); 185 } 186 187 195 public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception { 196 sendAsyncToSlave(info); 197 Subscription answer=super.addConsumer(context,info); 198 return answer; 199 } 200 201 208 public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Exception { 209 super.removeSubscription(context,info); 210 sendAsyncToSlave(info); 211 } 212 213 220 public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception { 221 TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN); 222 sendAsyncToSlave(info); 223 super.beginTransaction(context,xid); 224 } 225 226 234 public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Exception { 235 TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE); 236 sendAsyncToSlave(info); 237 int result=super.prepareTransaction(context,xid); 238 return result; 239 } 240 241 248 public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Exception { 249 TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK); 250 sendAsyncToSlave(info); 251 super.rollbackTransaction(context,xid); 252 } 253 254 262 public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Exception { 263 TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE); 264 sendSyncToSlave(info); 265 super.commitTransaction(context,xid,onePhase); 266 } 267 268 275 public void forgetTransaction(ConnectionContext context,TransactionId xid) throws Exception { 276 TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET); 277 sendAsyncToSlave(info); 278 super.forgetTransaction(context,xid); 279 } 280 281 286 public void processDispatch(MessageDispatch messageDispatch){ 287 MessageDispatchNotification mdn=new MessageDispatchNotification(); 288 mdn.setConsumerId(messageDispatch.getConsumerId()); 289 mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId()); 290 mdn.setDestination(messageDispatch.getDestination()); 291 if(messageDispatch.getMessage()!=null) 292 mdn.setMessageId(messageDispatch.getMessage().getMessageId()); 293 sendAsyncToSlave(mdn); 294 super.processDispatch(messageDispatch); 295 } 296 297 303 public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception { 304 308 sendToSlave(message); 309 super.send(producerExchange,message); 310 } 311 312 318 public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception { 319 sendToSlave(ack); 320 super.acknowledge(consumerExchange,ack); 321 } 322 323 public boolean isFaultTolerantConfiguration(){ 324 return true; 325 } 326 327 protected void sendToSlave(Message message){ 328 if(message.isResponseRequired()){ 329 sendSyncToSlave(message); 330 }else{ 331 sendAsyncToSlave(message); 332 } 333 } 334 335 protected void sendToSlave(MessageAck ack){ 336 if(ack.isResponseRequired()){ 337 sendAsyncToSlave(ack); 338 }else{ 339 sendSyncToSlave(ack); 340 } 341 } 342 343 protected void sendAsyncToSlave(Command command){ 344 try{ 345 slave.oneway(command); 346 }catch(Throwable e){ 347 log.error("Slave Failed",e); 348 stopProcessing(); 349 } 350 } 351 352 protected void sendSyncToSlave(Command command){ 353 try{ 354 Response response=(Response)slave.request(command); 355 if(response.isException()){ 356 ExceptionResponse er=(ExceptionResponse)response; 357 log.error("Slave Failed",er.getException()); 358 } 359 }catch(Throwable e){ 360 log.error("Slave Failed",e); 361 } 362 } 363 } 364 | Popular Tags |