KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > cursors > QueueStorePrefetch


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with this
5  * work for additional information regarding copyright ownership. The ASF
6  * licenses this file to You under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15  * License for the specific language governing permissions and limitations under
16  * the License.
17  */

18
19 package org.apache.activemq.broker.region.cursors;
20
21 import java.io.IOException JavaDoc;
22 import java.util.LinkedList JavaDoc;
23 import org.apache.activemq.broker.region.Destination;
24 import org.apache.activemq.broker.region.MessageReference;
25 import org.apache.activemq.broker.region.Queue;
26 import org.apache.activemq.command.Message;
27 import org.apache.activemq.command.MessageId;
28 import org.apache.activemq.store.MessageRecoveryListener;
29 import org.apache.activemq.store.MessageStore;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32
33 /**
34  * perist pending messages pending message (messages awaiting disptach to a
35  * consumer) cursor
36  *
37  * @version $Revision: 474985 $
38  */

39 class QueueStorePrefetch extends AbstractPendingMessageCursor implements
40         MessageRecoveryListener {
41
42     static private final Log log=LogFactory.getLog(QueueStorePrefetch.class);
43    
44     private MessageStore store;
45     private final LinkedList JavaDoc <Message>batchList=new LinkedList JavaDoc<Message>();
46     private Destination regionDestination;
47     private int size = 0;
48
49     /**
50      * @param topic
51      * @param clientId
52      * @param subscriberName
53      * @throws IOException
54      */

55     public QueueStorePrefetch(Queue queue){
56         this.regionDestination = queue;
57         this.store=(MessageStore)queue.getMessageStore();
58         
59     }
60
61     public void start() throws Exception JavaDoc{
62     }
63
64     public void stop() throws Exception JavaDoc{
65         store.resetBatching();
66         gc();
67     }
68
69     /**
70      * @return true if there are no pending messages
71      */

72     public boolean isEmpty(){
73         return size <= 0;
74     }
75     
76     public boolean hasMessagesBufferedToDeliver() {
77         return !batchList.isEmpty();
78     }
79     
80     public synchronized int size(){
81         try {
82         size = store.getMessageCount();
83         }catch(IOException JavaDoc e) {
84             log.error("Failed to get message count",e);
85             throw new RuntimeException JavaDoc(e);
86         }
87         return size;
88     }
89     
90     public synchronized void addMessageLast(MessageReference node) throws Exception JavaDoc{
91         size++;
92     }
93     
94     public void addMessageFirst(MessageReference node) throws Exception JavaDoc{
95         size++;
96     }
97     
98     public void remove(){
99         size--;
100     }
101
102     public void remove(MessageReference node){
103         size--;
104     }
105
106
107     public synchronized boolean hasNext(){
108         if(batchList.isEmpty()){
109             try{
110                 fillBatch();
111             }catch(Exception JavaDoc e){
112                 log.error("Failed to fill batch",e);
113                 throw new RuntimeException JavaDoc(e);
114             }
115         }
116         return !batchList.isEmpty();
117     }
118
119     public synchronized MessageReference next(){
120         Message result = batchList.removeFirst();
121         result.decrementReferenceCount();
122         result.setRegionDestination(regionDestination);
123         return result;
124     }
125
126     public void reset(){
127     }
128
129     // MessageRecoveryListener implementation
130
public void finished(){
131     }
132
133     public void recoverMessage(Message message) throws Exception JavaDoc{
134         message.setRegionDestination(regionDestination);
135         message.incrementReferenceCount();
136         batchList.addLast(message);
137     }
138
139     public void recoverMessageReference(MessageId messageReference) throws Exception JavaDoc {
140         Message msg=store.getMessage(messageReference);
141         if(msg!=null){
142             recoverMessage(msg);
143         }else{
144             String JavaDoc err = "Failed to retrieve message for id: "+messageReference;
145             log.error(err);
146             throw new IOException JavaDoc(err);
147         }
148     }
149     
150     public void gc() {
151         for (Message msg:batchList) {
152             msg.decrementReferenceCount();
153         }
154         batchList.clear();
155     }
156
157     // implementation
158
protected void fillBatch() throws Exception JavaDoc{
159         store.recoverNextMessages(maxBatchSize,this);
160     }
161     
162     public String JavaDoc toString() {
163         return "QueueStorePrefetch" + System.identityHashCode(this) ;
164     }
165     
166 }
167
Popular Tags