1 18 package org.apache.activemq.store.journal; 19 20 import java.io.IOException ; 21 import java.util.HashMap ; 22 import java.util.Iterator ; 23 24 import org.apache.activeio.journal.RecordLocation; 25 import org.apache.activemq.broker.ConnectionContext; 26 import org.apache.activemq.command.ActiveMQTopic; 27 import org.apache.activemq.command.JournalTopicAck; 28 import org.apache.activemq.command.Message; 29 import org.apache.activemq.command.MessageId; 30 import org.apache.activemq.command.SubscriptionInfo; 31 import org.apache.activemq.store.MessageRecoveryListener; 32 import org.apache.activemq.store.TopicMessageStore; 33 import org.apache.activemq.transaction.Synchronization; 34 import org.apache.activemq.util.Callback; 35 import org.apache.activemq.util.SubscriptionKey; 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 39 44 public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore { 45 46 private static final Log log = LogFactory.getLog(JournalTopicMessageStore.class); 47 48 private TopicMessageStore longTermStore; 49 private HashMap ackedLastAckLocations = new HashMap (); 50 51 public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, ActiveMQTopic destinationName) { 52 super(adapter, checkpointStore, destinationName); 53 this.longTermStore = checkpointStore; 54 } 55 56 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { 57 this.peristenceAdapter.checkpoint(true, true); 58 longTermStore.recoverSubscription(clientId, subscriptionName, listener); 59 } 60 61 public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception { 62 this.peristenceAdapter.checkpoint(true, true); 63 longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned,listener); 64 65 } 66 67 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 68 return longTermStore.lookupSubscription(clientId, subscriptionName); 69 } 70 71 public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { 72 this.peristenceAdapter.checkpoint(true, true); 73 longTermStore.addSubsciption(clientId, subscriptionName, selector, retroactive); 74 } 75 76 public void addMessage(ConnectionContext context, Message message) throws IOException { 77 super.addMessage(context, message); 78 } 79 80 82 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException { 83 final boolean debug = log.isDebugEnabled(); 84 85 JournalTopicAck ack = new JournalTopicAck(); 86 ack.setDestination(destination); 87 ack.setMessageId(messageId); 88 ack.setMessageSequenceId(messageId.getBrokerSequenceId()); 89 ack.setSubscritionName(subscriptionName); 90 ack.setClientId(clientId); 91 ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null); 92 final RecordLocation location = peristenceAdapter.writeCommand(ack, false); 93 94 final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 95 if( !context.isInTransaction() ) { 96 if( debug ) 97 log.debug("Journalled acknowledge for: "+messageId+", at: "+location); 98 acknowledge(messageId, location, key); 99 } else { 100 if( debug ) 101 log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location); 102 synchronized (this) { 103 inFlightTxLocations.add(location); 104 } 105 transactionStore.acknowledge(this, ack, location); 106 context.getTransaction().addSynchronization(new Synchronization(){ 107 public void afterCommit() throws Exception { 108 if( debug ) 109 log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location); 110 synchronized (JournalTopicMessageStore.this) { 111 inFlightTxLocations.remove(location); 112 acknowledge(messageId, location, key); 113 } 114 } 115 public void afterRollback() throws Exception { 116 if( debug ) 117 log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location); 118 synchronized (JournalTopicMessageStore.this) { 119 inFlightTxLocations.remove(location); 120 } 121 } 122 }); 123 } 124 125 } 126 127 public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) { 128 try { 129 SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName); 130 if( sub != null ) { 131 longTermStore.acknowledge(context, clientId, subscritionName, messageId); 132 } 133 } 134 catch (Throwable e) { 135 log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e); 136 } 137 } 138 139 140 145 private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { 146 synchronized(this) { 147 lastLocation = location; 148 ackedLastAckLocations.put(key, messageId); 149 } 150 } 151 152 public RecordLocation checkpoint() throws IOException { 153 154 final HashMap cpAckedLastAckLocations; 155 156 synchronized (this) { 158 cpAckedLastAckLocations = this.ackedLastAckLocations; 159 this.ackedLastAckLocations = new HashMap (); 160 } 161 162 return super.checkpoint( new Callback() { 163 public void execute() throws Exception { 164 165 Iterator iterator = cpAckedLastAckLocations.keySet().iterator(); 167 while (iterator.hasNext()) { 168 SubscriptionKey subscriptionKey = (SubscriptionKey) iterator.next(); 169 MessageId identity = (MessageId) cpAckedLastAckLocations.get(subscriptionKey); 170 longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity); 171 } 172 173 } 174 }); 175 176 } 177 178 181 public TopicMessageStore getLongTermTopicMessageStore() { 182 return longTermStore; 183 } 184 185 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 186 longTermStore.deleteSubscription(clientId, subscriptionName); 187 } 188 189 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 190 return longTermStore.getAllSubscriptions(); 191 } 192 193 194 public int getMessageCount(String clientId,String subscriberName) throws IOException { 195 this.peristenceAdapter.checkpoint(true, true); 196 return longTermStore.getMessageCount(clientId,subscriberName); 197 } 198 199 public void resetBatching(String clientId,String subscriptionName) { 200 longTermStore.resetBatching(clientId,subscriptionName); 201 } 202 203 204 205 } 206 | Popular Tags |