1 14 15 package org.apache.activemq.store.amq; 16 17 import java.io.IOException ; 18 import java.io.InterruptedIOException ; 19 import java.util.HashMap ; 20 import java.util.Iterator ; 21 import org.apache.activemq.broker.ConnectionContext; 22 import org.apache.activemq.command.ActiveMQTopic; 23 import org.apache.activemq.command.JournalTopicAck; 24 import org.apache.activemq.command.MessageId; 25 import org.apache.activemq.command.SubscriptionInfo; 26 import org.apache.activemq.kaha.impl.async.Location; 27 import org.apache.activemq.store.MessageRecoveryListener; 28 import org.apache.activemq.store.TopicMessageStore; 29 import org.apache.activemq.store.TopicReferenceStore; 30 import org.apache.activemq.transaction.Synchronization; 31 import org.apache.activemq.util.Callback; 32 import org.apache.activemq.util.SubscriptionKey; 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 36 41 public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore{ 42 43 private static final Log log=LogFactory.getLog(AMQTopicMessageStore.class); 44 private TopicReferenceStore topicReferenceStore; 45 private HashMap <SubscriptionKey,MessageId> ackedLastAckLocations=new HashMap <SubscriptionKey,MessageId>(); 46 47 public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, 48 ActiveMQTopic destinationName){ 49 super(adapter,topicReferenceStore,destinationName); 50 this.topicReferenceStore=topicReferenceStore; 51 } 52 53 public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) 54 throws Exception { 55 flush(); 56 topicReferenceStore.recoverSubscription(clientId,subscriptionName,new RecoveryListenerAdapter(this,listener)); 57 } 58 59 public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, 60 final MessageRecoveryListener listener) throws Exception { 61 RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener); 62 topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener); 63 if(recoveryListener.size()==0){ 64 flush(); 65 topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener); 66 } 67 } 68 69 public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException { 70 return topicReferenceStore.lookupSubscription(clientId,subscriptionName); 71 } 72 73 public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) 74 throws IOException { 75 topicReferenceStore.addSubsciption(clientId,subscriptionName,selector,retroactive); 76 } 77 78 80 public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId) 81 throws IOException { 82 final boolean debug=log.isDebugEnabled(); 83 JournalTopicAck ack=new JournalTopicAck(); 84 ack.setDestination(destination); 85 ack.setMessageId(messageId); 86 ack.setMessageSequenceId(messageId.getBrokerSequenceId()); 87 ack.setSubscritionName(subscriptionName); 88 ack.setClientId(clientId); 89 ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null); 90 final Location location=peristenceAdapter.writeCommand(ack,false); 91 final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName); 92 if(!context.isInTransaction()){ 93 if(debug) 94 log.debug("Journalled acknowledge for: "+messageId+", at: "+location); 95 acknowledge(messageId,location,key); 96 }else{ 97 if(debug) 98 log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location); 99 synchronized(this){ 100 inFlightTxLocations.add(location); 101 } 102 transactionStore.acknowledge(this,ack,location); 103 context.getTransaction().addSynchronization(new Synchronization(){ 104 105 public void afterCommit() throws Exception { 106 if(debug) 107 log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location); 108 synchronized(AMQTopicMessageStore.this){ 109 inFlightTxLocations.remove(location); 110 acknowledge(messageId,location,key); 111 } 112 } 113 114 public void afterRollback() throws Exception { 115 if(debug) 116 log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location); 117 synchronized(AMQTopicMessageStore.this){ 118 inFlightTxLocations.remove(location); 119 } 120 } 121 }); 122 } 123 } 124 125 public boolean replayAcknowledge(ConnectionContext context,String clientId,String subscritionName, 126 MessageId messageId){ 127 try{ 128 SubscriptionInfo sub=topicReferenceStore.lookupSubscription(clientId,subscritionName); 129 if(sub!=null){ 130 topicReferenceStore.acknowledge(context,clientId,subscritionName,messageId); 131 return true; 132 } 133 }catch(Throwable e){ 134 log.debug("Could not replay acknowledge for message '"+messageId 135 +"'. Message may have already been acknowledged. reason: "+e); 136 } 137 return false; 138 } 139 140 146 private void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException { 147 synchronized(this){ 148 lastLocation=location; 149 ackedLastAckLocations.put(key,messageId); 150 } 151 try{ 152 asyncWriteTask.wakeup(); 153 }catch(InterruptedException e){ 154 throw new InterruptedIOException (); 155 } 156 } 157 158 @Override protected Location doAsyncWrite() throws IOException { 159 final HashMap <SubscriptionKey,MessageId> cpAckedLastAckLocations; 160 synchronized(this){ 162 cpAckedLastAckLocations=this.ackedLastAckLocations; 163 this.ackedLastAckLocations=new HashMap <SubscriptionKey,MessageId>(); 164 } 165 Location location=super.doAsyncWrite(); 166 167 if (cpAckedLastAckLocations != null) { 168 transactionTemplate.run(new Callback() { 169 public void execute() throws Exception { 170 Iterator <SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator(); 172 while (iterator.hasNext()) { 173 SubscriptionKey subscriptionKey = iterator.next(); 174 MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); 175 topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, 176 subscriptionKey.subscriptionName, identity); 177 } 178 } 179 }); 180 } 181 return location; 182 } 183 184 187 public TopicReferenceStore getTopicReferenceStore(){ 188 return topicReferenceStore; 189 } 190 191 public void deleteSubscription(String clientId,String subscriptionName) throws IOException { 192 topicReferenceStore.deleteSubscription(clientId,subscriptionName); 193 } 194 195 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 196 return topicReferenceStore.getAllSubscriptions(); 197 } 198 199 public int getMessageCount(String clientId,String subscriberName) throws IOException { 200 flush(); 201 return topicReferenceStore.getMessageCount(clientId,subscriberName); 202 } 203 204 public void resetBatching(String clientId,String subscriptionName){ 205 topicReferenceStore.resetBatching(clientId,subscriptionName); 206 } 207 } 208 | Popular Tags |