1 18 package org.apache.activemq; 19 20 import javax.jms.Connection ; 21 import javax.jms.DeliveryMode ; 22 import javax.jms.Destination ; 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.MessageConsumer ; 26 import javax.jms.MessageProducer ; 27 import javax.jms.Queue ; 28 import javax.jms.Session ; 29 import javax.jms.TextMessage ; 30 import javax.jms.Topic ; 31 32 import org.apache.activemq.ActiveMQConnectionFactory; 33 34 import junit.framework.Test; 35 import junit.framework.TestCase; 36 import junit.framework.TestSuite; 37 38 41 public class JmsRedeliveredTest extends TestCase { 42 43 private Connection connection; 44 45 48 protected void setUp() throws Exception { 49 connection = createConnection(); 50 } 51 52 55 protected void tearDown() throws Exception { 56 if (connection != null) { 57 connection.close(); 58 connection = null; 59 } 60 } 61 62 68 protected Connection createConnection() throws Exception { 69 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 70 return factory.createConnection(); 71 } 72 73 78 public void testQueueSessionCloseMarksMessageRedelivered() throws JMSException { 79 connection.start(); 80 81 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 82 Queue queue = session.createQueue("queue-"+getName()); 83 MessageProducer producer = createProducer(session, queue); 84 producer.send(createTextMessage(session)); 85 86 MessageConsumer consumer = session.createConsumer(queue); 88 Message msg = consumer.receive(1000); 89 assertNotNull(msg); 90 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); 91 93 session.close(); 95 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 96 97 consumer = session.createConsumer(queue); 99 msg = consumer.receive(2000); 100 assertNotNull(msg); 101 assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); 102 msg.acknowledge(); 103 104 session.close(); 105 } 106 107 113 public void testQueueRecoverMarksMessageRedelivered() throws JMSException { 114 connection.start(); 115 116 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 117 Queue queue = session.createQueue("queue-"+getName()); 118 MessageProducer producer = createProducer(session, queue); 119 producer.send(createTextMessage(session)); 120 121 MessageConsumer consumer = session.createConsumer(queue); 123 Message msg = consumer.receive(1000); 124 assertNotNull(msg); 125 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); 126 128 session.recover(); 130 131 msg = consumer.receive(2000); 133 assertNotNull(msg); 134 assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); 135 msg.acknowledge(); 136 137 session.close(); 138 } 139 140 146 public void testQueueRollbackMarksMessageRedelivered() throws JMSException { 147 connection.start(); 148 149 Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 150 Queue queue = session.createQueue("queue-"+getName()); 151 MessageProducer producer = createProducer(session, queue); 152 producer.send(createTextMessage(session)); 153 session.commit(); 154 155 MessageConsumer consumer = session.createConsumer(queue); 157 Message msg = consumer.receive(1000); 158 assertNotNull(msg); 159 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); 160 161 session.rollback(); 163 164 msg = consumer.receive(2000); 166 assertNotNull(msg); 167 assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); 168 169 session.commit(); 170 session.close(); 171 } 172 173 181 public void testDurableTopicSessionCloseMarksMessageRedelivered() throws JMSException { 182 connection.setClientID(getName()); 183 connection.start(); 184 185 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 186 Topic topic = session.createTopic("topic-"+getName()); 187 MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); 188 189 MessageProducer producer = session.createProducer(topic); 192 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 193 producer.send(createTextMessage(session)); 194 195 Message msg = consumer.receive(1000); 197 assertNotNull(msg); 198 assertFalse("Message should not be re-delivered.", msg.getJMSRedelivered()); 199 201 session.close(); 203 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 204 205 consumer = session.createDurableSubscriber(topic, "sub1"); 207 msg = consumer.receive(2000); 208 assertNotNull(msg); 209 assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); 210 msg.acknowledge(); 211 212 session.close(); 213 } 214 215 222 public void testDurableTopicRecoverMarksMessageRedelivered() throws JMSException { 223 connection.setClientID(getName()); 224 connection.start(); 225 226 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 227 Topic topic = session.createTopic("topic-"+getName()); 228 MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); 229 230 MessageProducer producer = createProducer(session, topic); 231 producer.send(createTextMessage(session)); 232 233 Message msg = consumer.receive(1000); 235 assertNotNull(msg); 236 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); 237 239 session.recover(); 241 242 msg = consumer.receive(2000); 244 assertNotNull(msg); 245 assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); 246 msg.acknowledge(); 247 248 session.close(); 249 } 250 251 257 public void testDurableTopicRollbackMarksMessageRedelivered() throws JMSException { 258 connection.setClientID(getName()); 259 connection.start(); 260 261 Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 262 Topic topic = session.createTopic("topic-"+getName()); 263 MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); 264 265 MessageProducer producer = createProducer(session, topic); 266 producer.send(createTextMessage(session)); 267 session.commit(); 268 269 Message msg = consumer.receive(1000); 271 assertNotNull(msg); 272 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); 273 274 session.rollback(); 276 277 msg = consumer.receive(2000); 279 assertNotNull(msg); 280 assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); 281 282 283 session.commit(); 284 session.close(); 285 } 286 287 292 public void testTopicRecoverMarksMessageRedelivered() throws JMSException { 293 294 connection.setClientID(getName()); 295 connection.start(); 296 297 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 298 Topic topic = session.createTopic("topic-"+getName()); 299 MessageConsumer consumer = session.createConsumer(topic); 300 301 MessageProducer producer = createProducer(session, topic); 302 producer.send(createTextMessage(session)); 303 304 Message msg = consumer.receive(1000); 306 assertNotNull(msg); 307 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); 308 310 session.recover(); 312 313 msg = consumer.receive(2000); 315 assertNotNull(msg); 316 assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); 317 msg.acknowledge(); 318 319 session.close(); 320 } 321 322 328 public void testTopicRollbackMarksMessageRedelivered() throws JMSException { 329 connection.setClientID(getName()); 330 connection.start(); 331 332 Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 333 Topic topic = session.createTopic("topic-"+getName()); 334 MessageConsumer consumer = session.createConsumer(topic); 335 336 MessageProducer producer = createProducer(session, topic); 337 producer.send(createTextMessage(session)); 338 session.commit(); 339 340 Message msg = consumer.receive(1000); 342 assertNotNull(msg); 343 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); 344 345 session.rollback(); 347 348 msg = consumer.receive(2000); 350 assertNotNull(msg); 351 assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); 352 353 session.commit(); 354 session.close(); 355 } 356 357 364 private TextMessage createTextMessage(Session session) throws JMSException { 365 return session.createTextMessage("Hello"); 366 } 367 368 376 private MessageProducer createProducer(Session session, Destination queue) throws JMSException { 377 MessageProducer producer = session.createProducer(queue); 378 producer.setDeliveryMode(getDeliveryMode()); 379 return producer; 380 } 381 382 387 protected int getDeliveryMode() { 388 return DeliveryMode.PERSISTENT; 389 } 390 391 394 static final public class PersistentCase extends JmsRedeliveredTest { 395 396 401 protected int getDeliveryMode() { 402 return DeliveryMode.PERSISTENT; 403 } 404 } 405 406 409 static final public class TransientCase extends JmsRedeliveredTest { 410 411 416 protected int getDeliveryMode() { 417 return DeliveryMode.NON_PERSISTENT; 418 } 419 } 420 421 public static Test suite() { 422 TestSuite suite= new TestSuite(); 423 suite.addTestSuite(PersistentCase.class); 424 suite.addTestSuite(TransientCase.class); 425 return suite; 426 } 427 } 428 | Popular Tags |