KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > fanout > FanoutTransport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.fanout;
19
20 import java.io.IOException JavaDoc;
21 import java.io.InterruptedIOException JavaDoc;
22 import java.net.URI JavaDoc;
23 import java.util.ArrayList JavaDoc;
24 import java.util.Iterator JavaDoc;
25
26 import org.apache.activemq.command.Command;
27 import org.apache.activemq.command.ConsumerInfo;
28 import org.apache.activemq.command.Message;
29 import org.apache.activemq.command.Response;
30 import org.apache.activemq.state.ConnectionStateTracker;
31 import org.apache.activemq.thread.DefaultThreadPools;
32 import org.apache.activemq.thread.Task;
33 import org.apache.activemq.thread.TaskRunner;
34 import org.apache.activemq.transport.CompositeTransport;
35 import org.apache.activemq.transport.DefaultTransportListener;
36 import org.apache.activemq.transport.FutureResponse;
37 import org.apache.activemq.transport.ResponseCallback;
38 import org.apache.activemq.transport.Transport;
39 import org.apache.activemq.transport.TransportFactory;
40 import org.apache.activemq.transport.TransportListener;
41 import org.apache.activemq.util.IOExceptionSupport;
42 import org.apache.activemq.util.ServiceStopper;
43 import org.apache.activemq.util.ServiceSupport;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46
47 import java.util.concurrent.ConcurrentHashMap JavaDoc;
48 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
49
50 /**
51  * A Transport that fans out a connection to multiple brokers.
52  *
53  * @version $Revision$
54  */

55 public class FanoutTransport implements CompositeTransport {
56
57     private static final Log log = LogFactory.getLog(FanoutTransport.class);
58
59     private TransportListener transportListener;
60     private boolean disposed;
61
62     private final Object JavaDoc reconnectMutex = new Object JavaDoc();
63     private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
64     private final ConcurrentHashMap JavaDoc requestMap = new ConcurrentHashMap JavaDoc();
65
66     private final TaskRunner reconnectTask;
67     private boolean started;
68     
69     private ArrayList JavaDoc transports = new ArrayList JavaDoc();
70     private int connectedCount=0;
71     
72     private int minAckCount = 2;
73     
74     private long initialReconnectDelay = 10;
75     private long maxReconnectDelay = 1000 * 30;
76     private long backOffMultiplier = 2;
77     private boolean useExponentialBackOff = true;
78     private int maxReconnectAttempts;
79     private Exception JavaDoc connectionFailure;
80     private FanoutTransportHandler primary;
81     
82     static class RequestCounter {
83         
84         final Command command;
85         final AtomicInteger JavaDoc ackCount;
86         
87         RequestCounter(Command command, int count) {
88             this.command = command;
89             this.ackCount = new AtomicInteger JavaDoc(count);
90         }
91         
92         public String JavaDoc toString() {
93             return command.getCommandId()+"="+ackCount.get();
94         }
95     }
96
97     class FanoutTransportHandler extends DefaultTransportListener {
98         
99         private final URI JavaDoc uri;
100         private Transport transport;
101
102         private int connectFailures;
103         private long reconnectDelay = initialReconnectDelay;
104         private long reconnectDate;
105
106         public FanoutTransportHandler(URI JavaDoc uri) {
107             this.uri=uri;
108         }
109
110         public void onCommand(Object JavaDoc o) {
111             Command command = (Command) o;
112             if (command.isResponse()) {
113                 Integer JavaDoc id = new Integer JavaDoc(((Response) command).getCorrelationId());
114                 RequestCounter rc = (RequestCounter) requestMap.get(id);
115                 if( rc != null ) {
116                     if( rc.ackCount.decrementAndGet() <= 0 ) {
117                         requestMap.remove(id);
118                         transportListener.onCommand(command);
119                     }
120                 } else {
121                     transportListener.onCommand(command);
122                 }
123             } else {
124                 transportListener.onCommand(command);
125             }
126         }
127
128         public void onException(IOException JavaDoc error) {
129             try {
130                 synchronized (reconnectMutex) {
131                     if( transport == null )
132                         return;
133                     
134                     log.debug("Transport failed, starting up reconnect task", error);
135                     
136                     ServiceSupport.dispose(transport);
137                     transport=null;
138                     connectedCount--;
139                     if( primary == this) {
140                         primary = null;
141                     }
142                     reconnectTask.wakeup();
143                 }
144             }
145             catch (InterruptedException JavaDoc e) {
146                 Thread.currentThread().interrupt();
147                 transportListener.onException(new InterruptedIOException JavaDoc());
148             }
149         }
150     }
151
152     public FanoutTransport() throws InterruptedIOException JavaDoc {
153         // Setup a task that is used to reconnect the a connection async.
154
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
155             public boolean iterate() {
156                 return doConnect();
157             }
158         }, "ActiveMQ Fanout Worker: "+System.identityHashCode(this));
159     }
160     
161     /**
162      * @return
163      */

