1 14 15 package org.apache.activemq.store.kahadaptor; 16 17 import java.io.IOException ; 18 import java.util.Iterator ; 19 import java.util.Map ; 20 import java.util.concurrent.ConcurrentHashMap ; 21 import org.apache.activemq.broker.ConnectionContext; 22 import org.apache.activemq.command.ActiveMQDestination; 23 import org.apache.activemq.command.Message; 24 import org.apache.activemq.command.MessageId; 25 import org.apache.activemq.command.SubscriptionInfo; 26 import org.apache.activemq.kaha.ListContainer; 27 import org.apache.activemq.kaha.MapContainer; 28 import org.apache.activemq.kaha.Marshaller; 29 import org.apache.activemq.kaha.Store; 30 import org.apache.activemq.kaha.StoreEntry; 31 import org.apache.activemq.store.MessageRecoveryListener; 32 import org.apache.activemq.store.TopicMessageStore; 33 34 37 public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore{ 38 39 protected ListContainer<TopicSubAck> ackContainer; 40 private Map subscriberContainer; 41 private Store store; 42 protected Map subscriberMessages=new ConcurrentHashMap (); 43 44 public KahaTopicMessageStore(Store store,MapContainer messageContainer,ListContainer<TopicSubAck> ackContainer, 45 MapContainer subsContainer,ActiveMQDestination destination) throws IOException { 46 super(messageContainer,destination); 47 this.store=store; 48 this.ackContainer=ackContainer; 49 subscriberContainer=subsContainer; 50 for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){ 52 Object key=i.next(); 53 addSubscriberMessageContainer(key); 54 } 55 } 56 57 @Override 58 public synchronized void addMessage(ConnectionContext context,Message message) throws IOException { 59 int subscriberCount=subscriberMessages.size(); 60 if(subscriberCount>0){ 61 MessageId id = message.getMessageId(); 62 StoreEntry messageEntry=messageContainer.place(id,message); 63 TopicSubAck tsa=new TopicSubAck(); 64 tsa.setCount(subscriberCount); 65 tsa.setMessageEntry(messageEntry); 66 StoreEntry ackEntry=ackContainer.placeLast(tsa); 67 for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){ 68 TopicSubContainer container=(TopicSubContainer)i.next(); 69 ConsumerMessageRef ref=new ConsumerMessageRef(); 70 ref.setAckEntry(ackEntry); 71 ref.setMessageEntry(messageEntry); 72 ref.setMessageId(id); 73 container.add(ref); 74 } 75 } 76 } 77 78 public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName, 79 MessageId messageId) throws IOException { 80 String subcriberId=getSubscriptionKey(clientId,subscriptionName); 81 TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId); 82 if(container!=null){ 83 ConsumerMessageRef ref=container.remove(messageId); 84 if(container.isEmpty()){ 85 container.reset(); 86 } 87 if(ref!=null){ 88 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); 89 if(tsa!=null){ 90 if(tsa.decrementCount()<=0){ 91 StoreEntry entry = ref.getAckEntry(); 92 entry = ackContainer.refresh(entry); 93 ackContainer.remove(entry); 94 entry = tsa.getMessageEntry(); 95 entry =messageContainer.refresh(entry); 96 messageContainer.remove(entry); 97 }else{ 98 ackContainer.update(ref.getAckEntry(),tsa); 99 } 100 } 101 } 102 } 103 } 104 105 public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException { 106 return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName)); 107 } 108 109 public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) 110 throws IOException { 111 SubscriptionInfo info=new SubscriptionInfo(); 112 info.setDestination(destination); 113 info.setClientId(clientId); 114 info.setSelector(selector); 115 info.setSubcriptionName(subscriptionName); 116 String key=getSubscriptionKey(clientId,subscriptionName); 117 if(!subscriberContainer.containsKey(key)){ 120 subscriberContainer.put(key,info); 121 } 122 ListContainer container=addSubscriberMessageContainer(key); 124 135 } 136 137 public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException { 138 String key=getSubscriptionKey(clientId,subscriptionName); 139 removeSubscriberMessageContainer(key); 140 } 141 142 public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) 143 throws Exception { 144 String key=getSubscriptionKey(clientId,subscriptionName); 145 TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); 146 if(container!=null){ 147 for(Iterator i=container.iterator();i.hasNext();){ 148 ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); 149 Object msg=messageContainer.get(ref.getMessageEntry()); 150 if(msg!=null){ 151 recover(listener, msg); 152 } 153 } 154 } 155 listener.finished(); 156 } 157 158 public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, 159 MessageRecoveryListener listener) throws Exception { 160 String key=getSubscriptionKey(clientId,subscriptionName); 161 TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); 162 if(container!=null){ 163 int count=0; 164 StoreEntry entry=container.getBatchEntry(); 165 if(entry==null){ 166 entry=container.getEntry(); 167 }else{ 168 entry=container.refreshEntry(entry); 169 if(entry!=null){ 170 entry=container.getNextEntry(entry); 171 } 172 } 173 if(entry!=null){ 174 do{ 175 ConsumerMessageRef consumerRef=container.get(entry); 176 Object msg=messageContainer.getValue(consumerRef.getMessageEntry()); 177 if(msg!=null){ 178 recover(listener, msg); 179 count++; 180 } 181 container.setBatchEntry(entry); 182 entry=container.getNextEntry(entry); 183 }while(entry!=null&&count<maxReturned&&listener.hasSpace()); 184 } 185 } 186 listener.finished(); 187 } 188 189 public void delete(){ 190 super.delete(); 191 ackContainer.clear(); 192 subscriberContainer.clear(); 193 } 194 195 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 196 return (SubscriptionInfo[])subscriberContainer.values().toArray(new SubscriptionInfo[subscriberContainer.size()]); 197 } 198 199 protected String getSubscriptionKey(String clientId,String subscriberName){ 200 String result=clientId+":"; 201 result+=subscriberName!=null?subscriberName:"NOT_SET"; 202 return result; 203 } 204 205 protected ListContainer addSubscriberMessageContainer(Object key) throws IOException { 206 ListContainer container=store.getListContainer(key,"topic-subs"); 207 Marshaller marshaller=new ConsumerMessageRefMarshaller(); 208 container.setMarshaller(marshaller); 209 TopicSubContainer tsc=new TopicSubContainer(container); 210 subscriberMessages.put(key,tsc); 211 return container; 212 } 213 214 protected void removeSubscriberMessageContainer(Object key) throws IOException { 215 subscriberContainer.remove(key); 216 TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key); 217 for(Iterator i=container.iterator();i.hasNext();){ 218 ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); 219 if(ref!=null){ 220 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); 221 if(tsa!=null){ 222 if(tsa.decrementCount()<=0){ 223 ackContainer.remove(ref.getAckEntry()); 224 messageContainer.remove(tsa.getMessageEntry()); 225 }else{ 226 ackContainer.update(ref.getAckEntry(),tsa); 227 } 228 } 229 } 230 } 231 store.deleteListContainer(key,"topic-subs"); 232 } 233 234 public int getMessageCount(String clientId,String subscriberName) throws IOException { 235 String key=getSubscriptionKey(clientId,subscriberName); 236 TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); 237 return container != null ? container.size() : 0; 238 } 239 240 245 public synchronized void removeAllMessages(ConnectionContext context) throws IOException { 246 messageContainer.clear(); 247 ackContainer.clear(); 248 for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){ 249 TopicSubContainer container=(TopicSubContainer)i.next(); 250 container.clear(); 251 } 252 } 253 254 public synchronized void resetBatching(String clientId,String subscriptionName){ 255 String key=getSubscriptionKey(clientId,subscriptionName); 256 TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key); 257 if(topicSubContainer!=null){ 258 topicSubContainer.reset(); 259 } 260 } 261 } 262 | Popular Tags |