KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > ft > MasterConnector


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.ft;
19
20 import java.io.IOException JavaDoc;
21 import java.net.URI JavaDoc;
22 import java.net.URISyntaxException JavaDoc;
23 import java.util.List JavaDoc;
24
25 import org.apache.activemq.Service;
26 import org.apache.activemq.broker.BrokerService;
27 import org.apache.activemq.broker.BrokerServiceAware;
28 import org.apache.activemq.broker.TransportConnector;
29 import org.apache.activemq.command.BrokerInfo;
30 import org.apache.activemq.command.Command;
31 import org.apache.activemq.command.CommandTypes;
32 import org.apache.activemq.command.ConnectionId;
33 import org.apache.activemq.command.ConnectionInfo;
34 import org.apache.activemq.command.MessageDispatch;
35 import org.apache.activemq.command.ProducerInfo;
36 import org.apache.activemq.command.Response;
37 import org.apache.activemq.command.SessionInfo;
38 import org.apache.activemq.command.ShutdownInfo;
39 import org.apache.activemq.transport.DefaultTransportListener;
40 import org.apache.activemq.transport.Transport;
41 import org.apache.activemq.transport.TransportFactory;
42 import org.apache.activemq.util.IdGenerator;
43 import org.apache.activemq.util.ServiceStopper;
44 import org.apache.activemq.util.ServiceSupport;
45 import org.apache.commons.logging.Log;
46 import org.apache.commons.logging.LogFactory;
47 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
48
49 /**
50  * Connects a Slave Broker to a Master when using <a
51  * HREF="http://activemq.apache.org/masterslave.html">Master Slave</a>
52  * for High Availability of messages.
53  *
54  * @org.apache.xbean.XBean
55  *
56  * @version $Revision$
57  */

