KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > kahadaptor > KahaTopicReferenceStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.kahadaptor;
16
17 import java.io.IOException JavaDoc;
18 import java.util.Iterator JavaDoc;
19 import java.util.Map JavaDoc;
20 import java.util.concurrent.ConcurrentHashMap JavaDoc;
21 import org.apache.activemq.broker.ConnectionContext;
22 import org.apache.activemq.command.ActiveMQDestination;
23 import org.apache.activemq.command.Message;
24 import org.apache.activemq.command.MessageId;
25 import org.apache.activemq.command.SubscriptionInfo;
26 import org.apache.activemq.kaha.ListContainer;
27 import org.apache.activemq.kaha.MapContainer;
28 import org.apache.activemq.kaha.Marshaller;
29 import org.apache.activemq.kaha.Store;
30 import org.apache.activemq.kaha.StoreEntry;
31 import org.apache.activemq.store.MessageRecoveryListener;
32 import org.apache.activemq.store.TopicReferenceStore;
33
34 public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore{
35
36     protected ListContainer<TopicSubAck> ackContainer;
37     private Map JavaDoc subscriberContainer;
38     private Store store;
39     protected Map JavaDoc subscriberMessages=new ConcurrentHashMap JavaDoc();
40
41     public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer messageContainer,ListContainer ackContainer,
42             MapContainer subsContainer,ActiveMQDestination destination) throws IOException JavaDoc{
43         super(adapter,messageContainer,destination);
44         this.store=store;
45         this.ackContainer=ackContainer;
46         subscriberContainer=subsContainer;
47         // load all the Ack containers
48
for(Iterator JavaDoc i=subscriberContainer.keySet().iterator();i.hasNext();){
49             Object JavaDoc key=i.next();
50             addSubscriberMessageContainer(key);
51         }
52     }
53
54     protected MessageId getMessageId(Object JavaDoc object){
55         return new MessageId(((ReferenceRecord)object).getMessageId());
56     }
57
58     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException JavaDoc{
59         throw new RuntimeException JavaDoc("Use addMessageReference instead");
60     }
61
62     public synchronized Message getMessage(MessageId identity) throws IOException JavaDoc{
63         throw new RuntimeException JavaDoc("Use addMessageReference instead");
64     }
65
66     protected void recover(MessageRecoveryListener listener,Object JavaDoc msg) throws Exception JavaDoc{
67         ReferenceRecord record=(ReferenceRecord)msg;
68         listener.recoverMessageReference(new MessageId(record.getMessageId()));
69     }
70
71     public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){
72         final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
73         final int subscriberCount=subscriberMessages.size();
74         if(subscriberCount>0){
75             final StoreEntry messageEntry=messageContainer.place(messageId,record);
76             addInterest(record);
77             final TopicSubAck tsa=new TopicSubAck();
78             tsa.setCount(subscriberCount);
79             tsa.setMessageEntry(messageEntry);
80             final StoreEntry ackEntry=ackContainer.placeLast(tsa);
81             for(final Iterator JavaDoc i=subscriberMessages.values().iterator();i.hasNext();){
82                 final TopicSubContainer container=(TopicSubContainer)i.next();
83                 final ConsumerMessageRef ref=new ConsumerMessageRef();
84                 ref.setAckEntry(ackEntry);
85                 ref.setMessageEntry(messageEntry);
86                 ref.setMessageId(messageId);
87                 container.add(ref);
88             }
89         }
90     }
91
92     public ReferenceData getMessageReference(final MessageId identity) throws IOException JavaDoc{
93         final ReferenceRecord result=messageContainer.get(identity);
94         if(result==null)
95             return null;
96         return result.getData();
97     }
98
99     public void addReferenceFileIdsInUse(){
100         for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
101             TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
102             if(subAck.getCount()>0){
103                 ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
104                 addInterest(rr);
105             }
106         }
107     }
108
109     protected ListContainer addSubscriberMessageContainer(Object JavaDoc key) throws IOException JavaDoc{
110         ListContainer container=store.getListContainer(key,"topic-subs-references");
111         Marshaller marshaller=new ConsumerMessageRefMarshaller();
112         container.setMarshaller(marshaller);
113         TopicSubContainer tsc=new TopicSubContainer(container);
114         subscriberMessages.put(key,tsc);
115         return container;
116     }
117
118     public synchronized void acknowledge(ConnectionContext context,String JavaDoc clientId,String JavaDoc subscriptionName,
119             MessageId messageId) throws IOException JavaDoc{
120         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
121         
122         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
123         if(container!=null){
124             ConsumerMessageRef ref=container.remove(messageId);
125             if(ref!=null){
126                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
127                 if(tsa!=null){
128                     if(tsa.decrementCount()<=0){
129                         StoreEntry entry=ref.getAckEntry();
130                         entry=ackContainer.refresh(entry);
131                         ackContainer.remove(entry);
132                         ReferenceRecord rr=messageContainer.get(messageId);
133                         if(rr!=null){
134                             entry=tsa.getMessageEntry();
135                             entry=messageContainer.refresh(entry);
136                             messageContainer.remove(entry);
137                             removeInterest(rr);
138                         }
139                     }else{
140                        
141                         ackContainer.update(ref.getAckEntry(),tsa);
142                     }
143                 }
144             }
145         }
146     }
147
148     public synchronized void addSubsciption(String JavaDoc clientId,String JavaDoc subscriptionName,String JavaDoc selector,boolean retroactive)
149             throws IOException JavaDoc{
150         SubscriptionInfo info=new SubscriptionInfo();
151         info.setDestination(destination);
152         info.setClientId(clientId);
153         info.setSelector(selector);
154         info.setSubcriptionName(subscriptionName);
155         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
156         // if already exists - won't add it again as it causes data files
157
// to hang around
158
if(!subscriberContainer.containsKey(key)){
159             subscriberContainer.put(key,info);
160         }
161         // add the subscriber
162
ListContainer container=addSubscriberMessageContainer(key);
163         if(retroactive){
164             /*
165             for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
166                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
167                 ConsumerMessageRef ref=new ConsumerMessageRef();
168                 ref.setAckEntry(entry);
169                 ref.setMessageEntry(tsa.getMessageEntry());
170                 container.add(ref);
171             }
172             */

173         }
174     }
175
176     public synchronized void deleteSubscription(String JavaDoc clientId,String JavaDoc subscriptionName) throws IOException JavaDoc{
177         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
178         removeSubscriberMessageContainer(key);
179     }
180
181     public SubscriptionInfo[] getAllSubscriptions() throws IOException JavaDoc{
182         return (SubscriptionInfo[])subscriberContainer.values().toArray(
183                 new SubscriptionInfo[subscriberContainer.size()]);
184     }
185
186     public int getMessageCount(String JavaDoc clientId,String JavaDoc subscriberName) throws IOException JavaDoc{
187         String JavaDoc key=getSubscriptionKey(clientId,subscriberName);
188         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
189         return container != null ? container.size() : 0;
190     }
191
192     public SubscriptionInfo lookupSubscription(String JavaDoc clientId,String JavaDoc subscriptionName) throws IOException JavaDoc{
193         return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
194     }
195
196     public void recoverNextMessages(String JavaDoc clientId,String JavaDoc subscriptionName,int maxReturned,
197             MessageRecoveryListener listener) throws Exception JavaDoc{
198         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
199         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
200         if(container!=null){
201             int count=0;
202             StoreEntry entry=container.getBatchEntry();
203             if(entry==null){
204                 entry=container.getEntry();
205             }else{
206                 entry=container.refreshEntry(entry);
207                 if(entry!=null){
208                     entry=container.getNextEntry(entry);
209                 }
210             }
211             if(entry!=null){
212                 do{
213                     ConsumerMessageRef consumerRef=container.get(entry);
214                     Object JavaDoc msg=messageContainer.getValue(consumerRef.getMessageEntry());
215                     if(msg!=null){
216                         recover(listener,msg);
217                         count++;
218                     }
219                     container.setBatchEntry(entry);
220                     entry=container.getNextEntry(entry);
221                 }while(entry!=null&&count<maxReturned&&listener.hasSpace());
222             }
223         }
224         listener.finished();
225     }
226
227     public void recoverSubscription(String JavaDoc clientId,String JavaDoc subscriptionName,MessageRecoveryListener listener)
228             throws Exception JavaDoc{
229         
230         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
231         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
232         if(container!=null){
233             for(Iterator JavaDoc i=container.iterator();i.hasNext();){
234                 ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
235                 Object JavaDoc msg=messageContainer.get(ref.getMessageEntry());
236                 if(msg!=null){
237                     recover(listener,msg);
238                 }
239             }
240         }
241         listener.finished();
242     }
243
244     public synchronized void resetBatching(String JavaDoc clientId,String JavaDoc subscriptionName){
245         String JavaDoc key=getSubscriptionKey(clientId,subscriptionName);
246         TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
247         if(topicSubContainer!=null){
248             topicSubContainer.reset();
249         }
250     }
251
252     protected void removeSubscriberMessageContainer(Object JavaDoc key) throws IOException JavaDoc{
253         subscriberContainer.remove(key);
254         TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
255         for(Iterator JavaDoc i=container.iterator();i.hasNext();){
256             ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
257             if(ref!=null){
258                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
259                 if(tsa!=null){
260                     if(tsa.decrementCount()<=0){
261                         ackContainer.remove(ref.getAckEntry());
262                         messageContainer.remove(tsa.getMessageEntry());
263                     }else{
264                         ackContainer.update(ref.getAckEntry(),tsa);
265                     }
266                 }
267             }
268         }
269         store.deleteListContainer(key,"topic-subs-references");
270     }
271
272     protected String JavaDoc getSubscriptionKey(String JavaDoc clientId,String JavaDoc subscriberName){
273         String JavaDoc result=clientId+":";
274         result+=subscriberName!=null?subscriberName:"NOT_SET";
275         return result;
276     }
277 }
278
Popular Tags