KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > kahadaptor > KahaTopicMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.kahadaptor;
16
17 import java.io.IOException JavaDoc;
18 import java.util.Iterator JavaDoc;
19 import java.util.Map JavaDoc;
20 import java.util.concurrent.ConcurrentHashMap JavaDoc;
21 import org.apache.activemq.broker.ConnectionContext;
22 import org.apache.activemq.command.ActiveMQDestination;
23 import org.apache.activemq.command.Message;
24 import org.apache.activemq.command.MessageId;
25 import org.apache.activemq.command.SubscriptionInfo;
26 import org.apache.activemq.kaha.ListContainer;
27 import org.apache.activemq.kaha.MapContainer;
28 import org.apache.activemq.kaha.Marshaller;
29 import org.apache.activemq.kaha.Store;
30 import org.apache.activemq.kaha.StoreEntry;
31 import org.apache.activemq.store.MessageRecoveryListener;
32 import org.apache.activemq.store.TopicMessageStore;
33
34 /**
35  * @version $Revision: 1.5 $
36  */

37 public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore{
38
39     protected ListContainer<TopicSubAck> ackContainer;
40     private Map JavaDoc subscriberContainer;
41     private Store store;
42     protected Map JavaDoc subscriberMessages=new ConcurrentHashMap JavaDoc();
43
44     public KahaTopicMessageStore(Store store,MapContainer messageContainer,ListContainer<TopicSubAck> ackContainer,
45             MapContainer subsContainer,ActiveMQDestination destination) throws IOException JavaDoc{
46         super(messageContainer,destination);
47         this.store=store;
48         this.ackContainer=ackContainer;
49         subscriberContainer=subsContainer;
50         // load all the Ack containers
51
for(Iterator JavaDoc i=subscriberContainer.keySet().iterator();i.hasNext();){
52             Object JavaDoc key=i.next();
53             addSubscriberMessageContainer(key);
54         }
55     }
56
57     @Override JavaDoc
58     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException JavaDoc{
59         int subscriberCount=subscriberMessages.size();
60         if(subscriberCount>0){
61             MessageId id = message.getMessageId();
62             StoreEntry messageEntry=messageContainer.place(id,message);
63             TopicSubAck tsa=new TopicSubAck();
64             tsa.setCount(subscriberCount);
65             tsa.setMessageEntry(messageEntry);
66             StoreEntry ackEntry=ackContainer.placeLast(tsa);
67             for(Iterator JavaDoc i=subscriberMessages.values().iterator();i.hasNext();){
68                 TopicSubContainer container=(TopicSubContainer)i.next();
69                 ConsumerMessageRef ref=new ConsumerMessageRef();
70                 ref.setAckEntry(ackEntry);
71                 ref.setMessageEntry(messageEntry);
72                 ref.setMessageId(id);
73                 container.add(ref);
74             }
75         }
76     }
77
78     public synchronized void acknowledge(ConnectionContext context,String JavaDoc clientId,String JavaDoc subscriptionName,
79             MessageId messageId) throws IOException JavaDoc{
80         String JavaDoc subcriberId=getSubscriptionKey(clientId,subscriptionName);
81         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
82         if(container!=null){
83             ConsumerMessageRef ref=container.remove(messageId);
84             if(container.isEmpty()){
85                 container.reset();
86             }
87             if(ref!=null){
88                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
89                 if(tsa!=null){
90                     if(tsa.decrementCount()<=0){
91                         StoreEntry entry = ref.getAckEntry();
92                         entry = ackContainer.refresh(entry);
93                         ackContainer.remove(entry);
94                         entry = tsa.getMessageEntry();
95                         entry =messageContainer.refresh(entry);
96                         messageContainer.remove(entry);
97                     }else{
98                         ackContainer.update(ref.getAckEntry(),tsa);
99                     }
100                 }
101             }
102         }
103     }
104
105     public SubscriptionInfo lookupSubscription(String JavaDoc clientId,String JavaDoc subscriptionName) throws IOException JavaDoc{
106         return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
107     }
108
109     public synchronized void addSubsciption(String JavaDoc clientId,String JavaDoc subscriptionName,String JavaDoc selector,boolean retroactive)
110             throws IOException JavaDoc{
111         SubscriptionInfo info=new SubscriptionInfo();
112         info.setDestination(destination);
113         info.setClientId(clientId);
114         info.setSelector(selector);
115         info.setSubcriptionName(subscriptionName);
116         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
117         // if already exists - won't add it again as it causes data files
118
// to hang around
119
if(!subscriberContainer.containsKey(key)){
120             subscriberContainer.put(key,info);
121         }
122         // add the subscriber
123
ListContainer container=addSubscriberMessageContainer(key);
124         /*
125         if(retroactive){
126             for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
127                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
128                 ConsumerMessageRef ref=new ConsumerMessageRef();
129                 ref.setAckEntry(entry);
130                 ref.setMessageEntry(tsa.getMessageEntry());
131                 container.add(ref);
132             }
133         }
134         */

135     }
136
137     public synchronized void deleteSubscription(String JavaDoc clientId,String JavaDoc subscriptionName) throws IOException JavaDoc{
138         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
139         removeSubscriberMessageContainer(key);
140     }
141
142     public void recoverSubscription(String JavaDoc clientId,String JavaDoc subscriptionName,MessageRecoveryListener listener)
143             throws Exception JavaDoc{
144         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
145         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
146         if(container!=null){
147             for(Iterator JavaDoc i=container.iterator();i.hasNext();){
148                 ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
149                 Object JavaDoc msg=messageContainer.get(ref.getMessageEntry());
150                 if(msg!=null){
151                     recover(listener, msg);
152                 }
153             }
154         }
155         listener.finished();
156     }
157
158     public void recoverNextMessages(String JavaDoc clientId,String JavaDoc subscriptionName,int maxReturned,
159             MessageRecoveryListener listener) throws Exception JavaDoc{
160         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
161         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
162         if(container!=null){
163             int count=0;
164             StoreEntry entry=container.getBatchEntry();
165             if(entry==null){
166                 entry=container.getEntry();
167             }else{
168                 entry=container.refreshEntry(entry);
169                 if(entry!=null){
170                     entry=container.getNextEntry(entry);
171                 }
172             }
173             if(entry!=null){
174                 do{
175                     ConsumerMessageRef consumerRef=container.get(entry);
176                     Object JavaDoc msg=messageContainer.getValue(consumerRef.getMessageEntry());
177                     if(msg!=null){
178                         recover(listener, msg);
179                         count++;
180                     }
181                     container.setBatchEntry(entry);
182                     entry=container.getNextEntry(entry);
183                 }while(entry!=null&&count<maxReturned&&listener.hasSpace());
184             }
185         }
186         listener.finished();
187     }
188
189     public void delete(){
190         super.delete();
191         ackContainer.clear();
192         subscriberContainer.clear();
193     }
194
195     public SubscriptionInfo[] getAllSubscriptions() throws IOException JavaDoc{
196         return (SubscriptionInfo[])subscriberContainer.values().toArray(new SubscriptionInfo[subscriberContainer.size()]);
197     }
198
199     protected String JavaDoc getSubscriptionKey(String JavaDoc clientId,String JavaDoc subscriberName){
200         String JavaDoc result=clientId+":";
201         result+=subscriberName!=null?subscriberName:"NOT_SET";
202         return result;
203     }
204
205     protected ListContainer addSubscriberMessageContainer(Object JavaDoc key) throws IOException JavaDoc{
206         ListContainer container=store.getListContainer(key,"topic-subs");
207         Marshaller marshaller=new ConsumerMessageRefMarshaller();
208         container.setMarshaller(marshaller);
209         TopicSubContainer tsc=new TopicSubContainer(container);
210         subscriberMessages.put(key,tsc);
211         return container;
212     }
213
214     protected void removeSubscriberMessageContainer(Object JavaDoc key) throws IOException JavaDoc{
215         subscriberContainer.remove(key);
216         TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
217         for(Iterator JavaDoc i=container.iterator();i.hasNext();){
218             ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
219             if(ref!=null){
220                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
221                 if(tsa!=null){
222                     if(tsa.decrementCount()<=0){
223                         ackContainer.remove(ref.getAckEntry());
224                         messageContainer.remove(tsa.getMessageEntry());
225                     }else{
226                         ackContainer.update(ref.getAckEntry(),tsa);
227                     }
228                 }
229             }
230         }
231         store.deleteListContainer(key,"topic-subs");
232     }
233
234     public int getMessageCount(String JavaDoc clientId,String JavaDoc subscriberName) throws IOException JavaDoc{
235         String JavaDoc key=getSubscriptionKey(clientId,subscriberName);
236         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
237         return container != null ? container.size() : 0;
238     }
239
240     /**
241      * @param context
242      * @throws IOException
243      * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
244      */

245     public synchronized void removeAllMessages(ConnectionContext context) throws IOException JavaDoc{
246         messageContainer.clear();
247         ackContainer.clear();
248         for(Iterator JavaDoc i=subscriberMessages.values().iterator();i.hasNext();){
249             TopicSubContainer container=(TopicSubContainer)i.next();
250             container.clear();
251         }
252     }
253
254     public synchronized void resetBatching(String JavaDoc clientId,String JavaDoc subscriptionName){
255         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
256         TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
257         if(topicSubContainer!=null){
258             topicSubContainer.reset();
259         }
260     }
261 }
262
Popular Tags