58 public class MasterConnector implements Service, BrokerServiceAware {
59
60     private static final Log log = LogFactory.getLog(MasterConnector.class);
61     private BrokerService broker;
62     private URI JavaDoc remoteURI;
63     private URI JavaDoc localURI;
64     private Transport localBroker;
65     private Transport remoteBroker;
66     private TransportConnector connector;
67     private AtomicBoolean JavaDoc masterActive = new AtomicBoolean JavaDoc(false);
68     private AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
69     private final IdGenerator idGenerator = new IdGenerator();
70     private String JavaDoc userName;
71     private String JavaDoc password;
72     private ConnectionInfo connectionInfo;
73     private SessionInfo sessionInfo;
74     private ProducerInfo producerInfo;
75
76     public MasterConnector() {
77     }
78
79     public MasterConnector(String JavaDoc remoteUri) throws URISyntaxException JavaDoc {
80         remoteURI = new URI JavaDoc(remoteUri);
81     }
82
83     public void setBrokerService(BrokerService broker) {
84         this.broker = broker;
85         if (localURI == null) {
86             localURI = broker.getVmConnectorURI();
87         }
88         if (connector == null) {
89             List JavaDoc transportConnectors = broker.getTransportConnectors();
90             if (!transportConnectors.isEmpty()) {
91                 connector = (TransportConnector) transportConnectors.get(0);
92             }
93         }
94     }
95
96     public boolean isSlave() {
97         return masterActive.get();
98     }
99
100     public void start() throws Exception JavaDoc {
101         if (!started.compareAndSet(false, true)) {
102             return;
103         }
104         if (remoteURI == null) {
105             throw new IllegalArgumentException JavaDoc("You must specify a remoteURI");
106         }
107         localBroker = TransportFactory.connect(localURI);
108         remoteBroker = TransportFactory.connect(remoteURI);
109         log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
110
111         localBroker.setTransportListener(new DefaultTransportListener() {
112             public void onCommand(Object JavaDoc command) {
113             }
114
115             public void onException(IOException JavaDoc error) {
116                 if (started.get()) {
117                     serviceLocalException(error);
118                 }
119             }
120         });
121
122         remoteBroker.setTransportListener(new DefaultTransportListener() {
123             public void onCommand(Object JavaDoc o) {
124                 Command command = (Command) o;
125                 if (started.get()) {
126                     serviceRemoteCommand(command);
127                 }
128             }
129
130             public void onException(IOException JavaDoc error) {
131                 if (started.get()) {
132                     serviceRemoteException(error);
133                 }
134             }
135         });
136
137         masterActive.set(true);
138         Thread JavaDoc thead = new Thread JavaDoc() {
139             public void run() {
140                 try {
141                     localBroker.start();
142                     remoteBroker.start();
143                     startBridge();
144                 }
145                 catch (Exception JavaDoc e) {
146                     masterActive.set(false);
147                     log.error("Failed to start network bridge: " + e, e);
148                 }
149             }
150         };
151         thead.start();
152
153     }
154
155     protected void startBridge() throws Exception JavaDoc {
156         connectionInfo = new ConnectionInfo();
157         connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
158         connectionInfo.setClientId(idGenerator.generateId());
159         connectionInfo.setUserName(userName);
160         connectionInfo.setPassword(password);
161         localBroker.oneway(connectionInfo);
162         ConnectionInfo remoteInfo = new ConnectionInfo();
163         connectionInfo.copy(remoteInfo);
164         remoteInfo.setBrokerMasterConnector(true);
165         remoteBroker.oneway(connectionInfo);
166
167         sessionInfo = new SessionInfo(connectionInfo, 1);
168         localBroker.oneway(sessionInfo);
169         remoteBroker.oneway(sessionInfo);
170
171         producerInfo = new ProducerInfo(sessionInfo, 1);
172         producerInfo.setResponseRequired(false);
173         remoteBroker.oneway(producerInfo);
174
175         BrokerInfo brokerInfo = null;
176         if (connector != null) {
177
178             brokerInfo = connector.getBrokerInfo();
179         }
180         else {
181             brokerInfo = new BrokerInfo();
182         }
183         brokerInfo.setBrokerName(broker.getBrokerName());
184         brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
185         brokerInfo.setSlaveBroker(true);
186         remoteBroker.oneway(brokerInfo);
187
188         log.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
189     }
190
191     public void stop() throws Exception JavaDoc {
192         if (!started.compareAndSet(true, false)) {
193             return;
194         }
195
196         masterActive.set(false);
197         try {
198             // if (connectionInfo!=null){
199
// localBroker.request(connectionInfo.createRemoveCommand());
200
// }
201
// localBroker.setTransportListener(null);
202
// remoteBroker.setTransportListener(null);
203
remoteBroker.oneway(new ShutdownInfo());
204             localBroker.oneway(new ShutdownInfo());
205         }
206         catch (IOException JavaDoc e) {
207             log.debug("Caught exception stopping", e);
208         }
209         finally {
210             ServiceStopper ss = new ServiceStopper();
211             ss.stop(localBroker);
212             ss.stop(remoteBroker);
213             ss.throwFirstException();
214         }
215     }
216
217     protected void serviceRemoteException(IOException JavaDoc error) {
218         log.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
219         shutDown();
220     }
221
222     protected void serviceRemoteCommand(Command command) {
223         try {
224             if (command.isMessageDispatch()) {
225                 MessageDispatch md = (MessageDispatch) command;
226                 command = md.getMessage();
227             }
228             if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) {
229                 log.warn("The Master has shutdown");
230                 shutDown();
231
232             }
233             else {
234                 boolean responseRequired = command.isResponseRequired();
235                 int commandId = command.getCommandId();
236                 localBroker.oneway(command);
237                 if (responseRequired) {
238                     Response response = new Response();
239                     response.setCorrelationId(commandId);
240                     remoteBroker.oneway(response);
241                 }
242             }
243         }
244         catch (IOException JavaDoc e) {
245             serviceRemoteException(e);
246         }
247     }
248
249     protected void serviceLocalException(Throwable JavaDoc error) {
250         log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
251         ServiceSupport.dispose(this);
252     }
253
254     /**
255      * @return Returns the localURI.
256      */

257     public URI JavaDoc getLocalURI() {
258         return localURI;
259     }
260
261     /**
262      * @param localURI
263      * The localURI to set.
264      */

265     public void setLocalURI(URI JavaDoc localURI) {
266         this.localURI = localURI;
267     }
268
269     /**
270      * @return Returns the remoteURI.
271      */

272     public URI JavaDoc getRemoteURI() {
273         return remoteURI;
274     }
275
276     /**
277      * @param remoteURI
278      * The remoteURI to set.
279      */

280     public void setRemoteURI(URI JavaDoc remoteURI) {
281         this.remoteURI = remoteURI;
282     }
283
284     /**
285      * @return Returns the password.
286      */

287     public String JavaDoc getPassword() {
288         return password;
289     }
290
291     /**
292      * @param password
293      * The password to set.
294      */

295     public void setPassword(String JavaDoc password) {
296         this.password = password;
297     }
298
299     /**
300      * @return Returns the userName.
301      */

302     public String JavaDoc getUserName() {
303         return userName;
304     }
305
306     /**
307      * @param userName
308      * The userName to set.
309      */

310     public void setUserName(String JavaDoc userName) {
311         this.userName = userName;
312     }
313
314     private void shutDown() {
315         masterActive.set(false);
316         broker.masterFailed();
317         ServiceSupport.dispose(this);
318     }
319
320 }
321
Popular Tags