KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > DurableTopicSubscription


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker.region;
16
17 import java.io.IOException JavaDoc;
18 import java.util.Iterator JavaDoc;
19 import java.util.concurrent.ConcurrentHashMap JavaDoc;
20 import javax.jms.InvalidSelectorException JavaDoc;
21 import org.apache.activemq.broker.Broker;
22 import org.apache.activemq.broker.ConnectionContext;
23 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
24 import org.apache.activemq.command.ConsumerInfo;
25 import org.apache.activemq.command.Message;
26 import org.apache.activemq.command.MessageAck;
27 import org.apache.activemq.command.MessageDispatch;
28 import org.apache.activemq.memory.UsageListener;
29 import org.apache.activemq.memory.UsageManager;
30 import org.apache.activemq.util.SubscriptionKey;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34 public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener{
35
36     static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
37     private final ConcurrentHashMap JavaDoc redeliveredMessages=new ConcurrentHashMap JavaDoc();
38     private final ConcurrentHashMap JavaDoc destinations=new ConcurrentHashMap JavaDoc();
39     private final SubscriptionKey subscriptionKey;
40     private final boolean keepDurableSubsActive;
41     private final UsageManager usageManager;
42     private boolean active=false;
43
44     public DurableTopicSubscription(Broker broker,UsageManager usageManager,ConnectionContext context,
45             ConsumerInfo info,boolean keepDurableSubsActive) throws InvalidSelectorException JavaDoc{
46         super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(),
47                 broker.getTempDataStore(),info.getPrefetchSize()));
48         this.usageManager=usageManager;
49         this.keepDurableSubsActive=keepDurableSubsActive;
50         subscriptionKey=new SubscriptionKey(context.getClientId(),info.getSubscriptionName());
51     }
52
53     synchronized public boolean isActive(){
54         return active;
55     }
56
57     protected boolean isFull(){
58         return !active||super.isFull();
59     }
60
61     synchronized public void gc(){
62     }
63
64     public synchronized void add(ConnectionContext context,Destination destination) throws Exception JavaDoc{
65         super.add(context,destination);
66         destinations.put(destination.getActiveMQDestination(),destination);
67         if(active||keepDurableSubsActive){
68             Topic topic=(Topic)destination;
69             topic.activate(context,this);
70             if(pending.isEmpty(topic)){
71                 topic.recoverRetroactiveMessages(context,this);
72             }
73         }
74         dispatchMatched();
75     }
76
77     public void activate(UsageManager memoryManager,ConnectionContext context,ConsumerInfo info) throws Exception JavaDoc{
78         log.debug("Deactivating "+this);
79         if(!active){
80             this.active=true;
81             this.context=context;
82             this.info=info;
83             if(!keepDurableSubsActive){
84                 for(Iterator JavaDoc iter=destinations.values().iterator();iter.hasNext();){
85                     Topic topic=(Topic)iter.next();
86                     topic.activate(context,this);
87                 }
88             }
89             synchronized(pending){
90                 pending.setUsageManager(memoryManager);
91                 pending.start();
92             }
93             // If nothing was in the persistent store, then try to use the recovery policy.
94
if(pending.isEmpty()){
95                 for(Iterator JavaDoc iter=destinations.values().iterator();iter.hasNext();){
96                     Topic topic=(Topic)iter.next();
97                     topic.recoverRetroactiveMessages(context,this);
98                 }
99             }
100             dispatchMatched();
101             this.usageManager.addUsageListener(this);
102         }
103     }
104
105     synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception JavaDoc{
106         active=false;
107         this.usageManager.removeUsageListener(this);
108         synchronized(pending){
109             pending.stop();
110         }
111         if(!keepDurableSubsActive){
112             for(Iterator JavaDoc iter=destinations.values().iterator();iter.hasNext();){
113                 Topic topic=(Topic)iter.next();
114                 topic.deactivate(context,this);
115             }
116         }
117         synchronized(dispatched){
118             for(Iterator JavaDoc iter=dispatched.iterator();iter.hasNext();){
119                 // Mark the dispatched messages as redelivered for next time.
120
MessageReference node=(MessageReference)iter.next();
121                 Integer JavaDoc count=(Integer JavaDoc)redeliveredMessages.get(node.getMessageId());
122                 if(count!=null){
123                     redeliveredMessages.put(node.getMessageId(),new Integer JavaDoc(count.intValue()+1));
124                 }else{
125                     redeliveredMessages.put(node.getMessageId(),new Integer JavaDoc(1));
126                 }
127                 if(keepDurableSubsActive){
128                     synchronized(pending){
129                         pending.addMessageFirst(node);
130                     }
131                 }else{
132                     node.decrementReferenceCount();
133                 }
134                 iter.remove();
135             }
136         }
137         if(!keepDurableSubsActive){
138             synchronized(pending){
139                 try{
140                     pending.reset();
141                     while(pending.hasNext()){
142                         MessageReference node=pending.next();
143                         node.decrementReferenceCount();
144                         pending.remove();
145                     }
146                 }finally{
147                     pending.release();
148                 }
149             }
150         }
151         prefetchExtension=0;
152     }
153
154     protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
155         MessageDispatch md=super.createMessageDispatch(node,message);
156         Integer JavaDoc count=(Integer JavaDoc)redeliveredMessages.get(node.getMessageId());
157         if(count!=null){
158             md.setRedeliveryCounter(count.intValue());
159         }
160         return md;
161     }
162
163     public void add(MessageReference node) throws Exception JavaDoc{
164         if(!active&&!keepDurableSubsActive){
165             return;
166         }
167         node.incrementReferenceCount();
168         super.add(node);
169     }
170
171     protected void doAddRecoveredMessage(MessageReference message) throws Exception JavaDoc{
172         pending.addRecoveredMessage(message);
173     }
174
175     public int getPendingQueueSize(){
176         if(active||keepDurableSubsActive){
177             return super.getPendingQueueSize();
178         }
179         // TODO: need to get from store
180
return 0;
181     }
182
183     public void setSelector(String JavaDoc selector) throws InvalidSelectorException JavaDoc{
184         throw new UnsupportedOperationException JavaDoc(
185                 "You cannot dynamically change the selector for durable topic subscriptions");
186     }
187
188     protected boolean canDispatch(MessageReference node){
189         return active;
190     }
191
192     protected void acknowledge(ConnectionContext context,MessageAck ack,MessageReference node) throws IOException JavaDoc{
193         node.getRegionDestination().acknowledge(context,this,ack,node);
194         redeliveredMessages.remove(node.getMessageId());
195         node.decrementReferenceCount();
196     }
197
198     public String JavaDoc getSubscriptionName(){
199         return subscriptionKey.getSubscriptionName();
200     }
201
202     public String JavaDoc toString(){
203         return "DurableTopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
204                 +", total="+enqueueCounter+", pending="+getPendingQueueSize()+", dispatched="+dispatchCounter
205                 +", inflight="+dispatched.size()+", prefetchExtension="+this.prefetchExtension;
206     }
207
208     public String JavaDoc getClientId(){
209         return subscriptionKey.getClientId();
210     }
211
212     public SubscriptionKey getSubscriptionKey(){
213         return subscriptionKey;
214     }
215
216     /**
217      * Release any references that we are holding.
218      */

219     public void destroy(){
220         try{
221             synchronized(pending){
222                 pending.reset();
223                 while(pending.hasNext()){
224                     MessageReference node=pending.next();
225                     node.decrementReferenceCount();
226                 }
227             }
228         }finally{
229             pending.release();
230             pending.clear();
231         }
232         for(Iterator JavaDoc iter=dispatched.iterator();iter.hasNext();){
233             MessageReference node=(MessageReference)iter.next();
234             node.decrementReferenceCount();
235         }
236         dispatched.clear();
237     }
238
239     /**
240      * @param memoryManager
241      * @param oldPercentUsage
242      * @param newPercentUsage
243      * @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int,
244      * int)
245      */

246     public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
247         if(oldPercentUsage>newPercentUsage&&oldPercentUsage>=90){
248             try{
249                 dispatchMatched();
250             }catch(IOException JavaDoc e){
251                 log.warn("problem calling dispatchMatched",e);
252             }
253         }
254     }
255 }
256
Popular Tags