1 14 15 package org.apache.activemq.store.memory; 16 17 import java.io.IOException ; 18 import java.util.Collections ; 19 import java.util.HashMap ; 20 import java.util.Iterator ; 21 import java.util.Map ; 22 import java.util.Map.Entry; 23 import org.apache.activemq.broker.ConnectionContext; 24 import org.apache.activemq.command.ActiveMQDestination; 25 import org.apache.activemq.command.Message; 26 import org.apache.activemq.command.MessageId; 27 import org.apache.activemq.command.SubscriptionInfo; 28 import org.apache.activemq.store.MessageRecoveryListener; 29 import org.apache.activemq.store.TopicMessageStore; 30 import org.apache.activemq.util.LRUCache; 31 import org.apache.activemq.util.SubscriptionKey; 32 33 36 public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{ 37 38 private Map subscriberDatabase; 39 private Map topicSubMap; 40 41 public MemoryTopicMessageStore(ActiveMQDestination destination){ 42 this(destination,new LRUCache(100,100,0.75f,false),makeMap()); 43 } 44 45 protected static Map makeMap(){ 46 return Collections.synchronizedMap(new HashMap ()); 47 } 48 49 public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase){ 50 super(destination,messageTable); 51 this.subscriberDatabase=subscriberDatabase; 52 this.topicSubMap=makeMap(); 53 } 54 55 public synchronized void addMessage(ConnectionContext context,Message message) throws IOException { 56 super.addMessage(context,message); 57 for(Iterator i=topicSubMap.values().iterator();i.hasNext();){ 58 MemoryTopicSub sub=(MemoryTopicSub)i.next(); 59 sub.addMessage(message.getMessageId(),message); 60 } 61 } 62 63 public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName, 64 MessageId messageId) throws IOException { 65 SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName); 66 MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(key); 67 if(sub!=null){ 68 sub.removeMessage(messageId); 69 } 70 } 71 72 public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException { 73 return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName)); 74 } 75 76 public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) 77 throws IOException { 78 SubscriptionInfo info=new SubscriptionInfo(); 79 info.setDestination(destination); 80 info.setClientId(clientId); 81 info.setSelector(selector); 82 info.setSubcriptionName(subscriptionName); 83 SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName); 84 MemoryTopicSub sub=new MemoryTopicSub(); 85 topicSubMap.put(key,sub); 86 if(retroactive){ 87 for(Iterator i=messageTable.entrySet().iterator();i.hasNext();){ 88 Map.Entry entry=(Entry)i.next(); 89 sub.addMessage((MessageId)entry.getKey(),(Message)entry.getValue()); 90 } 91 } 92 subscriberDatabase.put(key,info); 93 } 94 95 public void deleteSubscription(String clientId,String subscriptionName){ 96 org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName); 97 subscriberDatabase.remove(key); 98 topicSubMap.remove(key); 99 } 100 101 public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) 102 throws Exception { 103 MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName)); 104 if(sub!=null){ 105 sub.recoverSubscription(listener); 106 } 107 } 108 109 public void delete(){ 110 super.delete(); 111 subscriberDatabase.clear(); 112 topicSubMap.clear(); 113 } 114 115 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 116 return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); 117 } 118 119 public synchronized int getMessageCount(String clientId,String subscriberName) throws IOException { 120 int result=0; 121 MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriberName)); 122 if(sub!=null){ 123 result=sub.size(); 124 } 125 return result; 126 } 127 128 public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, 129 MessageRecoveryListener listener) throws Exception { 130 MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName)); 131 if(sub!=null){ 132 sub.recoverNextMessages(maxReturned,listener); 133 } 134 } 135 136 public void resetBatching(String clientId,String subscriptionName){ 137 MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName)); 138 if(sub!=null){ 139 sub.resetBatching(); 140 } 141 } 142 } 143 | Popular Tags |