1 18 package org.apache.activemq; 19 20 import java.net.URI ; 21 import java.net.URISyntaxException ; 22 import java.util.ArrayList ; 23 import java.util.List ; 24 import javax.jms.Connection ; 25 import javax.jms.ConnectionFactory ; 26 import javax.jms.Destination ; 27 import javax.jms.JMSException ; 28 import javax.jms.Message ; 29 import javax.jms.MessageConsumer ; 30 import javax.jms.MessageListener ; 31 import javax.jms.MessageProducer ; 32 import javax.jms.ObjectMessage ; 33 import javax.jms.Session ; 34 import javax.jms.TextMessage ; 35 import org.apache.activemq.broker.BrokerFactory; 36 import org.apache.activemq.broker.BrokerService; 37 import org.apache.activemq.test.JmsResourceProvider; 38 import org.apache.activemq.test.TestSupport; 39 40 43 abstract public class JmsTransactionTestSupport extends TestSupport implements MessageListener { 44 45 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 46 .getLog(JmsTransactionTestSupport.class); 47 48 protected ConnectionFactory connectionFactory; 49 protected Connection connection; 50 protected Session session; 51 protected MessageConsumer consumer; 52 protected MessageProducer producer; 53 protected JmsResourceProvider resourceProvider; 54 protected Destination destination; 55 56 private final int messageCount = 5; 58 private final String messageText = "message"; 59 private List unackMessages = new ArrayList (messageCount); 60 private List ackMessages = new ArrayList (messageCount); 61 private boolean resendPhase = false; 62 protected int batchCount = 10; 63 protected int batchSize = 20; 64 65 protected BrokerService broker; 66 67 public JmsTransactionTestSupport() { 68 super(); 69 } 70 71 public JmsTransactionTestSupport(String name) { 72 super(name); 73 } 74 75 76 79 protected void setUp() throws Exception { 80 broker = createBroker(); 81 broker.start(); 82 83 resourceProvider = getJmsResourceProvider(); 84 topic = resourceProvider.isTopic(); 85 resourceProvider.setTransacted(true); 87 connectionFactory = resourceProvider.createConnectionFactory(); 88 reconnect(); 89 } 90 91 93 protected BrokerService createBroker() throws Exception , URISyntaxException { 94 return BrokerFactory.createBroker(new URI ("broker://()/localhost?persistent=false")); 95 } 96 97 100 protected void tearDown() throws Exception { 101 log.info("Closing down connection"); 102 103 session.close(); 104 session=null; 105 connection.close(); 106 connection=null; 107 broker.stop(); 108 broker=null; 109 110 log.info("Connection closed."); 111 } 112 113 protected abstract JmsResourceProvider getJmsResourceProvider(); 114 115 120 public void testSendReceiveTransactedBatches() throws Exception { 121 122 TextMessage message = session.createTextMessage("Batch Message"); 123 124 for (int j = 0; j < batchCount; j++) { 125 log.info("Producing bacth " + j + " of " + batchSize + " messages"); 126 127 for (int i = 0; i < batchSize; i++) { 128 producer.send(message); 129 } 130 131 session.commit(); 132 log.info("Consuming bacth " + j + " of " + batchSize + " messages"); 133 134 for (int i = 0; i < batchSize; i++) { 135 message = (TextMessage ) consumer.receive(1000 * 5); 136 assertNotNull("Received only " + i + " messages in batch " + j, message); 137 assertEquals("Batch Message", message.getText()); 138 } 139 140 session.commit(); 141 } 142 } 143 144 149 public void testSendRollback() throws Exception { 150 Message[] outbound = new Message[]{ 151 session.createTextMessage("First Message"), 152 session.createTextMessage("Second Message") 153 }; 154 155 producer.send(outbound[0]); 157 session.commit(); 158 159 producer.send(session.createTextMessage("I'm going to get rolled back.")); 161 session.rollback(); 162 163 producer.send(outbound[1]); 165 session.commit(); 166 167 ArrayList messages = new ArrayList (); 169 log.info("About to consume message 1"); 170 Message message = consumer.receive(1000); 171 messages.add(message); 172 log.info("Received: " + message); 173 174 log.info("About to consume message 2"); 176 message = consumer.receive(4000); 177 messages.add(message); 178 log.info("Received: " + message); 179 180 session.commit(); 182 Message inbound[] = new Message[messages.size()]; 183 messages.toArray(inbound); 184 assertTextMessagesEqual("Rollback did not work.", outbound, inbound); 185 } 186 187 192 public void testSendSessionClose() throws Exception { 193 Message[] outbound = new Message[]{ 194 session.createTextMessage("First Message"), 195 session.createTextMessage("Second Message") 196 }; 197 198 producer.send(outbound[0]); 200 session.commit(); 201 202 producer.send(session.createTextMessage("I'm going to get rolled back.")); 204 consumer.close(); 205 206 reconnectSession(); 207 208 producer.send(outbound[1]); 210 session.commit(); 211 212 ArrayList messages = new ArrayList (); 214 log.info("About to consume message 1"); 215 Message message = consumer.receive(1000); 216 messages.add(message); 217 log.info("Received: " + message); 218 219 log.info("About to consume message 2"); 221 message = consumer.receive(4000); 222 messages.add(message); 223 log.info("Received: " + message); 224 225 session.commit(); 227 Message inbound[] = new Message[messages.size()]; 228 messages.toArray(inbound); 229 assertTextMessagesEqual("Rollback did not work.", outbound, inbound); 230 } 231 232 237 public void testSendSessionAndConnectionClose() throws Exception { 238 Message[] outbound = new Message[]{ 239 session.createTextMessage("First Message"), 240 session.createTextMessage("Second Message") 241 }; 242 243 producer.send(outbound[0]); 245 session.commit(); 246 247 producer.send(session.createTextMessage("I'm going to get rolled back.")); 249 consumer.close(); 250 session.close(); 251 252 reconnect(); 253 254 producer.send(outbound[1]); 256 session.commit(); 257 258 ArrayList messages = new ArrayList (); 260 log.info("About to consume message 1"); 261 Message message = consumer.receive(1000); 262 messages.add(message); 263 log.info("Received: " + message); 264 265 log.info("About to consume message 2"); 267 message = consumer.receive(4000); 268 messages.add(message); 269 log.info("Received: " + message); 270 271 session.commit(); 273 Message inbound[] = new Message[messages.size()]; 274 messages.toArray(inbound); 275 assertTextMessagesEqual("Rollback did not work.", outbound, inbound); 276 } 277 278 283 public void testReceiveRollback() throws Exception { 284 Message[] outbound = new Message[]{ 285 session.createTextMessage("First Message"), 286 session.createTextMessage("Second Message") 287 }; 288 289 while (consumer.receive(1000) != null) { 291 } 292 session.commit(); 293 294 producer.send(outbound[0]); 296 producer.send(outbound[1]); 297 session.commit(); 298 299 log.info("Sent 0: " + outbound[0]); 300 log.info("Sent 1: " + outbound[1]); 301 302 ArrayList messages = new ArrayList (); 303 Message message = consumer.receive(1000); 304 messages.add(message); 305 assertEquals(outbound[0], message); 306 session.commit(); 307 308 message = consumer.receive(1000); 310 assertNotNull(message); 311 assertEquals(outbound[1], message); 312 session.rollback(); 313 314 message = consumer.receive(5000); 317 assertNotNull("Should have re-received the message again!", message); 318 messages.add(message); 319 session.commit(); 320 321 Message inbound[] = new Message[messages.size()]; 322 messages.toArray(inbound); 323 assertTextMessagesEqual("Rollback did not work", outbound, inbound); 324 } 325 326 331 public void testReceiveTwoThenRollback() throws Exception { 332 Message[] outbound = new Message[]{ 333 session.createTextMessage("First Message"), 334 session.createTextMessage("Second Message") 335 }; 336 337 while (consumer.receive(1000) != null) { 339 } 340 session.commit(); 341 342 producer.send(outbound[0]); 344 producer.send(outbound[1]); 345 session.commit(); 346 347 log.info("Sent 0: " + outbound[0]); 348 log.info("Sent 1: " + outbound[1]); 349 350 ArrayList messages = new ArrayList (); 351 Message message = consumer.receive(1000); 352 assertEquals(outbound[0], message); 353 354 message = consumer.receive(1000); 355 assertNotNull(message); 356 assertEquals(outbound[1], message); 357 session.rollback(); 358 359 message = consumer.receive(5000); 362 assertNotNull("Should have re-received the first message again!", message); 363 messages.add(message); 364 assertEquals(outbound[0], message); 365 message = consumer.receive(5000); 366 assertNotNull("Should have re-received the second message again!", message); 367 messages.add(message); 368 assertEquals(outbound[1], message); 369 370 assertNull(consumer.receiveNoWait()); 371 session.commit(); 372 373 Message inbound[] = new Message[messages.size()]; 374 messages.toArray(inbound); 375 assertTextMessagesEqual("Rollback did not work", outbound, inbound); 376 } 377 378 383 public void testSendReceiveWithPrefetchOne() throws Exception { 384 setPrefetchToOne(); 385 Message[] outbound = new Message[]{ 386 session.createTextMessage("First Message"), 387 session.createTextMessage("Second Message"), 388 session.createTextMessage("Third Message"), 389 session.createTextMessage("Fourth Message") 390 }; 391 392 for (int i = 0; i < outbound.length; i++) { 393 producer.send(outbound[i]); 395 } 396 session.commit(); 397 398 for (int i = 0; i < outbound.length; i++) { 400 log.info("About to consume message 1"); 401 Message message = consumer.receive(1000); 402 assertNotNull(message); 403 log.info("Received: " + message); 404 } 405 406 session.commit(); 408 } 409 410 415 public void testReceiveTwoThenRollbackManyTimes() throws Exception { 416 for (int i = 0; i < 5; i++) 417 testReceiveTwoThenRollback(); 418 } 419 420 426 public void testSendRollbackWithPrefetchOfOne() throws Exception { 427 setPrefetchToOne(); 428 testSendRollback(); 429 } 430 431 437 public void testReceiveRollbackWithPrefetchOfOne() throws Exception { 438 setPrefetchToOne(); 439 testReceiveRollback(); 440 } 441 442 447 public void testCloseConsumerBeforeCommit() throws Exception { 448 TextMessage [] outbound = new TextMessage []{ 449 session.createTextMessage("First Message"), 450 session.createTextMessage("Second Message") 451 }; 452 453 while (consumer.receiveNoWait() != null) { 455 } 456 457 session.commit(); 458 459 producer.send(outbound[0]); 461 producer.send(outbound[1]); 462 session.commit(); 463 log.info("Sent 0: " + outbound[0]); 464 log.info("Sent 1: " + outbound[1]); 465 466 TextMessage message = (TextMessage ) consumer.receive(1000); 467 assertEquals(outbound[0].getText(), message.getText()); 468 consumer.close(); 471 session.commit(); 472 473 consumer = resourceProvider.createConsumer(session, destination); 475 log.info("Created consumer: " + consumer); 476 477 message = (TextMessage ) consumer.receive(1000); 478 assertEquals(outbound[1].getText(), message.getText()); 479 session.commit(); 480 } 481 482 483 public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception { 484 ArrayList list = new ArrayList (); 485 list.add("First"); 486 Message outbound = session.createObjectMessage(list); 487 outbound.setStringProperty("foo", "abc"); 488 489 producer.send(outbound); 490 session.commit(); 491 492 log.info("About to consume message 1"); 493 Message message = consumer.receive(5000); 494 495 List body = assertReceivedObjectMessageWithListBody(message); 496 497 try { 499 message.setStringProperty("foo", "def"); 500 fail("Cannot change properties of the object!"); 501 } 502 catch (JMSException e) { 503 log.info("Caught expected exception: " + e, e); 504 } 505 body.clear(); 506 body.add("This should never be seen!"); 507 session.rollback(); 508 509 message = consumer.receive(5000); 510 List secondBody = assertReceivedObjectMessageWithListBody(message); 511 assertNotSame("Second call should return a different body", secondBody, body); 512 session.commit(); 513 } 514 515 protected List assertReceivedObjectMessageWithListBody(Message message) throws JMSException { 516 assertNotNull("Should have received a message!", message); 517 assertEquals("foo header", "abc", message.getStringProperty("foo")); 518 519 assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage ); 520 ObjectMessage objectMessage = (ObjectMessage ) message; 521 List body = (List ) objectMessage.getObject(); 522 log.info("Received body: " + body); 523 524 assertEquals("Size of list should be 1", 1, body.size()); 525 assertEquals("element 0 of list", "First", body.get(0)); 526 return body; 527 } 528 529 534 protected void reconnect() throws JMSException { 535 536 if (connection != null) { 537 connection.close(); 539 } 540 session = null; 541 connection = resourceProvider.createConnection(connectionFactory); 542 reconnectSession(); 543 connection.start(); 544 } 545 546 551 protected void reconnectSession() throws JMSException { 552 if (session != null) { 553 session.close(); 554 } 555 556 session = resourceProvider.createSession(connection); 557 destination = resourceProvider.createDestination(session, getSubject()); 558 producer = resourceProvider.createProducer(session, destination); 559 consumer = resourceProvider.createConsumer(session, destination); 560 } 561 562 565 protected void setPrefetchToOne() { 566 ActiveMQPrefetchPolicy prefetchPolicy = ((ActiveMQConnection) connection).getPrefetchPolicy(); 567 prefetchPolicy.setQueuePrefetch(1); 568 prefetchPolicy.setTopicPrefetch(1); 569 prefetchPolicy.setDurableTopicPrefetch(1); 570 prefetchPolicy.setOptimizeDurableTopicPrefetch(1); 571 } 572 573 public void testMessageListener() throws Exception { 574 for(int i = 0;i<messageCount;i++) { 576 producer.send(session.createTextMessage(messageText+i)); 577 } 578 session.commit(); 579 consumer.setMessageListener(this); 580 waitReceiveUnack(); 582 assertEquals(unackMessages.size(),messageCount); 583 waitReceiveAck(); 585 assertEquals(ackMessages.size(),messageCount); 586 consumer.setMessageListener(null); 588 assertNull(consumer.receive(500)); 589 reconnect(); 590 } 591 592 public void onMessage(Message message) { 593 if(!resendPhase) { 594 unackMessages.add(message); 595 if(unackMessages.size() == messageCount) { 596 try { 597 session.rollback(); 598 resendPhase = true; 599 } catch (Exception e) { 600 e.printStackTrace(); 601 } 602 } 603 } else { 604 ackMessages.add(message); 605 if(ackMessages.size() == messageCount) { 606 try { 607 session.commit(); 608 } catch (Exception e) { 609 e.printStackTrace(); 610 } 611 } 612 } 613 } 614 615 private void waitReceiveUnack() throws Exception { 616 for(int i=0; i < 100 && !resendPhase; i++) { 617 Thread.sleep(100); 618 } 619 assertTrue(resendPhase); 620 } 621 622 private void waitReceiveAck() throws Exception { 623 for(int i=0; i < 100 && ackMessages.size() < messageCount; i++) { 624 Thread.sleep(100); 625 } 626 assertFalse(ackMessages.size() < messageCount); 627 } 628 } 629 | Popular Tags |