KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > failover > FailoverTransport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.failover;
19
20 import java.io.IOException JavaDoc;
21 import java.io.InterruptedIOException JavaDoc;
22 import java.net.URI JavaDoc;
23 import java.util.ArrayList JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.Random JavaDoc;
26
27 import org.apache.activemq.command.BrokerInfo;
28 import org.apache.activemq.command.Command;
29 import org.apache.activemq.command.Response;
30 import org.apache.activemq.state.ConnectionStateTracker;
31 import org.apache.activemq.state.Tracked;
32 import org.apache.activemq.thread.DefaultThreadPools;
33 import org.apache.activemq.thread.Task;
34 import org.apache.activemq.thread.TaskRunner;
35 import org.apache.activemq.transport.CompositeTransport;
36 import org.apache.activemq.transport.FutureResponse;
37 import org.apache.activemq.transport.ResponseCallback;
38 import org.apache.activemq.transport.Transport;
39 import org.apache.activemq.transport.TransportFactory;
40 import org.apache.activemq.transport.TransportListener;
41 import org.apache.activemq.util.IOExceptionSupport;
42 import org.apache.activemq.util.ServiceSupport;
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45
46 import java.util.concurrent.ConcurrentHashMap JavaDoc;
47 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
48
49 /**
50  * A Transport that is made reliable by being able to fail over to another
51  * transport when a transport failure is detected.
52  *
53  * @version $Revision$
54  */

