KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > kahadaptor > KahaMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.kahadaptor;
16
17 import java.io.IOException JavaDoc;
18 import org.apache.activemq.broker.ConnectionContext;
19 import org.apache.activemq.command.ActiveMQDestination;
20 import org.apache.activemq.command.Message;
21 import org.apache.activemq.command.MessageAck;
22 import org.apache.activemq.command.MessageId;
23 import org.apache.activemq.kaha.MapContainer;
24 import org.apache.activemq.kaha.StoreEntry;
25 import org.apache.activemq.memory.UsageManager;
26 import org.apache.activemq.store.MessageRecoveryListener;
27 import org.apache.activemq.store.MessageStore;
28
29 /**
30  * An implementation of {@link org.apache.activemq.store.MessageStore} which uses a JPS Container
31  *
32  * @version $Revision: 1.7 $
33  */

34 public class KahaMessageStore implements MessageStore{
35
36     protected final ActiveMQDestination destination;
37     protected final MapContainer<MessageId,Message> messageContainer;
38     protected StoreEntry batchEntry=null;
39
40     public KahaMessageStore(MapContainer<MessageId,Message> container,ActiveMQDestination destination)
41             throws IOException JavaDoc{
42         this.messageContainer=container;
43         this.destination=destination;
44     }
45
46     protected MessageId getMessageId(Object JavaDoc object){
47         return ((Message)object).getMessageId();
48     }
49
50     public Object JavaDoc getId(){
51         return messageContainer.getId();
52     }
53
54     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException JavaDoc{
55         messageContainer.put(message.getMessageId(),message);
56         // TODO: we should do the following but it is not need if the message is being added within a persistence
57
// transaction
58
// but since I can't tell if one is running right now.. I'll leave this out for now.
59
// if( message.isResponseRequired() ) {
60
// messageContainer.force();
61
// }
62
}
63
64     public synchronized Message getMessage(MessageId identity) throws IOException JavaDoc{
65         Message result=messageContainer.get(identity);
66         return result;
67     }
68
69     protected void recover(MessageRecoveryListener listener,Object JavaDoc msg) throws Exception JavaDoc{
70         listener.recoverMessage((Message)msg);
71     }
72
73     public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException JavaDoc{
74         removeMessage(ack.getLastMessageId());
75     }
76
77     
78     
79     public synchronized void removeMessage(MessageId msgId) throws IOException JavaDoc{
80         StoreEntry entry=messageContainer.getEntry(msgId);
81         if(entry!=null){
82             messageContainer.remove(entry);
83             if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
84                 resetBatching();
85             }
86         }
87     }
88
89     public synchronized void recover(MessageRecoveryListener listener) throws Exception JavaDoc{
90         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
91             Message msg=(Message)messageContainer.getValue(entry);
92             recover(listener,msg);
93         }
94         listener.finished();
95     }
96
97     public void start(){
98     }
99
100     public void stop(){
101     }
102
103     public synchronized void removeAllMessages(ConnectionContext context) throws IOException JavaDoc{
104         messageContainer.clear();
105     }
106
107     public ActiveMQDestination getDestination(){
108         return destination;
109     }
110
111     public synchronized void delete(){
112         messageContainer.clear();
113     }
114
115     /**
116      * @param usageManager The UsageManager that is controlling the destination's memory usage.
117      */

118     public void setUsageManager(UsageManager usageManager){
119     }
120
121     /**
122      * @return the number of messages held by this destination
123      * @see org.apache.activemq.store.MessageStore#getMessageCount()
124      */

125     public int getMessageCount(){
126         return messageContainer.size();
127     }
128
129     /**
130      * @param id
131      * @return null
132      * @throws Exception
133      * @see org.apache.activemq.store.MessageStore#getPreviousMessageIdToDeliver(org.apache.activemq.command.MessageId)
134      */

135     public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception JavaDoc{
136         return null;
137     }
138
139     /**
140      * @param lastMessageId
141      * @param maxReturned
142      * @param listener
143      * @throws Exception
144      * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int,
145      * org.apache.activemq.store.MessageRecoveryListener)
146      */

147     public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception JavaDoc{
148         StoreEntry entry=batchEntry;
149         if(entry==null){
150             entry=messageContainer.getFirst();
151         }else{
152             entry=messageContainer.refresh(entry);
153             entry=messageContainer.getNext(entry);
154             if(entry==null){
155                 batchEntry=null;
156             }
157         }
158         if(entry!=null){
159             int count=0;
160             do{
161                 Object JavaDoc msg=messageContainer.getValue(entry);
162                 if(msg!=null){
163                     recover(listener,msg);
164                     count++;
165                 }
166                 batchEntry=entry;
167                 entry=messageContainer.getNext(entry);
168             }while(entry!=null&&count<maxReturned&&listener.hasSpace());
169         }
170         listener.finished();
171     }
172
173     /**
174      * @param nextToDispatch
175      * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
176      */

177     public void resetBatching(){
178         batchEntry=null;
179     }
180
181     /**
182      * @return true if the store supports cursors
183      */

184     public boolean isSupportForCursors(){
185         return true;
186     }
187 }
188
Popular Tags