1 18 package org.apache.activemq.broker.policy; 19 20 import org.apache.activemq.ActiveMQConnection; 21 import org.apache.activemq.ActiveMQConnectionFactory; 22 import org.apache.activemq.RedeliveryPolicy; 23 import org.apache.activemq.command.ActiveMQQueue; 24 25 import javax.jms.Destination ; 26 import javax.jms.Message ; 27 28 32 public class DeadLetterTest extends DeadLetterTestSupport { 33 34 private int rollbackCount; 35 36 protected void doTest() throws Exception { 37 connection.start(); 38 39 ActiveMQConnection amqConnection = (ActiveMQConnection) connection; 40 rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; 41 log.info("Will redeliver messages: " + rollbackCount + " times"); 42 43 makeConsumer(); 44 makeDlqConsumer(); 45 46 sendMessages(); 47 48 for (int i = 0; i < messageCount; i++) { 50 consumeAndRollback(i); 51 } 52 53 for (int i = 0; i < messageCount; i++) { 54 Message msg = dlqConsumer.receive(1000); 55 assertMessage(msg, i); 56 assertNotNull("Should be a DLQ message for loop: " + i, msg); 57 } 58 } 59 60 protected void consumeAndRollback(int messageCounter) throws Exception { 61 for (int i = 0; i < rollbackCount; i++) { 62 Message message = consumer.receive(5000); 63 assertNotNull("No message received for message: " + messageCounter + " and rollback loop: " + i, message); 64 assertMessage(message, messageCounter); 65 66 session.rollback(); 67 } 68 log.info("Rolled back: " + rollbackCount + " times"); 69 } 70 71 protected void setUp() throws Exception { 72 transactedMode = true; 73 super.setUp(); 74 } 75 76 protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { 77 ActiveMQConnectionFactory answer = super.createConnectionFactory(); 78 RedeliveryPolicy policy = new RedeliveryPolicy(); 79 policy.setMaximumRedeliveries(3); 80 policy.setBackOffMultiplier((short) 1); 81 policy.setInitialRedeliveryDelay(10); 82 policy.setUseExponentialBackOff(false); 83 answer.setRedeliveryPolicy(policy); 84 return answer; 85 } 86 87 protected Destination createDlqDestination() { 88 return new ActiveMQQueue("ActiveMQ.DLQ"); 89 } 90 91 } 92 | Popular Tags |