KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > TopicRegion


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region;
19
20 import java.io.IOException JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.Set JavaDoc;
23
24 import javax.jms.InvalidDestinationException JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26
27 import org.apache.activemq.advisory.AdvisorySupport;
28 import org.apache.activemq.broker.ConnectionContext;
29 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
30 import org.apache.activemq.broker.region.policy.PolicyEntry;
31 import org.apache.activemq.command.ActiveMQDestination;
32 import org.apache.activemq.command.ConnectionId;
33 import org.apache.activemq.command.ConsumerId;
34 import org.apache.activemq.command.ConsumerInfo;
35 import org.apache.activemq.command.RemoveSubscriptionInfo;
36 import org.apache.activemq.command.SessionId;
37 import org.apache.activemq.command.SubscriptionInfo;
38 import org.apache.activemq.memory.UsageManager;
39 import org.apache.activemq.store.TopicMessageStore;
40 import org.apache.activemq.thread.TaskRunnerFactory;
41 import org.apache.activemq.util.LongSequenceGenerator;
42 import org.apache.activemq.util.SubscriptionKey;
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45
46 import java.util.concurrent.ConcurrentHashMap JavaDoc;
47
48 /**
49  *
50  * @version $Revision: 1.12 $
51  */

52 public class TopicRegion extends AbstractRegion {
53     private static final Log log = LogFactory.getLog(TopicRegion.class);
54     protected final ConcurrentHashMap JavaDoc durableSubscriptions = new ConcurrentHashMap JavaDoc();
55     private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
56     private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
57     private boolean keepDurableSubsActive=false;
58
59     public TopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
60             DestinationFactory destinationFactory) {
61         super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
62         
63     }
64
65     public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception JavaDoc{
66         if(info.isDurable()){
67             ActiveMQDestination destination=info.getDestination();
68             if(!destination.isPattern()){
69                 // Make sure the destination is created.
70
lookup(context,destination);
71             }
72             String JavaDoc clientId=context.getClientId();
73             String JavaDoc subcriptionName=info.getSubscriptionName();
74             SubscriptionKey key=new SubscriptionKey(clientId,subcriptionName);
75             DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key);
76             if(sub!=null){
77                 if(sub.isActive()){
78                     throw new JMSException JavaDoc("Durable consumer is in use for client: "+clientId+" and subscriptionName: "
79                             +subcriptionName);
80                 }
81                 // Has the selector changed??
82
if(hasDurableSubChanged(info,sub.getConsumerInfo())){
83                     // Remove the consumer first then add it.
84
durableSubscriptions.remove(key);
85                     for(Iterator JavaDoc iter=destinations.values().iterator();iter.hasNext();){
86                         Topic topic=(Topic)iter.next();
87                         topic.deleteSubscription(context,key);
88                     }
89                     super.removeConsumer(context,sub.getConsumerInfo());
90                     super.addConsumer(context,info);
91                     sub=(DurableTopicSubscription)durableSubscriptions.get(key);
92                 }else{
93                     // Change the consumer id key of the durable sub.
94
if(sub.getConsumerInfo().getConsumerId()!=null)
95                         subscriptions.remove(sub.getConsumerInfo().getConsumerId());
96                     subscriptions.put(info.getConsumerId(),sub);
97                 }
98             }else{
99                 super.addConsumer(context,info);
100                 sub=(DurableTopicSubscription)durableSubscriptions.get(key);
101                 if(sub==null){
102                     throw new JMSException JavaDoc("Cannot use the same consumerId: "+info.getConsumerId()
103                             +" for two different durable subscriptions clientID: "+key.getClientId()
104                             +" subscriberName: "+key.getSubscriptionName());
105                 }
106             }
107             sub.activate(memoryManager,context,info);
108             return sub;
109         }else{
110             return super.addConsumer(context,info);
111         }
112     }
113
114     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception JavaDoc {
115         if (info.isDurable()) {
116
117             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
118             DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
119             if (sub != null) {
120                 sub.deactivate(keepDurableSubsActive);
121             }
122
123         }
124         else {
125             super.removeConsumer(context, info);
126         }
127     }
128
129     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception JavaDoc {
130         SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName());
131         DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
132         if (sub == null) {
133             throw new InvalidDestinationException JavaDoc("No durable subscription exists for: " + info.getSubcriptionName());
134         }
135         if (sub.isActive()) {
136             throw new JMSException JavaDoc("Durable consumer is in use");
137         }
138
139         durableSubscriptions.remove(key);
140         for (Iterator JavaDoc iter = destinations.values().iterator(); iter.hasNext();) {
141             Topic topic = (Topic) iter.next();
142             topic.deleteSubscription(context, key);
143         }
144         super.removeConsumer(context, sub.getConsumerInfo());
145     }
146
147     public String JavaDoc toString() {
148         return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage()
149                 + "%";
150     }
151
152     // Implementation methods
153
// -------------------------------------------------------------------------
154
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception JavaDoc {
155         Topic topic = (Topic) super.createDestination(context, destination);
156  
157         recoverDurableSubscriptions(context, topic);
158         
159         return topic;
160     }
161
162     private void recoverDurableSubscriptions(ConnectionContext context, Topic topic) throws IOException JavaDoc, JMSException JavaDoc, Exception JavaDoc {
163         TopicMessageStore store = (TopicMessageStore) topic.getMessageStore();
164         // Eagerly recover the durable subscriptions
165
if (store != null) {
166             SubscriptionInfo[] infos = store.getAllSubscriptions();
167             for (int i = 0; i < infos.length; i++) {
168                 
169                 SubscriptionInfo info = infos[i];
170                 log.debug("Restoring durable subscription: "+infos);
171                 SubscriptionKey key = new SubscriptionKey(info);
172                 
173                 // A single durable sub may be subscribing to multiple topics. so it might exist already.
174
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
175                 ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
176                 if( sub == null ) {
177                     ConnectionContext c = new ConnectionContext();
178                     c.setBroker(context.getBroker());
179                     c.setClientId(key.getClientId());
180                     c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
181                     sub = (DurableTopicSubscription) createSubscription(c, consumerInfo );
182                 }
183                 
184                 topic.addSubscription(context, sub);
185             }
186         }
187     }
188     
189     private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
190         ConsumerInfo rc = new ConsumerInfo();
191         rc.setSelector(info.getSelector());
192         rc.setSubscriptionName(info.getSubcriptionName());
193         rc.setDestination(info.getDestination());
194         rc.setConsumerId(createConsumerId());
195         return rc;
196     }
197
198     private ConsumerId createConsumerId() {
199         return new ConsumerId(recoveredDurableSubSessionId,recoveredDurableSubIdGenerator.getNextSequenceId());
200     }
201
202     protected void configureTopic(Topic topic, ActiveMQDestination destination) {
203         if (broker.getDestinationPolicy() != null) {
204             PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
205             if (entry != null) {
206                 entry.configure(topic);
207             }
208         }
209     }
210
211     protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException JavaDoc{
212         if(info.isDurable()){
213             if(AdvisorySupport.isAdvisoryTopic(info.getDestination())){
214                 throw new JMSException JavaDoc("Cannot create a durable subscription for an advisory Topic");
215             }
216             SubscriptionKey key=new SubscriptionKey(context.getClientId(),info.getSubscriptionName());
217             DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key);
218             if(sub==null){
219                 sub=new DurableTopicSubscription(broker,memoryManager,context,info,keepDurableSubsActive);
220                 ActiveMQDestination destination=info.getDestination();
221                 if(destination!=null&&broker.getDestinationPolicy()!=null){
222                     PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
223                     if(entry!=null){
224                         entry.configure(broker,memoryManager,sub);
225                     }
226                 }
227                 durableSubscriptions.put(key,sub);
228             }else{
229                 throw new JMSException JavaDoc("That durable subscription is already active.");
230             }
231             return sub;
232         }
233         try{
234             TopicSubscription answer=new TopicSubscription(broker,context,info,memoryManager);
235             // lets configure the subscription depending on the destination
236
ActiveMQDestination destination=info.getDestination();
237             if(destination!=null&&broker.getDestinationPolicy()!=null){
238                 PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
239                 if(entry!=null){
240                     entry.configure(broker,memoryManager,answer);
241                 }
242             }
243             answer.init();
244             return answer;
245         }catch(Exception JavaDoc e){
246             log.error("Failed to create TopicSubscription ",e);
247             JMSException JavaDoc jmsEx=new JMSException JavaDoc("Couldn't create TopicSubscription");
248             jmsEx.setLinkedException(e);
249             throw jmsEx;
250         }
251     }
252
253     /**
254      */

255     private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
256         if (info1.getSelector() != null ^ info2.getSelector() != null)
257             return true;
258         if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector()))
259             return true;
260         return !info1.getDestination().equals(info2.getDestination());
261     }
262
263     protected Set JavaDoc getInactiveDestinations() {
264         Set JavaDoc inactiveDestinations = super.getInactiveDestinations();
265         for (Iterator JavaDoc iter = inactiveDestinations.iterator(); iter.hasNext();) {
266             ActiveMQDestination dest = (ActiveMQDestination) iter.next();
267             if (!dest.isTopic())
268                 iter.remove();
269         }
270         return inactiveDestinations;
271     }
272
273     public boolean isKeepDurableSubsActive() {
274         return keepDurableSubsActive;
275     }
276
277     public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
278         this.keepDurableSubsActive = keepDurableSubsActive;
279     }
280
281 }
282
Popular Tags