KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > network > DurableConduitBridge


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.network;
19
20 import org.apache.activemq.command.ActiveMQDestination;
21 import org.apache.activemq.command.ConsumerId;
22 import org.apache.activemq.command.ConsumerInfo;
23 import org.apache.activemq.filter.DestinationFilter;
24 import org.apache.activemq.transport.Transport;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import java.io.IOException JavaDoc;
29 import java.util.Iterator JavaDoc;
30 /**
31  * Consolidates subscriptions
32  *
33  * @version $Revision: 1.1 $
34  */

35 public class DurableConduitBridge extends ConduitBridge{
36     static final private Log log=LogFactory.getLog(DurableConduitBridge.class);
37
38     /**
39      * Constructor
40      * @param configuration
41      *
42      * @param localBroker
43      * @param remoteBroker
44      */

45     public DurableConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
46         super(configuration,localBroker,remoteBroker);
47     }
48
49     /**
50      * Subscriptions for these destinations are always created
51      *
52      */

53     protected void setupStaticDestinations(){
54         super.setupStaticDestinations();
55         ActiveMQDestination[] dests=durableDestinations;
56         if(dests!=null){
57             for(int i=0;i<dests.length;i++){
58                 ActiveMQDestination dest=dests[i];
59                 if(isPermissableDestination(dest) && !doesConsumerExist(dest)){
60                     DemandSubscription sub=createDemandSubscription(dest);
61                     if(dest.isTopic()){
62                         sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
63                     }
64                     try{
65                         addSubscription(sub);
66                     }catch(IOException JavaDoc e){
67                         log.error("Failed to add static destination "+dest,e);
68                     }
69                     if(log.isTraceEnabled())
70                         log.trace("Forwarding messages for durable destination: "+dest);
71                 }
72             }
73         }
74     }
75
76     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException JavaDoc{
77         if(addToAlreadyInterestedConsumers(info)){
78             return null; // don't want this subscription added
79
}
80         // not matched so create a new one
81
// but first, if it's durable - changed set the
82
// ConsumerId here - so it won't be removed if the
83
// durable subscriber goes away on the other end
84
if(info.isDurable()||(info.getDestination().isQueue()&&!info.getDestination().isTemporary())){
85             info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator.getNextSequenceId()));
86         }
87         if(info.isDurable()){
88             // set the subscriber name to something reproducible
89

90             info.setSubscriptionName(getSubscriberName(info.getDestination()));
91         }
92         return doCreateDemandSubscription(info);
93     }
94     
95     protected String JavaDoc getSubscriberName(ActiveMQDestination dest){
96         String JavaDoc subscriberName = configuration.getBrokerName()+"_"+dest.getPhysicalName();
97         return subscriberName;
98     }
99
100     protected boolean doesConsumerExist(ActiveMQDestination dest){
101         DestinationFilter filter=DestinationFilter.parseFilter(dest);
102         for(Iterator JavaDoc i=subscriptionMapByLocalId.values().iterator();i.hasNext();){
103             DemandSubscription ds=(DemandSubscription) i.next();
104             if(filter.matches(ds.getLocalInfo().getDestination())){
105                 return true;
106             }
107         }
108         return false;
109     }
110 }
111
Popular Tags