1 18 package org.apache.activemq.store.journal; 19 20 import java.io.File ; 21 import java.io.IOException ; 22 import java.util.ArrayList ; 23 import java.util.HashSet ; 24 import java.util.Iterator ; 25 import java.util.Set ; 26 import java.util.concurrent.Callable ; 27 import java.util.concurrent.ConcurrentHashMap ; 28 import java.util.concurrent.CountDownLatch ; 29 import java.util.concurrent.FutureTask ; 30 import java.util.concurrent.LinkedBlockingQueue ; 31 import java.util.concurrent.ThreadFactory ; 32 import java.util.concurrent.ThreadPoolExecutor ; 33 import java.util.concurrent.TimeUnit ; 34 import java.util.concurrent.atomic.AtomicBoolean ; 35 36 import org.apache.activeio.journal.InvalidRecordLocationException; 37 import org.apache.activeio.journal.Journal; 38 import org.apache.activeio.journal.JournalEventListener; 39 import org.apache.activeio.journal.RecordLocation; 40 import org.apache.activeio.packet.ByteArrayPacket; 41 import org.apache.activeio.packet.Packet; 42 import org.apache.activemq.broker.ConnectionContext; 43 import org.apache.activemq.command.ActiveMQDestination; 44 import org.apache.activemq.command.ActiveMQQueue; 45 import org.apache.activemq.command.ActiveMQTopic; 46 import org.apache.activemq.command.DataStructure; 47 import org.apache.activemq.command.JournalQueueAck; 48 import org.apache.activemq.command.JournalTopicAck; 49 import org.apache.activemq.command.JournalTrace; 50 import org.apache.activemq.command.JournalTransaction; 51 import org.apache.activemq.command.Message; 52 import org.apache.activemq.command.MessageAck; 53 import org.apache.activemq.memory.UsageListener; 54 import org.apache.activemq.memory.UsageManager; 55 import org.apache.activemq.openwire.OpenWireFormat; 56 import org.apache.activemq.store.MessageStore; 57 import org.apache.activemq.store.PersistenceAdapter; 58 import org.apache.activemq.store.TopicMessageStore; 59 import org.apache.activemq.store.TransactionStore; 60 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 61 import org.apache.activemq.store.journal.JournalTransactionStore.Tx; 62 import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation; 63 import org.apache.activemq.thread.Scheduler; 64 import org.apache.activemq.thread.Task; 65 import org.apache.activemq.thread.TaskRunner; 66 import org.apache.activemq.thread.TaskRunnerFactory; 67 import org.apache.activemq.util.ByteSequence; 68 import org.apache.activemq.util.IOExceptionSupport; 69 import org.apache.activemq.wireformat.WireFormat; 70 import org.apache.commons.logging.Log; 71 import org.apache.commons.logging.LogFactory; 72 73 82 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener { 83 84 private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class); 85 86 private final Journal journal; 87 private final PersistenceAdapter longTermPersistence; 88 89 private final WireFormat wireFormat = new OpenWireFormat(); 90 91 private final ConcurrentHashMap queues = new ConcurrentHashMap (); 92 private final ConcurrentHashMap topics = new ConcurrentHashMap (); 93 94 private UsageManager usageManager; 95 private long checkpointInterval = 1000 * 60 * 5; 96 private long lastCheckpointRequest = System.currentTimeMillis(); 97 private long lastCleanup = System.currentTimeMillis(); 98 private int maxCheckpointWorkers = 10; 99 private int maxCheckpointMessageAddSize = 1024*1024; 100 101 private JournalTransactionStore transactionStore = new JournalTransactionStore(this); 102 private ThreadPoolExecutor checkpointExecutor; 103 104 private TaskRunner checkpointTask; 105 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch (1); 106 private boolean fullCheckPoint; 107 108 private AtomicBoolean started = new AtomicBoolean (false); 109 110 private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); 111 112 final Runnable createPeriodicCheckpointTask() { 113 return new Runnable () { 114 public void run() { 115 if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { 116 checkpoint(false, true); 117 } 118 } 119 }; 120 } 121 122 public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { 123 124 this.journal = journal; 125 journal.setJournalEventListener(this); 126 127 checkpointTask = taskRunnerFactory.createTaskRunner(new Task (){ 128 public boolean iterate() { 129 return doCheckpoint(); 130 } 131 }, "ActiveMQ Journal Checkpoint Worker"); 132 133 this.longTermPersistence = longTermPersistence; 134 } 135 136 139 public void setUsageManager(UsageManager usageManager) { 140 this.usageManager = usageManager; 141 longTermPersistence.setUsageManager(usageManager); 142 } 143 144 public Set getDestinations() { 145 Set destinations = new HashSet (longTermPersistence.getDestinations()); 146 destinations.addAll(queues.keySet()); 147 destinations.addAll(topics.keySet()); 148 return destinations; 149 } 150 151 private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { 152 if (destination.isQueue()) { 153 return createQueueMessageStore((ActiveMQQueue) destination); 154 } 155 else { 156 return createTopicMessageStore((ActiveMQTopic) destination); 157 } 158 } 159 160 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 161 JournalMessageStore store = (JournalMessageStore) queues.get(destination); 162 if (store == null) { 163 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination); 164 store = new JournalMessageStore(this, checkpointStore, destination); 165 queues.put(destination, store); 166 } 167 return store; 168 } 169 170 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { 171 JournalTopicMessageStore store = (JournalTopicMessageStore) topics.get(destinationName); 172 if (store == null) { 173 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName); 174 store = new JournalTopicMessageStore(this, checkpointStore, destinationName); 175 topics.put(destinationName, store); 176 } 177 return store; 178 } 179 180 public TransactionStore createTransactionStore() throws IOException { 181 return transactionStore; 182 } 183 184 public long getLastMessageBrokerSequenceId() throws IOException { 185 return longTermPersistence.getLastMessageBrokerSequenceId(); 186 } 187 188 public void beginTransaction(ConnectionContext context) throws IOException { 189 longTermPersistence.beginTransaction(context); 190 } 191 192 public void commitTransaction(ConnectionContext context) throws IOException { 193 longTermPersistence.commitTransaction(context); 194 } 195 196 public void rollbackTransaction(ConnectionContext context) throws IOException { 197 longTermPersistence.rollbackTransaction(context); 198 } 199 200 public synchronized void start() throws Exception { 201 if( !started.compareAndSet(false, true) ) 202 return; 203 204 checkpointExecutor = new ThreadPoolExecutor (maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue (), new ThreadFactory () { 205 public Thread newThread(Runnable runable) { 206 Thread t = new Thread (runable, "Journal checkpoint worker"); 207 t.setPriority(7); 208 return t; 209 } 210 }); 211 213 this.usageManager.addUsageListener(this); 214 215 if (longTermPersistence instanceof JDBCPersistenceAdapter) { 216 ((JDBCPersistenceAdapter) longTermPersistence).setCleanupPeriod(0); 219 } 220 221 longTermPersistence.start(); 222 createTransactionStore(); 223 recover(); 224 225 Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10); 227 228 } 229 230 public void stop() throws Exception { 231 232 this.usageManager.removeUsageListener(this); 233 if( !started.compareAndSet(true, false) ) 234 return; 235 236 Scheduler.cancel(periodicCheckpointTask); 237 238 checkpoint(true, true); 240 checkpointTask.shutdown(); 241 checkpointExecutor.shutdown(); 242 243 queues.clear(); 244 topics.clear(); 245 246 IOException firstException = null; 247 try { 248 journal.close(); 249 } catch (Exception e) { 250 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); 251 } 252 longTermPersistence.stop(); 253 254 if (firstException != null) { 255 throw firstException; 256 } 257 } 258 259 public PersistenceAdapter getLongTermPersistence() { 262 return longTermPersistence; 263 } 264 265 268 public WireFormat getWireFormat() { 269 return wireFormat; 270 } 271 272 275 281 public void overflowNotification(RecordLocation safeLocation) { 282 checkpoint(false, true); 283 } 284 285 291 public void checkpoint(boolean sync, boolean fullCheckpoint) { 292 try { 293 if (journal == null ) 294 throw new IllegalStateException ("Journal is closed."); 295 296 long now = System.currentTimeMillis(); 297 CountDownLatch latch = null; 298 synchronized(this) { 299 latch = nextCheckpointCountDownLatch; 300 lastCheckpointRequest = now; 301 if( fullCheckpoint ) { 302 this.fullCheckPoint = true; 303 } 304 } 305 306 checkpointTask.wakeup(); 307 308 if (sync) { 309 log.debug("Waking for checkpoint to complete."); 310 latch.await(); 311 } 312 } 313 catch (InterruptedException e) { 314 Thread.currentThread().interrupt(); 315 log.warn("Request to start checkpoint failed: " + e, e); 316 } 317 } 318 319 public void checkpoint(boolean sync) { 320 checkpoint(sync,sync); 321 } 322 323 327 public boolean doCheckpoint() { 328 CountDownLatch latch = null; 329 boolean fullCheckpoint; 330 synchronized(this) { 331 latch = nextCheckpointCountDownLatch; 332 nextCheckpointCountDownLatch = new CountDownLatch (1); 333 fullCheckpoint = this.fullCheckPoint; 334 this.fullCheckPoint=false; 335 } 336 try { 337 338 log.debug("Checkpoint started."); 339 RecordLocation newMark = null; 340 341 ArrayList futureTasks = new ArrayList (queues.size()+topics.size()); 342 343 if( fullCheckpoint ) { 352 Iterator iterator = queues.values().iterator(); 353 while (iterator.hasNext()) { 354 try { 355 final JournalMessageStore ms = (JournalMessageStore) iterator.next(); 356 FutureTask task = new FutureTask (new Callable () { 357 public Object call() throws Exception { 358 return ms.checkpoint(); 359 }}); 360 futureTasks.add(task); 361 checkpointExecutor.execute(task); 362 } 363 catch (Exception e) { 364 log.error("Failed to checkpoint a message store: " + e, e); 365 } 366 } 367 } 368 369 Iterator iterator = topics.values().iterator(); 370 while (iterator.hasNext()) { 371 try { 372 final JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next(); 373 FutureTask task = new FutureTask (new Callable () { 374 public Object call() throws Exception { 375 return ms.checkpoint(); 376 }}); 377 futureTasks.add(task); 378 checkpointExecutor.execute(task); 379 } 380 catch (Exception e) { 381 log.error("Failed to checkpoint a message store: " + e, e); 382 } 383 } 384 385 try { 386 for (Iterator iter = futureTasks.iterator(); iter.hasNext();) { 387 FutureTask ft = (FutureTask ) iter.next(); 388 RecordLocation mark = (RecordLocation) ft.get(); 389 if( fullCheckpoint ) { 391 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { 392 newMark = mark; 393 } 394 } 395 } 396 } catch (Throwable e) { 397 log.error("Failed to checkpoint a message store: " + e, e); 398 } 399 400 401 if( fullCheckpoint ) { 402 try { 403 if (newMark != null) { 404 log.debug("Marking journal at: " + newMark); 405 journal.setMark(newMark, true); 406 } 407 } 408 catch (Exception e) { 409 log.error("Failed to mark the Journal: " + e, e); 410 } 411 412 if (longTermPersistence instanceof JDBCPersistenceAdapter) { 413 long now = System.currentTimeMillis(); 416 if( now > lastCleanup+checkpointInterval ) { 417 lastCleanup = now; 418 ((JDBCPersistenceAdapter) longTermPersistence).cleanup(); 419 } 420 } 421 } 422 423 log.debug("Checkpoint done."); 424 } 425 finally { 426 latch.countDown(); 427 } 428 synchronized(this) { 429 return this.fullCheckPoint; 430 } 431 432 } 433 434 439 public DataStructure readCommand(RecordLocation location) throws IOException { 440 try { 441 Packet packet = journal.read(location); 442 return (DataStructure) wireFormat.unmarshal(toByteSequence(packet)); 443 } 444 catch (InvalidRecordLocationException e) { 445 throw createReadException(location, e); 446 } 447 catch (IOException e) { 448 throw createReadException(location, e); 449 } 450 } 451 452 461 private void recover() throws IllegalStateException , InvalidRecordLocationException, IOException , IOException { 462 463 RecordLocation pos = null; 464 int transactionCounter = 0; 465 466 log.info("Journal Recovery Started from: " + journal); 467 ConnectionContext context = new ConnectionContext(); 468 469 while ((pos = journal.getNextRecordLocation(pos)) != null) { 471 Packet data = journal.read(pos); 472 DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data)); 473 474 if (c instanceof Message ) { 475 Message message = (Message) c; 476 JournalMessageStore store = (JournalMessageStore) createMessageStore(message.getDestination()); 477 if ( message.isInTransaction()) { 478 transactionStore.addMessage(store, message, pos); 479 } 480 else { 481 store.replayAddMessage(context, message); 482 transactionCounter++; 483 } 484 } else { 485 switch (c.getDataStructureType()) { 486 case JournalQueueAck.DATA_STRUCTURE_TYPE: 487 { 488 JournalQueueAck command = (JournalQueueAck) c; 489 JournalMessageStore store = (JournalMessageStore) createMessageStore(command.getDestination()); 490 if (command.getMessageAck().isInTransaction()) { 491 transactionStore.removeMessage(store, command.getMessageAck(), pos); 492 } 493 else { 494 store.replayRemoveMessage(context, command.getMessageAck()); 495 transactionCounter++; 496 } 497 } 498 break; 499 case JournalTopicAck.DATA_STRUCTURE_TYPE: 500 { 501 JournalTopicAck command = (JournalTopicAck) c; 502 JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(command.getDestination()); 503 if (command.getTransactionId() != null) { 504 transactionStore.acknowledge(store, command, pos); 505 } 506 else { 507 store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()); 508 transactionCounter++; 509 } 510 } 511 break; 512 case JournalTransaction.DATA_STRUCTURE_TYPE: 513 { 514 JournalTransaction command = (JournalTransaction) c; 515 try { 516 switch (command.getType()) { 518 case JournalTransaction.XA_PREPARE: 519 transactionStore.replayPrepare(command.getTransactionId()); 520 break; 521 case JournalTransaction.XA_COMMIT: 522 case JournalTransaction.LOCAL_COMMIT: 523 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); 524 if (tx == null) 525 break; 528 tx.getOperations(); 530 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { 531 TxOperation op = (TxOperation) iter.next(); 532 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { 533 op.store.replayAddMessage(context, (Message) op.data); 534 } 535 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { 536 op.store.replayRemoveMessage(context, (MessageAck) op.data); 537 } 538 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { 539 JournalTopicAck ack = (JournalTopicAck) op.data; 540 ((JournalTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack 541 .getMessageId()); 542 } 543 } 544 transactionCounter++; 545 break; 546 case JournalTransaction.LOCAL_ROLLBACK: 547 case JournalTransaction.XA_ROLLBACK: 548 transactionStore.replayRollback(command.getTransactionId()); 549 break; 550 } 551 } 552 catch (IOException e) { 553 log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); 554 } 555 } 556 break; 557 case JournalTrace.DATA_STRUCTURE_TYPE: 558 JournalTrace trace = (JournalTrace) c; 559 log.debug("TRACE Entry: " + trace.getMessage()); 560 break; 561 default: 562 log.error("Unknown type of record in transaction log which will be discarded: " + c); 563 } 564 } 565 } 566 567 RecordLocation location = writeTraceMessage("RECOVERED", true); 568 journal.setMark(location, true); 569 570 log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."); 571 } 572 573 private IOException createReadException(RecordLocation location, Exception e) { 574 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); 575 } 576 577 protected IOException createWriteException(DataStructure packet, Exception e) { 578 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); 579 } 580 581 protected IOException createWriteException(String command, Exception e) { 582 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); 583 } 584 585 protected IOException createRecoveryFailedException(Exception e) { 586 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); 587 } 588 589 596 public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { 597 if( started.get() ) 598 return journal.write(toPacket(wireFormat.marshal(command)), sync); 599 throw new IOException ("closed"); 600 } 601 602 private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException { 603 JournalTrace trace = new JournalTrace(); 604 trace.setMessage(message); 605 return writeCommand(trace, sync); 606 } 607 608 public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) { 609 newPercentUsage = ((newPercentUsage)/10)*10; 610 oldPercentUsage = ((oldPercentUsage)/10)*10; 611 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { 612 boolean sync = newPercentUsage >= 90; 613 checkpoint(sync, true); 614 } 615 } 616 617 public JournalTransactionStore getTransactionStore() { 618 return transactionStore; 619 } 620 621 public void deleteAllMessages() throws IOException { 622 try { 623 JournalTrace trace = new JournalTrace(); 624 trace.setMessage("DELETED"); 625 RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false); 626 journal.setMark(location, true); 627 log.info("Journal deleted: "); 628 } catch (IOException e) { 629 throw e; 630 } catch (Throwable e) { 631 throw IOExceptionSupport.create(e); 632 } 633 longTermPersistence.deleteAllMessages(); 634 } 635 636 public UsageManager getUsageManager() { 637 return usageManager; 638 } 639 640 public int getMaxCheckpointMessageAddSize() { 641 return maxCheckpointMessageAddSize; 642 } 643 644 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { 645 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; 646 } 647 648 public int getMaxCheckpointWorkers() { 649 return maxCheckpointWorkers; 650 } 651 652 public void setMaxCheckpointWorkers(int maxCheckpointWorkers) { 653 this.maxCheckpointWorkers = maxCheckpointWorkers; 654 } 655 656 public boolean isUseExternalMessageReferences() { 657 return false; 658 } 659 660 public void setUseExternalMessageReferences(boolean enable) { 661 if( enable ) 662 throw new IllegalArgumentException ("The journal does not support message references."); 663 } 664 665 public Packet toPacket(ByteSequence sequence) { 666 return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length)); 667 } 668 669 public ByteSequence toByteSequence(Packet packet) { 670 org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); 671 return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); 672 } 673 674 public void setBrokerName(String brokerName){ 675 longTermPersistence.setBrokerName(brokerName); 676 } 677 678 public String toString(){ 679 return "JournalPersistenceAdapator(" + longTermPersistence + ")"; 680 } 681 682 public void setDirectory(File dir){ 683 } 684 685 } 686 | Popular Tags |