KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > network > DemandForwardingBridgeSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.network;
19
20 import java.io.IOException JavaDoc;
21
22 import org.apache.activemq.advisory.AdvisorySupport;
23 import org.apache.activemq.command.ActiveMQDestination;
24 import org.apache.activemq.command.ActiveMQTempDestination;
25 import org.apache.activemq.command.ActiveMQTopic;
26 import org.apache.activemq.command.BrokerId;
27 import org.apache.activemq.command.BrokerInfo;
28 import org.apache.activemq.command.Command;
29 import org.apache.activemq.command.ConnectionError;
30 import org.apache.activemq.command.ConnectionId;
31 import org.apache.activemq.command.ConnectionInfo;
32 import org.apache.activemq.command.ConsumerId;
33 import org.apache.activemq.command.ConsumerInfo;
34 import org.apache.activemq.command.DataStructure;
35 import org.apache.activemq.command.DestinationInfo;
36 import org.apache.activemq.command.ExceptionResponse;
37 import org.apache.activemq.command.KeepAliveInfo;
38 import org.apache.activemq.command.Message;
39 import org.apache.activemq.command.MessageAck;
40 import org.apache.activemq.command.MessageDispatch;
41 import org.apache.activemq.command.NetworkBridgeFilter;
42 import org.apache.activemq.command.ProducerInfo;
43 import org.apache.activemq.command.RemoveInfo;
44 import org.apache.activemq.command.Response;
45 import org.apache.activemq.command.SessionInfo;
46 import org.apache.activemq.command.ShutdownInfo;
47 import org.apache.activemq.command.WireFormatInfo;
48 import org.apache.activemq.filter.DestinationFilter;
49 import org.apache.activemq.transport.DefaultTransportListener;
50 import org.apache.activemq.transport.FutureResponse;
51 import org.apache.activemq.transport.ResponseCallback;
52 import org.apache.activemq.transport.Transport;
53 import org.apache.activemq.transport.TransportDisposedIOException;
54 import org.apache.activemq.transport.TransportListener;
55 import org.apache.activemq.util.IdGenerator;
56 import org.apache.activemq.util.IntrospectionSupport;
57 import org.apache.activemq.util.LongSequenceGenerator;
58 import org.apache.activemq.util.MarshallingSupport;
59 import org.apache.activemq.util.ServiceStopper;
60 import org.apache.activemq.util.ServiceSupport;
61 import org.apache.commons.logging.Log;
62 import org.apache.commons.logging.LogFactory;
63
64 import java.security.GeneralSecurityException JavaDoc;
65 import java.util.Properties JavaDoc;
66 import java.util.concurrent.ConcurrentHashMap JavaDoc;
67 import java.util.concurrent.CountDownLatch JavaDoc;
68 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
69
70 /**
71  * A useful base class for implementing demand forwarding bridges.
72  *
73  * @version $Revision: 520814 $
74  */

