KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > cursors > StoreDurableSubscriberCursor


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker.region.cursors;
16
17 import java.io.IOException JavaDoc;
18 import java.util.HashMap JavaDoc;
19 import java.util.Iterator JavaDoc;
20 import java.util.LinkedList JavaDoc;
21 import java.util.Map JavaDoc;
22 import org.apache.activemq.advisory.AdvisorySupport;
23 import org.apache.activemq.broker.ConnectionContext;
24 import org.apache.activemq.broker.region.Destination;
25 import org.apache.activemq.broker.region.MessageReference;
26 import org.apache.activemq.broker.region.Topic;
27 import org.apache.activemq.command.Message;
28 import org.apache.activemq.kaha.Store;
29 import org.apache.activemq.memory.UsageManager;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32
33 /**
34  * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
35  *
36  * @version $Revision: 512640 $
37  */

38 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
39
40     static private final Log log=LogFactory.getLog(StoreDurableSubscriberCursor.class);
41     private int pendingCount=0;
42     private String JavaDoc clientId;
43     private String JavaDoc subscriberName;
44     private Map JavaDoc topics=new HashMap JavaDoc();
45     private LinkedList JavaDoc <PendingMessageCursor>storePrefetches=new LinkedList JavaDoc<PendingMessageCursor>();
46     private boolean started;
47     private PendingMessageCursor nonPersistent;
48     private PendingMessageCursor currentCursor;
49
50     /**
51      * @param topic
52      * @param clientId
53      * @param subscriberName
54      * @throws IOException
55      */

56     public StoreDurableSubscriberCursor(String JavaDoc clientId,String JavaDoc subscriberName,Store store,int maxBatchSize){
57         this.clientId=clientId;
58         this.subscriberName=subscriberName;
59         this.nonPersistent=new FilePendingMessageCursor(clientId+subscriberName,store);
60         storePrefetches.add(nonPersistent);
61     }
62
63     public synchronized void start() throws Exception JavaDoc{
64         if(!started){
65             started=true;
66             for(PendingMessageCursor tsp: storePrefetches){
67                 tsp.start();
68                 pendingCount+=tsp.size();
69             }
70         }
71     }
72
73     public synchronized void stop() throws Exception JavaDoc{
74         if(started){
75             started=false;
76             for(PendingMessageCursor tsp: storePrefetches){
77                 tsp.stop();
78             }
79            
80             pendingCount=0;
81         }
82     }
83
84     /**
85      * Add a destination
86      *
87      * @param context
88      * @param destination
89      * @throws Exception
90      */

91     public synchronized void add(ConnectionContext context,Destination destination) throws Exception JavaDoc{
92         if(destination!=null&&!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())){
93             TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName);
94             tsp.setMaxBatchSize(getMaxBatchSize());
95             tsp.setUsageManager(usageManager);
96             topics.put(destination,tsp);
97             storePrefetches.add(tsp);
98             if(started){
99                 tsp.start();
100                 pendingCount+=tsp.size();
101             }
102         }
103     }
104
105     /**
106      * remove a destination
107      *
108      * @param context
109      * @param destination
110      * @throws Exception
111      */

112     public synchronized void remove(ConnectionContext context,Destination destination) throws Exception JavaDoc{
113         Object JavaDoc tsp=topics.remove(destination);
114         if(tsp!=null){
115             storePrefetches.remove(tsp);
116         }
117     }
118
119     /**
120      * @return true if there are no pending messages
121      */

122     public synchronized boolean isEmpty(){
123         return pendingCount<=0;
124     }
125
126     public boolean isEmpty(Destination destination){
127         boolean result=true;
128         TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(destination);
129         if(tsp!=null){
130             result=tsp.size()<=0;
131         }
132         return result;
133     }
134
135     /**
136      * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
137      * may do
138      *
139      * @see org.apache.activemq.region.cursors.PendingMessageCursor
140      * @return true if recovery required
141      */

