1 18 package org.apache.activemq; 19 20 import javax.jms.MessageConsumer ; 21 import javax.jms.MessageProducer ; 22 import javax.jms.Session ; 23 import javax.jms.TextMessage ; 24 25 import junit.framework.Test; 26 27 import org.apache.activemq.command.ActiveMQQueue; 28 29 34 public class RedeliveryPolicyTest extends JmsTestSupport { 35 36 public static Test suite() { 37 return suite(RedeliveryPolicyTest.class); 38 } 39 40 public static void main(String [] args) { 41 junit.textui.TestRunner.run(suite()); 42 } 43 44 47 public void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception { 48 49 RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 51 policy.setInitialRedeliveryDelay(500); 52 policy.setBackOffMultiplier((short) 2); 53 policy.setUseExponentialBackOff(true); 54 55 connection.start(); 56 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 57 ActiveMQQueue destination = new ActiveMQQueue(getName()); 58 MessageProducer producer = session.createProducer(destination); 59 60 MessageConsumer consumer = session.createConsumer(destination); 61 62 producer.send(session.createTextMessage("1st")); 64 producer.send(session.createTextMessage("2nd")); 65 session.commit(); 66 67 TextMessage m; 68 m = (TextMessage )consumer.receive(1000); 69 assertNotNull(m); 70 assertEquals("1st", m.getText()); 71 session.rollback(); 72 73 m = (TextMessage )consumer.receive(100); 75 assertNotNull(m); 76 session.rollback(); 77 78 m = (TextMessage )consumer.receive(100); 80 assertNull(m); 81 82 m = (TextMessage )consumer.receive(700); 83 assertNotNull(m); 84 assertEquals("1st", m.getText()); 85 session.rollback(); 86 87 m = (TextMessage )consumer.receive(100); 89 assertNull(m); 90 m = (TextMessage )consumer.receive(500); 91 assertNull(m); 92 m = (TextMessage )consumer.receive(700); 93 assertNotNull(m); 94 assertEquals("1st", m.getText()); 95 96 } 97 98 99 102 public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception { 103 104 RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 106 policy.setInitialRedeliveryDelay(500); 107 108 connection.start(); 109 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 110 ActiveMQQueue destination = new ActiveMQQueue(getName()); 111 MessageProducer producer = session.createProducer(destination); 112 113 MessageConsumer consumer = session.createConsumer(destination); 114 115 producer.send(session.createTextMessage("1st")); 117 producer.send(session.createTextMessage("2nd")); 118 session.commit(); 119 120 TextMessage m; 121 m = (TextMessage )consumer.receive(1000); 122 assertNotNull(m); 123 assertEquals("1st", m.getText()); 124 session.rollback(); 125 126 m = (TextMessage )consumer.receive(100); 128 assertNotNull(m); 129 session.rollback(); 130 131 m = (TextMessage )consumer.receive(100); 133 assertNull(m); 134 m = (TextMessage )consumer.receive(700); 135 assertNotNull(m); 136 assertEquals("1st", m.getText()); 137 session.rollback(); 138 139 m = (TextMessage )consumer.receive(100); 142 assertNull(m); 143 m = (TextMessage )consumer.receive(700); 144 assertNotNull(m); 145 assertEquals("1st", m.getText()); 146 147 } 148 149 152 public void testDLQHandling() throws Exception { 153 154 RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 156 policy.setInitialRedeliveryDelay(100); 157 policy.setUseExponentialBackOff(false); 158 policy.setMaximumRedeliveries(2); 159 160 connection.start(); 161 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 162 ActiveMQQueue destination = new ActiveMQQueue("TEST"); 163 MessageProducer producer = session.createProducer(destination); 164 165 MessageConsumer consumer = session.createConsumer(destination); 166 MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); 167 168 producer.send(session.createTextMessage("1st")); 170 producer.send(session.createTextMessage("2nd")); 171 session.commit(); 172 173 TextMessage m; 174 m = (TextMessage )consumer.receive(1000); 175 assertNotNull(m); 176 assertEquals("1st", m.getText()); 177 session.rollback(); 178 179 m = (TextMessage )consumer.receive(1000); 180 assertNotNull(m); 181 assertEquals("1st", m.getText()); 182 session.rollback(); 183 184 m = (TextMessage )consumer.receive(1000); 185 assertNotNull(m); 186 assertEquals("1st", m.getText()); 187 session.rollback(); 188 189 m = (TextMessage )consumer.receive(1000); 191 assertNotNull(m); 192 assertEquals("2nd", m.getText()); 193 session.commit(); 194 195 m = (TextMessage )dlqConsumer.receive(1000); 197 assertNotNull(m); 198 assertEquals("1st", m.getText()); 199 session.commit(); 200 201 } 202 203 204 207 public void testInfiniteMaximumNumberOfRedeliveries() throws Exception { 208 209 RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 211 policy.setInitialRedeliveryDelay(100); 212 policy.setUseExponentialBackOff(false); 213 policy.setMaximumRedeliveries(-1); 215 216 217 connection.start(); 218 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 219 ActiveMQQueue destination = new ActiveMQQueue("TEST"); 220 MessageProducer producer = session.createProducer(destination); 221 222 MessageConsumer consumer = session.createConsumer(destination); 223 224 producer.send(session.createTextMessage("1st")); 226 producer.send(session.createTextMessage("2nd")); 227 session.commit(); 228 229 TextMessage m; 230 231 m = (TextMessage )consumer.receive(1000); 232 assertNotNull(m); 233 assertEquals("1st", m.getText()); 234 session.rollback(); 235 236 m = (TextMessage )consumer.receive(1000); 238 assertNotNull(m); 239 assertEquals("1st", m.getText()); 240 session.rollback(); 241 242 m = (TextMessage )consumer.receive(1000); 243 assertNotNull(m); 244 assertEquals("1st", m.getText()); 245 session.rollback(); 246 247 m = (TextMessage )consumer.receive(1000); 248 assertNotNull(m); 249 assertEquals("1st", m.getText()); 250 session.rollback(); 251 252 m = (TextMessage )consumer.receive(1000); 253 assertNotNull(m); 254 assertEquals("1st", m.getText()); 255 session.rollback(); 256 257 m = (TextMessage )consumer.receive(1000); 258 assertNotNull(m); 259 assertEquals("1st", m.getText()); 260 session.commit(); 261 262 m = (TextMessage )consumer.receive(1000); 263 assertNotNull(m); 264 assertEquals("2nd", m.getText()); 265 session.commit(); 266 267 } 268 269 272 public void testZeroMaximumNumberOfRedeliveries() throws Exception { 273 274 RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 276 policy.setInitialRedeliveryDelay(100); 277 policy.setUseExponentialBackOff(false); 278 policy.setMaximumRedeliveries(0); 280 281 connection.start(); 282 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 283 ActiveMQQueue destination = new ActiveMQQueue("TEST"); 284 MessageProducer producer = session.createProducer(destination); 285 286 MessageConsumer consumer = session.createConsumer(destination); 287 288 producer.send(session.createTextMessage("1st")); 290 producer.send(session.createTextMessage("2nd")); 291 session.commit(); 292 293 TextMessage m; 294 m = (TextMessage )consumer.receive(1000); 295 assertNotNull(m); 296 assertEquals("1st", m.getText()); 297 session.rollback(); 298 299 m = (TextMessage )consumer.receive(1000); 301 assertNotNull(m); 302 assertEquals("2nd", m.getText()); 303 session.commit(); 304 305 306 307 } 308 } 309 | Popular Tags |