1 14 15 package org.apache.activemq.store.amq; 16 17 import java.io.IOException ; 18 import java.io.InterruptedIOException ; 19 import java.util.ArrayList ; 20 import java.util.Collections ; 21 import java.util.HashSet ; 22 import java.util.Iterator ; 23 import java.util.LinkedHashMap ; 24 import java.util.Map.Entry; 25 import java.util.concurrent.CountDownLatch ; 26 import java.util.concurrent.atomic.AtomicReference ; 27 import org.apache.activemq.broker.ConnectionContext; 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.DataStructure; 30 import org.apache.activemq.command.JournalQueueAck; 31 import org.apache.activemq.command.Message; 32 import org.apache.activemq.command.MessageAck; 33 import org.apache.activemq.command.MessageId; 34 import org.apache.activemq.kaha.impl.async.Location; 35 import org.apache.activemq.memory.UsageManager; 36 import org.apache.activemq.store.MessageRecoveryListener; 37 import org.apache.activemq.store.MessageStore; 38 import org.apache.activemq.store.PersistenceAdapter; 39 import org.apache.activemq.store.ReferenceStore; 40 import org.apache.activemq.store.ReferenceStore.ReferenceData; 41 import org.apache.activemq.thread.Task; 42 import org.apache.activemq.thread.TaskRunner; 43 import org.apache.activemq.transaction.Synchronization; 44 import org.apache.activemq.util.Callback; 45 import org.apache.activemq.util.TransactionTemplate; 46 import org.apache.commons.logging.Log; 47 import org.apache.commons.logging.LogFactory; 48 49 54 public class AMQMessageStore implements MessageStore{ 55 56 private static final Log log=LogFactory.getLog(AMQMessageStore.class); 57 protected final AMQPersistenceAdapter peristenceAdapter; 58 protected final AMQTransactionStore transactionStore; 59 protected final ReferenceStore referenceStore; 60 protected final ActiveMQDestination destination; 61 protected final TransactionTemplate transactionTemplate; 62 private LinkedHashMap <MessageId,ReferenceData> messages=new LinkedHashMap <MessageId,ReferenceData>(); 63 private ArrayList <MessageAck> messageAcks=new ArrayList <MessageAck>(); 64 65 private LinkedHashMap <MessageId,ReferenceData> cpAddedMessageIds; 66 protected Location lastLocation; 67 protected Location lastWrittenLocation; 68 protected HashSet <Location> inFlightTxLocations=new HashSet <Location>(); 69 protected final TaskRunner asyncWriteTask; 70 protected CountDownLatch flushLatch; 71 private final boolean debug=log.isDebugEnabled(); 72 private final AtomicReference <Location> mark=new AtomicReference <Location>(); 73 74 public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination destination){ 75 this.peristenceAdapter=adapter; 76 this.transactionStore=adapter.getTransactionStore(); 77 this.referenceStore=referenceStore; 78 this.destination=destination; 79 this.transactionTemplate=new TransactionTemplate(adapter,new ConnectionContext()); 80 asyncWriteTask=adapter.getTaskRunnerFactory().createTaskRunner(new Task(){ 81 82 public boolean iterate(){ 83 asyncWrite(); 84 return false; 85 } 86 },"Checkpoint: "+destination); 87 } 88 89 public void setUsageManager(UsageManager usageManager){ 90 referenceStore.setUsageManager(usageManager); 91 } 92 93 97 public void addMessage(ConnectionContext context,final Message message) throws IOException { 98 final MessageId id=message.getMessageId(); 99 final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired()); 100 if(!context.isInTransaction()){ 101 if(debug) 102 log.debug("Journalled message add for: "+id+", at: "+location); 103 addMessage(message,location); 104 }else{ 105 if(debug) 106 log.debug("Journalled transacted message add for: "+id+", at: "+location); 107 synchronized(this){ 108 inFlightTxLocations.add(location); 109 } 110 transactionStore.addMessage(this,message,location); 111 context.getTransaction().addSynchronization(new Synchronization(){ 112 113 public void afterCommit() throws Exception { 114 if(debug) 115 log.debug("Transacted message add commit for: "+id+", at: "+location); 116 synchronized(AMQMessageStore.this){ 117 inFlightTxLocations.remove(location); 118 addMessage(message,location); 119 } 120 } 121 122 public void afterRollback() throws Exception { 123 if(debug) 124 log.debug("Transacted message add rollback for: "+id+", at: "+location); 125 synchronized(AMQMessageStore.this){ 126 inFlightTxLocations.remove(location); 127 } 128 } 129 }); 130 } 131 } 132 133 private void addMessage(final Message message,final Location location) throws InterruptedIOException { 134 ReferenceData data=new ReferenceData(); 135 data.setExpiration(message.getExpiration()); 136 data.setFileId(location.getDataFileId()); 137 data.setOffset(location.getOffset()); 138 synchronized(this){ 139 lastLocation=location; 140 messages.put(message.getMessageId(),data); 141 } 142 try{ 143 asyncWriteTask.wakeup(); 144 }catch(InterruptedException e){ 145 throw new InterruptedIOException (); 146 } 147 } 148 149 public boolean replayAddMessage(ConnectionContext context,Message message,Location location){ 150 MessageId id=message.getMessageId(); 151 try{ 152 ReferenceData data=referenceStore.getMessageReference(id); 154 if(data==null){ 155 data=new ReferenceData(); 156 data.setExpiration(message.getExpiration()); 157 data.setFileId(location.getDataFileId()); 158 data.setOffset(location.getOffset()); 159 referenceStore.addMessageReference(context,id,data); 160 return true; 161 } 162 }catch(Throwable e){ 163 log.warn("Could not replay add for message '"+id+"'. Message may have already been added. reason: "+e,e); 164 } 165 return false; 166 } 167 168 170 public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException { 171 JournalQueueAck remove=new JournalQueueAck(); 172 remove.setDestination(destination); 173 remove.setMessageAck(ack); 174 final Location location=peristenceAdapter.writeCommand(remove,ack.isResponseRequired()); 175 if(!context.isInTransaction()){ 176 if(debug) 177 log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location); 178 removeMessage(ack,location); 179 }else{ 180 if(debug) 181 log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location); 182 synchronized(this){ 183 inFlightTxLocations.add(location); 184 } 185 transactionStore.removeMessage(this,ack,location); 186 context.getTransaction().addSynchronization(new Synchronization(){ 187 188 public void afterCommit() throws Exception { 189 if(debug) 190 log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location); 191 synchronized(AMQMessageStore.this){ 192 inFlightTxLocations.remove(location); 193 removeMessage(ack,location); 194 } 195 } 196 197 public void afterRollback() throws Exception { 198 if(debug) 199 log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location); 200 synchronized(AMQMessageStore.this){ 201 inFlightTxLocations.remove(location); 202 } 203 } 204 }); 205 } 206 } 207 208 private void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException { 209 ReferenceData data; 210 synchronized(this){ 211 lastLocation=location; 212 MessageId id=ack.getLastMessageId(); 213 data=messages.remove(id); 214 if(data==null){ 215 messageAcks.add(ack); 216 } 217 } 218 if(data==null){ 219 try{ 220 asyncWriteTask.wakeup(); 221 }catch(InterruptedException e){ 222 throw new InterruptedIOException (); 223 } 224 } 225 } 226 227 public boolean replayRemoveMessage(ConnectionContext context,MessageAck messageAck){ 228 try{ 229 ReferenceData t=referenceStore.getMessageReference(messageAck.getLastMessageId()); 231 if(t!=null){ 232 referenceStore.removeMessage(context,messageAck); 233 return true; 234 } 235 }catch(Throwable e){ 236 log.warn("Could not replay acknowledge for message '"+messageAck.getLastMessageId() 237 +"'. Message may have already been acknowledged. reason: "+e); 238 } 239 return false; 240 } 241 242 247 public void flush() throws InterruptedIOException { 248 if(log.isDebugEnabled()){ 249 log.debug("flush starting ..."); 250 } 251 CountDownLatch countDown; 252 synchronized(this){ 253 if(lastWrittenLocation==lastLocation){ 254 return; 255 } 256 if(flushLatch==null){ 257 flushLatch=new CountDownLatch (1); 258 } 259 countDown=flushLatch; 260 } 261 try{ 262 asyncWriteTask.wakeup(); 263 countDown.await(); 264 }catch(InterruptedException e){ 265 throw new InterruptedIOException (); 266 } 267 if(log.isDebugEnabled()){ 268 log.debug("flush finished"); 269 } 270 } 271 272 276 private void asyncWrite(){ 277 try{ 278 CountDownLatch countDown; 279 synchronized(this){ 280 countDown=flushLatch; 281 flushLatch=null; 282 } 283 mark.set(doAsyncWrite()); 284 if(countDown!=null){ 285 countDown.countDown(); 286 } 287 }catch(IOException e){ 288 log.error("Checkpoint failed: "+e,e); 289 } 290 } 291 292 296 protected Location doAsyncWrite() throws IOException { 297 final ArrayList <MessageAck> cpRemovedMessageLocations; 298 final ArrayList <Location> cpActiveJournalLocations; 299 final int maxCheckpointMessageAddSize=peristenceAdapter.getMaxCheckpointMessageAddSize(); 300 final Location lastLocation; 301 synchronized(this){ 303 cpAddedMessageIds=this.messages; 304 cpRemovedMessageLocations=this.messageAcks; 305 cpActiveJournalLocations=new ArrayList <Location>(inFlightTxLocations); 306 this.messages=new LinkedHashMap <MessageId,ReferenceData>(); 307 this.messageAcks=new ArrayList <MessageAck>(); 308 lastLocation=this.lastLocation; 309 } 310 if(log.isDebugEnabled()) 311 log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: " 312 +cpRemovedMessageLocations.size()+" "); 313 transactionTemplate.run(new Callback(){ 314 315 public void execute() throws Exception { 316 int size=0; 317 PersistenceAdapter persitanceAdapter=transactionTemplate.getPersistenceAdapter(); 318 ConnectionContext context=transactionTemplate.getContext(); 319 Iterator <Entry<MessageId,ReferenceData>> iterator=cpAddedMessageIds.entrySet().iterator(); 321 while(iterator.hasNext()){ 322 Entry<MessageId,ReferenceData> entry=iterator.next(); 323 try{ 324 referenceStore.addMessageReference(context,entry.getKey(),entry.getValue()); 325 }catch(Throwable e){ 326 log.warn("Message could not be added to long term store: "+e.getMessage(),e); 327 } 328 size++; 329 if(size>=maxCheckpointMessageAddSize){ 331 persitanceAdapter.commitTransaction(context); 332 persitanceAdapter.beginTransaction(context); 333 size=0; 334 } 335 } 336 persitanceAdapter.commitTransaction(context); 337 persitanceAdapter.beginTransaction(context); 338 for(MessageAck ack:cpRemovedMessageLocations){ 340 try{ 341 referenceStore.removeMessage(transactionTemplate.getContext(),ack); 342 }catch(Throwable e){ 343 log.warn("Message could not be removed from long term store: "+e.getMessage(),e); 344 } 345 } 346 } 347 }); 348 log.debug("Batch update done."); 349 synchronized(this){ 350 cpAddedMessageIds=null; 351 lastWrittenLocation=lastLocation; 352 } 353 if(cpActiveJournalLocations.size()>0){ 354 Collections.sort(cpActiveJournalLocations); 355 return cpActiveJournalLocations.get(0); 356 }else{ 357 return lastLocation; 358 } 359 } 360 361 364 public Message getMessage(MessageId identity) throws IOException { 365 ReferenceData data=null; 366 synchronized(this){ 367 data=messages.get(identity); 369 if(data==null&&cpAddedMessageIds!=null){ 370 data=cpAddedMessageIds.get(identity); 371 } 372 } 373 if(data==null){ 374 data=referenceStore.getMessageReference(identity); 375 if(data==null){ 376 return null; 377 } 378 } 379 Location location=new Location(); 380 location.setDataFileId(data.getFileId()); 381 location.setOffset(data.getOffset()); 382 DataStructure rc=peristenceAdapter.readCommand(location); 383 try{ 384 return (Message)rc; 385 }catch(ClassCastException e){ 386 throw new IOException ("Could not read message "+identity+" at location "+location 387 +", expected a message, but got: "+rc); 388 } 389 } 390 391 398 public void recover(final MessageRecoveryListener listener) throws Exception { 399 flush(); 400 referenceStore.recover(new RecoveryListenerAdapter(this,listener)); 401 } 402 403 public void start() throws Exception { 404 referenceStore.start(); 405 } 406 407 public void stop() throws Exception { 408 flush(); 409 asyncWriteTask.shutdown(); 410 referenceStore.stop(); 411 } 412 413 416 public ReferenceStore getReferenceStore(){ 417 return referenceStore; 418 } 419 420 423 public void removeAllMessages(ConnectionContext context) throws IOException { 424 flush(); 425 referenceStore.removeAllMessages(context); 426 } 427 428 public ActiveMQDestination getDestination(){ 429 return destination; 430 } 431 432 public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef) 433 throws IOException { 434 throw new IOException ("The journal does not support message references."); 435 } 436 437 public String getMessageReference(MessageId identity) throws IOException { 438 throw new IOException ("The journal does not support message references."); 439 } 440 441 446 public int getMessageCount() throws IOException { 447 flush(); 448 return referenceStore.getMessageCount(); 449 } 450 451 public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception { 452 476 RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener); 477 referenceStore.recoverNextMessages(maxReturned,recoveryListener); 478 if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ 479 flush(); 480 referenceStore.recoverNextMessages(maxReturned,recoveryListener); 481 } 482 } 483 484 Message getMessage(ReferenceData data) throws IOException { 485 Location location=new Location(); 486 location.setDataFileId(data.getFileId()); 487 location.setOffset(data.getOffset()); 488 DataStructure rc=peristenceAdapter.readCommand(location); 489 try{ 490 return (Message)rc; 491 }catch(ClassCastException e){ 492 throw new IOException ("Could not read message at location "+location+", expected a message, but got: "+rc); 493 } 494 } 495 496 public void resetBatching(){ 497 referenceStore.resetBatching(); 498 } 499 500 public Location getMark(){ 501 return mark.get(); 502 } 503 } 504 | Popular Tags |