1 18 package org.apache.activemq.store.journal; 19 20 import java.io.IOException ; 21 import java.util.ArrayList ; 22 import java.util.Collections ; 23 import java.util.HashSet ; 24 import java.util.Iterator ; 25 import java.util.LinkedHashMap ; 26 27 import org.apache.activeio.journal.RecordLocation; 28 import org.apache.activemq.broker.ConnectionContext; 29 import org.apache.activemq.command.ActiveMQDestination; 30 import org.apache.activemq.command.JournalQueueAck; 31 import org.apache.activemq.command.Message; 32 import org.apache.activemq.command.MessageAck; 33 import org.apache.activemq.command.MessageId; 34 import org.apache.activemq.memory.UsageManager; 35 import org.apache.activemq.store.MessageRecoveryListener; 36 import org.apache.activemq.store.MessageStore; 37 import org.apache.activemq.store.PersistenceAdapter; 38 import org.apache.activemq.transaction.Synchronization; 39 import org.apache.activemq.util.Callback; 40 import org.apache.activemq.util.TransactionTemplate; 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 44 49 public class JournalMessageStore implements MessageStore { 50 51 private static final Log log = LogFactory.getLog(JournalMessageStore.class); 52 53 protected final JournalPersistenceAdapter peristenceAdapter; 54 protected final JournalTransactionStore transactionStore; 55 protected final MessageStore longTermStore; 56 protected final ActiveMQDestination destination; 57 protected final TransactionTemplate transactionTemplate; 58 59 private LinkedHashMap messages = new LinkedHashMap (); 60 private ArrayList messageAcks = new ArrayList (); 61 62 63 private LinkedHashMap cpAddedMessageIds; 64 65 protected RecordLocation lastLocation; 66 protected HashSet inFlightTxLocations = new HashSet (); 67 68 private UsageManager usageManager; 69 70 public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) { 71 this.peristenceAdapter = adapter; 72 this.transactionStore = adapter.getTransactionStore(); 73 this.longTermStore = checkpointStore; 74 this.destination = destination; 75 this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); 76 } 77 78 public void setUsageManager(UsageManager usageManager) { 79 this.usageManager = usageManager; 80 longTermStore.setUsageManager(usageManager); 81 } 82 83 84 88 public void addMessage(ConnectionContext context, final Message message) throws IOException { 89 90 final MessageId id = message.getMessageId(); 91 92 final boolean debug = log.isDebugEnabled(); 93 message.incrementReferenceCount(); 94 95 final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); 96 if( !context.isInTransaction() ) { 97 if( debug ) 98 log.debug("Journalled message add for: "+id+", at: "+location); 99 addMessage(message, location); 100 } else { 101 if( debug ) 102 log.debug("Journalled transacted message add for: "+id+", at: "+location); 103 synchronized( this ) { 104 inFlightTxLocations.add(location); 105 } 106 transactionStore.addMessage(this, message, location); 107 context.getTransaction().addSynchronization(new Synchronization(){ 108 public void afterCommit() throws Exception { 109 if( debug ) 110 log.debug("Transacted message add commit for: "+id+", at: "+location); 111 synchronized( JournalMessageStore.this ) { 112 inFlightTxLocations.remove(location); 113 addMessage(message, location); 114 } 115 } 116 public void afterRollback() throws Exception { 117 if( debug ) 118 log.debug("Transacted message add rollback for: "+id+", at: "+location); 119 synchronized( JournalMessageStore.this ) { 120 inFlightTxLocations.remove(location); 121 } 122 message.decrementReferenceCount(); 123 } 124 }); 125 } 126 } 127 128 private void addMessage(final Message message, final RecordLocation location) { 129 synchronized (this) { 130 lastLocation = location; 131 MessageId id = message.getMessageId(); 132 messages.put(id, message); 133 } 134 } 135 136 public void replayAddMessage(ConnectionContext context, Message message) { 137 try { 138 Message t = longTermStore.getMessage(message.getMessageId()); 140 if( t==null ) { 141 longTermStore.addMessage(context, message); 142 } 143 } 144 catch (Throwable e) { 145 log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e); 146 } 147 } 148 149 151 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 152 final boolean debug = log.isDebugEnabled(); 153 JournalQueueAck remove = new JournalQueueAck(); 154 remove.setDestination(destination); 155 remove.setMessageAck(ack); 156 157 final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); 158 if( !context.isInTransaction() ) { 159 if( debug ) 160 log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location); 161 removeMessage(ack, location); 162 } else { 163 if( debug ) 164 log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location); 165 synchronized( this ) { 166 inFlightTxLocations.add(location); 167 } 168 transactionStore.removeMessage(this, ack, location); 169 context.getTransaction().addSynchronization(new Synchronization(){ 170 public void afterCommit() throws Exception { 171 if( debug ) 172 log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location); 173 synchronized( JournalMessageStore.this ) { 174 inFlightTxLocations.remove(location); 175 removeMessage(ack, location); 176 } 177 } 178 public void afterRollback() throws Exception { 179 if( debug ) 180 log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location); 181 synchronized( JournalMessageStore.this ) { 182 inFlightTxLocations.remove(location); 183 } 184 } 185 }); 186 187 } 188 } 189 190 private void removeMessage(final MessageAck ack, final RecordLocation location) { 191 synchronized (this) { 192 lastLocation = location; 193 MessageId id = ack.getLastMessageId(); 194 Message message = (Message) messages.remove(id); 195 if (message == null) { 196 messageAcks.add(ack); 197 } else { 198 message.decrementReferenceCount(); 199 } 200 } 201 } 202 203 public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { 204 try { 205 Message t = longTermStore.getMessage(messageAck.getLastMessageId()); 207 if( t!=null ) { 208 longTermStore.removeMessage(context, messageAck); 209 } 210 } 211 catch (Throwable e) { 212 log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); 213 } 214 } 215 216 220 public RecordLocation checkpoint() throws IOException { 221 return checkpoint(null); 222 } 223 224 228 public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException { 229 230 231 RecordLocation rc; 232 final ArrayList cpRemovedMessageLocations; 233 final ArrayList cpActiveJournalLocations; 234 final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); 235 236 synchronized (this) { 238 cpAddedMessageIds = this.messages; 239 cpRemovedMessageLocations = this.messageAcks; 240 241 cpActiveJournalLocations=new ArrayList (inFlightTxLocations); 242 243 this.messages = new LinkedHashMap (); 244 this.messageAcks = new ArrayList (); 245 } 246 247 transactionTemplate.run(new Callback() { 248 public void execute() throws Exception { 249 250 int size = 0; 251 252 PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter(); 253 ConnectionContext context = transactionTemplate.getContext(); 254 255 Iterator iterator = cpAddedMessageIds.values().iterator(); 257 while (iterator.hasNext()) { 258 Message message = (Message) iterator.next(); 259 try { 260 longTermStore.addMessage(context, message); 261 } catch (Throwable e) { 262 log.warn("Message could not be added to long term store: " + e.getMessage(), e); 263 } 264 265 size += message.getSize(); 266 267 message.decrementReferenceCount(); 268 269 if( size >= maxCheckpointMessageAddSize ) { 271 persitanceAdapter.commitTransaction(context); 272 persitanceAdapter.beginTransaction(context); 273 size=0; 274 } 275 276 } 277 278 persitanceAdapter.commitTransaction(context); 279 persitanceAdapter.beginTransaction(context); 280 281 iterator = cpRemovedMessageLocations.iterator(); 283 while (iterator.hasNext()) { 284 try { 285 MessageAck ack = (MessageAck) iterator.next(); 286 longTermStore.removeMessage(transactionTemplate.getContext(), ack); 287 } catch (Throwable e) { 288 log.debug("Message could not be removed from long term store: " + e.getMessage(), e); 289 } 290 } 291 292 if( postCheckpointTest!= null ) { 293 postCheckpointTest.execute(); 294 } 295 } 296 297 }); 298 299 synchronized (this) { 300 cpAddedMessageIds = null; 301 } 302 303 if( cpActiveJournalLocations.size() > 0 ) { 304 Collections.sort(cpActiveJournalLocations); 305 return (RecordLocation) cpActiveJournalLocations.get(0); 306 } else { 307 return lastLocation; 308 } 309 } 310 311 314 public Message getMessage(MessageId identity) throws IOException { 315 Message answer = null; 316 317 synchronized (this) { 318 answer = (Message) messages.get(identity); 320 if( answer==null && cpAddedMessageIds!=null ) 321 answer = (Message) cpAddedMessageIds.get(identity); 322 } 323 324 if (answer != null ) { 325 return answer; 326 } 327 328 return longTermStore.getMessage(identity); 330 } 331 332 340 public void recover(final MessageRecoveryListener listener) throws Exception { 341 peristenceAdapter.checkpoint(true, true); 342 longTermStore.recover(listener); 343 } 344 345 public void start() throws Exception { 346 if( this.usageManager != null ) 347 this.usageManager.addUsageListener(peristenceAdapter); 348 longTermStore.start(); 349 } 350 351 public void stop() throws Exception { 352 longTermStore.stop(); 353 if( this.usageManager != null ) 354 this.usageManager.removeUsageListener(peristenceAdapter); 355 } 356 357 360 public MessageStore getLongTermMessageStore() { 361 return longTermStore; 362 } 363 364 367 public void removeAllMessages(ConnectionContext context) throws IOException { 368 peristenceAdapter.checkpoint(true, true); 369 longTermStore.removeAllMessages(context); 370 } 371 372 public ActiveMQDestination getDestination() { 373 return destination; 374 } 375 376 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { 377 throw new IOException ("The journal does not support message references."); 378 } 379 380 public String getMessageReference(MessageId identity) throws IOException { 381 throw new IOException ("The journal does not support message references."); 382 } 383 384 389 public int getMessageCount() throws IOException { 390 peristenceAdapter.checkpoint(true, true); 391 return longTermStore.getMessageCount(); 392 } 393 394 395 public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception { 396 peristenceAdapter.checkpoint(true, true); 397 longTermStore.recoverNextMessages(maxReturned,listener); 398 399 } 400 401 402 public void resetBatching(){ 403 longTermStore.resetBatching(); 404 405 } 406 407 } 408 | Popular Tags |