KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > cursors > StoreQueueCursor


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker.region.cursors;
16
17 import org.apache.activemq.broker.region.MessageReference;
18 import org.apache.activemq.broker.region.Queue;
19 import org.apache.activemq.command.Message;
20 import org.apache.activemq.kaha.Store;
21 import org.apache.activemq.memory.UsageManager;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24
25 /**
26  * Store based Cursor for Queues
27  *
28  * @version $Revision: 474985 $
29  */

30 public class StoreQueueCursor extends AbstractPendingMessageCursor{
31
32     static private final Log log=LogFactory.getLog(StoreQueueCursor.class);
33     private int pendingCount=0;
34     private Queue queue;
35     private Store tmpStore;
36     private PendingMessageCursor nonPersistent;
37     private QueueStorePrefetch persistent;
38     private boolean started;
39     private PendingMessageCursor currentCursor;
40     
41    
42     /**
43      * Construct
44      *
45      * @param queue
46      * @param tmpStore
47      */

48     public StoreQueueCursor(Queue queue,Store tmpStore){
49         this.queue=queue;
50         this.tmpStore=tmpStore;
51         this.persistent=new QueueStorePrefetch(queue);
52         currentCursor = persistent;
53     }
54
55     public synchronized void start() throws Exception JavaDoc{
56         started=true;
57         if(nonPersistent==null){
58             nonPersistent=new FilePendingMessageCursor(queue.getDestination(),tmpStore);
59             nonPersistent.setMaxBatchSize(getMaxBatchSize());
60             nonPersistent.setUsageManager(usageManager);
61         }
62         nonPersistent.start();
63         persistent.start();
64         pendingCount=persistent.size() + nonPersistent.size();
65     }
66
67     public synchronized void stop() throws Exception JavaDoc{
68         started=false;
69         if(nonPersistent!=null){
70             nonPersistent.stop();
71             nonPersistent.gc();
72         }
73         persistent.stop();
74         persistent.gc();
75         pendingCount=0;
76     }
77
78     public synchronized void addMessageLast(MessageReference node) throws Exception JavaDoc{
79         if(node!=null){
80             Message msg=node.getMessage();
81             if(started){
82                 pendingCount++;
83                 if(!msg.isPersistent()){
84                     nonPersistent.addMessageLast(node);
85                 }
86             }
87             if(msg.isPersistent()){
88                 persistent.addMessageLast(node);
89             }
90         }
91     }
92     
93     public void addMessageFirst(MessageReference node) throws Exception JavaDoc{
94         if(node!=null){
95             Message msg=node.getMessage();
96             if(started){
97                 pendingCount++;
98                 if(!msg.isPersistent()){
99                     nonPersistent.addMessageFirst(node);
100                 }
101             }
102             if(msg.isPersistent()){
103                 persistent.addMessageFirst(node);
104             }
105         }
106     }
107
108     public void clear(){
109         pendingCount=0;
110     }
111
112     public synchronized boolean hasNext(){
113         
114         boolean result=pendingCount>0;
115         if(result){
116             try{
117                 currentCursor=getNextCursor();
118             }catch(Exception JavaDoc e){
119                 log.error("Failed to get current cursor ",e);
120                 throw new RuntimeException JavaDoc(e);
121             }
122             result=currentCursor!=null?currentCursor.hasNext():false;
123         }
124         return result;
125     }
126
127     public synchronized MessageReference next(){
128         MessageReference result = currentCursor!=null?currentCursor.next():null;
129         return result;
130     }
131
132     public synchronized void remove(){
133         if(currentCursor!=null){
134             currentCursor.remove();
135         }
136         pendingCount--;
137     }
138
139     public synchronized void remove(MessageReference node){
140         if (!node.isPersistent()) {
141             nonPersistent.remove(node);
142         }else {
143             persistent.remove(node);
144         }
145         pendingCount--;
146     }
147
148     public synchronized void reset(){
149         nonPersistent.reset();
150         persistent.reset();
151     }
152
153     public int size(){
154         return pendingCount;
155     }
156
157     public synchronized boolean isEmpty(){
158         return pendingCount<=0;
159     }
160
161     /**
162      * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
163      * may do
164      *
165      * @see org.apache.activemq.region.cursors.PendingMessageCursor
166      * @return true if recovery required
167      */

168     public boolean isRecoveryRequired(){
169         return false;
170     }
171
172     /**
173      * @return the nonPersistent Cursor
174      */

175     public PendingMessageCursor getNonPersistent(){
176         return this.nonPersistent;
177     }
178
179     /**
180      * @param nonPersistent cursor to set
181      */

182     public void setNonPersistent(PendingMessageCursor nonPersistent){
183         this.nonPersistent=nonPersistent;
184     }
185
186     public void setMaxBatchSize(int maxBatchSize){
187         persistent.setMaxBatchSize(maxBatchSize);
188         if(nonPersistent!=null){
189             nonPersistent.setMaxBatchSize(maxBatchSize);
190         }
191         super.setMaxBatchSize(maxBatchSize);
192     }
193     
194     public void gc() {
195         if (persistent != null) {
196             persistent.gc();
197         }
198         if (nonPersistent != null) {
199             nonPersistent.gc();
200         }
201     }
202     
203     public void setUsageManager(UsageManager usageManager){
204         super.setUsageManager(usageManager);
205         if (persistent != null) {
206             persistent.setUsageManager(usageManager);
207         }
208         if (nonPersistent != null) {
209             nonPersistent.setUsageManager(usageManager);
210         }
211      }
212
213     protected synchronized PendingMessageCursor getNextCursor() throws Exception JavaDoc{
214         if(currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()){
215             currentCursor=currentCursor==persistent?nonPersistent:persistent;
216             //sanity check
217
if (currentCursor.isEmpty()) {
218                 currentCursor=currentCursor==persistent?nonPersistent:persistent;
219             }
220         }
221         return currentCursor;
222     }
223 }
224
Popular Tags