1 14 15 package org.apache.activemq.store.kahadaptor; 16 17 import java.io.IOException ; 18 import java.util.Iterator ; 19 import java.util.Map ; 20 import java.util.concurrent.ConcurrentHashMap ; 21 import org.apache.activemq.broker.ConnectionContext; 22 import org.apache.activemq.command.ActiveMQDestination; 23 import org.apache.activemq.command.Message; 24 import org.apache.activemq.command.MessageId; 25 import org.apache.activemq.command.SubscriptionInfo; 26 import org.apache.activemq.kaha.ListContainer; 27 import org.apache.activemq.kaha.MapContainer; 28 import org.apache.activemq.kaha.Marshaller; 29 import org.apache.activemq.kaha.Store; 30 import org.apache.activemq.kaha.StoreEntry; 31 import org.apache.activemq.store.MessageRecoveryListener; 32 import org.apache.activemq.store.TopicReferenceStore; 33 34 public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore{ 35 36 protected ListContainer<TopicSubAck> ackContainer; 37 private Map subscriberContainer; 38 private Store store; 39 protected Map subscriberMessages=new ConcurrentHashMap (); 40 41 public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer messageContainer,ListContainer ackContainer, 42 MapContainer subsContainer,ActiveMQDestination destination) throws IOException { 43 super(adapter,messageContainer,destination); 44 this.store=store; 45 this.ackContainer=ackContainer; 46 subscriberContainer=subsContainer; 47 for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){ 49 Object key=i.next(); 50 addSubscriberMessageContainer(key); 51 } 52 } 53 54 protected MessageId getMessageId(Object object){ 55 return new MessageId(((ReferenceRecord)object).getMessageId()); 56 } 57 58 public synchronized void addMessage(ConnectionContext context,Message message) throws IOException { 59 throw new RuntimeException ("Use addMessageReference instead"); 60 } 61 62 public synchronized Message getMessage(MessageId identity) throws IOException { 63 throw new RuntimeException ("Use addMessageReference instead"); 64 } 65 66 protected void recover(MessageRecoveryListener listener,Object msg) throws Exception { 67 ReferenceRecord record=(ReferenceRecord)msg; 68 listener.recoverMessageReference(new MessageId(record.getMessageId())); 69 } 70 71 public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){ 72 final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data); 73 final int subscriberCount=subscriberMessages.size(); 74 if(subscriberCount>0){ 75 final StoreEntry messageEntry=messageContainer.place(messageId,record); 76 addInterest(record); 77 final TopicSubAck tsa=new TopicSubAck(); 78 tsa.setCount(subscriberCount); 79 tsa.setMessageEntry(messageEntry); 80 final StoreEntry ackEntry=ackContainer.placeLast(tsa); 81 for(final Iterator i=subscriberMessages.values().iterator();i.hasNext();){ 82 final TopicSubContainer container=(TopicSubContainer)i.next(); 83 final ConsumerMessageRef ref=new ConsumerMessageRef(); 84 ref.setAckEntry(ackEntry); 85 ref.setMessageEntry(messageEntry); 86 ref.setMessageId(messageId); 87 container.add(ref); 88 } 89 } 90 } 91 92 public ReferenceData getMessageReference(final MessageId identity) throws IOException { 93 final ReferenceRecord result=messageContainer.get(identity); 94 if(result==null) 95 return null; 96 return result.getData(); 97 } 98 99 public void addReferenceFileIdsInUse(){ 100 for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ 101 TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry); 102 if(subAck.getCount()>0){ 103 ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry()); 104 addInterest(rr); 105 } 106 } 107 } 108 109 protected ListContainer addSubscriberMessageContainer(Object key) throws IOException { 110 ListContainer container=store.getListContainer(key,"topic-subs-references"); 111 Marshaller marshaller=new ConsumerMessageRefMarshaller(); 112 container.setMarshaller(marshaller); 113 TopicSubContainer tsc=new TopicSubContainer(container); 114 subscriberMessages.put(key,tsc); 115 return container; 116 } 117 118 public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName, 119 MessageId messageId) throws IOException { 120 String key=getSubscriptionKey(clientId,subscriptionName); 121 122 TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); 123 if(container!=null){ 124 ConsumerMessageRef ref=container.remove(messageId); 125 if(ref!=null){ 126 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); 127 if(tsa!=null){ 128 if(tsa.decrementCount()<=0){ 129 StoreEntry entry=ref.getAckEntry(); 130 entry=ackContainer.refresh(entry); 131 ackContainer.remove(entry); 132 ReferenceRecord rr=messageContainer.get(messageId); 133 if(rr!=null){ 134 entry=tsa.getMessageEntry(); 135 entry=messageContainer.refresh(entry); 136 messageContainer.remove(entry); 137 removeInterest(rr); 138 } 139 }else{ 140 141 ackContainer.update(ref.getAckEntry(),tsa); 142 } 143 } 144 } 145 } 146 } 147 148 public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) 149 throws IOException { 150 SubscriptionInfo info=new SubscriptionInfo(); 151 info.setDestination(destination); 152 info.setClientId(clientId); 153 info.setSelector(selector); 154 info.setSubcriptionName(subscriptionName); 155 String key=getSubscriptionKey(clientId,subscriptionName); 156 if(!subscriberContainer.containsKey(key)){ 159 subscriberContainer.put(key,info); 160 } 161 ListContainer container=addSubscriberMessageContainer(key); 163 if(retroactive){ 164 173 } 174 } 175 176 public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException { 177 String key=getSubscriptionKey(clientId,subscriptionName); 178 removeSubscriberMessageContainer(key); 179 } 180 181 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 182 return (SubscriptionInfo[])subscriberContainer.values().toArray( 183 new SubscriptionInfo[subscriberContainer.size()]); 184 } 185 186 public int getMessageCount(String clientId,String subscriberName) throws IOException { 187 String key=getSubscriptionKey(clientId,subscriberName); 188 TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); 189 return container != null ? container.size() : 0; 190 } 191 192 public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException { 193 return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName)); 194 } 195 196 public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, 197 MessageRecoveryListener listener) throws Exception { 198 String key=getSubscriptionKey(clientId,subscriptionName); 199 TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); 200 if(container!=null){ 201 int count=0; 202 StoreEntry entry=container.getBatchEntry(); 203 if(entry==null){ 204 entry=container.getEntry(); 205 }else{ 206 entry=container.refreshEntry(entry); 207 if(entry!=null){ 208 entry=container.getNextEntry(entry); 209 } 210 } 211 if(entry!=null){ 212 do{ 213 ConsumerMessageRef consumerRef=container.get(entry); 214 Object msg=messageContainer.getValue(consumerRef.getMessageEntry()); 215 if(msg!=null){ 216 recover(listener,msg); 217 count++; 218 } 219 container.setBatchEntry(entry); 220 entry=container.getNextEntry(entry); 221 }while(entry!=null&&count<maxReturned&&listener.hasSpace()); 222 } 223 } 224 listener.finished(); 225 } 226 227 public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) 228 throws Exception { 229 230 String key=getSubscriptionKey(clientId,subscriptionName); 231 TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); 232 if(container!=null){ 233 for(Iterator i=container.iterator();i.hasNext();){ 234 ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); 235 Object msg=messageContainer.get(ref.getMessageEntry()); 236 if(msg!=null){ 237 recover(listener,msg); 238 } 239 } 240 } 241 listener.finished(); 242 } 243 244 public synchronized void resetBatching(String clientId,String subscriptionName){ 245 String key=getSubscriptionKey(clientId,subscriptionName); 246 TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key); 247 if(topicSubContainer!=null){ 248 topicSubContainer.reset(); 249 } 250 } 251 252 protected void removeSubscriberMessageContainer(Object key) throws IOException { 253 subscriberContainer.remove(key); 254 TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key); 255 for(Iterator i=container.iterator();i.hasNext();){ 256 ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); 257 if(ref!=null){ 258 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); 259 if(tsa!=null){ 260 if(tsa.decrementCount()<=0){ 261 ackContainer.remove(ref.getAckEntry()); 262 messageContainer.remove(tsa.getMessageEntry()); 263 }else{ 264 ackContainer.update(ref.getAckEntry(),tsa); 265 } 266 } 267 } 268 } 269 store.deleteListContainer(key,"topic-subs-references"); 270 } 271 272 protected String getSubscriptionKey(String clientId,String subscriberName){ 273 String result=clientId+":"; 274 result+=subscriberName!=null?subscriberName:"NOT_SET"; 275 return result; 276 } 277 } 278 | Popular Tags |