164     private boolean doConnect() {
165         long closestReconnectDate=0;
166         synchronized (reconnectMutex) {
167
168             if (disposed || connectionFailure!=null) {
169                 reconnectMutex.notifyAll();
170             }
171
172             if (transports.size() == connectedCount || disposed || connectionFailure!=null) {
173                 return false;
174             } else {
175                 
176                 if( transports.isEmpty() ) {
177 // connectionFailure = new IOException("No uris available to connect to.");
178
} else {
179                     
180                     
181                     // Try to connect them up.
182
Iterator JavaDoc iter = transports.iterator();
183                     for (int i = 0; iter.hasNext() && !disposed; i++) {
184                         
185                         long now = System.currentTimeMillis();
186                         
187                         FanoutTransportHandler fanoutHandler = (FanoutTransportHandler) iter.next();
188                         if( fanoutHandler.transport!=null ) {
189                             continue;
190                         }
191                         
192                         // Are we waiting a little to try to reconnect this one?
193
if( fanoutHandler.reconnectDate!=0 && fanoutHandler.reconnectDate>now ) {
194                             if( closestReconnectDate==0 || fanoutHandler.reconnectDate < closestReconnectDate ) {
195                                 closestReconnectDate = fanoutHandler.reconnectDate;
196                             }
197                             continue;
198                         }
199                         
200                         URI JavaDoc uri = fanoutHandler.uri;
201                         try {
202                             log.debug("Stopped: "+this);
203                             log.debug("Attempting connect to: " + uri);
204                             Transport t = TransportFactory.compositeConnect(uri);
205                             log.debug("Connection established");
206                             fanoutHandler.transport = t;
207                             fanoutHandler.reconnectDelay = 10;
208                             fanoutHandler.connectFailures = 0;
209                             if( primary == null ) {
210                                 primary = fanoutHandler;
211                             }
212                             t.setTransportListener(fanoutHandler);
213                             connectedCount++;
214                             if (started) {
215                                 restoreTransport(fanoutHandler);
216                             }
217                         }
218                         catch (Exception JavaDoc e) {
219                             log.debug("Connect fail to: " + uri + ", reason: " + e);
220                             
221                             if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
222                                 log.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
223                                 connectionFailure = e;
224                                 reconnectMutex.notifyAll();
225                                 return false;
226                             } else {
227                                 
228                                 if (useExponentialBackOff) {
229                                     // Exponential increment of reconnect delay.
230
fanoutHandler.reconnectDelay *= backOffMultiplier;
231                                     if (fanoutHandler.reconnectDelay > maxReconnectDelay)
232                                         fanoutHandler.reconnectDelay = maxReconnectDelay;
233                                 }
234                                 
235                                 fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
236                                 
237                                 if( closestReconnectDate==0 || fanoutHandler.reconnectDate < closestReconnectDate ) {
238                                     closestReconnectDate = fanoutHandler.reconnectDate;
239                                 }
240                             }
241                         }
242                     }
243                     if (transports.size() == connectedCount || disposed ) {
244                         reconnectMutex.notifyAll();
245                         return false;
246                     }
247                     
248                 }
249             }
250             
251         }
252
253         try {
254             long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
255             if(reconnectDelay>0) {
256                 log.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
257                 Thread.sleep(reconnectDelay);
258             }
259         }
260         catch (InterruptedException JavaDoc e1) {
261             Thread.currentThread().interrupt();
262         }
263         return true;
264     }
265
266     public void start() throws Exception JavaDoc {
267         synchronized (reconnectMutex) {
268             log.debug("Started.");
269             if (started)
270                 return;
271             started = true;
272             for (Iterator JavaDoc iter = transports.iterator(); iter.hasNext();) {
273                 FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
274                 if( th.transport != null ) {
275                     restoreTransport(th);
276                 }
277             }
278         }
279     }
280
281     public void stop() throws Exception JavaDoc {
282         synchronized (reconnectMutex) {
283             ServiceStopper ss = new ServiceStopper();
284             
285             if (!started)
286                 return;
287             started = false;
288             disposed = true;
289
290             for (Iterator JavaDoc iter = transports.iterator(); iter.hasNext();) {
291                 FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
292                 if( th.transport != null ) {
293                     ss.stop(th.transport);
294                 }
295             }
296             
297             log.debug("Stopped: "+this);
298             ss.throwFirstException();
299         }
300         reconnectTask.shutdown();
301     }
302
303     public long getInitialReconnectDelay() {
304         return initialReconnectDelay;
305     }
306
307     public void setInitialReconnectDelay(long initialReconnectDelay) {
308         this.initialReconnectDelay = initialReconnectDelay;
309     }
310
311     public long getMaxReconnectDelay() {
312         return maxReconnectDelay;
313     }
314
315     public void setMaxReconnectDelay(long maxReconnectDelay) {
316         this.maxReconnectDelay = maxReconnectDelay;
317     }
318
319     public long getReconnectDelayExponent() {
320         return backOffMultiplier;
321     }
322
323     public void setReconnectDelayExponent(long reconnectDelayExponent) {
324         this.backOffMultiplier = reconnectDelayExponent;
325     }
326
327     public int getMaxReconnectAttempts() {
328         return maxReconnectAttempts;
329     }
330
331     public void setMaxReconnectAttempts(int maxReconnectAttempts) {
332         this.maxReconnectAttempts = maxReconnectAttempts;
333     }
334
335     public void oneway(Object JavaDoc o) throws IOException JavaDoc {
336         final Command command = (Command) o;
337         try {
338             synchronized (reconnectMutex) {
339                 
340                 // If it was a request and it was not being tracked by
341
// the state tracker,
342
// then hold it in the requestMap so that we can replay
343
// it later.
344
boolean fanout = isFanoutCommand(command);
345                 if (stateTracker.track(command)==null && command.isResponseRequired() ) {
346                     int size = fanout ? minAckCount : 1;
347                     requestMap.put(new Integer JavaDoc(command.getCommandId()), new RequestCounter(command, size));
348                 }
349
350                 // Wait for transport to be connected.
351
while (connectedCount != minAckCount && !disposed && connectionFailure==null ) {
352                     log.debug("Waiting for at least "+minAckCount+" transports to be connected.");
353                     reconnectMutex.wait(1000);
354                 }
355
356                 // Still not fully connected.
357
if( connectedCount != minAckCount ) {
358
359                     Exception JavaDoc error;
360                     
361                     // Throw the right kind of error..
362
if (disposed) {
363                         error = new IOException JavaDoc("Transport disposed.");
364                     } else if (connectionFailure!=null) {
365                         error = connectionFailure;
366                     } else {
367                         error = new IOException JavaDoc("Unexpected failure.");
368                     }
369                     
370                     if( error instanceof IOException JavaDoc )
371                         throw (IOException JavaDoc)error;
372                     throw IOExceptionSupport.create(error);
373                 }
374                 
375                 // Send the message.
376
if( fanout ) {
377                     for (Iterator JavaDoc iter = transports.iterator(); iter.hasNext();) {
378                         FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
379                         if( th.transport!=null ) {
380                             try {
381                                 th.transport.oneway(command);
382                             } catch (IOException JavaDoc e) {
383                                 log.debug("Send attempt: failed.");
384                                 th.onException(e);
385                             }
386                         }
387                     }
388                 } else {
389                     try {
390                         primary.transport.oneway(command);
391                     } catch (IOException JavaDoc e) {
392                         log.debug("Send attempt: failed.");
393                         primary.onException(e);
394                     }
395                 }
396                 
397             }
398         } catch (InterruptedException JavaDoc e) {
399             // Some one may be trying to stop our thread.
400
Thread.currentThread().interrupt();
401             throw new InterruptedIOException JavaDoc();
402         }
403     }
404
405     /**
406      * @param command
407      * @return
408      */

