1 18 package org.apache.activemq; 19 20 import javax.jms.Connection ; 21 import javax.jms.DeliveryMode ; 22 import javax.jms.Destination ; 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.MessageConsumer ; 26 import javax.jms.MessageListener ; 27 import javax.jms.MessageProducer ; 28 import javax.jms.Queue ; 29 import javax.jms.Session ; 30 import javax.jms.TextMessage ; 31 32 import junit.framework.TestCase; 33 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 37 public class MessageListenerRedeliveryTest extends TestCase { 38 39 private Log log = LogFactory.getLog(getClass()); 40 41 private Connection connection; 42 43 protected void setUp() throws Exception { 44 connection = createConnection(); 45 } 46 47 50 protected void tearDown() throws Exception { 51 if (connection != null) { 52 connection.close(); 53 connection = null; 54 } 55 } 56 57 protected RedeliveryPolicy getRedeliveryPolicy() { 58 RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 59 redeliveryPolicy.setInitialRedeliveryDelay(1000); 60 redeliveryPolicy.setMaximumRedeliveries(3); 61 redeliveryPolicy.setBackOffMultiplier((short) 2); 62 redeliveryPolicy.setUseExponentialBackOff(true); 63 return redeliveryPolicy; 64 } 65 66 protected Connection createConnection() throws Exception { 67 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 68 factory.setRedeliveryPolicy(getRedeliveryPolicy()); 69 return factory.createConnection(); 70 } 71 72 private class TestMessageListener implements MessageListener { 73 private Session session; 74 75 public int counter = 0; 76 77 public TestMessageListener(Session session) { 78 this.session = session; 79 } 80 81 public void onMessage(Message message) { 82 try { 83 log.info("Message Received: " + message); 84 counter++; 85 if (counter <= 4) { 86 log.info("Message Rollback."); 87 session.rollback(); 88 } else { 89 log.info("Message Commit."); 90 message.acknowledge(); 91 session.commit(); 92 } 93 } catch (JMSException e) { 94 log.error("Error when rolling back transaction"); 95 } 96 } 97 } 98 99 public void testQueueRollbackConsumerListener() throws JMSException { 100 connection.start(); 101 102 Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 103 Queue queue = session.createQueue("queue-" + getName()); 104 MessageProducer producer = createProducer(session, queue); 105 Message message = createTextMessage(session); 106 producer.send(message); 107 session.commit(); 108 109 MessageConsumer consumer = session.createConsumer(queue); 110 111 ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer; 112 mc.setRedeliveryPolicy(getRedeliveryPolicy()); 113 114 TestMessageListener listener = new TestMessageListener(session); 115 consumer.setMessageListener(listener); 116 117 try { 118 Thread.sleep(500); 119 } catch (InterruptedException e) { 120 121 } 122 123 assertEquals(2, listener.counter); 126 127 try { 128 Thread.sleep(1000); 129 } catch (InterruptedException e) { 130 131 } 132 assertEquals(3, listener.counter); 134 135 try { 136 Thread.sleep(2000); 137 } catch (InterruptedException e) { 138 139 } 140 assertEquals(4, listener.counter); 142 143 producer.send(createTextMessage(session)); 145 session.commit(); 146 147 try { 148 Thread.sleep(500); 149 } catch (InterruptedException e) { 150 } 152 assertEquals(5, listener.counter); 154 155 try { 156 Thread.sleep(1500); 157 } catch (InterruptedException e) { 158 } 160 assertEquals(5, listener.counter); 162 163 session.close(); 164 } 165 166 public void testQueueRollbackSessionListener() throws JMSException { 167 connection.start(); 168 169 Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 170 Queue queue = session.createQueue("queue-" + getName()); 171 MessageProducer producer = createProducer(session, queue); 172 Message message = createTextMessage(session); 173 producer.send(message); 174 session.commit(); 175 176 MessageConsumer consumer = session.createConsumer(queue); 177 178 ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer; 179 mc.setRedeliveryPolicy(getRedeliveryPolicy()); 180 181 TestMessageListener listener = new TestMessageListener(session); 182 consumer.setMessageListener(listener); 183 184 try { 185 Thread.sleep(500); 186 } catch (InterruptedException e) { 187 188 } 189 assertEquals(2, listener.counter); 191 192 try { 193 Thread.sleep(1000); 194 } catch (InterruptedException e) { 195 196 } 197 assertEquals(3, listener.counter); 199 200 try { 201 Thread.sleep(2000); 202 } catch (InterruptedException e) { 203 204 } 205 assertEquals(4, listener.counter); 207 208 producer.send(createTextMessage(session)); 210 session.commit(); 211 212 try { 213 Thread.sleep(500); 214 } catch (InterruptedException e) { 215 } 217 assertEquals(5, listener.counter); 219 220 try { 221 Thread.sleep(1500); 222 } catch (InterruptedException e) { 223 } 225 assertEquals(5, listener.counter); 227 228 session.close(); 229 } 230 231 private TextMessage createTextMessage(Session session) throws JMSException { 232 return session.createTextMessage("Hello"); 233 } 234 235 private MessageProducer createProducer(Session session, Destination queue) throws JMSException { 236 MessageProducer producer = session.createProducer(queue); 237 producer.setDeliveryMode(getDeliveryMode()); 238 return producer; 239 } 240 241 protected int getDeliveryMode() { 242 return DeliveryMode.PERSISTENT; 243 } 244 } 245 | Popular Tags |