KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > cursors > FilePendingMessageCursor


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker.region.cursors;
16
17 import java.io.IOException JavaDoc;
18 import java.util.Iterator JavaDoc;
19 import java.util.LinkedList JavaDoc;
20 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
21 import java.util.concurrent.atomic.AtomicLong JavaDoc;
22 import org.apache.activemq.broker.region.Destination;
23 import org.apache.activemq.broker.region.MessageReference;
24 import org.apache.activemq.command.Message;
25 import org.apache.activemq.kaha.CommandMarshaller;
26 import org.apache.activemq.kaha.ListContainer;
27 import org.apache.activemq.kaha.Store;
28 import org.apache.activemq.memory.UsageListener;
29 import org.apache.activemq.memory.UsageManager;
30 import org.apache.activemq.openwire.OpenWireFormat;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34 /**
35  * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
36  *
37  * @version $Revision: 509115 $
38  */

39 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{
40
41     static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class);
42     static private final AtomicLong JavaDoc nameCount = new AtomicLong JavaDoc();
43     private Store store;
44     private String JavaDoc name;
45     private LinkedList JavaDoc memoryList=new LinkedList JavaDoc();
46     private ListContainer diskList;
47     private Iterator JavaDoc iter=null;
48     private Destination regionDestination;
49     private boolean iterating;
50     private boolean flushRequired;
51     private AtomicBoolean JavaDoc started=new AtomicBoolean JavaDoc();
52
53     /**
54      * @param name
55      * @param store
56      */

57     public FilePendingMessageCursor(String JavaDoc name,Store store){
58         this.name=nameCount.incrementAndGet() + "_"+name;
59         this.store=store;
60     }
61
62     public void start(){
63         if(started.compareAndSet(false,true)){
64             if(usageManager!=null){
65                 usageManager.addUsageListener(this);
66             }
67         }
68     }
69
70     public void stop(){
71         if(started.compareAndSet(true,false)){
72             gc();
73             if(usageManager!=null){
74                 usageManager.removeUsageListener(this);
75             }
76         }
77     }
78
79     /**
80      * @return true if there are no pending messages
81      */

82     public synchronized boolean isEmpty(){
83         return memoryList.isEmpty()&&isDiskListEmpty();
84     }
85
86     /**
87      * reset the cursor
88      *
89      */

90     public synchronized void reset(){
91         iterating=true;
92         iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator();
93     }
94
95     public synchronized void release(){
96         iterating=false;
97         if(flushRequired){
98             flushRequired=false;
99             flushToDisk();
100         }
101     }
102
103     public synchronized void destroy(){
104         stop();
105         for(Iterator JavaDoc i=memoryList.iterator();i.hasNext();){
106             Message node=(Message)i.next();
107             node.decrementReferenceCount();
108         }
109         memoryList.clear();
110         if(!isDiskListEmpty()){
111             getDiskList().clear();
112         }
113     }
114
115     public synchronized LinkedList JavaDoc pageInList(int maxItems){
116         LinkedList JavaDoc result=new LinkedList JavaDoc();
117         int count=0;
118         for(Iterator JavaDoc i=memoryList.iterator();i.hasNext()&&count<maxItems;){
119             result.add(i.next());
120             count++;
121         }
122         if(count<maxItems&&!isDiskListEmpty()){
123             for(Iterator JavaDoc i=getDiskList().iterator();i.hasNext()&&count<maxItems;){
124                 Message message=(Message)i.next();
125                 message.setRegionDestination(regionDestination);
126                 message.incrementReferenceCount();
127                 result.add(message);
128                 count++;
129             }
130         }
131         return result;
132     }
133
134     /**
135      * add message to await dispatch
136      *
137      * @param node
138      */

139     public synchronized void addMessageLast(MessageReference node){
140         try{
141             regionDestination=node.getMessage().getRegionDestination();
142             if(isSpaceInMemoryList()){
143                 memoryList.add(node);
144             }else{
145                 flushToDisk();
146                 node.decrementReferenceCount();
147                 getDiskList().addLast(node);
148             }
149         }catch(IOException JavaDoc e){
150             throw new RuntimeException JavaDoc(e);
151         }
152     }
153
154     /**
155      * add message to await dispatch
156      *
157      * @param node
158      */

159     public synchronized void addMessageFirst(MessageReference node){
160         try{
161             regionDestination=node.getMessage().getRegionDestination();
162             if(isSpaceInMemoryList()){
163                 memoryList.addFirst(node);
164             }else{
165                 flushToDisk();
166                 node.decrementReferenceCount();
167                 getDiskList().addFirst(node);
168             }
169         }catch(IOException JavaDoc e){
170             throw new RuntimeException JavaDoc(e);
171         }
172     }
173
174     /**
175      * @return true if there pending messages to dispatch
176      */

177     public synchronized boolean hasNext(){
178         return iter.hasNext();
179     }
180
181     /**
182      * @return the next pending message
183      */

184     public synchronized MessageReference next(){
185         Message message=(Message)iter.next();
186         if(!isDiskListEmpty()){
187             // got from disk
188
message.setRegionDestination(regionDestination);
189             message.incrementReferenceCount();
190         }
191         return message;
192     }
193
194     /**
195      * remove the message at the cursor position
196      *
197      */

198     public synchronized void remove(){
199         iter.remove();
200     }
201
202     /**
203      * @param node
204      * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
205      */

206     public synchronized void remove(MessageReference node){
207         memoryList.remove(node);
208         if(!isDiskListEmpty()){
209             getDiskList().remove(node);
210         }
211     }
212
213     /**
214      * @return the number of pending messages
215      */

216     public synchronized int size(){
217         return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size());
218     }
219
220     /**
221      * clear all pending messages
222      *
223      */

224     public synchronized void clear(){
225         memoryList.clear();
226         if(!isDiskListEmpty()){
227             getDiskList().clear();
228         }
229     }
230
231     public synchronized boolean isFull(){
232         // we always have space - as we can persist to disk
233
return false;
234     }
235
236     public boolean hasMessagesBufferedToDeliver(){
237         return !isEmpty();
238     }
239
240     public void setUsageManager(UsageManager usageManager){
241         super.setUsageManager(usageManager);
242         usageManager.addUsageListener(this);
243     }
244
245     public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
246         if(newPercentUsage>=getMemoryUsageHighWaterMark()){
247             synchronized(this){
248                 flushRequired=true;
249                 if(!iterating){
250                     flushToDisk();
251                     flushRequired=false;
252                 }
253             }
254         }
255     }
256
257     protected boolean isSpaceInMemoryList(){
258         return hasSpace()&&isDiskListEmpty();
259     }
260
261     protected synchronized void flushToDisk(){
262         if(!memoryList.isEmpty()){
263             while(!memoryList.isEmpty()){
264                 MessageReference node=(MessageReference)memoryList.removeFirst();
265                 node.decrementReferenceCount();
266                 getDiskList().addLast(node);
267             }
268             memoryList.clear();
269         }
270     }
271
272     protected boolean isDiskListEmpty(){
273         return diskList==null||diskList.isEmpty();
274     }
275
276     protected ListContainer getDiskList(){
277         if(diskList==null){
278             try{
279                 diskList=store.getListContainer(name,"TopicSubscription",true);
280                 diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
281             }catch(IOException JavaDoc e){
282                 e.printStackTrace();
283                 throw new RuntimeException JavaDoc(e);
284             }
285         }
286         return diskList;
287     }
288 }
289
Popular Tags