1 14 15 package org.apache.activemq.store.amq; 16 17 import java.io.File ; 18 import java.io.IOException ; 19 import java.util.Date ; 20 import java.util.HashSet ; 21 import java.util.Iterator ; 22 import java.util.Set ; 23 import java.util.concurrent.ConcurrentHashMap ; 24 import java.util.concurrent.CountDownLatch ; 25 import java.util.concurrent.atomic.AtomicBoolean ; 26 import org.apache.activeio.journal.Journal; 27 import org.apache.activemq.broker.ConnectionContext; 28 import org.apache.activemq.broker.BrokerServiceAware; 29 import org.apache.activemq.broker.BrokerService; 30 import org.apache.activemq.command.ActiveMQDestination; 31 import org.apache.activemq.command.ActiveMQQueue; 32 import org.apache.activemq.command.ActiveMQTopic; 33 import org.apache.activemq.command.DataStructure; 34 import org.apache.activemq.command.JournalQueueAck; 35 import org.apache.activemq.command.JournalTopicAck; 36 import org.apache.activemq.command.JournalTrace; 37 import org.apache.activemq.command.JournalTransaction; 38 import org.apache.activemq.command.Message; 39 import org.apache.activemq.command.MessageAck; 40 import org.apache.activemq.kaha.impl.async.AsyncDataManager; 41 import org.apache.activemq.kaha.impl.async.Location; 42 import org.apache.activemq.memory.UsageListener; 43 import org.apache.activemq.memory.UsageManager; 44 import org.apache.activemq.openwire.OpenWireFormat; 45 import org.apache.activemq.store.MessageStore; 46 import org.apache.activemq.store.PersistenceAdapter; 47 import org.apache.activemq.store.ReferenceStore; 48 import org.apache.activemq.store.ReferenceStoreAdapter; 49 import org.apache.activemq.store.TopicMessageStore; 50 import org.apache.activemq.store.TopicReferenceStore; 51 import org.apache.activemq.store.TransactionStore; 52 import org.apache.activemq.store.amq.AMQTransactionStore.Tx; 53 import org.apache.activemq.store.amq.AMQTransactionStore.TxOperation; 54 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter; 55 import org.apache.activemq.thread.DefaultThreadPools; 56 import org.apache.activemq.thread.Scheduler; 57 import org.apache.activemq.thread.Task; 58 import org.apache.activemq.thread.TaskRunner; 59 import org.apache.activemq.thread.TaskRunnerFactory; 60 import org.apache.activemq.util.ByteSequence; 61 import org.apache.activemq.util.IOExceptionSupport; 62 import org.apache.activemq.util.IOHelper; 63 import org.apache.activemq.wireformat.WireFormat; 64 import org.apache.commons.logging.Log; 65 import org.apache.commons.logging.LogFactory; 66 67 75 public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware { 76 77 private static final Log log=LogFactory.getLog(AMQPersistenceAdapter.class); 78 private final ConcurrentHashMap <ActiveMQQueue,AMQMessageStore> queues=new ConcurrentHashMap <ActiveMQQueue,AMQMessageStore>(); 79 private final ConcurrentHashMap <ActiveMQTopic,AMQMessageStore> topics=new ConcurrentHashMap <ActiveMQTopic,AMQMessageStore>(); 80 private AsyncDataManager asyncDataManager; 81 private ReferenceStoreAdapter referenceStoreAdapter; 82 private TaskRunnerFactory taskRunnerFactory; 83 private WireFormat wireFormat=new OpenWireFormat(); 84 private UsageManager usageManager; 85 private long cleanupInterval=1000*60; 86 private long checkpointInterval=1000*10; 87 private int maxCheckpointWorkers=1; 88 private int maxCheckpointMessageAddSize=1024*4; 89 private AMQTransactionStore transactionStore=new AMQTransactionStore(this); 90 private TaskRunner checkpointTask; 91 private CountDownLatch nextCheckpointCountDownLatch=new CountDownLatch (1); 92 private final AtomicBoolean started=new AtomicBoolean (false); 93 private Runnable periodicCheckpointTask; 94 private Runnable periodicCleanupTask; 95 private boolean deleteAllMessages; 96 private boolean syncOnWrite; 97 private String brokerName=""; 98 private File directory; 99 private BrokerService brokerService; 100 101 public String getBrokerName(){ 102 return this.brokerName; 103 } 104 105 public void setBrokerName(String brokerName){ 106 this.brokerName=brokerName; 107 if(this.referenceStoreAdapter!=null){ 108 this.referenceStoreAdapter.setBrokerName(brokerName); 109 } 110 } 111 112 public BrokerService getBrokerService() { 113 return brokerService; 114 } 115 116 public void setBrokerService(BrokerService brokerService) { 117 this.brokerService = brokerService; 118 } 119 120 public synchronized void start() throws Exception { 121 if(!started.compareAndSet(false,true)) 122 return; 123 if(this.directory==null) { 124 if (brokerService != null) { 125 this.directory = brokerService.getBrokerDataDirectory(); 126 } 127 else { 128 this.directory=new File (IOHelper.getDefaultDataDirectory(),brokerName); 129 this.directory=new File (directory,"amqstore"); 130 } 131 } 132 log.info("AMQStore starting using directory: " + directory); 133 this.directory.mkdirs(); 134 135 if(this.usageManager!=null){ 136 this.usageManager.addUsageListener(this); 137 } 138 if(asyncDataManager==null){ 139 asyncDataManager=createAsyncDataManager(); 140 } 141 if(referenceStoreAdapter==null){ 142 referenceStoreAdapter=createReferenceStoreAdapter(); 143 } 144 referenceStoreAdapter.setDirectory(new File (directory,"kr-store")); 145 referenceStoreAdapter.setBrokerName(getBrokerName()); 146 referenceStoreAdapter.setUsageManager(usageManager); 147 if(taskRunnerFactory==null){ 148 taskRunnerFactory=createTaskRunnerFactory(); 149 } 150 asyncDataManager.start(); 151 if(deleteAllMessages){ 152 asyncDataManager.delete(); 153 try{ 154 JournalTrace trace=new JournalTrace(); 155 trace.setMessage("DELETED "+new Date ()); 156 Location location=asyncDataManager.write(wireFormat.marshal(trace),false); 157 asyncDataManager.setMark(location,true); 158 log.info("Journal deleted: "); 159 deleteAllMessages=false; 160 }catch(IOException e){ 161 throw e; 162 }catch(Throwable e){ 163 throw IOExceptionSupport.create(e); 164 } 165 referenceStoreAdapter.deleteAllMessages(); 166 } 167 referenceStoreAdapter.start(); 168 Set <Integer > files=referenceStoreAdapter.getReferenceFileIdsInUse(); 169 log.info("Active data files: "+files); 170 checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){ 171 172 public boolean iterate(){ 173 doCheckpoint(); 174 return false; 175 } 176 },"ActiveMQ Journal Checkpoint Worker"); 177 createTransactionStore(); 178 if(referenceStoreAdapter.isStoreValid()==false){ 179 log.warn("The ReferenceStore is not valid - recovering ..."); 180 recover(); 181 log.info("Finished recovering the ReferenceStore"); 182 }else { 183 Location location=writeTraceMessage("RECOVERED "+new Date (),true); 184 asyncDataManager.setMark(location,true); 185 } 186 periodicCheckpointTask=new Runnable (){ 188 189 public void run(){ 190 checkpoint(false); 191 } 192 }; 193 Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval); 194 periodicCleanupTask=new Runnable (){ 195 196 public void run(){ 197 cleanup(); 198 } 199 }; 200 Scheduler.executePeriodically(periodicCleanupTask,cleanupInterval); 201 } 202 203 public void stop() throws Exception { 204 205 if(!started.compareAndSet(true,false)) 206 return; 207 this.usageManager.removeUsageListener(this); 208 Scheduler.cancel(periodicCheckpointTask); 209 Scheduler.cancel(periodicCleanupTask); 210 Iterator <AMQMessageStore> iterator=queues.values().iterator(); 211 while(iterator.hasNext()){ 212 AMQMessageStore ms=iterator.next(); 213 ms.stop(); 214 } 215 iterator=topics.values().iterator(); 216 while(iterator.hasNext()){ 217 final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next(); 218 ms.stop(); 219 } 220 checkpoint(true); 222 checkpointTask.shutdown(); 223 queues.clear(); 224 topics.clear(); 225 IOException firstException=null; 226 referenceStoreAdapter.stop(); 227 try{ 228 log.debug("Journal close"); 229 asyncDataManager.close(); 230 }catch(Exception e){ 231 firstException=IOExceptionSupport.create("Failed to close journals: "+e,e); 232 } 233 if(firstException!=null){ 234 throw firstException; 235 } 236 } 237 238 242 public void checkpoint(boolean sync){ 243 try{ 244 if(asyncDataManager==null) 245 throw new IllegalStateException ("Journal is closed."); 246 CountDownLatch latch=null; 247 synchronized(this){ 248 latch=nextCheckpointCountDownLatch; 249 } 250 checkpointTask.wakeup(); 251 if(sync){ 252 if(log.isDebugEnabled()){ 253 log.debug("Waitng for checkpoint to complete."); 254 } 255 latch.await(); 256 } 257 referenceStoreAdapter.checkpoint(sync); 258 }catch(InterruptedException e){ 259 Thread.currentThread().interrupt(); 260 log.warn("Request to start checkpoint failed: "+e,e); 261 }catch(IOException e){ 262 log.error("checkpoint failed: "+e,e); 263 } 264 } 265 266 271 public boolean doCheckpoint(){ 272 CountDownLatch latch=null; 273 synchronized(this){ 274 latch=nextCheckpointCountDownLatch; 275 nextCheckpointCountDownLatch=new CountDownLatch (1); 276 } 277 try{ 278 if(log.isDebugEnabled()){ 279 log.debug("Checkpoint started."); 280 } 281 282 Location newMark=null; 283 Iterator <AMQMessageStore> iterator=queues.values().iterator(); 284 while(iterator.hasNext()){ 285 final AMQMessageStore ms=iterator.next(); 286 Location mark=(Location)ms.getMark(); 287 if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){ 288 newMark=mark; 289 } 290 } 291 iterator=topics.values().iterator(); 292 while(iterator.hasNext()){ 293 final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next(); 294 Location mark=(Location)ms.getMark(); 295 if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){ 296 newMark=mark; 297 } 298 } 299 try{ 300 if(newMark!=null){ 301 if(log.isDebugEnabled()){ 302 log.debug("Marking journal at: "+newMark); 303 } 304 asyncDataManager.setMark(newMark,false); 305 writeTraceMessage("CHECKPOINT "+new Date (),true); 306 } 307 }catch(Exception e){ 308 log.error("Failed to mark the Journal: "+e,e); 309 } 310 if(log.isDebugEnabled()){ 311 log.debug("Checkpoint done."); 312 } 313 }finally{ 314 latch.countDown(); 315 } 316 return true; 317 } 318 319 325 public void cleanup(){ 326 try{ 327 Set <Integer > inUse=referenceStoreAdapter.getReferenceFileIdsInUse(); 328 asyncDataManager.consolidateDataFilesNotIn(inUse); 329 }catch(IOException e){ 330 log.error("Could not cleanup data files: "+e,e); 331 } 332 } 333 334 public Set <ActiveMQDestination> getDestinations(){ 335 Set <ActiveMQDestination> destinations=new HashSet <ActiveMQDestination>(referenceStoreAdapter.getDestinations()); 336 destinations.addAll(queues.keySet()); 337 destinations.addAll(topics.keySet()); 338 return destinations; 339 } 340 341 private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { 342 if(destination.isQueue()){ 343 return createQueueMessageStore((ActiveMQQueue)destination); 344 }else{ 345 return createTopicMessageStore((ActiveMQTopic)destination); 346 } 347 } 348 349 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 350 AMQMessageStore store=queues.get(destination); 351 if(store==null){ 352 ReferenceStore checkpointStore=referenceStoreAdapter.createQueueReferenceStore(destination); 353 store=new AMQMessageStore(this,checkpointStore,destination); 354 try{ 355 store.start(); 356 }catch(Exception e){ 357 throw IOExceptionSupport.create(e); 358 } 359 queues.put(destination,store); 360 } 361 return store; 362 } 363 364 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { 365 AMQTopicMessageStore store=(AMQTopicMessageStore)topics.get(destinationName); 366 if(store==null){ 367 TopicReferenceStore checkpointStore=referenceStoreAdapter.createTopicReferenceStore(destinationName); 368 store=new AMQTopicMessageStore(this,checkpointStore,destinationName); 369 try{ 370 store.start(); 371 }catch(Exception e){ 372 throw IOExceptionSupport.create(e); 373 } 374 topics.put(destinationName,store); 375 } 376 return store; 377 } 378 379 public TransactionStore createTransactionStore() throws IOException { 380 return transactionStore; 381 } 382 383 public long getLastMessageBrokerSequenceId() throws IOException { 384 return referenceStoreAdapter.getLastMessageBrokerSequenceId(); 385 } 386 387 public void beginTransaction(ConnectionContext context) throws IOException { 388 referenceStoreAdapter.beginTransaction(context); 389 } 390 391 public void commitTransaction(ConnectionContext context) throws IOException { 392 referenceStoreAdapter.commitTransaction(context); 393 } 394 395 public void rollbackTransaction(ConnectionContext context) throws IOException { 396 referenceStoreAdapter.rollbackTransaction(context); 397 } 398 399 404 public DataStructure readCommand(Location location) throws IOException { 405 try{ 406 ByteSequence packet=asyncDataManager.read(location); 407 return (DataStructure)wireFormat.unmarshal(packet); 408 }catch(IOException e){ 409 throw createReadException(location,e); 410 } 411 } 412 413 421 private void recover() throws IllegalStateException ,IOException { 422 Location pos=null; 423 int redoCounter=0; 424 log.info("Journal Recovery Started from: "+asyncDataManager); 425 long start=System.currentTimeMillis(); 426 ConnectionContext context=new ConnectionContext(); 427 while((pos=asyncDataManager.getNextLocation(pos))!=null){ 429 ByteSequence data=asyncDataManager.read(pos); 430 DataStructure c=(DataStructure)wireFormat.unmarshal(data); 431 if(c instanceof Message){ 432 Message message=(Message)c; 433 AMQMessageStore store=(AMQMessageStore)createMessageStore(message.getDestination()); 434 if(message.isInTransaction()){ 435 transactionStore.addMessage(store,message,pos); 436 }else{ 437 if(store.replayAddMessage(context,message,pos)){ 438 redoCounter++; 439 } 440 } 441 }else{ 442 switch(c.getDataStructureType()){ 443 case JournalQueueAck.DATA_STRUCTURE_TYPE: { 444 JournalQueueAck command=(JournalQueueAck)c; 445 AMQMessageStore store=(AMQMessageStore)createMessageStore(command.getDestination()); 446 if(command.getMessageAck().isInTransaction()){ 447 transactionStore.removeMessage(store,command.getMessageAck(),pos); 448 }else{ 449 if(store.replayRemoveMessage(context,command.getMessageAck())){ 450 redoCounter++; 451 } 452 } 453 } 454 break; 455 case JournalTopicAck.DATA_STRUCTURE_TYPE: { 456 JournalTopicAck command=(JournalTopicAck)c; 457 AMQTopicMessageStore store=(AMQTopicMessageStore)createMessageStore(command.getDestination()); 458 if(command.getTransactionId()!=null){ 459 transactionStore.acknowledge(store,command,pos); 460 }else{ 461 if(store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command 462 .getMessageId())){ 463 redoCounter++; 464 } 465 } 466 } 467 break; 468 case JournalTransaction.DATA_STRUCTURE_TYPE: { 469 JournalTransaction command=(JournalTransaction)c; 470 try{ 471 switch(command.getType()){ 473 case JournalTransaction.XA_PREPARE: 474 transactionStore.replayPrepare(command.getTransactionId()); 475 break; 476 case JournalTransaction.XA_COMMIT: 477 case JournalTransaction.LOCAL_COMMIT: 478 Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared()); 479 if(tx==null) 480 break; tx.getOperations(); 484 for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){ 485 TxOperation op=(TxOperation)iter.next(); 486 if(op.operationType==TxOperation.ADD_OPERATION_TYPE){ 487 if(op.store.replayAddMessage(context,(Message)op.data,op.location)) 488 redoCounter++; 489 } 490 if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){ 491 if(op.store.replayRemoveMessage(context,(MessageAck)op.data)) 492 redoCounter++; 493 } 494 if(op.operationType==TxOperation.ACK_OPERATION_TYPE){ 495 JournalTopicAck ack=(JournalTopicAck)op.data; 496 if(((AMQTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack 497 .getSubscritionName(),ack.getMessageId())){ 498 redoCounter++; 499 } 500 } 501 } 502 break; 503 case JournalTransaction.LOCAL_ROLLBACK: 504 case JournalTransaction.XA_ROLLBACK: 505 transactionStore.replayRollback(command.getTransactionId()); 506 break; 507 } 508 }catch(IOException e){ 509 log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e); 510 } 511 } 512 break; 513 case JournalTrace.DATA_STRUCTURE_TYPE: 514 JournalTrace trace=(JournalTrace)c; 515 log.debug("TRACE Entry: "+trace.getMessage()); 516 break; 517 default: 518 log.error("Unknown type of record in transaction log which will be discarded: "+c); 519 } 520 } 521 } 522 Location location=writeTraceMessage("RECOVERED "+new Date (),true); 523 asyncDataManager.setMark(location,true); 524 long end=System.currentTimeMillis(); 525 log.info("Recovered "+redoCounter+" operations from redo log in "+((end-start)/1000.0f)+" seconds."); 526 } 527 528 private IOException createReadException(Location location,Exception e){ 529 return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e); 530 } 531 532 protected IOException createWriteException(DataStructure packet,Exception e){ 533 return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e); 534 } 535 536 protected IOException createWriteException(String command,Exception e){ 537 return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e); 538 } 539 540 protected IOException createRecoveryFailedException(Exception e){ 541 return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e); 542 } 543 544 551 public Location writeCommand(DataStructure command,boolean syncHint) throws IOException { 552 return asyncDataManager.write(wireFormat.marshal(command),(syncHint&&syncOnWrite)); 553 } 554 555 private Location writeTraceMessage(String message,boolean sync) throws IOException { 556 JournalTrace trace=new JournalTrace(); 557 trace.setMessage(message); 558 return writeCommand(trace,sync); 559 } 560 561 public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ 562 newPercentUsage=((newPercentUsage)/10)*10; 563 oldPercentUsage=((oldPercentUsage)/10)*10; 564 if(newPercentUsage>=70&&oldPercentUsage<newPercentUsage){ 565 checkpoint(false); 566 } 567 } 568 569 public AMQTransactionStore getTransactionStore(){ 570 return transactionStore; 571 } 572 573 public void deleteAllMessages() throws IOException { 574 deleteAllMessages=true; 575 } 576 577 public String toString(){ 578 return "AMQPersistenceAdapter("+directory+")"; 579 } 580 581 protected AsyncDataManager createAsyncDataManager(){ 585 AsyncDataManager manager=new AsyncDataManager(); 586 manager.setDirectory(new File (directory,"journal")); 587 return manager; 588 } 589 590 protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException { 591 KahaReferenceStoreAdapter adaptor=new KahaReferenceStoreAdapter(); 592 return adaptor; 593 } 594 595 protected TaskRunnerFactory createTaskRunnerFactory(){ 596 return DefaultThreadPools.getDefaultTaskRunnerFactory(); 597 } 598 599 public AsyncDataManager getAsyncDataManager(){ 603 return asyncDataManager; 604 } 605 606 public void setAsyncDataManager(AsyncDataManager asyncDataManager){ 607 this.asyncDataManager=asyncDataManager; 608 } 609 610 public ReferenceStoreAdapter getReferenceStoreAdapter(){ 611 return referenceStoreAdapter; 612 } 613 614 public TaskRunnerFactory getTaskRunnerFactory(){ 615 return taskRunnerFactory; 616 } 617 618 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){ 619 this.taskRunnerFactory=taskRunnerFactory; 620 } 621 622 625 public WireFormat getWireFormat(){ 626 return wireFormat; 627 } 628 629 public void setWireFormat(WireFormat wireFormat){ 630 this.wireFormat=wireFormat; 631 } 632 633 public UsageManager getUsageManager(){ 634 return usageManager; 635 } 636 637 public void setUsageManager(UsageManager usageManager){ 638 this.usageManager=usageManager; 639 } 640 641 public int getMaxCheckpointMessageAddSize(){ 642 return maxCheckpointMessageAddSize; 643 } 644 645 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize){ 646 this.maxCheckpointMessageAddSize=maxCheckpointMessageAddSize; 647 } 648 649 public int getMaxCheckpointWorkers(){ 650 return maxCheckpointWorkers; 651 } 652 653 public void setMaxCheckpointWorkers(int maxCheckpointWorkers){ 654 this.maxCheckpointWorkers=maxCheckpointWorkers; 655 } 656 657 public File getDirectory(){ 658 return directory; 659 } 660 661 public void setDirectory(File directory){ 662 this.directory=directory; 663 } 664 665 public boolean isSyncOnWrite(){ 666 return this.syncOnWrite; 667 } 668 669 public void setSyncOnWrite(boolean syncOnWrite){ 670 this.syncOnWrite=syncOnWrite; 671 } 672 673 674 677 public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter){ 678 this.referenceStoreAdapter=referenceStoreAdapter; 679 } 680 } 681 | Popular Tags |