75 public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
76     protected static final Log log = LogFactory.getLog(DemandForwardingBridge.class);
77     protected final Transport localBroker;
78     protected final Transport remoteBroker;
79     protected final IdGenerator idGenerator = new IdGenerator();
80     protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
81     protected ConnectionInfo localConnectionInfo;
82     protected ConnectionInfo remoteConnectionInfo;
83     protected SessionInfo localSessionInfo;
84     protected ProducerInfo producerInfo;
85     protected String JavaDoc remoteBrokerName = "Unknown";
86     protected String JavaDoc localClientId;
87     protected ConsumerInfo demandConsumerInfo;
88     protected int demandConsumerDispatched;
89     protected final AtomicBoolean JavaDoc localBridgeStarted = new AtomicBoolean JavaDoc(false);
90     protected final AtomicBoolean JavaDoc remoteBridgeStarted = new AtomicBoolean JavaDoc(false);
91     protected boolean disposed = false;
92     protected BrokerId localBrokerId;
93     protected ActiveMQDestination[] excludedDestinations;
94     protected ActiveMQDestination[] dynamicallyIncludedDestinations;
95     protected ActiveMQDestination[] staticallyIncludedDestinations;
96     protected ActiveMQDestination[] durableDestinations;
97     protected final ConcurrentHashMap JavaDoc subscriptionMapByLocalId = new ConcurrentHashMap JavaDoc();
98     protected final ConcurrentHashMap JavaDoc subscriptionMapByRemoteId = new ConcurrentHashMap JavaDoc();
99     protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
100     protected CountDownLatch JavaDoc startedLatch = new CountDownLatch JavaDoc(2);
101     protected CountDownLatch JavaDoc remoteBrokerNameKnownLatch = new CountDownLatch JavaDoc(1);
102     protected final AtomicBoolean JavaDoc remoteInterupted = new AtomicBoolean JavaDoc(false);
103     protected final AtomicBoolean JavaDoc lastConnectSucceeded = new AtomicBoolean JavaDoc(false);
104     protected NetworkBridgeConfiguration configuration;
105     private NetworkBridgeFailedListener bridgeFailedListener;
106     private boolean createdByDuplex;
107
108     
109     public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
110         this.configuration=configuration;
111         this.localBroker = localBroker;
112         this.remoteBroker = remoteBroker;
113     }
114
115     public void start() throws Exception JavaDoc {
116         localBroker.setTransportListener(new DefaultTransportListener(){
117             public void onCommand(Object JavaDoc o){
118                 Command command = (Command) o;
119                 serviceLocalCommand(command);
120             }
121     
122             public void onException(IOException JavaDoc error){
123                 serviceLocalException(error);
124             }
125         });
126         remoteBroker.setTransportListener(new TransportListener(){
127             public void onCommand(Object JavaDoc o){
128                 Command command = (Command) o;
129                 serviceRemoteCommand(command);
130             }
131     
132             public void onException(IOException JavaDoc error){
133                 serviceRemoteException(error);
134             }
135     
136             public void transportInterupted(){
137                 //clear any subscriptions - to try and prevent the bridge from stalling the broker
138
if( remoteInterupted.compareAndSet(false, true) ) {
139                     
140                     log.info("Outbound transport to " + remoteBrokerName + " interrupted.");
141
142                     if( localBridgeStarted.get() ) {
143                         clearDownSubscriptions();
144                         synchronized( DemandForwardingBridgeSupport.this ) {
145                             try{
146                                 localBroker.oneway(localConnectionInfo.createRemoveCommand());
147                             }catch(TransportDisposedIOException td){
148                                 log.debug("local broker is now disposed",td);
149                             }
150                             catch(IOException JavaDoc e){
151                                 log.warn("Caught exception from local start",e);
152                             }
153                         }
154                     }
155                     
156                     localBridgeStarted.set(false);
157                     remoteBridgeStarted.set(false);
158                     startedLatch = new CountDownLatch JavaDoc(2);
159                 }
160                 
161             }
162     
163             public void transportResumed(){
164                 if( remoteInterupted.compareAndSet(true, false) ) {
165                     
166                     // We want to slow down false connects so that we don't get in a busy loop.
167
// False connects can occurr if you using SSH tunnels.
168
if( !lastConnectSucceeded.get() ) {
169                         try {
170                             log.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
171                             Thread.sleep(1000);
172                         } catch (InterruptedException JavaDoc e) {
173                             Thread.currentThread().interrupt();
174                         }
175                     }
176                     lastConnectSucceeded.set(false);
177                     try {
178                         startLocalBridge();
179                         remoteBridgeStarted.set(true);
180                         startedLatch.countDown();
181                         log.info("Outbound transport to " + remoteBrokerName + " resumed");
182                     }catch(Exception JavaDoc e) {
183                         log.error("Caught exception from local start in resume transport",e );
184                     }
185                                                          
186                 }
187             }
188         });
189         localBroker.start();
190         remoteBroker.start();
191         
192         try{
193             triggerRemoteStartBridge();
194         }catch(IOException JavaDoc e){
195             log.warn("Caught exception from remote start",e);
196         }
197     }
198
199     protected void triggerLocalStartBridge() throws IOException JavaDoc {
200         Thread JavaDoc thead=new Thread JavaDoc(){
201             public void run(){
202                 try{
203                     startLocalBridge();
204                 }catch(Exception JavaDoc e){
205                     serviceLocalException(e);
206                 }
207             }
208         };
209         thead.start();
210     }
211
212     protected void triggerRemoteStartBridge() throws IOException JavaDoc {
213         Thread JavaDoc thead=new Thread JavaDoc(){
214             public void run(){
215                 try{
216                     startRemoteBridge();
217                 }catch(Exception JavaDoc e){
218                     serviceRemoteException(e);
219                 }
220             }
221         };
222         thead.start();
223     }
224
225     protected void startLocalBridge() throws Exception JavaDoc {
226         if(localBridgeStarted.compareAndSet(false,true)){
227             synchronized( this ) {
228                 
229                 remoteBrokerNameKnownLatch.await();
230
231                 localConnectionInfo=new ConnectionInfo();
232                 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
233                 localClientId="NC_"+remoteBrokerName+"_inbound"+configuration.getBrokerName();
234                 localConnectionInfo.setClientId(localClientId);
235                 localConnectionInfo.setUserName(configuration.getUserName());
236                 localConnectionInfo.setPassword(configuration.getPassword());
237                 localBroker.oneway(localConnectionInfo);
238     
239                 localSessionInfo=new SessionInfo(localConnectionInfo,1);
240                 localBroker.oneway(localSessionInfo);
241                 
242                 log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
243                                 +") has been established.");
244                 
245                 startedLatch.countDown();
246                 setupStaticDestinations();
247             }
248         }
249     }
250
251     protected void startRemoteBridge() throws Exception JavaDoc {
252         if(remoteBridgeStarted.compareAndSet(false,true)) {
253     
254             synchronized (this) {
255                                 
256                 if( remoteConnectionInfo!=null ) {
257                     remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
258                 }
259                 
260                 remoteConnectionInfo=new ConnectionInfo();
261                 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
262                 remoteConnectionInfo.setClientId("NC_"+configuration.getBrokerName()+"_outbound");
263                 remoteConnectionInfo.setUserName(configuration.getUserName());
264                 remoteConnectionInfo.setPassword(configuration.getPassword());
265                 remoteBroker.oneway(remoteConnectionInfo);
266                 if (isCreatedByDuplex()==false) {
267                 BrokerInfo brokerInfo=new BrokerInfo();
268                 brokerInfo.setBrokerName(configuration.getBrokerName());
269                 brokerInfo.setNetworkConnection(true);
270                 brokerInfo.setDuplexConnection(configuration.isDuplex());
271               
272                 //set our properties
273
Properties JavaDoc props = new Properties JavaDoc();
274                 IntrospectionSupport.getProperties(this,props,null);
275                 String JavaDoc str = MarshallingSupport.propertiesToString(props);
276                 brokerInfo.setNetworkProperties(str);
277                 remoteBroker.oneway(brokerInfo);
278                 }
279
280                 SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
281                 remoteBroker.oneway(remoteSessionInfo);
282
283                 producerInfo=new ProducerInfo(remoteSessionInfo,1);
284                 producerInfo.setResponseRequired(false);
285                 remoteBroker.oneway(producerInfo);
286
287                 // Listen to consumer advisory messages on the remote broker to determine demand.
288
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
289                 demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
290                 String JavaDoc advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+configuration.getDestinationFilter();
291                 if( configuration.isBridgeTempDestinations() ) {
292                     advisoryTopic += ","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
293                 }
294                 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
295                 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
296                 remoteBroker.oneway(demandConsumerInfo);
297                 startedLatch.countDown();
298                 
299                 if (!disposed){
300                     triggerLocalStartBridge();
301                 }
302                 
303             }
304         }
305     }
306
307     public void stop() throws Exception JavaDoc{
308         log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
309         boolean wasDisposedAlready=disposed;
310         if(!disposed){
311             try{
312                 disposed=true;
313                 remoteBridgeStarted.set(false);
314                 localBroker.oneway(new ShutdownInfo());
315                 remoteBroker.oneway(new ShutdownInfo());
316             }catch(IOException JavaDoc e){
317                 log.debug("Caught exception stopping",e);
318             }finally{
319                 ServiceStopper ss=new ServiceStopper();
320                 ss.stop(localBroker);
321                 ss.stop(remoteBroker);
322                 ss.throwFirstException();
323             }
324         }
325         if(wasDisposedAlready){
326             log.debug(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
327         }else{
328             log.info(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
329         }
330     }
331     
332     public void serviceRemoteException(Throwable JavaDoc error){
333         if(!disposed){
334             if(error instanceof SecurityException JavaDoc||error instanceof GeneralSecurityException JavaDoc){
335                 log.error("Network connection between "+localBroker+" and "+remoteBroker
336                         +" shutdown due to a remote error: "+error);
337             }else{
338                 log.warn("Network connection between "+localBroker+" and "+remoteBroker
339                         +" shutdown due to a remote error: "+error);
340             }
341             log.debug("The remote Exception was: "+error,error);
342             new Thread JavaDoc(){
343
344                 public void run(){
345                     ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
346                 }
347             }.start();
348             fireBridgeFailed();
349         }
350     }
351
352     protected void serviceRemoteCommand(Command command) {
353         if(!disposed){
354             try{
355                 if(command.isMessageDispatch()){
356                     waitStarted();
357                     MessageDispatch md=(MessageDispatch) command;
358                     serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
359                     demandConsumerDispatched++;
360                     if(demandConsumerDispatched>(demandConsumerInfo.getPrefetchSize()*.75)){
361                         remoteBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,demandConsumerDispatched));
362                         demandConsumerDispatched=0;
363                     }
364                 }else if(command.isBrokerInfo()){
365                     
366                     lastConnectSucceeded.set(true);
367                     serviceRemoteBrokerInfo(command);
368                     // Let the local broker know the remote broker's ID.
369
localBroker.oneway(command);
370                     
371                 }else if(command.getClass() == ConnectionError.class ) {
372                     ConnectionError ce = (ConnectionError) command;
373                     serviceRemoteException(ce.getException());
374                 }else{
375                     switch(command.getDataStructureType()){
376                     case KeepAliveInfo.DATA_STRUCTURE_TYPE:
377                     case WireFormatInfo.DATA_STRUCTURE_TYPE:
378                     case ShutdownInfo.DATA_STRUCTURE_TYPE:
379                         break;
380                     default:
381                         log.warn("Unexpected remote command: "+command);
382                     }
383                 }
384             }catch(Exception JavaDoc e){
385                 serviceRemoteException(e);
386             }
387         }
388     }
389
390     private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException JavaDoc {
391        
392         final int networkTTL = configuration.getNetworkTTL();
393         if(data.getClass()==ConsumerInfo.class){
394             // Create a new local subscription
395
ConsumerInfo info=(ConsumerInfo) data;
396             BrokerId[] path=info.getBrokerPath();
397             if((path!=null&&path.length>= networkTTL)){
398                 if(log.isDebugEnabled())
399                     log.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
400                 return;
401             }
402             if(contains(info.getBrokerPath(),localBrokerPath[0])){
403                 // Ignore this consumer as it's a consumer we locally sent to the broker.
404
if(log.isDebugEnabled())
405                     log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
406                 return;
407             }
408             if (!isPermissableDestination(info.getDestination())){
409                 //ignore if not in the permited or in the excluded list
410
if(log.isDebugEnabled())
411                     log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
412                 return;
413             }
414             // Update the packet to show where it came from.
415
info=info.copy();
416             addRemoteBrokerToBrokerPath(info);
417             DemandSubscription sub=createDemandSubscription(info);
418             if (sub != null){
419                 addSubscription(sub);
420                 if(log.isDebugEnabled())
421                     log.debug(configuration.getBrokerName() + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
422             }else {
423                 if(log.isDebugEnabled())
424                     log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
425             }
426         }
427         else if (data.getClass()==DestinationInfo.class){
428 // It's a destination info - we want to pass up
429
//infomation about temporary destinations
430
DestinationInfo destInfo = (DestinationInfo) data;
431             BrokerId[] path=destInfo.getBrokerPath();
432             if((path!=null&&path.length>= networkTTL)){
433                 if(log.isDebugEnabled())
434                     log.debug("Ignoring Subscription " + destInfo + " restricted to " + networkTTL + " network hops only");
435                 return;
436             }
437             if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){
438                 // Ignore this consumer as it's a consumer we locally sent to the broker.
439
if(log.isDebugEnabled())
440                     log.debug("Ignoring sub " + destInfo + " already routed through this broker once");
441                 return;
442             }
443             
444             destInfo.setConnectionId(localConnectionInfo.getConnectionId());
445             if (destInfo.getDestination() instanceof ActiveMQTempDestination){
446                 //re-set connection id so comes from here
447
ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
448                 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
449             }
450                         
451             destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
452             
453             log.debug("Replying destination control command: "+destInfo);
454             localBroker.oneway(destInfo);
455             
456         }
457         else if(data.getClass()==RemoveInfo.class){
458             ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
459             removeDemandSubscription(id);
460         }
461     }
462
463     public void serviceLocalException(Throwable JavaDoc error) {
464         if( !disposed ) {
465             log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error);
466             log.debug("The local Exception was:"+error,error);
467             new Thread JavaDoc() {
468                 public void run() {
469                     ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
470                 }
471             }.start();
472             fireBridgeFailed();
473         }
474     }
475
476     protected void addSubscription(DemandSubscription sub) throws IOException JavaDoc {
477         if (sub != null){
478             localBroker.oneway(sub.getLocalInfo());
479         }
480     }
481
482     protected void removeSubscription(DemandSubscription sub) throws IOException JavaDoc {
483         if(sub!=null){
484             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
485             localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
486         }
487     }
488
489     protected DemandSubscription getDemandSubscription(MessageDispatch md) {
490         return (DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
491     }
492
493     protected Message configureMessage(MessageDispatch md) {
494         Message message=md.getMessage().copy();
495         // Update the packet to show where it came from.
496
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath));
497         message.setProducerId(producerInfo.getProducerId());
498         message.setDestination(md.getDestination());
499         if(message.getOriginalTransactionId()==null)
500             message.setOriginalTransactionId(message.getTransactionId());
501         message.setTransactionId(null);
502         return message;
503     }
504
505     protected void serviceLocalCommand(Command command) {
506         if(!disposed){
507             final boolean trace=log.isTraceEnabled();
508             try{
509                 if(command.isMessageDispatch()){
510                     waitStarted();
511                     final MessageDispatch md=(MessageDispatch) command;
512                     DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
513                     if(sub!=null){
514                         Message message= configureMessage(md);
515                         if(trace)
516                             log.trace("bridging "+configuration.getBrokerName()+" -> "+remoteBrokerName+": "+message);
517                         
518                         
519                         
520                         if( !message.isResponseRequired() ) {
521                             
522                             // If the message was originally sent using async send, we will preserve that QOS
523
// by bridging it using an async send (small chance of message loss).
524
remoteBroker.oneway(message);
525                             localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
526                             
527                         } else {
528                             
529                             // The message was not sent using async send, so we should only ack the local
530
// broker when we get confirmation that the remote broker has received the message.
531
ResponseCallback callback = new ResponseCallback() {
532                                 public void onCompletion(FutureResponse future) {
533                                     try {
534                                         Response response = future.getResult();
535                                         if(response.isException()){
536                                             ExceptionResponse er=(ExceptionResponse) response;
537                                             serviceLocalException(er.getException());
538                                         } else {
539                                             localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
540                                         }
541                                     } catch (IOException JavaDoc e) {
542                                         serviceLocalException(e);
543                                     }
544                                 }
545                             };
546
547                             remoteBroker.asyncRequest(message, callback);
548                         }
549                         
550                     }else{
551                         if (trace)log.trace("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
552                     }
553                 }else if(command.isBrokerInfo()){
554                     serviceLocalBrokerInfo(command);
555                 }else if(command.isShutdownInfo()){
556                     log.info(configuration.getBrokerName()+" Shutting down");
557                     // Don't shut down the whole connector if the remote side was interrupted.
558
// the local transport is just shutting down temporarily until the remote side
559
// is restored.
560
if( !remoteInterupted.get() ) {
561                         stop();
562                     }
563                 }else if(command.getClass() == ConnectionError.class ) {
564                     ConnectionError ce = (ConnectionError) command;
565                     serviceLocalException(ce.getException());
566                 }else{
567                     switch(command.getDataStructureType()){
568                     case WireFormatInfo.DATA_STRUCTURE_TYPE:
569                         break;
570                     default:
571                         log.warn("Unexpected local command: "+command);
572                     }
573                 }
574             }catch(Exception JavaDoc e){
575                 serviceLocalException(e);
576             }
577         }
578     }
579
580     /**
581      * @return Returns the dynamicallyIncludedDestinations.
582      */

583     public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
584         return dynamicallyIncludedDestinations;
585     }
586
587     /**
588      * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set.
589      */