142     public boolean isRecoveryRequired(){
143         return false;
144     }
145
146     public synchronized void addMessageLast(MessageReference node) throws Exception JavaDoc{
147         if(node!=null){
148             Message msg=node.getMessage();
149             if(started){
150                 pendingCount++;
151                 if(!msg.isPersistent()){
152                     nonPersistent.addMessageLast(node);
153                 }
154             }
155             if(msg.isPersistent()){
156                 Destination dest=msg.getRegionDestination();
157                 TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
158                 if(tsp!=null){
159                     tsp.addMessageLast(node);
160                 }
161             }
162         }
163     }
164
165     public void addRecoveredMessage(MessageReference node) throws Exception JavaDoc{
166         nonPersistent.addMessageLast(node);
167     }
168
169     public void clear(){
170         pendingCount=0;
171         nonPersistent.clear();
172         for(PendingMessageCursor tsp: storePrefetches){
173             tsp.clear();
174         }
175     }
176
177     public synchronized boolean hasNext(){
178         boolean result=pendingCount>0;
179         if(result){
180             try{
181                 currentCursor=getNextCursor();
182             }catch(Exception JavaDoc e){
183                 log.error("Failed to get current cursor ",e);
184                 throw new RuntimeException JavaDoc(e);
185             }
186             result=currentCursor!=null?currentCursor.hasNext():false;
187         }
188         return result;
189     }
190
191     public synchronized MessageReference next(){
192         MessageReference result = currentCursor!=null?currentCursor.next():null;
193         return result;
194     }
195
196     public synchronized void remove(){
197         if(currentCursor!=null){
198             currentCursor.remove();
199         }
200         pendingCount--;
201     }
202
203     public synchronized void remove(MessageReference node){
204         if(currentCursor!=null){
205             currentCursor.remove(node);
206         }
207         pendingCount--;
208     }
209
210     public synchronized void reset(){
211         for(Iterator JavaDoc i=storePrefetches.iterator();i.hasNext();){
212             AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
213             tsp.reset();
214         }
215     }
216
217     public synchronized void release(){
218         for(Iterator JavaDoc i=storePrefetches.iterator();i.hasNext();){
219             AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
220             tsp.release();
221         }
222     }
223
224     public int size(){
225         return pendingCount;
226     }
227
228     public synchronized void setMaxBatchSize(int maxBatchSize){
229         for(Iterator JavaDoc i=storePrefetches.iterator();i.hasNext();){
230             AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
231             tsp.setMaxBatchSize(maxBatchSize);
232         }
233         super.setMaxBatchSize(maxBatchSize);
234     }
235
236     public synchronized void gc(){
237         for(Iterator JavaDoc i=storePrefetches.iterator();i.hasNext();){
238             PendingMessageCursor tsp=(PendingMessageCursor)i.next();
239             tsp.gc();
240         }
241     }
242
243     public synchronized void setUsageManager(UsageManager usageManager){
244         super.setUsageManager(usageManager);
245         for(Iterator JavaDoc i=storePrefetches.iterator();i.hasNext();){
246             PendingMessageCursor tsp=(PendingMessageCursor)i.next();
247             tsp.setUsageManager(usageManager);
248         }
249     }
250
251     protected synchronized PendingMessageCursor getNextCursor() throws Exception JavaDoc{
252         if(currentCursor==null||currentCursor.isEmpty()){
253             currentCursor=null;
254             for(Iterator JavaDoc i=storePrefetches.iterator();i.hasNext();){
255                 AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
256                 if(tsp.hasNext()){
257                     currentCursor=tsp;
258                     break;
259                 }
260             }
261             // round-robin
262
storePrefetches.addLast(storePrefetches.removeFirst());
263         }
264         return currentCursor;
265     }
266
267     public String JavaDoc toString(){
268         return "StoreDurableSubscriber("+clientId+":"+subscriberName+")";
269     }
270 }
271
Popular Tags