1 18 package org.apache.activemq.usecases; 19 20 import org.apache.activemq.ActiveMQConnectionFactory; 21 import org.apache.activemq.test.TestSupport; 22 23 import javax.jms.Connection ; 24 import javax.jms.DeliveryMode ; 25 import javax.jms.Destination ; 26 import javax.jms.JMSException ; 27 import javax.jms.Message ; 28 import javax.jms.MessageConsumer ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Session ; 31 import javax.jms.TextMessage ; 32 import javax.jms.Topic ; 33 34 37 public class DurableConsumerCloseAndReconnectTest extends TestSupport { 38 protected static final long RECEIVE_TIMEOUT = 5000L; 39 40 private Connection connection; 41 private Session session; 42 private MessageConsumer consumer; 43 private MessageProducer producer; 44 private Destination destination; 45 private int messageCount=0; 46 protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { 47 return new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=false"); 48 } 49 50 public void testCreateDurableConsumerCloseThenReconnect() throws Exception { 51 Connection dummyConnection = createConnection(); 53 dummyConnection.start(); 54 55 consumeMessagesDeliveredWhileConsumerClosed(); 56 57 dummyConnection.close(); 58 59 consumeMessagesDeliveredWhileConsumerClosed(); 61 ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true"); 63 dummyConnection = fac.createConnection(); 64 dummyConnection.start(); 65 dummyConnection.close(); 66 } 67 68 protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception { 69 makeConsumer(); 70 closeConsumer(); 71 72 publish(); 73 74 Thread.sleep(1000); 76 77 makeConsumer(); 78 79 Message message = consumer.receive(RECEIVE_TIMEOUT); 80 assertTrue("Should have received a message!", message != null); 81 82 closeConsumer(); 83 84 log.info("Now lets create the consumer again and because we didn't ack, we should get it again"); 85 makeConsumer(); 86 87 message = consumer.receive(RECEIVE_TIMEOUT); 88 assertTrue("Should have received a message!", message != null); 89 message.acknowledge(); 90 91 closeConsumer(); 92 93 log.info("Now lets create the consumer again and because we didn't ack, we should get it again"); 94 makeConsumer(); 95 96 message = consumer.receive(2000); 97 assertTrue("Should have no more messages left!", message == null); 98 99 closeConsumer(); 100 101 log.info("Lets publish one more message now"); 102 publish(); 103 104 makeConsumer(); 105 message = consumer.receive(RECEIVE_TIMEOUT); 106 assertTrue("Should have received a message!", message != null); 107 message.acknowledge(); 108 109 closeConsumer(); 110 } 111 112 protected void publish() throws Exception { 113 connection = createConnection(); 114 connection.start(); 115 116 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 117 destination = createDestination(); 118 119 producer = session.createProducer(destination); 120 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 121 TextMessage msg = session.createTextMessage("This is a test: " + messageCount++); 122 producer.send(msg); 123 124 producer.close(); 125 producer = null; 126 closeSession(); 127 } 128 129 protected Destination createDestination() throws JMSException { 130 if (isTopic()) { 131 return session.createTopic(getSubject()); 132 } 133 else { 134 return session.createQueue(getSubject()); 135 } 136 } 137 138 protected boolean isTopic() { 139 return true; 140 } 141 142 protected void closeConsumer() throws JMSException { 143 consumer.close(); 144 consumer = null; 145 closeSession(); 146 } 147 148 protected void closeSession() throws JMSException { 149 session.close(); 150 session = null; 151 connection.close(); 152 connection = null; 153 } 154 155 protected void makeConsumer() throws Exception { 156 String durableName = getName(); 157 String clientID = getSubject(); 158 log.info("Creating a durable subscribe for clientID: " + clientID + " and durable name: " + durableName); 159 createSession(clientID); 160 consumer = createConsumer(durableName); 161 } 162 163 private MessageConsumer createConsumer(String durableName) throws JMSException { 164 if (destination instanceof Topic ) { 165 return session.createDurableSubscriber((Topic ) destination, durableName); 166 } 167 else { 168 return session.createConsumer(destination); 169 } 170 } 171 172 protected void createSession(String clientID) throws Exception { 173 connection = createConnection(); 174 connection.setClientID(clientID); 175 connection.start(); 176 177 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 178 destination = createDestination(); 179 } 180 } 181 | Popular Tags |