590     public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
591         this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
592     }
593
594     /**
595      * @return Returns the excludedDestinations.
596      */

597     public ActiveMQDestination[] getExcludedDestinations() {
598         return excludedDestinations;
599     }
600
601     /**
602      * @param excludedDestinations The excludedDestinations to set.
603      */

604     public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
605         this.excludedDestinations=excludedDestinations;
606     }
607
608     /**
609      * @return Returns the staticallyIncludedDestinations.
610      */

611     public ActiveMQDestination[] getStaticallyIncludedDestinations() {
612         return staticallyIncludedDestinations;
613     }
614
615     /**
616      * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
617      */

618     public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
619         this.staticallyIncludedDestinations=staticallyIncludedDestinations;
620     }
621
622     /**
623      * @return Returns the durableDestinations.
624      */

625     public ActiveMQDestination[] getDurableDestinations() {
626         return durableDestinations;
627     }
628
629     /**
630      * @param durableDestinations The durableDestinations to set.
631      */

632     public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
633         this.durableDestinations=durableDestinations;
634     }
635
636     /**
637      * @return Returns the localBroker.
638      */

639     public Transport getLocalBroker() {
640         return localBroker;
641     }
642
643     /**
644      * @return Returns the remoteBroker.
645      */

646     public Transport getRemoteBroker() {
647         return remoteBroker;
648     }
649     
650     /**
651      * @return the createdByDuplex
652      */