55 public class FailoverTransport implements CompositeTransport {
56
57     private static final Log log = LogFactory.getLog(FailoverTransport.class);
58
59     private TransportListener transportListener;
60     private boolean disposed;
61     private final CopyOnWriteArrayList JavaDoc uris = new CopyOnWriteArrayList JavaDoc();
62
63     private final Object JavaDoc reconnectMutex = new Object JavaDoc();
64     private final Object JavaDoc sleepMutex = new Object JavaDoc();
65     private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
66     private final ConcurrentHashMap JavaDoc requestMap = new ConcurrentHashMap JavaDoc();
67
68     private URI JavaDoc connectedTransportURI;
69     private Transport connectedTransport;
70     private final TaskRunner reconnectTask;
71     private boolean started;
72
73     private long initialReconnectDelay = 10;
74     private long maxReconnectDelay = 1000 * 30;
75     private long backOffMultiplier = 2;
76     private boolean useExponentialBackOff = true;
77     private boolean randomize = true;
78     private boolean initialized;
79     private int maxReconnectAttempts;
80     private int connectFailures;
81     private long reconnectDelay = initialReconnectDelay;
82     private Exception JavaDoc connectionFailure;
83
84     private final TransportListener myTransportListener = createTransportListener();
85     
86     TransportListener createTransportListener() {
87         return new TransportListener() {
88             public void onCommand(Object JavaDoc o) {
89                 Command command = (Command) o;
90                 if (command == null) {
91                     return;
92                 }
93                 if (command.isResponse()) {
94                     Object JavaDoc object = requestMap.remove(new Integer JavaDoc(((Response) command).getCorrelationId()));
95                     if( object!=null && object.getClass() == Tracked.class ) {
96                        ((Tracked)object).onResponses();
97                     }
98                 }
99                 if (!initialized){
100                     if (command.isBrokerInfo()){
101                         BrokerInfo info = (BrokerInfo)command;
102                         BrokerInfo[] peers = info.getPeerBrokerInfos();
103                         if (peers!= null){
104                             for (int i =0; i < peers.length;i++){
105                                 String JavaDoc brokerString = peers[i].getBrokerURL();
106                                 add(brokerString);
107                             }
108                         }
109                     initialized = true;
110                     }
111                     
112                 }
113                 if (transportListener != null) {
114                     transportListener.onCommand(command);
115                 }
116             }
117     
118             public void onException(IOException JavaDoc error) {
119                 try {
120                     handleTransportFailure(error);
121                 }
122                 catch (InterruptedException JavaDoc e) {
123                     Thread.currentThread().interrupt();
124                     transportListener.onException(new InterruptedIOException JavaDoc());
125                 }
126             }
127             
128             public void transportInterupted(){
129                 if (transportListener != null){
130                     transportListener.transportInterupted();
131                 }
132             }
133     
134             public void transportResumed(){
135                 if(transportListener != null){
136                     transportListener.transportResumed();
137                 }
138             }
139         };
140     }
141     
142     public FailoverTransport() throws InterruptedIOException JavaDoc {
143
144         stateTracker.setTrackTransactions(true);
145         
146         // Setup a task that is used to reconnect the a connection async.
147
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
148
149             public boolean iterate() {
150
151                 Exception JavaDoc failure=null;
152                 synchronized (reconnectMutex) {
153
154                     if (disposed || connectionFailure!=null) {
155                         reconnectMutex.notifyAll();
156                     }
157
158                     if (connectedTransport != null || disposed || connectionFailure!=null) {
159                         return false;
160                     } else {
161                         ArrayList JavaDoc connectList = getConnectList();
162                         if( connectList.isEmpty() ) {
163                             failure = new IOException JavaDoc("No uris available to connect to.");
164                         } else {
165                             if (!useExponentialBackOff){
166                                 reconnectDelay = initialReconnectDelay;
167                             }
168                             Iterator JavaDoc iter = connectList.iterator();
169                             for (int i = 0; iter.hasNext() && connectedTransport == null && !disposed; i++) {
170                                 URI JavaDoc uri = (URI JavaDoc) iter.next();
171                                 try {
172                                     log.debug("Attempting connect to: " + uri);
173                                     Transport t = TransportFactory.compositeConnect(uri);
174                                     t.setTransportListener(myTransportListener);
175                                     t.start();
176                                     
177                                     if (started) {
178                                         restoreTransport(t);
179                                     }
180                                     
181                                     log.debug("Connection established");
182                                     reconnectDelay = initialReconnectDelay;
183                                     connectedTransportURI = uri;
184                                     connectedTransport = t;
185                                     reconnectMutex.notifyAll();
186                                     connectFailures = 0;
187                                     if (transportListener != null){
188                                         transportListener.transportResumed();
189                                     }
190                                     log.info("Successfully reconnected to " + uri);
191                                     return false;
192                                 }
193                                 catch (Exception JavaDoc e) {
194                                     failure = e;
195                                     log.debug("Connect fail to: " + uri + ", reason: " + e);
196                                 }
197                             }
198                         }
199                     }
200                     
201                     if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
202                         log.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
203                         connectionFailure = failure;
204                         reconnectMutex.notifyAll();
205                         return false;
206                     }
207                 }
208
209                 if(!disposed){
210                     
211                         log.debug("Waiting "+reconnectDelay+" ms before attempting connection. ");
212                         synchronized(sleepMutex){
213                             try{
214                                 sleepMutex.wait(reconnectDelay);
215                             }catch(InterruptedException JavaDoc e){
216                                Thread.currentThread().interrupt();
217                             }
218                         }
219                         
220                     
221                     if(useExponentialBackOff){
222                         // Exponential increment of reconnect delay.
223
reconnectDelay*=backOffMultiplier;
224                         if(reconnectDelay>maxReconnectDelay)
225                             reconnectDelay=maxReconnectDelay;
226                     }
227                 }
228                 return !disposed;
229             }
230
231         }, "ActiveMQ Failover Worker: "+System.identityHashCode(this));
232     }
233
234     private void handleTransportFailure(IOException JavaDoc e) throws InterruptedException JavaDoc {
235         if (transportListener != null){
236             transportListener.transportInterupted();
237         }
238         synchronized (reconnectMutex) {
239             log.warn("Transport failed, attempting to automatically reconnect due to: " + e, e);
240             if (connectedTransport != null) {
241                 initialized = false;
242                 ServiceSupport.dispose(connectedTransport);
243                 connectedTransport = null;
244                 connectedTransportURI = null;
245             }
246             reconnectTask.wakeup();
247         }
248     }
249
250     public void start() throws Exception JavaDoc {
251         synchronized (reconnectMutex) {
252             log.debug("Started.");
253             if (started)
254                 return;
255             started = true;
256             if (connectedTransport != null) {
257                 stateTracker.restore(connectedTransport);
258             }
259         }
260     }
261
262     public void stop() throws Exception JavaDoc {
263         synchronized (reconnectMutex) {
264             log.debug("Stopped.");
265             if (!started)
266                 return;
267             started = false;
268             disposed = true;
269
270             if (connectedTransport != null) {
271                 connectedTransport.stop();
272                 connectedTransport=null;
273             }
274             reconnectMutex.notifyAll();
275         }
276         synchronized(sleepMutex){
277             sleepMutex.notifyAll();
278         }
279         reconnectTask.shutdown();
280     }
281
282     public long getInitialReconnectDelay() {
283         return initialReconnectDelay;
284     }
285
286     public void setInitialReconnectDelay(long initialReconnectDelay) {
287         this.initialReconnectDelay = initialReconnectDelay;
288     }
289
290     public long getMaxReconnectDelay() {
291         return maxReconnectDelay;
292     }
293
294     public void setMaxReconnectDelay(long maxReconnectDelay) {
295         this.maxReconnectDelay = maxReconnectDelay;
296     }
297
298     public long getReconnectDelay() {
299         return reconnectDelay;
300     }
301
302     public void setReconnectDelay(long reconnectDelay) {
303         this.reconnectDelay = reconnectDelay;
304     }
305
306     public long getReconnectDelayExponent() {
307         return backOffMultiplier;
308     }
309
310     public void setReconnectDelayExponent(long reconnectDelayExponent) {
311         this.backOffMultiplier = reconnectDelayExponent;
312     }
313
314     public Transport getConnectedTransport() {
315         return connectedTransport;
316     }
317
318     public URI JavaDoc getConnectedTransportURI() {
319         return connectedTransportURI;
320     }
321
322     public int getMaxReconnectAttempts() {
323         return maxReconnectAttempts;
324     }
325
326     public void setMaxReconnectAttempts(int maxReconnectAttempts) {
327         this.maxReconnectAttempts = maxReconnectAttempts;
328     }
329
330     /**
331      * @return Returns the randomize.
332      */

