KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > cursors > TopicStorePrefetch


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker.region.cursors;
16
17 import java.io.IOException JavaDoc;
18 import java.util.LinkedList JavaDoc;
19 import org.apache.activemq.broker.region.Destination;
20 import org.apache.activemq.broker.region.MessageReference;
21 import org.apache.activemq.broker.region.Topic;
22 import org.apache.activemq.command.Message;
23 import org.apache.activemq.command.MessageId;
24 import org.apache.activemq.store.MessageRecoveryListener;
25 import org.apache.activemq.store.TopicMessageStore;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 /**
30  * perist pendingCount messages pendingCount message (messages awaiting disptach to a consumer) cursor
31  *
32  * @version $Revision: 512642 $
33  */

34 class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{
35
36     static private final Log log=LogFactory.getLog(TopicStorePrefetch.class);
37     private TopicMessageStore store;
38     private final LinkedList JavaDoc<Message> batchList=new LinkedList JavaDoc<Message>();
39     private String JavaDoc clientId;
40     private String JavaDoc subscriberName;
41     private Destination regionDestination;
42     private MessageId firstMessageId;
43     private MessageId lastMessageId;
44     private int pendingCount;
45     private boolean started;
46
47     /**
48      * @param topic
49      * @param clientId
50      * @param subscriberName
51      */

52     public TopicStorePrefetch(Topic topic,String JavaDoc clientId,String JavaDoc subscriberName){
53         this.regionDestination=topic;
54         this.store=(TopicMessageStore)topic.getMessageStore();
55         this.clientId=clientId;
56         this.subscriberName=subscriberName;
57     }
58
59     public synchronized void start(){
60         if(!started){
61             started=true;
62             pendingCount=getStoreSize();
63             try{
64                 fillBatch();
65             }catch(Exception JavaDoc e){
66                 log.error("Failed to fill batch",e);
67                 throw new RuntimeException JavaDoc(e);
68             }
69         }
70     }
71
72     public synchronized void stop(){
73         if(started){
74             started=false;
75             store.resetBatching(clientId,subscriberName);
76             gc();
77         }
78     }
79
80     /**
81      * @return true if there are no pendingCount messages
82      */

83     public boolean isEmpty(){
84         return pendingCount <= 0;
85     }
86
87     public synchronized int size(){
88         return getPendingCount();
89     }
90
91     public synchronized void addMessageLast(MessageReference node) throws Exception JavaDoc{
92         if(node!=null){
93             if(isEmpty() && started){
94                 firstMessageId=node.getMessageId();
95             }
96             lastMessageId=node.getMessageId();
97             node.decrementReferenceCount();
98             pendingCount++;
99         }
100     }
101     
102     public void addMessageFirst(MessageReference node) throws Exception JavaDoc{
103         if(node!=null){
104             if(started){
105                 firstMessageId=node.getMessageId();
106             }
107             node.decrementReferenceCount();
108             pendingCount++;
109         }
110     }
111     
112     public synchronized void remove(){
113         pendingCount--;
114     }
115     
116     public synchronized void remove(MessageReference node){
117         pendingCount--;
118     }
119     
120     public synchronized void clear(){
121         pendingCount=0;
122     }
123
124     public synchronized boolean hasNext(){
125         return !isEmpty();
126     }
127
128     public synchronized MessageReference next(){
129         Message result=null;
130         if(!isEmpty()){
131             if(batchList.isEmpty()){
132                 try{
133                     fillBatch();
134                 }catch(final Exception JavaDoc e){
135                     log.error("Failed to fill batch",e);
136                     throw new RuntimeException JavaDoc(e);
137                 }
138                 if(batchList.isEmpty()){
139                     return null;
140                 }
141             }
142             if(!batchList.isEmpty()){
143                 result=batchList.removeFirst();
144                 if(lastMessageId!=null){
145                     if(result.getMessageId().equals(lastMessageId)){
146                         //pendingCount=0;
147
}
148                 }
149                 result.setRegionDestination(regionDestination);
150             }
151         }
152         return result;
153     }
154
155     public void reset(){
156     }
157
158     // MessageRecoveryListener implementation
159
public void finished(){
160     }
161
162     public synchronized void recoverMessage(Message message) throws Exception JavaDoc{
163         message.setRegionDestination(regionDestination);
164         // only increment if count is zero (could have been cached)
165
if(message.getReferenceCount()==0){
166             message.incrementReferenceCount();
167         }
168         batchList.addLast(message);
169     }
170
171     public void recoverMessageReference(MessageId messageReference) throws Exception JavaDoc{
172         // shouldn't get called
173
throw new RuntimeException JavaDoc("Not supported");
174     }
175
176     // implementation
177
protected synchronized void fillBatch() throws Exception JavaDoc{
178         if(!isEmpty()){
179             store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this);
180             if(firstMessageId!=null){
181                 int pos=0;
182                 for(Message msg:batchList){
183                     if(msg.getMessageId().equals(firstMessageId)){
184                         firstMessageId=null;
185                         break;
186                     }
187                     pos++;
188                 }
189                 if(pos>0){
190                     for(int i=0;i<pos&&!batchList.isEmpty();i++){
191                         batchList.removeFirst();
192                     }
193                     if(batchList.isEmpty()){
194                         log.debug("Refilling batch - haven't got past first message = " + firstMessageId);
195                         fillBatch();
196                     }
197                 }
198             }
199         }
200     }
201     
202     protected synchronized int getPendingCount(){
203         if(pendingCount <= 0){
204             pendingCount = getStoreSize();
205         }
206         return pendingCount;
207     }
208     
209     protected synchronized int getStoreSize(){
210         try{
211             return store.getMessageCount(clientId,subscriberName);
212         }catch(IOException JavaDoc e){
213             log.error(this+" Failed to get the outstanding message count from the store",e);
214             throw new RuntimeException JavaDoc(e);
215         }
216     }
217     
218     
219
220     public synchronized void gc(){
221         for(Message msg:batchList){
222             msg.decrementReferenceCount();
223         }
224         batchList.clear();
225     }
226
227     public String JavaDoc toString(){
228         return "TopicStorePrefetch"+System.identityHashCode(this)+"("+clientId+","+subscriberName+")";
229     }
230 }
231
Popular Tags