409     private boolean isFanoutCommand(Command command) {
410         if( command.isMessage() ) {
411             return ((Message)command).getDestination().isTopic();
412         }
413         if( command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ) {
414             return false;
415         }
416         return true;
417     }
418
419     public FutureResponse asyncRequest(Object JavaDoc command, ResponseCallback responseCallback) throws IOException JavaDoc {
420         throw new AssertionError JavaDoc("Unsupported Method");
421     }
422
423     public Object JavaDoc request(Object JavaDoc command) throws IOException JavaDoc {
424         throw new AssertionError JavaDoc("Unsupported Method");
425     }
426     
427     public Object JavaDoc request(Object JavaDoc command,int timeout) throws IOException JavaDoc {
428         throw new AssertionError JavaDoc("Unsupported Method");
429     }
430
431     public void reconnect() {
432         log.debug("Waking up reconnect task");
433         try {
434             reconnectTask.wakeup();
435         } catch (InterruptedException JavaDoc e) {
436             Thread.currentThread().interrupt();
437         }
438     }
439
440     public TransportListener getTransportListener() {
441         return transportListener;
442     }
443
444     public void setTransportListener(TransportListener commandListener) {
445         this.transportListener = commandListener;
446     }
447
448     public Object JavaDoc narrow(Class JavaDoc target) {
449
450         if (target.isAssignableFrom(getClass())) {
451             return this;
452         }
453         
454         synchronized (reconnectMutex) {
455             for (Iterator JavaDoc iter = transports.iterator(); iter.hasNext();) {
456                 FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
457                 if( th.transport!=null ) {
458                     Object JavaDoc rc = th.transport.narrow(target);
459                     if( rc !=null )
460                         return rc;
461                 }
462             }
463         }
464         
465         return null;
466
467     }
468
469     protected void restoreTransport(FanoutTransportHandler th) throws Exception JavaDoc, IOException JavaDoc {
470         th.transport.start();
471         stateTracker.setRestoreConsumers(th.transport==primary);
472         stateTracker.restore(th.transport);
473         for (Iterator JavaDoc iter2 = requestMap.values().iterator(); iter2.hasNext();) {
474             RequestCounter rc = (RequestCounter) iter2.next();
475             th.transport.oneway(rc.command);
476         }
477     }
478
479     public void add(URI JavaDoc uris[]) {
480         
481         synchronized (reconnectMutex) {
482             for (int i = 0; i < uris.length; i++) {
483                 URI JavaDoc uri = uris[i];
484                 
485                 boolean match=false;
486                 for (Iterator JavaDoc iter = transports.iterator(); iter.hasNext();) {
487                     FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
488                     if( th.uri.equals(uri)) {
489                         match=true;
490                         break;
491                     }
492                 }
493                 if( !match ) {
494                     FanoutTransportHandler th = new FanoutTransportHandler(uri);
495                     transports.add(th);
496                     reconnect();
497                 }
498             }
499         }
500         
501     }
502     
503     public void remove(URI JavaDoc uris[]) {
504         
505         synchronized (reconnectMutex) {
506             for (int i = 0; i < uris.length; i++) {
507                 URI JavaDoc uri = uris[i];
508                 
509                 boolean match=false;
510                 for (Iterator JavaDoc iter = transports.iterator(); iter.hasNext();) {
511                     FanoutTransportHandler th = (FanoutTransportHandler) iter.next();
512                     if( th.uri.equals(uri)) {
513                         if( th.transport!=null ) {
514                             ServiceSupport.dispose(th.transport);
515                             connectedCount--;
516                         }
517                         iter.remove();
518                         break;
519                     }
520                 }
521             }
522         }
523         
524     }
525
526     public String JavaDoc getRemoteAddress() {
527         if(primary != null){
528            if(primary.transport != null){
529                return primary.transport.getRemoteAddress();
530            }
531         }
532         return null;
533     }
534
535 }
536
Popular Tags