1 18 package org.apache.activemq; 19 20 import javax.jms.Connection ; 21 import javax.jms.Destination ; 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Session ; 27 import javax.jms.TextMessage ; 28 import javax.jms.Topic ; 29 30 33 public class JmsTopicRedeliverTest extends TestSupport { 34 35 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 36 .getLog(JmsTopicRedeliverTest.class); 37 38 39 protected Connection connection; 40 protected Session session; 41 protected Session consumeSession; 42 protected MessageConsumer consumer; 43 protected MessageProducer producer; 44 protected Destination consumerDestination; 45 protected Destination producerDestination; 46 protected boolean topic = true; 47 protected boolean durable = false; 48 protected boolean verbose = false; 49 protected long initRedeliveryDelay = 0; 50 51 protected void setUp() throws Exception { 52 super.setUp(); 53 54 connectionFactory = createConnectionFactory(); 55 connection = createConnection(); 56 initRedeliveryDelay = ((ActiveMQConnection)connection).getRedeliveryPolicy().getInitialRedeliveryDelay(); 57 58 if (durable) { 59 connection.setClientID(getClass().getName()); 60 } 61 62 log.info("Created connection: " + connection); 63 64 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 65 consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 66 67 log.info("Created session: " + session); 68 log.info("Created consumeSession: " + consumeSession); 69 producer = session.createProducer(null); 70 72 log.info("Created producer: " + producer); 73 74 if (topic) { 75 consumerDestination = session.createTopic(getConsumerSubject()); 76 producerDestination = session.createTopic(getProducerSubject()); 77 } else { 78 consumerDestination = session.createQueue(getConsumerSubject()); 79 producerDestination = session.createQueue(getProducerSubject()); 80 } 81 82 log.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass()); 83 log.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass()); 84 consumer = createConsumer(); 85 connection.start(); 86 87 log.info("Created connection: " + connection); 88 } 89 90 91 protected void tearDown() throws Exception { 92 if (connection != null) { 93 connection.close(); 94 } 95 super.tearDown(); 96 } 97 98 99 105 protected String getConsumerSubject() { 106 return "TEST"; 107 } 108 109 115 protected String getProducerSubject() { 116 return "TEST"; 117 } 118 119 124 public void testRecover() throws Exception { 125 String text = "TEST"; 126 Message sendMessage = session.createTextMessage(text); 127 128 if (verbose) { 129 log.info("About to send a message: " + sendMessage + " with text: " + text); 130 } 131 producer.send(producerDestination, sendMessage); 132 133 Message unackMessage = consumer.receive(initRedeliveryDelay + 1000); 135 assertNotNull(unackMessage); 136 String unackId = unackMessage.getJMSMessageID(); 137 assertEquals(((TextMessage ) unackMessage).getText(), text); 138 assertFalse(unackMessage.getJMSRedelivered()); 139 141 consumeSession.recover(); 143 Message ackMessage = consumer.receive(initRedeliveryDelay + 1000); 144 assertNotNull(ackMessage); 145 ackMessage.acknowledge(); 146 String ackId = ackMessage.getJMSMessageID(); 147 assertEquals(((TextMessage ) ackMessage).getText(), text); 148 assertTrue(ackMessage.getJMSRedelivered()); 149 assertEquals(unackId, ackId); 151 consumeSession.recover(); 152 assertNull(consumer.receiveNoWait()); 153 } 154 155 protected MessageConsumer createConsumer() throws JMSException { 156 if (durable) { 157 log.info("Creating durable consumer"); 158 return consumeSession.createDurableSubscriber((Topic ) consumerDestination, getName()); 159 } 160 return consumeSession.createConsumer(consumerDestination); 161 } 162 163 } 164 | Popular Tags |