1 18 package org.apache.activemq.broker.policy; 19 20 import org.apache.activemq.TestSupport; 21 import org.apache.activemq.broker.BrokerService; 22 23 import javax.jms.Connection ; 24 import javax.jms.DeliveryMode ; 25 import javax.jms.Destination ; 26 import javax.jms.JMSException ; 27 import javax.jms.Message ; 28 import javax.jms.MessageConsumer ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Session ; 31 import javax.jms.TextMessage ; 32 import javax.jms.Topic ; 33 34 37 public abstract class DeadLetterTestSupport extends TestSupport { 38 39 protected int messageCount = 10; 40 protected long timeToLive = 0; 41 protected Connection connection; 42 protected Session session; 43 protected MessageConsumer consumer; 44 protected MessageProducer producer; 45 private Destination destination; 46 protected int deliveryMode = DeliveryMode.PERSISTENT; 47 protected boolean durableSubscriber = false; 48 protected Destination dlqDestination; 49 protected MessageConsumer dlqConsumer; 50 protected BrokerService broker; 51 protected boolean transactedMode = false; 52 protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE; 53 54 protected void setUp() throws Exception { 55 super.setUp(); 56 broker = createBroker(); 57 broker.start(); 58 connection = createConnection(); 59 connection.setClientID(toString()); 60 61 session = connection.createSession(transactedMode, acknowledgeMode); 62 connection.start(); 63 } 64 65 protected void tearDown() throws Exception { 66 if (connection != null) { 67 connection.close(); 68 } 69 if (broker != null) { 70 broker.stop(); 71 } 72 } 73 74 protected abstract void doTest() throws Exception ; 75 76 protected BrokerService createBroker() throws Exception { 77 BrokerService broker = new BrokerService(); 78 broker.setPersistent(false); 79 return broker; 80 } 81 82 protected void makeConsumer() throws JMSException { 83 Destination destination = getDestination(); 84 log.info("Consuming from: " + destination); 85 if (durableSubscriber) { 86 consumer = session.createDurableSubscriber((Topic ) destination, destination.toString()); 87 } 88 else { 89 consumer = session.createConsumer(destination); 90 } 91 } 92 93 protected void makeDlqConsumer() throws JMSException { 94 dlqDestination = createDlqDestination(); 95 96 log.info("Consuming from dead letter on: " + dlqDestination); 97 dlqConsumer = session.createConsumer(dlqDestination); 98 } 99 100 protected void sendMessages() throws JMSException { 101 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 102 producer = session.createProducer(getDestination()); 103 producer.setDeliveryMode(deliveryMode); 104 producer.setTimeToLive(timeToLive); 105 106 log.info("Sending " + messageCount + " messages to: " + getDestination()); 107 for (int i = 0; i < messageCount; i++) { 108 Message message = createMessage(session, i); 109 producer.send(message); 110 } 111 } 112 113 protected TextMessage createMessage(Session session, int i) throws JMSException { 114 return session.createTextMessage(getMessageText(i)); 115 } 116 117 protected String getMessageText(int i) { 118 return "message: " + i; 119 } 120 121 protected void assertMessage(Message message, int i) throws Exception { 122 log.info("Received message: " + message); 123 assertNotNull("No message received for index: " + i, message); 124 assertTrue("Should be a TextMessage not: " + message, message instanceof TextMessage ); 125 TextMessage textMessage = (TextMessage ) message; 126 assertEquals("text of message: " + i, getMessageText(i), textMessage .getText()); 127 } 128 129 protected abstract Destination createDlqDestination(); 130 131 public void testTransientTopicMessage() throws Exception { 132 super.topic = true; 133 deliveryMode = DeliveryMode.NON_PERSISTENT; 134 durableSubscriber = true; 135 doTest(); 136 } 137 138 public void testDurableTopicMessage() throws Exception { 139 super.topic = true; 140 deliveryMode = DeliveryMode.PERSISTENT; 141 durableSubscriber = true; 142 doTest(); 143 } 144 145 public void testTransientQueueMessage() throws Exception { 146 super.topic = false; 147 deliveryMode = DeliveryMode.NON_PERSISTENT; 148 durableSubscriber = false; 149 doTest(); 150 } 151 152 public void testDurableQueueMessage() throws Exception { 153 super.topic = false; 154 deliveryMode = DeliveryMode.PERSISTENT; 155 durableSubscriber = false; 156 doTest(); 157 } 158 159 public Destination getDestination() { 160 if (destination == null) { 161 destination = createDestination(); 162 } 163 return destination; 164 } 165 } 166 | Popular Tags |