1 18 package org.apache.activemq.broker.ft; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 import java.util.List ; 24 25 import org.apache.activemq.Service; 26 import org.apache.activemq.broker.BrokerService; 27 import org.apache.activemq.broker.BrokerServiceAware; 28 import org.apache.activemq.broker.TransportConnector; 29 import org.apache.activemq.command.BrokerInfo; 30 import org.apache.activemq.command.Command; 31 import org.apache.activemq.command.CommandTypes; 32 import org.apache.activemq.command.ConnectionId; 33 import org.apache.activemq.command.ConnectionInfo; 34 import org.apache.activemq.command.MessageDispatch; 35 import org.apache.activemq.command.ProducerInfo; 36 import org.apache.activemq.command.Response; 37 import org.apache.activemq.command.SessionInfo; 38 import org.apache.activemq.command.ShutdownInfo; 39 import org.apache.activemq.transport.DefaultTransportListener; 40 import org.apache.activemq.transport.Transport; 41 import org.apache.activemq.transport.TransportFactory; 42 import org.apache.activemq.util.IdGenerator; 43 import org.apache.activemq.util.ServiceStopper; 44 import org.apache.activemq.util.ServiceSupport; 45 import org.apache.commons.logging.Log; 46 import org.apache.commons.logging.LogFactory; 47 import java.util.concurrent.atomic.AtomicBoolean ; 48 49 58 public class MasterConnector implements Service, BrokerServiceAware { 59 60 private static final Log log = LogFactory.getLog(MasterConnector.class); 61 private BrokerService broker; 62 private URI remoteURI; 63 private URI localURI; 64 private Transport localBroker; 65 private Transport remoteBroker; 66 private TransportConnector connector; 67 private AtomicBoolean masterActive = new AtomicBoolean (false); 68 private AtomicBoolean started = new AtomicBoolean (false); 69 private final IdGenerator idGenerator = new IdGenerator(); 70 private String userName; 71 private String password; 72 private ConnectionInfo connectionInfo; 73 private SessionInfo sessionInfo; 74 private ProducerInfo producerInfo; 75 76 public MasterConnector() { 77 } 78 79 public MasterConnector(String remoteUri) throws URISyntaxException { 80 remoteURI = new URI (remoteUri); 81 } 82 83 public void setBrokerService(BrokerService broker) { 84 this.broker = broker; 85 if (localURI == null) { 86 localURI = broker.getVmConnectorURI(); 87 } 88 if (connector == null) { 89 List transportConnectors = broker.getTransportConnectors(); 90 if (!transportConnectors.isEmpty()) { 91 connector = (TransportConnector) transportConnectors.get(0); 92 } 93 } 94 } 95 96 public boolean isSlave() { 97 return masterActive.get(); 98 } 99 100 public void start() throws Exception { 101 if (!started.compareAndSet(false, true)) { 102 return; 103 } 104 if (remoteURI == null) { 105 throw new IllegalArgumentException ("You must specify a remoteURI"); 106 } 107 localBroker = TransportFactory.connect(localURI); 108 remoteBroker = TransportFactory.connect(remoteURI); 109 log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established."); 110 111 localBroker.setTransportListener(new DefaultTransportListener() { 112 public void onCommand(Object command) { 113 } 114 115 public void onException(IOException error) { 116 if (started.get()) { 117 serviceLocalException(error); 118 } 119 } 120 }); 121 122 remoteBroker.setTransportListener(new DefaultTransportListener() { 123 public void onCommand(Object o) { 124 Command command = (Command) o; 125 if (started.get()) { 126 serviceRemoteCommand(command); 127 } 128 } 129 130 public void onException(IOException error) { 131 if (started.get()) { 132 serviceRemoteException(error); 133 } 134 } 135 }); 136 137 masterActive.set(true); 138 Thread thead = new Thread () { 139 public void run() { 140 try { 141 localBroker.start(); 142 remoteBroker.start(); 143 startBridge(); 144 } 145 catch (Exception e) { 146 masterActive.set(false); 147 log.error("Failed to start network bridge: " + e, e); 148 } 149 } 150 }; 151 thead.start(); 152 153 } 154 155 protected void startBridge() throws Exception { 156 connectionInfo = new ConnectionInfo(); 157 connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 158 connectionInfo.setClientId(idGenerator.generateId()); 159 connectionInfo.setUserName(userName); 160 connectionInfo.setPassword(password); 161 localBroker.oneway(connectionInfo); 162 ConnectionInfo remoteInfo = new ConnectionInfo(); 163 connectionInfo.copy(remoteInfo); 164 remoteInfo.setBrokerMasterConnector(true); 165 remoteBroker.oneway(connectionInfo); 166 167 sessionInfo = new SessionInfo(connectionInfo, 1); 168 localBroker.oneway(sessionInfo); 169 remoteBroker.oneway(sessionInfo); 170 171 producerInfo = new ProducerInfo(sessionInfo, 1); 172 producerInfo.setResponseRequired(false); 173 remoteBroker.oneway(producerInfo); 174 175 BrokerInfo brokerInfo = null; 176 if (connector != null) { 177 178 brokerInfo = connector.getBrokerInfo(); 179 } 180 else { 181 brokerInfo = new BrokerInfo(); 182 } 183 brokerInfo.setBrokerName(broker.getBrokerName()); 184 brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos()); 185 brokerInfo.setSlaveBroker(true); 186 remoteBroker.oneway(brokerInfo); 187 188 log.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established."); 189 } 190 191 public void stop() throws Exception { 192 if (!started.compareAndSet(true, false)) { 193 return; 194 } 195 196 masterActive.set(false); 197 try { 198 remoteBroker.oneway(new ShutdownInfo()); 204 localBroker.oneway(new ShutdownInfo()); 205 } 206 catch (IOException e) { 207 log.debug("Caught exception stopping", e); 208 } 209 finally { 210 ServiceStopper ss = new ServiceStopper(); 211 ss.stop(localBroker); 212 ss.stop(remoteBroker); 213 ss.throwFirstException(); 214 } 215 } 216 217 protected void serviceRemoteException(IOException error) { 218 log.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error); 219 shutDown(); 220 } 221 222 protected void serviceRemoteCommand(Command command) { 223 try { 224 if (command.isMessageDispatch()) { 225 MessageDispatch md = (MessageDispatch) command; 226 command = md.getMessage(); 227 } 228 if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) { 229 log.warn("The Master has shutdown"); 230 shutDown(); 231 232 } 233 else { 234 boolean responseRequired = command.isResponseRequired(); 235 int commandId = command.getCommandId(); 236 localBroker.oneway(command); 237 if (responseRequired) { 238 Response response = new Response(); 239 response.setCorrelationId(commandId); 240 remoteBroker.oneway(response); 241 } 242 } 243 } 244 catch (IOException e) { 245 serviceRemoteException(e); 246 } 247 } 248 249 protected void serviceLocalException(Throwable error) { 250 log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error); 251 ServiceSupport.dispose(this); 252 } 253 254 257 public URI getLocalURI() { 258 return localURI; 259 } 260 261 265 public void setLocalURI(URI localURI) { 266 this.localURI = localURI; 267 } 268 269 272 public URI getRemoteURI() { 273 return remoteURI; 274 } 275 276 280 public void setRemoteURI(URI remoteURI) { 281 this.remoteURI = remoteURI; 282 } 283 284 287 public String getPassword() { 288 return password; 289 } 290 291 295 public void setPassword(String password) { 296 this.password = password; 297 } 298 299 302 public String getUserName() { 303 return userName; 304 } 305 306 310 public void setUserName(String userName) { 311 this.userName = userName; 312 } 313 314 private void shutDown() { 315 masterActive.set(false); 316 broker.masterFailed(); 317 ServiceSupport.dispose(this); 318 } 319 320 } 321 | Popular Tags |