653     public boolean isCreatedByDuplex(){
654         return this.createdByDuplex;
655     }
656     
657     /**
658      * @param createdByDuplex the createdByDuplex to set
659      */

660     public void setCreatedByDuplex(boolean createdByDuplex){
661         this.createdByDuplex=createdByDuplex;
662     }
663   
664     public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
665         if(brokerPath!=null){
666             for(int i=0;i<brokerPath.length;i++){
667                 if(brokerId.equals(brokerPath[i]))
668                     return true;
669             }
670         }
671         return false;
672     }
673
674     protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
675         if (brokerPath == null || brokerPath.length == 0)
676             return pathsToAppend;
677         BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
678         System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
679         System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
680         return rc;
681     }
682
683     protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
684         if (brokerPath == null || brokerPath.length == 0)
685             return new BrokerId[] { idToAppend };
686         BrokerId rc[] = new BrokerId[brokerPath.length + 1];
687         System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
688         rc[brokerPath.length] = idToAppend;
689         return rc;
690     }
691
692     protected boolean isPermissableDestination(ActiveMQDestination destination) {
693         // Are we not bridging temp destinations?
694
if( destination.isTemporary() && !configuration.isBridgeTempDestinations() ) {
695             return false;
696         }
697         
698         DestinationFilter filter=DestinationFilter.parseFilter(destination);
699         ActiveMQDestination[] dests = excludedDestinations;
700         if(dests!=null&&dests.length>0){
701             for(int i=0;i<dests.length;i++){
702                 ActiveMQDestination match=dests[i];
703                 if(match!=null&&filter.matches(match)){
704                     return false;
705                 }
706             }
707         }
708         dests = dynamicallyIncludedDestinations;
709         if(dests!=null&&dests.length>0){
710             for(int i=0;i<dests.length;i++){
711                 ActiveMQDestination match=dests[i];
712                 if(match!=null&&filter.matches(match)){
713                     return true;
714                 }
715             }
716             return false;
717         }
718     
719         return true;
720     }
721
722     /**
723      * Subscriptions for these desitnations are always created
724      *
725      */

