KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > memory > MemoryTopicMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.memory;
16
17 import java.io.IOException JavaDoc;
18 import java.util.Collections JavaDoc;
19 import java.util.HashMap JavaDoc;
20 import java.util.Iterator JavaDoc;
21 import java.util.Map JavaDoc;
22 import java.util.Map.Entry;
23 import org.apache.activemq.broker.ConnectionContext;
24 import org.apache.activemq.command.ActiveMQDestination;
25 import org.apache.activemq.command.Message;
26 import org.apache.activemq.command.MessageId;
27 import org.apache.activemq.command.SubscriptionInfo;
28 import org.apache.activemq.store.MessageRecoveryListener;
29 import org.apache.activemq.store.TopicMessageStore;
30 import org.apache.activemq.util.LRUCache;
31 import org.apache.activemq.util.SubscriptionKey;
32
33 /**
34  * @version $Revision: 1.5 $
35  */

36 public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{
37
38     private Map JavaDoc subscriberDatabase;
39     private Map JavaDoc topicSubMap;
40
41     public MemoryTopicMessageStore(ActiveMQDestination destination){
42         this(destination,new LRUCache(100,100,0.75f,false),makeMap());
43     }
44
45     protected static Map JavaDoc makeMap(){
46         return Collections.synchronizedMap(new HashMap JavaDoc());
47     }
48
49     public MemoryTopicMessageStore(ActiveMQDestination destination,Map JavaDoc messageTable,Map JavaDoc subscriberDatabase){
50         super(destination,messageTable);
51         this.subscriberDatabase=subscriberDatabase;
52         this.topicSubMap=makeMap();
53     }
54
55     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException JavaDoc{
56         super.addMessage(context,message);
57         for(Iterator JavaDoc i=topicSubMap.values().iterator();i.hasNext();){
58             MemoryTopicSub sub=(MemoryTopicSub)i.next();
59             sub.addMessage(message.getMessageId(),message);
60         }
61     }
62
63     public synchronized void acknowledge(ConnectionContext context,String JavaDoc clientId,String JavaDoc subscriptionName,
64             MessageId messageId) throws IOException JavaDoc{
65         SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
66         MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(key);
67         if(sub!=null){
68             sub.removeMessage(messageId);
69         }
70     }
71
72     public SubscriptionInfo lookupSubscription(String JavaDoc clientId,String JavaDoc subscriptionName) throws IOException JavaDoc{
73         return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
74     }
75
76     public synchronized void addSubsciption(String JavaDoc clientId,String JavaDoc subscriptionName,String JavaDoc selector,boolean retroactive)
77             throws IOException JavaDoc{
78         SubscriptionInfo info=new SubscriptionInfo();
79         info.setDestination(destination);
80         info.setClientId(clientId);
81         info.setSelector(selector);
82         info.setSubcriptionName(subscriptionName);
83         SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
84         MemoryTopicSub sub=new MemoryTopicSub();
85         topicSubMap.put(key,sub);
86         if(retroactive){
87             for(Iterator JavaDoc i=messageTable.entrySet().iterator();i.hasNext();){
88                 Map.Entry JavaDoc entry=(Entry)i.next();
89                 sub.addMessage((MessageId)entry.getKey(),(Message)entry.getValue());
90             }
91         }
92         subscriberDatabase.put(key,info);
93     }
94
95     public void deleteSubscription(String JavaDoc clientId,String JavaDoc subscriptionName){
96         org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
97         subscriberDatabase.remove(key);
98         topicSubMap.remove(key);
99     }
100
101     public void recoverSubscription(String JavaDoc clientId,String JavaDoc subscriptionName,MessageRecoveryListener listener)
102             throws Exception JavaDoc{
103         MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
104         if(sub!=null){
105             sub.recoverSubscription(listener);
106         }
107     }
108
109     public void delete(){
110         super.delete();
111         subscriberDatabase.clear();
112         topicSubMap.clear();
113     }
114
115     public SubscriptionInfo[] getAllSubscriptions() throws IOException JavaDoc{
116         return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
117     }
118
119     public synchronized int getMessageCount(String JavaDoc clientId,String JavaDoc subscriberName) throws IOException JavaDoc{
120         int result=0;
121         MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriberName));
122         if(sub!=null){
123             result=sub.size();
124         }
125         return result;
126     }
127
128     public void recoverNextMessages(String JavaDoc clientId,String JavaDoc subscriptionName,int maxReturned,
129             MessageRecoveryListener listener) throws Exception JavaDoc{
130         MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
131         if(sub!=null){
132             sub.recoverNextMessages(maxReturned,listener);
133         }
134     }
135
136     public void resetBatching(String JavaDoc clientId,String JavaDoc subscriptionName){
137         MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
138         if(sub!=null){
139             sub.resetBatching();
140         }
141     }
142 }
143
Popular Tags