1 18 package org.apache.activemq; 19 20 import javax.jms.BytesMessage ; 21 import javax.jms.DeliveryMode ; 22 import javax.jms.Message ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.MessageListener ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Session ; 27 import javax.jms.TextMessage ; 28 import javax.jms.Topic ; 29 30 import junit.framework.Test; 31 32 import org.apache.activemq.ActiveMQConnection; 33 import org.apache.activemq.ActiveMQMessageConsumer; 34 import org.apache.activemq.command.ActiveMQDestination; 35 import org.apache.activemq.command.ActiveMQQueue; 36 37 import java.util.concurrent.CountDownLatch ; 38 import java.util.concurrent.TimeUnit ; 39 import java.util.concurrent.atomic.AtomicInteger ; 40 41 46 public class JMSConsumerTest extends JmsTestSupport { 47 48 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 49 .getLog(JMSConsumerTest.class); 50 51 public static Test suite() { 52 return suite(JMSConsumerTest.class); 53 } 54 55 public static void main(String [] args) { 56 junit.textui.TestRunner.run(suite()); 57 } 58 59 public ActiveMQDestination destination; 60 public int deliveryMode; 61 public int prefetch; 62 public int ackMode; 63 public byte destinationType; 64 public boolean durableConsumer; 65 66 public void initCombosForTestMessageListenerWithConsumerCanBeStopped() { 67 addCombinationValues("deliveryMode", new Object [] { 68 new Integer (DeliveryMode.NON_PERSISTENT), 69 new Integer (DeliveryMode.PERSISTENT) }); 70 addCombinationValues("destinationType", new Object [] { 71 new Byte (ActiveMQDestination.QUEUE_TYPE), 72 new Byte (ActiveMQDestination.TOPIC_TYPE), 73 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 74 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) }); 75 } 76 public void testMessageListenerWithConsumerCanBeStopped() throws Exception { 77 78 final AtomicInteger counter = new AtomicInteger (0); 79 final CountDownLatch done1 = new CountDownLatch (1); 80 final CountDownLatch done2 = new CountDownLatch (1); 81 82 connection.start(); 84 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 85 destination = createDestination(session, destinationType); 86 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination); 87 consumer.setMessageListener(new MessageListener () { 88 public void onMessage(Message m) { 89 counter.incrementAndGet(); 90 if( counter.get()==1 ) 91 done1.countDown(); 92 if( counter.get()==2 ) 93 done2.countDown(); 94 } 95 }); 96 97 sendMessages(session, destination, 1); 99 assertTrue(done1.await(1, TimeUnit.SECONDS)); 100 assertEquals(1, counter.get()); 101 102 consumer.stop(); 104 105 sendMessages(session, destination, 1); 107 assertFalse(done2.await(1, TimeUnit.SECONDS)); 108 assertEquals(1, counter.get()); 109 110 consumer.start(); 112 assertTrue(done2.await(1, TimeUnit.SECONDS)); 113 assertEquals(2, counter.get()); 114 } 115 116 public void initCombosForTestMutiReceiveWithPrefetch1() { 117 addCombinationValues("deliveryMode", new Object [] { 118 new Integer (DeliveryMode.NON_PERSISTENT), 119 new Integer (DeliveryMode.PERSISTENT) }); 120 addCombinationValues("ackMode", new Object [] { 121 new Integer (Session.AUTO_ACKNOWLEDGE), 122 new Integer (Session.DUPS_OK_ACKNOWLEDGE), 123 new Integer (Session.CLIENT_ACKNOWLEDGE) }); 124 addCombinationValues("destinationType", new Object [] { 125 new Byte (ActiveMQDestination.QUEUE_TYPE), 126 new Byte (ActiveMQDestination.TOPIC_TYPE), 127 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 128 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) 129 }); 130 } 131 132 public void testMutiReceiveWithPrefetch1() throws Exception { 133 134 connection.getPrefetchPolicy().setAll(1); 136 connection.start(); 137 138 Session session = connection.createSession(false, ackMode); 140 destination = createDestination(session, destinationType); 141 MessageConsumer consumer = session.createConsumer(destination); 142 143 sendMessages(session, destination, 4); 145 146 Message message = null; 148 for (int i = 0; i < 4; i++) { 149 message = consumer.receive(1000); 150 assertNotNull(message); 151 } 152 assertNull(consumer.receiveNoWait()); 153 message.acknowledge(); 154 } 155 156 public void initCombosForTestDurableConsumerSelectorChange() { 157 addCombinationValues("deliveryMode", new Object [] { 158 new Integer (DeliveryMode.NON_PERSISTENT), 159 new Integer (DeliveryMode.PERSISTENT) }); 160 addCombinationValues("destinationType", new Object [] { 161 new Byte (ActiveMQDestination.TOPIC_TYPE)}); 162 } 163 public void testDurableConsumerSelectorChange() throws Exception { 164 165 connection.setClientID("test"); 167 connection.start(); 168 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 169 destination = createDestination(session, destinationType); 170 MessageProducer producer = session.createProducer(destination); 171 producer.setDeliveryMode(deliveryMode); 172 MessageConsumer consumer = session.createDurableSubscriber((Topic )destination, "test", "color='red'", false); 173 174 TextMessage message = session.createTextMessage("1st"); 176 message.setStringProperty("color", "red"); 177 producer.send(message); 178 179 Message m = consumer.receive(1000); 180 assertNotNull(m); 181 assertEquals("1st", ((TextMessage )m).getText()); 182 183 consumer.close(); 185 consumer = session.createDurableSubscriber((Topic )destination, "test", "color='blue'", false); 186 187 message = session.createTextMessage("2nd"); 188 message.setStringProperty("color", "red"); 189 producer.send(message); 190 message = session.createTextMessage("3rd"); 191 message.setStringProperty("color", "blue"); 192 producer.send(message); 193 194 m = consumer.receive(1000); 196 assertNotNull(m); 197 assertEquals("3rd", ((TextMessage )m).getText()); 198 199 assertNull(consumer.receiveNoWait()); 200 } 201 202 public void initCombosForTestSendReceiveBytesMessage() { 203 addCombinationValues("deliveryMode", new Object [] { new Integer (DeliveryMode.NON_PERSISTENT), 204 new Integer (DeliveryMode.PERSISTENT) }); 205 addCombinationValues("destinationType", new Object [] { new Byte (ActiveMQDestination.QUEUE_TYPE), 206 new Byte (ActiveMQDestination.TOPIC_TYPE), new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 207 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) }); 208 } 209 210 public void testSendReceiveBytesMessage() throws Exception { 211 212 connection.start(); 214 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 215 destination = createDestination(session, destinationType); 216 MessageConsumer consumer = session.createConsumer(destination); 217 MessageProducer producer = session.createProducer(destination); 218 219 BytesMessage message = session.createBytesMessage(); 220 message.writeBoolean(true); 221 message.writeBoolean(false); 222 producer.send(message); 223 224 BytesMessage m = (BytesMessage )consumer.receive(1000); 226 assertNotNull(m); 227 assertTrue(m.readBoolean()); 228 assertFalse(m.readBoolean()); 229 230 assertNull(consumer.receiveNoWait()); 231 } 232 233 234 public void initCombosForTestSetMessageListenerAfterStart() { 235 addCombinationValues("deliveryMode", new Object [] { 236 new Integer (DeliveryMode.NON_PERSISTENT), 237 new Integer (DeliveryMode.PERSISTENT) }); 238 addCombinationValues("destinationType", new Object [] { 239 new Byte (ActiveMQDestination.QUEUE_TYPE), 240 new Byte (ActiveMQDestination.TOPIC_TYPE), 241 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 242 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) }); 243 } 244 public void testSetMessageListenerAfterStart() throws Exception { 245 246 final AtomicInteger counter = new AtomicInteger (0); 247 final CountDownLatch done = new CountDownLatch (1); 248 249 connection.start(); 251 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 252 destination = createDestination(session, destinationType); 253 MessageConsumer consumer = session.createConsumer(destination); 254 255 sendMessages(session, destination, 4); 257 258 consumer.setMessageListener(new MessageListener () { 260 public void onMessage(Message m) { 261 counter.incrementAndGet(); 262 if( counter.get()==4 ) 263 done.countDown(); 264 } 265 }); 266 267 assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); 268 Thread.sleep(200); 269 270 assertEquals(4, counter.get()); 272 } 273 274 public void initCombosForTestMessageListenerUnackedWithPrefetch1StayInQueue() { 275 addCombinationValues("deliveryMode", new Object [] { 276 new Integer (DeliveryMode.NON_PERSISTENT), 277 new Integer (DeliveryMode.PERSISTENT) 278 }); 279 addCombinationValues("ackMode", new Object [] { 280 new Integer (Session.AUTO_ACKNOWLEDGE), 281 new Integer (Session.DUPS_OK_ACKNOWLEDGE), 282 new Integer (Session.CLIENT_ACKNOWLEDGE) 283 }); 284 addCombinationValues("destinationType", new Object [] { new Byte (ActiveMQDestination.QUEUE_TYPE), }); 285 } 286 287 public void testMessageListenerUnackedWithPrefetch1StayInQueue() throws Exception { 288 289 final AtomicInteger counter = new AtomicInteger (0); 290 final CountDownLatch sendDone = new CountDownLatch (1); 291 final CountDownLatch got2Done = new CountDownLatch (1); 292 293 connection.getPrefetchPolicy().setAll(1); 295 connection.setOptimizedMessageDispatch(false); 298 connection.start(); 299 300 Session session = connection.createSession(false, ackMode); 302 destination = createDestination(session, destinationType); 303 MessageConsumer consumer = session.createConsumer(destination); 304 consumer.setMessageListener(new MessageListener () { 305 public void onMessage(Message m) { 306 try { 307 TextMessage tm = (TextMessage )m; 308 log.info("Got in first listener: "+tm.getText()); 309 assertEquals( ""+counter.get(), tm.getText() ); 310 counter.incrementAndGet(); 311 m.acknowledge(); 312 if( counter.get()==2 ) { 313 sendDone.await(); 314 connection.close(); 315 got2Done.countDown(); 316 } 317 } catch (Throwable e) { 318 e.printStackTrace(); 319 } 320 } 321 }); 322 323 sendMessages(session, destination, 4); 325 sendDone.countDown(); 326 327 assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS)); 329 330 connection = (ActiveMQConnection) factory.createConnection(); 332 connections.add(connection); 333 334 connection.getPrefetchPolicy().setAll(1); 335 connection.start(); 336 337 final CountDownLatch done2 = new CountDownLatch (1); 339 session = connection.createSession(false, ackMode); 340 consumer = session.createConsumer(destination); 341 consumer.setMessageListener(new MessageListener () { 342 public void onMessage(Message m) { 343 try { 344 TextMessage tm = (TextMessage )m; 345 log.info("Got in second listener: "+tm.getText()); 346 assertEquals( ""+counter.get(), tm.getText() ); 347 counter.incrementAndGet(); 348 if( counter.get()==4 ) 349 done2.countDown(); 350 } catch (Throwable e) { 351 e.printStackTrace(); 352 } 353 } 354 }); 355 356 assertTrue(done2.await(1000, TimeUnit.MILLISECONDS)); 357 Thread.sleep(200); 358 359 assertEquals(4, counter.get()); 361 362 } 363 364 365 public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() { 366 addCombinationValues("deliveryMode", new Object [] { 367 new Integer (DeliveryMode.NON_PERSISTENT), 368 new Integer (DeliveryMode.PERSISTENT) }); 369 addCombinationValues("destinationType", new Object [] { 370 new Byte (ActiveMQDestination.QUEUE_TYPE), 371 new Byte (ActiveMQDestination.TOPIC_TYPE), 372 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 373 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) }); 374 } 375 public void testMessageListenerWithConsumerWithPrefetch1() throws Exception { 376 377 final AtomicInteger counter = new AtomicInteger (0); 378 final CountDownLatch done = new CountDownLatch (1); 379 380 connection.getPrefetchPolicy().setAll(1); 382 connection.start(); 383 384 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 385 destination = createDestination(session, destinationType); 386 MessageConsumer consumer = session.createConsumer(destination); 387 consumer.setMessageListener(new MessageListener () { 388 public void onMessage(Message m) { 389 counter.incrementAndGet(); 390 if( counter.get()==4 ) 391 done.countDown(); 392 } 393 }); 394 395 sendMessages(session, destination, 4); 397 398 assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); 399 Thread.sleep(200); 400 401 assertEquals(4, counter.get()); 403 } 404 405 public void initCombosForTestMessageListenerWithConsumer() { 406 addCombinationValues("deliveryMode", new Object [] { 407 new Integer (DeliveryMode.NON_PERSISTENT), 408 new Integer (DeliveryMode.PERSISTENT) }); 409 addCombinationValues("destinationType", new Object [] { 410 new Byte (ActiveMQDestination.QUEUE_TYPE), 411 new Byte (ActiveMQDestination.TOPIC_TYPE), 412 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 413 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) }); 414 } 415 public void testMessageListenerWithConsumer() throws Exception { 416 417 final AtomicInteger counter = new AtomicInteger (0); 418 final CountDownLatch done = new CountDownLatch (1); 419 420 connection.start(); 422 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 423 destination = createDestination(session, destinationType); 424 MessageConsumer consumer = session.createConsumer(destination); 425 consumer.setMessageListener(new MessageListener () { 426 public void onMessage(Message m) { 427 counter.incrementAndGet(); 428 if( counter.get()==4 ) 429 done.countDown(); 430 } 431 }); 432 433 sendMessages(session, destination, 4); 435 436 assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); 437 Thread.sleep(200); 438 439 assertEquals(4, counter.get()); 441 } 442 443 public void initCombosForTestUnackedWithPrefetch1StayInQueue() { 444 addCombinationValues("deliveryMode", new Object [] { new Integer (DeliveryMode.NON_PERSISTENT), 445 new Integer (DeliveryMode.PERSISTENT) }); 446 addCombinationValues("ackMode", new Object [] { new Integer (Session.AUTO_ACKNOWLEDGE), 447 new Integer (Session.DUPS_OK_ACKNOWLEDGE), new Integer (Session.CLIENT_ACKNOWLEDGE) }); 448 addCombinationValues("destinationType", new Object [] { new Byte (ActiveMQDestination.QUEUE_TYPE), }); 449 } 450 451 public void testUnackedWithPrefetch1StayInQueue() throws Exception { 452 453 connection.getPrefetchPolicy().setAll(1); 455 connection.start(); 456 457 Session session = connection.createSession(false, ackMode); 459 destination = createDestination(session, destinationType); 460 MessageConsumer consumer = session.createConsumer(destination); 461 462 sendMessages(session, destination, 4); 464 465 Message message = null; 467 for (int i = 0; i < 2; i++) { 468 message = consumer.receive(1000); 469 assertNotNull(message); 470 } 471 message.acknowledge(); 472 473 connection.close(); 474 connection = (ActiveMQConnection) factory.createConnection(); 475 connections.add(connection); 476 connection.getPrefetchPolicy().setAll(1); 477 connection.start(); 478 479 session = connection.createSession(false, ackMode); 481 consumer = session.createConsumer(destination); 482 483 for (int i = 0; i < 2; i++) { 485 message = consumer.receive(1000); 486 assertNotNull(message); 487 } 488 message.acknowledge(); 489 assertNull(consumer.receiveNoWait()); 490 491 } 492 public void initCombosForTestPrefetch1MessageNotDispatched() { 493 addCombinationValues("deliveryMode", new Object [] { new Integer (DeliveryMode.NON_PERSISTENT), 494 new Integer (DeliveryMode.PERSISTENT) }); 495 } 496 497 public void testPrefetch1MessageNotDispatched() throws Exception { 498 499 connection.getPrefetchPolicy().setAll(1); 501 connection.start(); 502 503 Session session = connection.createSession(true, 0); 504 destination = new ActiveMQQueue("TEST"); 505 MessageConsumer consumer = session.createConsumer(destination); 506 507 sendMessages(session, destination, 2); 509 session.commit(); 510 511 Message message1 = consumer.receive(1000); 513 assertNotNull(message1); 514 515 ActiveMQConnection connection2 = (ActiveMQConnection) factory.createConnection(); 519 connections.add(connection2); 520 Session session2 = connection2.createSession(true, 0); 521 MessageConsumer consumer2 = session2.createConsumer(destination); 522 523 Message message2 = consumer.receive(1000); 525 assertNotNull(message2); 526 527 session.commit(); 528 session2.commit(); 529 530 assertNull(consumer.receiveNoWait()); 531 532 } 533 534 public void initCombosForTestDontStart() { 535 addCombinationValues("deliveryMode", new Object [] { new Integer (DeliveryMode.NON_PERSISTENT), }); 536 addCombinationValues("destinationType", new Object [] { new Byte (ActiveMQDestination.QUEUE_TYPE), 537 new Byte (ActiveMQDestination.TOPIC_TYPE), }); 538 } 539 540 public void testDontStart() throws Exception { 541 542 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 543 destination = createDestination(session, destinationType); 544 MessageConsumer consumer = session.createConsumer(destination); 545 546 sendMessages(session, destination, 1); 548 549 assertNull(consumer.receive(1000)); 551 } 552 553 public void initCombosForTestStartAfterSend() { 554 addCombinationValues("deliveryMode", new Object [] { new Integer (DeliveryMode.NON_PERSISTENT), }); 555 addCombinationValues("destinationType", new Object [] { new Byte (ActiveMQDestination.QUEUE_TYPE), 556 new Byte (ActiveMQDestination.TOPIC_TYPE), }); 557 } 558 559 public void testStartAfterSend() throws Exception { 560 561 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 562 destination = createDestination(session, destinationType); 563 MessageConsumer consumer = session.createConsumer(destination); 564 565 sendMessages(session, destination, 1); 567 568 connection.start(); 570 571 assertNotNull(consumer.receive(1000)); 573 assertNull(consumer.receiveNoWait()); 574 } 575 576 public void initCombosForTestReceiveMessageWithConsumer() { 577 addCombinationValues("deliveryMode", new Object [] { new Integer (DeliveryMode.NON_PERSISTENT), 578 new Integer (DeliveryMode.PERSISTENT) }); 579 addCombinationValues("destinationType", new Object [] { new Byte (ActiveMQDestination.QUEUE_TYPE), 580 new Byte (ActiveMQDestination.TOPIC_TYPE), new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 581 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) }); 582 } 583 584 public void testReceiveMessageWithConsumer() throws Exception { 585 586 connection.start(); 588 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 589 destination = createDestination(session, destinationType); 590 MessageConsumer consumer = session.createConsumer(destination); 591 592 sendMessages(session, destination, 1); 594 595 Message m = consumer.receive(1000); 597 assertNotNull(m); 598 assertEquals("0", ((TextMessage )m).getText()); 599 assertNull(consumer.receiveNoWait()); 600 } 601 602 } 603 | Popular Tags |