726     protected void setupStaticDestinations() {
727         ActiveMQDestination[] dests = staticallyIncludedDestinations;
728         if (dests != null){
729             for(int i=0;i<dests.length;i++){
730                 ActiveMQDestination dest=dests[i];
731                 DemandSubscription sub = createDemandSubscription(dest);
732                 try{
733                     addSubscription(sub);
734                 }catch(IOException JavaDoc e){
735                    log.error("Failed to add static destination " + dest,e);
736                 }
737                 if(log.isTraceEnabled())
738                     log.trace("Forwarding messages for static destination: " + dest);
739             }
740         }
741     }
742
743     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException JavaDoc {
744         return doCreateDemandSubscription(info);
745     }
746
747     protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException JavaDoc {
748         DemandSubscription result=new DemandSubscription(info);
749         result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
750                         .getNextSequenceId()));
751         
752         if( configuration.isDecreaseNetworkConsumerPriority() ) {
753             byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
754             if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
755                 // The longer the path to the consumer, the less it's consumer priority.
756
priority-=info.getBrokerPath().length+1;
757             }
758             result.getLocalInfo().setPriority(priority);
759         }
760         configureDemandSubscription(info, result);
761         return result;
762     }
763
764     protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
765         ConsumerInfo info = new ConsumerInfo();
766         info.setDestination(destination);
767         //the remote info held by the DemandSubscription holds the original consumerId,
768
//the local info get's overwritten
769
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
770                         .getNextSequenceId()));
771         DemandSubscription result=new DemandSubscription(info);
772         result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
773         return result;
774     }
775
776     protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException JavaDoc {
777         sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
778         sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
779         subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(),sub);
780         subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(),sub);
781     
782         // This works for now since we use a VM connection to the local broker.
783
// may need to change if we ever subscribe to a remote broker.
784
sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
785     }
786
787     protected void removeDemandSubscription(ConsumerId id) throws IOException JavaDoc {
788         DemandSubscription sub=(DemandSubscription) subscriptionMapByRemoteId.remove(id);
789         if (sub != null){
790             removeSubscription(sub);
791             if(log.isTraceEnabled())
792                 log.trace("removing sub on "+localBroker+" from "+remoteBrokerName+" : "+sub.getRemoteInfo());
793         }
794     }
795
796     protected void waitStarted() throws InterruptedException JavaDoc {
797         startedLatch.await();
798     }
799
800     protected void clearDownSubscriptions() {
801         subscriptionMapByLocalId.clear();
802         subscriptionMapByRemoteId.clear();
803     }
804
805     protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException JavaDoc;
806
807     protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException JavaDoc;
808
809     protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException JavaDoc;
810
811     protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException JavaDoc;
812     
813     protected abstract BrokerId[] getRemoteBrokerPath();
814     
815     public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
816         this.bridgeFailedListener=listener;
817       }
818       
819       private void fireBridgeFailed() {
820           NetworkBridgeFailedListener l = this.bridgeFailedListener;
821           if (l!=null) {
822               l.bridgeFailed();
823           }
824       }
825 }
826
Popular Tags