333     public boolean isRandomize(){
334         return randomize;
335     }
336
337     /**
338      * @param randomize The randomize to set.
339      */

340     public void setRandomize(boolean randomize){
341         this.randomize=randomize;
342     }
343
344     public void oneway(Object JavaDoc o) throws IOException JavaDoc {
345         Command command = (Command) o;
346         Exception JavaDoc error = null;
347         try {
348
349             synchronized (reconnectMutex) {
350                 // Keep trying until the message is sent.
351
for (int i = 0;!disposed; i++) {
352                     try {
353
354                         // Wait for transport to be connected.
355
while (connectedTransport == null && !disposed && connectionFailure==null ) {
356                             log.trace("Waiting for transport to reconnect.");
357                             try {
358                                 reconnectMutex.wait(1000);
359                             }
360                             catch (InterruptedException JavaDoc e) {
361                                 Thread.currentThread().interrupt();
362                                 log.debug("Interupted: " + e, e);
363                             }
364                         }
365
366                         if( connectedTransport==null ) {
367                             // Previous loop may have exited due to use being
368
// disposed.
369
if (disposed) {
370                                 error = new IOException JavaDoc("Transport disposed.");
371                             } else if (connectionFailure!=null) {
372                                 error = connectionFailure;
373                             } else {
374                                 error = new IOException JavaDoc("Unexpected failure.");
375                             }
376                             break;
377                         }
378
379                         // If it was a request and it was not being tracked by
380
// the state tracker,
381
// then hold it in the requestMap so that we can replay
382
// it later.
383
Tracked tracked = stateTracker.track(command);
384                         if( tracked!=null && tracked.isWaitingForResponse() ) {
385                             requestMap.put(new Integer JavaDoc(command.getCommandId()), tracked);
386                         } else if ( tracked==null && command.isResponseRequired()) {
387                             requestMap.put(new Integer JavaDoc(command.getCommandId()), command);
388                         }
389                                                 
390                         // Send the message.
391
try {
392                             connectedTransport.oneway(command);
393                         } catch (IOException JavaDoc e) {
394                             
395                             // If the command was not tracked.. we will retry in this method
396
if( tracked==null ) {
397                                 
398                                 // since we will retry in this method.. take it out of the request
399
// map so that it is not sent 2 times on recovery
400
if( command.isResponseRequired() ) {
401                                     requestMap.remove(new Integer JavaDoc(command.getCommandId()));
402                                 }
403                                 
404                                 // Rethrow the exception so it will handled by the outer catch
405
throw e;
406                             }
407                             
408                         }
409                         
410                         return;
411
412                     }
413                     catch (IOException JavaDoc e) {
414                         log.debug("Send oneway attempt: " + i + " failed.");
415                         handleTransportFailure(e);
416                     }
417                 }
418             }
419         }
420         catch (InterruptedException JavaDoc e) {
421             // Some one may be trying to stop our thread.
422
Thread.currentThread().interrupt();
423             throw new InterruptedIOException JavaDoc();
424         }
425         if(!disposed){
426             if(error!=null){
427                 if(error instanceof IOException JavaDoc)
428                     throw (IOException JavaDoc) error;
429                 throw IOExceptionSupport.create(error);
430             }
431         }
432     }
433
434     public FutureResponse asyncRequest(Object JavaDoc command, ResponseCallback responseCallback) throws IOException JavaDoc {
435         throw new AssertionError JavaDoc("Unsupported Method");
436     }
437
438     public Object JavaDoc request(Object JavaDoc command) throws IOException JavaDoc {
439         throw new AssertionError JavaDoc("Unsupported Method");
440     }
441     
442     public Object JavaDoc request(Object JavaDoc command,int timeout) throws IOException JavaDoc {
443         throw new AssertionError JavaDoc("Unsupported Method");
444     }
445
446     public void add(URI JavaDoc u[]) {
447         for (int i = 0; i < u.length; i++) {
448             if( !uris.contains(u[i]) )
449                 uris.add(u[i]);
450         }
451         reconnect();
452     }
453
454     public void remove(URI JavaDoc u[]) {
455         for (int i = 0; i < u.length; i++) {
456             uris.remove(u[i]);
457         }
458         reconnect();
459     }
460     
461     public void add(String JavaDoc u){
462         try {
463         URI JavaDoc uri = new URI JavaDoc(u);
464         if (!uris.contains(uri))
465             uris.add(uri);
466
467         reconnect();
468         }catch(Exception JavaDoc e){
469             log.error("Failed to parse URI: " + u);
470         }
471     }
472
473
474     public void reconnect() {
475         log.debug("Waking up reconnect task");
476         try {
477             reconnectTask.wakeup();
478         } catch (InterruptedException JavaDoc e) {
479             Thread.currentThread().interrupt();
480         }
481     }
482
483     private ArrayList JavaDoc getConnectList(){
484         ArrayList JavaDoc l=new ArrayList JavaDoc(uris);
485         if (randomize){
486             // Randomly, reorder the list by random swapping
487
Random JavaDoc r=new Random JavaDoc();
488             r.setSeed(System.currentTimeMillis());
489             for (int i=0;i<l.size();i++){
490                 int p=r.nextInt(l.size());
491                 Object JavaDoc t=l.get(p);
492                 l.set(p,l.get(i));
493                 l.set(i,t);
494             }
495         }
496         return l;
497     }
498
499     public TransportListener getTransportListener() {
500         return transportListener;
501     }
502
503     public void setTransportListener(TransportListener commandListener) {
504         this.transportListener = commandListener;
505     }
506
507     public Object JavaDoc narrow(Class JavaDoc target) {
508
509         if (target.isAssignableFrom(getClass())) {
510             return this;
511         }
512         synchronized (reconnectMutex) {
513             if (connectedTransport != null) {
514                 return connectedTransport.narrow(target);
515             }
516         }
517         return null;
518
519     }
520
521     protected void restoreTransport(Transport t) throws Exception JavaDoc, IOException JavaDoc {
522         t.start();
523         stateTracker.restore(t);
524         for (Iterator JavaDoc iter2 = requestMap.values().iterator(); iter2.hasNext();) {
525             Command command = (Command) iter2.next();
526             t.oneway(command);
527         }
528     }
529
530     public boolean isUseExponentialBackOff() {
531         return useExponentialBackOff;
532     }
533
534     public void setUseExponentialBackOff(boolean useExponentialBackOff) {
535         this.useExponentialBackOff = useExponentialBackOff;
536     }
537
538     public String JavaDoc toString() {
539         return connectedTransportURI==null ? "unconnected" : connectedTransportURI.toString();
540     }
541
542     public String JavaDoc getRemoteAddress() {
543         if(connectedTransport != null){
544             return connectedTransport.getRemoteAddress();
545         }
546         return null;
547     }
548
549 }
550
Popular Tags