1 18 package org.apache.activemq.broker; 19 20 import javax.jms.DeliveryMode ; 21 import javax.jms.JMSException ; 22 23 import junit.framework.Test; 24 25 import org.apache.activemq.command.ActiveMQDestination; 26 import org.apache.activemq.command.ActiveMQQueue; 27 import org.apache.activemq.command.ActiveMQTopic; 28 import org.apache.activemq.command.ConnectionInfo; 29 import org.apache.activemq.command.ConsumerInfo; 30 import org.apache.activemq.command.DestinationInfo; 31 import org.apache.activemq.command.LocalTransactionId; 32 import org.apache.activemq.command.Message; 33 import org.apache.activemq.command.MessageAck; 34 import org.apache.activemq.command.ProducerInfo; 35 import org.apache.activemq.command.SessionInfo; 36 37 import java.util.concurrent.TimeUnit ; 38 39 public class BrokerTest extends BrokerTestSupport { 40 41 public ActiveMQDestination destination; 42 public int deliveryMode; 43 public int prefetch; 44 public byte destinationType; 45 public boolean durableConsumer; 46 47 public void initCombosForTestConsumerPrefetchAndStandardAck() { 48 addCombinationValues( "deliveryMode", new Object []{ 49 new Integer (DeliveryMode.PERSISTENT)} ); 51 addCombinationValues( "destinationType", new Object []{ 52 new Byte (ActiveMQDestination.QUEUE_TYPE), 53 new Byte (ActiveMQDestination.TOPIC_TYPE), 54 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 55 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) 56 } ); 57 } 58 59 public void testConsumerPrefetchAndStandardAck() throws Exception { 60 61 StubConnection connection = createConnection(); 63 ConnectionInfo connectionInfo = createConnectionInfo(); 64 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 65 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 66 connection.send(connectionInfo); 67 connection.send(sessionInfo); 68 connection.send(producerInfo); 69 70 destination = createDestinationInfo(connection, connectionInfo, destinationType); 71 72 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 73 consumerInfo.setPrefetchSize(1); 74 connection.send(consumerInfo); 75 76 connection.send(createMessage(producerInfo, destination, deliveryMode)); 78 connection.send(createMessage(producerInfo, destination, deliveryMode)); 79 connection.send(createMessage(producerInfo, destination, deliveryMode)); 80 81 Message m1 = receiveMessage(connection); 83 assertNotNull(m1); 84 assertNoMessagesLeft(connection); 85 86 connection.send(createAck(consumerInfo, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 88 89 Message m2 = receiveMessage(connection); 90 assertNotNull(m2); 91 connection.send(createAck(consumerInfo, m2, 1, MessageAck.STANDARD_ACK_TYPE)); 92 93 Message m3 = receiveMessage(connection); 94 assertNotNull(m3); 95 connection.send(createAck(consumerInfo, m3, 1, MessageAck.STANDARD_ACK_TYPE)); 96 97 connection.send(closeConnectionInfo(connectionInfo)); 98 } 99 100 101 public void initCombosForTestTransactedAckWithPrefetchOfOne() { 102 addCombinationValues( "deliveryMode", new Object []{ 103 new Integer (DeliveryMode.NON_PERSISTENT), 104 new Integer (DeliveryMode.PERSISTENT)} ); 105 addCombinationValues( "destinationType", new Object []{ 106 new Byte (ActiveMQDestination.QUEUE_TYPE), 107 new Byte (ActiveMQDestination.TOPIC_TYPE), 108 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 109 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) 110 } ); 111 } 112 113 public void testTransactedAckWithPrefetchOfOne() throws Exception { 114 115 StubConnection connection1 = createConnection(); 117 ConnectionInfo connectionInfo1 = createConnectionInfo(); 118 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 119 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 120 connection1.send(connectionInfo1); 121 connection1.send(sessionInfo1); 122 connection1.send(producerInfo1); 123 124 destination = createDestinationInfo(connection1, connectionInfo1, destinationType); 125 126 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 127 consumerInfo1.setPrefetchSize(1); 128 connection1.send(consumerInfo1); 129 130 for( int i=0; i < 4 ; i++ ) { 132 Message message = createMessage(producerInfo1, destination, deliveryMode); 133 connection1.send(message); 134 } 135 136 LocalTransactionId txid = createLocalTransaction(sessionInfo1); 138 connection1.send(createBeginTransaction(connectionInfo1, txid)); 139 140 for( int i=0; i < 4 ; i++ ) { 142 Message m1 = receiveMessage(connection1); 143 assertNotNull(m1); 144 MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE); 145 ack.setTransactionId(txid); 146 connection1.send(ack); 147 } 148 149 connection1.send(createCommitTransaction1Phase(connectionInfo1, txid)); 151 152 assertNoMessagesLeft(connection1); 153 } 154 155 public void initCombosForTestTransactedSend() { 156 addCombinationValues( "deliveryMode", new Object []{ 157 new Integer (DeliveryMode.NON_PERSISTENT), 158 new Integer (DeliveryMode.PERSISTENT)} ); 159 addCombinationValues( "destinationType", new Object []{ 160 new Byte (ActiveMQDestination.QUEUE_TYPE), 161 new Byte (ActiveMQDestination.TOPIC_TYPE), 162 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 163 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE)} ); 164 } 165 public void testTransactedSend() throws Exception { 166 167 StubConnection connection1 = createConnection(); 169 ConnectionInfo connectionInfo1 = createConnectionInfo(); 170 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 171 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 172 connection1.send(connectionInfo1); 173 connection1.send(sessionInfo1); 174 connection1.send(producerInfo1); 175 176 destination = createDestinationInfo(connection1, connectionInfo1, destinationType); 177 178 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 179 consumerInfo1.setPrefetchSize(100); 180 connection1.send(consumerInfo1); 181 182 LocalTransactionId txid = createLocalTransaction(sessionInfo1); 184 connection1.send(createBeginTransaction(connectionInfo1, txid)); 185 186 for( int i=0; i < 4 ; i++ ) { 188 Message message = createMessage(producerInfo1, destination, deliveryMode); 189 message.setTransactionId(txid); 190 connection1.send(message); 191 } 192 193 assertNull(receiveMessage(connection1)); 196 197 connection1.send(createCommitTransaction1Phase(connectionInfo1, txid)); 199 200 for( int i=0; i < 4 ; i++ ) { 202 Message m1 = receiveMessage(connection1); 203 assertNotNull(m1); 204 } 205 206 assertNoMessagesLeft(connection1); 207 } 208 209 public void initCombosForTestQueueTransactedAck() { 210 addCombinationValues( "deliveryMode", new Object []{ 211 new Integer (DeliveryMode.NON_PERSISTENT), 212 new Integer (DeliveryMode.PERSISTENT)} ); 213 addCombinationValues( "destinationType", new Object []{ 214 new Byte (ActiveMQDestination.QUEUE_TYPE), 215 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 216 } ); 217 } 218 219 public void testQueueTransactedAck() throws Exception { 220 221 StubConnection connection1 = createConnection(); 223 ConnectionInfo connectionInfo1 = createConnectionInfo(); 224 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 225 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 226 connection1.send(connectionInfo1); 227 connection1.send(sessionInfo1); 228 connection1.send(producerInfo1); 229 230 destination = createDestinationInfo(connection1, connectionInfo1, destinationType); 231 232 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 233 consumerInfo1.setPrefetchSize(100); 234 connection1.send(consumerInfo1); 235 236 for( int i=0; i < 4 ; i++ ) { 238 Message message = createMessage(producerInfo1, destination, deliveryMode); 239 connection1.send(message); 240 } 241 242 LocalTransactionId txid = createLocalTransaction(sessionInfo1); 244 connection1.send(createBeginTransaction(connectionInfo1, txid)); 245 246 for( int i=0; i < 2 ; i++ ) { 248 Message m1 = receiveMessage(connection1); 249 assertNotNull("m1 is null for index: " + i, m1); 250 MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE); 251 ack.setTransactionId(txid); 252 connection1.request(ack); 253 } 254 255 connection1.send(createCommitTransaction1Phase(connectionInfo1, txid)); 257 258 assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination)); 260 } 261 262 public void initCombosForTestConsumerCloseCausesRedelivery() { 263 addCombinationValues( "deliveryMode", new Object []{ 264 new Integer (DeliveryMode.NON_PERSISTENT), 265 new Integer (DeliveryMode.PERSISTENT)} ); 266 addCombinationValues( "destination", new Object []{ 267 new ActiveMQQueue("TEST")} ); 268 } 269 270 public void testConsumerCloseCausesRedelivery() throws Exception { 271 272 StubConnection connection1 = createConnection(); 274 ConnectionInfo connectionInfo1 = createConnectionInfo(); 275 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 276 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 277 connection1.send(connectionInfo1); 278 connection1.send(sessionInfo1); 279 connection1.send(producerInfo1); 280 281 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 282 consumerInfo1.setPrefetchSize(100); 283 connection1.request(consumerInfo1); 284 285 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 287 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 288 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 289 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 290 291 for( int i=0; i < 4 ; i++ ) { 293 Message m1 = receiveMessage(connection1); 294 assertNotNull("m1 is null for index: " + i, m1); 295 assertFalse(m1.isRedelivered()); 296 } 297 298 connection1.send(consumerInfo1.createRemoveCommand()); 300 301 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1, destination); 303 consumerInfo2.setPrefetchSize(100); 304 connection1.request(consumerInfo2); 305 306 for( int i=0; i < 4 ; i++ ) { 308 Message m1 = receiveMessage(connection1); 309 assertNotNull("m1 is null for index: " + i, m1); 310 assertTrue(m1.isRedelivered()); 311 } 312 assertNoMessagesLeft(connection1); 313 314 } 315 316 public void testTopicDurableSubscriptionCanBeRestored() throws Exception { 317 318 ActiveMQDestination destination = new ActiveMQTopic("TEST"); 319 320 StubConnection connection1 = createConnection(); 322 ConnectionInfo connectionInfo1 = createConnectionInfo(); 323 connectionInfo1.setClientId("clientid1"); 324 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 325 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 326 connection1.send(connectionInfo1); 327 connection1.send(sessionInfo1); 328 connection1.send(producerInfo1); 329 330 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 331 consumerInfo1.setPrefetchSize(100); 332 consumerInfo1.setSubscriptionName("test"); 333 connection1.send(consumerInfo1); 334 335 connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); 337 connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); 338 connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); 339 connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); 340 341 Message m=null; 343 for( int i=0; i < 2 ; i++ ) { 344 m = receiveMessage(connection1); 345 assertNotNull(m); 346 } 347 connection1.send(createAck(consumerInfo1, m, 2, MessageAck.STANDARD_ACK_TYPE)); 349 connection1.request(closeConnectionInfo(connectionInfo1)); 351 connection1.stop(); 352 353 StubConnection connection2 = createConnection(); 355 ConnectionInfo connectionInfo2 = createConnectionInfo(); 356 connectionInfo2.setClientId("clientid1"); 357 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 358 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 359 consumerInfo2.setPrefetchSize(100); 360 consumerInfo2.setSubscriptionName("test"); 361 362 connection2.send(connectionInfo2); 363 connection2.send(sessionInfo2); 364 connection2.send(consumerInfo2); 365 366 for( int i=0; i < 2 ; i++ ) { 368 Message m1 = receiveMessage(connection2); 369 assertNotNull("m1 is null for index: " + i, m1); 370 } 371 assertNoMessagesLeft(connection2); 372 } 373 374 375 public void initCombosForTestGroupedMessagesDeliveredToOnlyOneConsumer() { 376 addCombinationValues( "deliveryMode", new Object []{ 377 new Integer (DeliveryMode.NON_PERSISTENT), 378 new Integer (DeliveryMode.PERSISTENT)} ); 379 } 380 public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception { 381 382 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 383 384 StubConnection connection1 = createConnection(); 386 ConnectionInfo connectionInfo1 = createConnectionInfo(); 387 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 388 ProducerInfo producerInfo = createProducerInfo(sessionInfo1); 389 connection1.send(connectionInfo1); 390 connection1.send(sessionInfo1); 391 connection1.send(producerInfo); 392 393 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 394 consumerInfo1.setPrefetchSize(1); 395 connection1.send(consumerInfo1); 396 397 for( int i=0; i < 4 ; i++ ) { 399 Message message = createMessage(producerInfo, destination, deliveryMode); 400 message.setGroupID("TEST-GROUP"); 401 message.setGroupSequence(i+1); 402 connection1.request(message); 403 } 404 405 StubConnection connection2 = createConnection(); 407 ConnectionInfo connectionInfo2 = createConnectionInfo(); 408 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 409 connection2.send(connectionInfo2); 410 connection2.send(sessionInfo2); 411 412 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 413 consumerInfo2.setPrefetchSize(1); 414 connection2.send(consumerInfo2); 415 416 for( int i=0; i < 3 ; i++ ) { 418 Message m1 = receiveMessage(connection1); 419 assertNotNull("m1 is null for index: " + i, m1); 420 connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 421 } 422 423 connection1.send(closeConsumerInfo(consumerInfo1)); 425 426 for( int i=0; i < 1 ; i++ ) { 428 Message m1 = receiveMessage(connection2); 429 assertNotNull("m1 is null for index: " + i, m1); 430 connection2.send(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 431 } 432 433 assertNoMessagesLeft(connection2); 434 } 435 436 public void initCombosForTestTopicConsumerOnlySeeMessagesAfterCreation() { 437 addCombinationValues( "deliveryMode", new Object []{ 438 new Integer (DeliveryMode.NON_PERSISTENT), 439 new Integer (DeliveryMode.PERSISTENT)} ); 440 addCombinationValues( "durableConsumer", new Object []{ 441 Boolean.TRUE, 442 Boolean.FALSE}); 443 } 444 445 public void testTopicConsumerOnlySeeMessagesAfterCreation() throws Exception { 446 447 ActiveMQDestination destination = new ActiveMQTopic("TEST"); 448 449 StubConnection connection1 = createConnection(); 451 ConnectionInfo connectionInfo1 = createConnectionInfo(); 452 connectionInfo1.setClientId("A"); 453 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 454 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 455 connection1.send(connectionInfo1); 456 connection1.send(sessionInfo1); 457 connection1.send(producerInfo1); 458 459 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 461 462 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 464 if( durableConsumer ) { 465 consumerInfo1.setSubscriptionName("test"); 466 } 467 consumerInfo1.setPrefetchSize(100); 468 connection1.send(consumerInfo1); 469 470 Message m = createMessage(producerInfo1, destination, deliveryMode); 471 connection1.send(m); 472 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 473 474 Message m2 = receiveMessage(connection1); 476 assertNotNull(m2); 477 assertEquals(m.getMessageId(), m2.getMessageId()); 478 m2 = receiveMessage(connection1); 479 assertNotNull(m2); 480 481 assertNoMessagesLeft(connection1); 482 } 483 484 public void initCombosForTestTopicRetroactiveConsumerSeeMessagesBeforeCreation() { 485 addCombinationValues( "deliveryMode", new Object []{ 486 new Integer (DeliveryMode.NON_PERSISTENT), 487 new Integer (DeliveryMode.PERSISTENT)} ); 488 addCombinationValues( "durableConsumer", new Object []{ 489 Boolean.TRUE, 490 Boolean.FALSE}); 491 } 492 493 public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception { 494 495 ActiveMQDestination destination = new ActiveMQTopic("TEST"); 496 497 StubConnection connection1 = createConnection(); 499 ConnectionInfo connectionInfo1 = createConnectionInfo(); 500 connectionInfo1.setClientId("A"); 501 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 502 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 503 connection1.send(connectionInfo1); 504 connection1.send(sessionInfo1); 505 connection1.send(producerInfo1); 506 507 Message m = createMessage(producerInfo1, destination, deliveryMode); 509 connection1.send(m); 510 511 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 513 if( durableConsumer ) { 514 consumerInfo1.setSubscriptionName("test"); 515 } 516 consumerInfo1.setPrefetchSize(100); 517 consumerInfo1.setRetroactive(true); 518 connection1.send(consumerInfo1); 519 520 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 521 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 522 523 526 Message m2 = receiveMessage(connection1); 528 assertNotNull(m2); 529 assertEquals(m.getMessageId(), m2.getMessageId()); 530 for( int i=0; i < 2 ; i++ ) { 531 m2 = receiveMessage(connection1); 532 assertNotNull(m2); 533 } 534 535 assertNoMessagesLeft(connection1); 536 } 537 538 539 589 590 601 602 627 628 629 630 public void initCombosForTestTempDestinationsOnlyAllowsLocalConsumers() { 631 addCombinationValues( "deliveryMode", new Object []{ 632 new Integer (DeliveryMode.NON_PERSISTENT), 633 new Integer (DeliveryMode.PERSISTENT)} ); 634 addCombinationValues( "destinationType", new Object []{ 635 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 636 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE)} ); 637 } 638 639 public void testTempDestinationsOnlyAllowsLocalConsumers() throws Exception { 640 641 StubConnection connection1 = createConnection(); 643 ConnectionInfo connectionInfo1 = createConnectionInfo(); 644 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 645 connection1.send(connectionInfo1); 646 connection1.send(sessionInfo1); 647 648 DestinationInfo destinationInfo = createTempDestinationInfo(connectionInfo1, destinationType); 649 connection1.request(destinationInfo); 650 destination = destinationInfo.getDestination(); 651 652 StubConnection connection2 = createConnection(); 654 ConnectionInfo connectionInfo2 = createConnectionInfo(); 655 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 656 connection2.send(connectionInfo2); 657 connection2.send(sessionInfo2); 658 659 try { 661 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 662 connection2.request(consumerInfo2); 663 fail("Expected JMSException."); 664 } catch ( JMSException success ) { 665 } 666 667 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 669 connection1.send(consumerInfo1); 670 } 671 672 public void initCombosForTestExclusiveQueueDeliversToOnlyOneConsumer() { 673 addCombinationValues( "deliveryMode", new Object []{ 674 new Integer (DeliveryMode.NON_PERSISTENT), 675 new Integer (DeliveryMode.PERSISTENT)} ); 676 } 677 public void testExclusiveQueueDeliversToOnlyOneConsumer() throws Exception { 678 679 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 680 681 StubConnection connection1 = createConnection(); 683 ConnectionInfo connectionInfo1 = createConnectionInfo(); 684 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 685 ProducerInfo producerInfo = createProducerInfo(sessionInfo1); 686 connection1.send(connectionInfo1); 687 connection1.send(sessionInfo1); 688 connection1.send(producerInfo); 689 690 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 691 consumerInfo1.setPrefetchSize(1); 692 consumerInfo1.setExclusive(true); 693 connection1.send(consumerInfo1); 694 695 connection1.request(createMessage(producerInfo, destination, deliveryMode)); 697 698 StubConnection connection2 = createConnection(); 700 ConnectionInfo connectionInfo2 = createConnectionInfo(); 701 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 702 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 703 consumerInfo2.setPrefetchSize(1); 704 consumerInfo2.setExclusive(true); 705 connection2.send(connectionInfo2); 706 connection2.send(sessionInfo2); 707 connection2.request(consumerInfo2); 708 709 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 712 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 713 714 for( int i=0; i < 2 ; i++ ) { 716 Message m1 = receiveMessage(connection1); 717 assertNotNull(m1); 718 connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 719 } 720 721 connection1.send(closeConsumerInfo(consumerInfo1)); 723 724 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 726 727 for( int i=0; i < 2 ; i++ ) { 728 Message m1 = receiveMessage(connection2); 729 assertNotNull(m1); 730 connection2.send(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 731 } 732 733 assertNoMessagesLeft(connection2); 734 } 735 736 public void initCombosForTestWildcardConsume() { 737 addCombinationValues( "deliveryMode", new Object []{ 738 new Integer (DeliveryMode.NON_PERSISTENT), 739 new Integer (DeliveryMode.PERSISTENT)} ); 740 addCombinationValues( "destinationType", new Object []{ 741 new Byte (ActiveMQDestination.QUEUE_TYPE), 742 new Byte (ActiveMQDestination.TOPIC_TYPE)} ); 743 } 744 745 public void testWildcardConsume() throws Exception { 746 747 StubConnection connection1 = createConnection(); 749 ConnectionInfo connectionInfo1 = createConnectionInfo(); 750 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 751 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 752 connection1.send(connectionInfo1); 753 connection1.send(sessionInfo1); 754 connection1.send(producerInfo1); 755 756 ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("WILD.*.TEST", destinationType); 758 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination); 759 consumerInfo1.setPrefetchSize(100); 760 connection1.send(consumerInfo1); 761 762 connection1.send(createMessage(producerInfo1, ActiveMQDestination.createDestination("WILD.CARD", destinationType), deliveryMode)); 764 connection1.send(createMessage(producerInfo1, ActiveMQDestination.createDestination("WILD.TEST", destinationType), deliveryMode)); 765 766 ActiveMQDestination d1 = ActiveMQDestination.createDestination("WILD.CARD.TEST", destinationType); 768 connection1.send(createMessage(producerInfo1, d1, deliveryMode)); 769 ActiveMQDestination d2 = ActiveMQDestination.createDestination("WILD.FOO.TEST", destinationType); 770 connection1.send(createMessage(producerInfo1, d2, deliveryMode)); 771 772 Message m = receiveMessage(connection1); 773 assertNotNull(m); 774 assertEquals(d1,m.getDestination()); 775 m = receiveMessage(connection1); 776 assertNotNull(m); 777 assertEquals(d2,m.getDestination()); 778 779 assertNoMessagesLeft(connection1); 780 connection1.send(closeConnectionInfo(connectionInfo1)); 781 } 782 783 public void initCombosForTestCompositeConsume() { 784 addCombinationValues( "deliveryMode", new Object []{ 785 new Integer (DeliveryMode.NON_PERSISTENT), 786 new Integer (DeliveryMode.PERSISTENT)} ); 787 addCombinationValues( "destinationType", new Object []{ 788 new Byte (ActiveMQDestination.QUEUE_TYPE), 789 new Byte (ActiveMQDestination.TOPIC_TYPE)} ); 790 } 791 792 public void testCompositeConsume() throws Exception { 793 794 StubConnection connection1 = createConnection(); 796 ConnectionInfo connectionInfo1 = createConnectionInfo(); 797 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 798 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 799 connection1.send(connectionInfo1); 800 connection1.send(sessionInfo1); 801 connection1.send(producerInfo1); 802 803 ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B", destinationType); 805 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination); 806 consumerInfo1.setRetroactive(true); 807 consumerInfo1.setPrefetchSize(100); 808 connection1.send(consumerInfo1); 809 810 ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType); 812 ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType); 813 814 connection1.send(createMessage(producerInfo1, destinationA, deliveryMode)); 816 connection1.send(createMessage(producerInfo1, destinationB, deliveryMode)); 817 818 for( int i=0; i < 2 ; i++ ) { 820 Message m1 = receiveMessage(connection1); 821 assertNotNull(m1); 822 } 823 824 assertNoMessagesLeft(connection1); 825 connection1.send(closeConnectionInfo(connectionInfo1)); 826 } 827 828 public void initCombosForTestCompositeSend() { 829 addCombinationValues( "deliveryMode", new Object []{ 830 new Integer (DeliveryMode.NON_PERSISTENT), 831 new Integer (DeliveryMode.PERSISTENT)} ); 832 addCombinationValues( "destinationType", new Object []{ 833 new Byte (ActiveMQDestination.QUEUE_TYPE), 834 new Byte (ActiveMQDestination.TOPIC_TYPE)} ); 835 } 836 837 public void testCompositeSend() throws Exception { 838 839 StubConnection connection1 = createConnection(); 841 ConnectionInfo connectionInfo1 = createConnectionInfo(); 842 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 843 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 844 connection1.send(connectionInfo1); 845 connection1.send(sessionInfo1); 846 connection1.send(producerInfo1); 847 848 ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType); 849 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationA); 850 consumerInfo1.setRetroactive(true); 851 consumerInfo1.setPrefetchSize(100); 852 connection1.send(consumerInfo1); 853 854 StubConnection connection2 = createConnection(); 856 ConnectionInfo connectionInfo2 = createConnectionInfo(); 857 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 858 connection2.send(connectionInfo2); 859 connection2.send(sessionInfo2); 860 861 ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType); 862 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationB); 863 consumerInfo2.setRetroactive(true); 864 consumerInfo2.setPrefetchSize(100); 865 connection2.send(consumerInfo2); 866 867 ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B", destinationType); 869 for( int i=0; i < 4 ; i++ ) { 870 connection1.send(createMessage(producerInfo1, compositeDestination, deliveryMode)); 871 } 872 873 for( int i=0; i < 4 ; i++ ) { 875 Message m1 = receiveMessage(connection1); 876 Message m2 = receiveMessage(connection2); 877 878 assertNotNull(m1); 879 assertNotNull(m2); 880 881 assertEquals( m1.getMessageId(), m2.getMessageId() ); 882 assertEquals( compositeDestination, m1.getOriginalDestination()); 883 assertEquals( compositeDestination, m2.getOriginalDestination()); 884 885 connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 886 connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE)); 887 888 } 889 890 assertNoMessagesLeft(connection1); 891 assertNoMessagesLeft(connection2); 892 893 connection1.send(closeConnectionInfo(connectionInfo1)); 894 connection2.send(closeConnectionInfo(connectionInfo2)); 895 } 896 897 public void initCombosForTestConnectionCloseCascades() { 898 addCombinationValues( "deliveryMode", new Object []{ 899 new Integer (DeliveryMode.NON_PERSISTENT), 900 new Integer (DeliveryMode.PERSISTENT)} ); 901 addCombinationValues( "destination", new Object []{ 902 new ActiveMQTopic("TEST"), 903 new ActiveMQQueue("TEST")} ); 904 } 905 906 public void testConnectionCloseCascades() throws Exception { 907 908 StubConnection connection1 = createConnection(); 910 ConnectionInfo connectionInfo1 = createConnectionInfo(); 911 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 912 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 913 connection1.send(connectionInfo1); 914 connection1.send(sessionInfo1); 915 connection1.send(producerInfo1); 916 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 917 consumerInfo1.setPrefetchSize(100); 918 consumerInfo1.setNoLocal(true); 919 connection1.request(consumerInfo1); 920 921 StubConnection connection2 = createConnection(); 923 ConnectionInfo connectionInfo2 = createConnectionInfo(); 924 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 925 ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); 926 connection2.send(connectionInfo2); 927 connection2.send(sessionInfo2); 928 connection2.send(producerInfo2); 929 930 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 932 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 933 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 934 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 935 936 for( int i=0; i < 4 ; i++ ) { 937 Message m1 = receiveMessage(connection1); 938 assertNotNull(m1); 939 connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 940 } 941 942 connection1.request(closeConnectionInfo(connectionInfo1)); 944 945 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 947 948 assertNull(connection1.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS)); 949 } 950 951 public void initCombosForTestSessionCloseCascades() { 952 addCombinationValues( "deliveryMode", new Object []{ 953 new Integer (DeliveryMode.NON_PERSISTENT), 954 new Integer (DeliveryMode.PERSISTENT)} ); 955 addCombinationValues( "destination", new Object []{ 956 new ActiveMQTopic("TEST"), 957 new ActiveMQQueue("TEST")} ); 958 } 959 960 public void testSessionCloseCascades() throws Exception { 961 962 StubConnection connection1 = createConnection(); 964 ConnectionInfo connectionInfo1 = createConnectionInfo(); 965 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 966 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 967 connection1.send(connectionInfo1); 968 connection1.send(sessionInfo1); 969 connection1.send(producerInfo1); 970 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 971 consumerInfo1.setPrefetchSize(100); 972 consumerInfo1.setNoLocal(true); 973 connection1.request(consumerInfo1); 974 975 StubConnection connection2 = createConnection(); 977 ConnectionInfo connectionInfo2 = createConnectionInfo(); 978 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 979 ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); 980 connection2.send(connectionInfo2); 981 connection2.send(sessionInfo2); 982 connection2.send(producerInfo2); 983 984 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 986 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 987 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 988 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 989 990 for( int i=0; i < 4 ; i++ ) { 991 Message m1 = receiveMessage(connection1); 992 assertNotNull(m1); 993 connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 994 } 995 996 connection1.request(closeSessionInfo(sessionInfo1)); 998 999 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 1001 1002 assertNull(connection1.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS)); 1003 } 1004 1005 public void initCombosForTestConsumerClose() { 1006 addCombinationValues( "deliveryMode", new Object []{ 1007 new Integer (DeliveryMode.NON_PERSISTENT), 1008 new Integer (DeliveryMode.PERSISTENT)} ); 1009 addCombinationValues( "destination", new Object []{ 1010 new ActiveMQTopic("TEST"), 1011 new ActiveMQQueue("TEST")} ); 1012 } 1013 1014 public void testConsumerClose() throws Exception { 1015 1016 StubConnection connection1 = createConnection(); 1018 ConnectionInfo connectionInfo1 = createConnectionInfo(); 1019 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 1020 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 1021 connection1.send(connectionInfo1); 1022 connection1.send(sessionInfo1); 1023 connection1.send(producerInfo1); 1024 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 1025 consumerInfo1.setPrefetchSize(100); 1026 consumerInfo1.setNoLocal(true); 1027 connection1.request(consumerInfo1); 1028 1029 StubConnection connection2 = createConnection(); 1031 ConnectionInfo connectionInfo2 = createConnectionInfo(); 1032 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 1033 ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); 1034 connection2.send(connectionInfo2); 1035 connection2.send(sessionInfo2); 1036 connection2.send(producerInfo2); 1037 1038 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 1040 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 1041 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 1042 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 1043 1044 for( int i=0; i < 4 ; i++ ) { 1045 Message m1 = receiveMessage(connection1); 1046 assertNotNull(m1); 1047 connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 1048 } 1049 1050 connection1.request(closeConsumerInfo(consumerInfo1)); 1052 1053 connection2.send(createMessage(producerInfo2, destination, deliveryMode)); 1055 1056 assertNull(connection1.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS)); 1057 } 1058 public void initCombosForTestTopicNoLocal() { 1059 addCombinationValues( "deliveryMode", new Object []{ 1060 new Integer (DeliveryMode.NON_PERSISTENT), 1061 new Integer (DeliveryMode.PERSISTENT)} ); 1062 } 1063 1064 public void testTopicNoLocal() throws Exception { 1065 1066 ActiveMQDestination destination = new ActiveMQTopic("TEST"); 1067 1068 StubConnection connection1 = createConnection(); 1070 ConnectionInfo connectionInfo1 = createConnectionInfo(); 1071 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 1072 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 1073 connection1.send(connectionInfo1); 1074 connection1.send(sessionInfo1); 1075 connection1.send(producerInfo1); 1076 1077 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 1078 consumerInfo1.setRetroactive(true); 1079 consumerInfo1.setPrefetchSize(100); 1080 consumerInfo1.setNoLocal(true); 1081 connection1.send(consumerInfo1); 1082 1083 StubConnection connection2 = createConnection(); 1085 ConnectionInfo connectionInfo2 = createConnectionInfo(); 1086 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 1087 ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); 1088 connection2.send(connectionInfo2); 1089 connection2.send(sessionInfo2); 1090 connection2.send(producerInfo2); 1091 1092 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 1093 consumerInfo2.setRetroactive(true); 1094 consumerInfo2.setPrefetchSize(100); 1095 consumerInfo2.setNoLocal(true); 1096 connection2.send(consumerInfo2); 1097 1098 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 1100 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 1101 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 1102 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 1103 1104 for( int i=0; i < 4 ; i++ ) { 1106 Message m1 = receiveMessage(connection2); 1107 assertNotNull(m1); 1108 } 1109 1110 Message message = createMessage(producerInfo2, destination, deliveryMode); 1112 connection2.send(message); 1113 1114 Message m = receiveMessage(connection1); 1117 assertNotNull(m); 1118 assertEquals(message.getMessageId(), m.getMessageId()); 1119 1120 assertNoMessagesLeft(connection1); 1121 assertNoMessagesLeft(connection2); 1122 } 1123 1124 1125 public void initCombosForTopicDispatchIsBroadcast() { 1126 addCombinationValues( "deliveryMode", new Object []{ 1127 new Integer (DeliveryMode.NON_PERSISTENT), 1128 new Integer (DeliveryMode.PERSISTENT)} ); 1129 } 1130 1131 public void testTopicDispatchIsBroadcast() throws Exception { 1132 1133 ActiveMQDestination destination = new ActiveMQTopic("TEST"); 1134 1135 StubConnection connection1 = createConnection(); 1137 ConnectionInfo connectionInfo1 = createConnectionInfo(); 1138 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 1139 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 1140 connection1.send(connectionInfo1); 1141 connection1.send(sessionInfo1); 1142 connection1.send(producerInfo1); 1143 1144 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 1145 consumerInfo1.setRetroactive(true); 1146 consumerInfo1.setPrefetchSize(100); 1147 connection1.send(consumerInfo1); 1148 1149 StubConnection connection2 = createConnection(); 1151 ConnectionInfo connectionInfo2 = createConnectionInfo(); 1152 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 1153 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 1154 consumerInfo2.setRetroactive(true); 1155 consumerInfo2.setPrefetchSize(100); 1156 connection2.send(connectionInfo2); 1157 connection2.send(sessionInfo2); 1158 connection2.send(consumerInfo2); 1159 1160 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 1162 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 1163 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 1164 connection1.send(createMessage(producerInfo1, destination, deliveryMode)); 1165 1166 for( int i=0; i < 4 ; i++ ) { 1168 Message m1 = receiveMessage(connection1); 1169 assertNotNull(m1); 1170 m1 = receiveMessage(connection2); 1171 assertNotNull(m1); 1172 } 1173 } 1174 1175 public void initCombosForTestQueueDispatchedAreRedeliveredOnConsumerClose() { 1176 addCombinationValues( "deliveryMode", new Object []{ 1177 new Integer (DeliveryMode.NON_PERSISTENT), 1178 new Integer (DeliveryMode.PERSISTENT)} ); 1179 addCombinationValues( "destinationType", new Object []{ 1180 new Byte (ActiveMQDestination.QUEUE_TYPE), 1181 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 1182 } ); 1183 } 1184 1185 public void testQueueDispatchedAreRedeliveredOnConsumerClose() throws Exception { 1186 1187 StubConnection connection1 = createConnection(); 1189 ConnectionInfo connectionInfo1 = createConnectionInfo(); 1190 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 1191 ProducerInfo producerInfo = createProducerInfo(sessionInfo1); 1192 connection1.send(connectionInfo1); 1193 connection1.send(sessionInfo1); 1194 connection1.send(producerInfo); 1195 1196 destination = createDestinationInfo(connection1, connectionInfo1, destinationType); 1197 1198 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 1199 consumerInfo1.setPrefetchSize(100); 1200 connection1.send(consumerInfo1); 1201 1202 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1204 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1205 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1206 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1207 1208 for( int i=0; i < 4 ; i++ ) { 1210 Message m1 = receiveMessage(connection1); 1211 assertNotNull(m1); 1212 assertFalse(m1.isRedelivered()); 1213 } 1214 connection1.send(closeConsumerInfo(consumerInfo1)); 1216 1217 while(connection1.getDispatchQueue().poll(0, TimeUnit.MILLISECONDS)!=null){ 1219 } 1220 1221 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1, destination); 1223 consumerInfo2.setPrefetchSize(100); 1224 connection1.send(consumerInfo2); 1225 1226 for( int i=0; i < 4 ; i++ ) { 1228 Message m1 = receiveMessage(connection1); 1229 assertNotNull(m1); 1230 assertTrue(m1.isRedelivered()); 1231 } 1232 } 1233 1234 public void initCombosForTestQueueBrowseMessages() { 1235 addCombinationValues( "deliveryMode", new Object []{ 1236 new Integer (DeliveryMode.NON_PERSISTENT), 1237 new Integer (DeliveryMode.PERSISTENT)} ); 1238 addCombinationValues( "destinationType", new Object []{ 1239 new Byte (ActiveMQDestination.QUEUE_TYPE), 1240 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 1241 } ); 1242 } 1243 public void testQueueBrowseMessages() throws Exception { 1244 1245 StubConnection connection = createConnection(); 1247 ConnectionInfo connectionInfo = createConnectionInfo(); 1248 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 1249 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 1250 connection.send(connectionInfo); 1251 connection.send(sessionInfo); 1252 connection.send(producerInfo); 1253 1254 destination = createDestinationInfo(connection, connectionInfo, destinationType); 1255 1256 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1257 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1258 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1259 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1260 1261 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 1263 consumerInfo.setBrowser(true); 1264 connection.send(consumerInfo); 1265 1266 for( int i=0; i < 4; i++ ) { 1267 Message m = receiveMessage(connection); 1268 assertNotNull(m); 1269 connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE)); 1270 } 1271 1272 assertNoMessagesLeft(connection); 1273 } 1274 1275 public void initCombosForTestQueuBrowserWith2Consumers() { 1276 addCombinationValues( "deliveryMode", new Object []{ 1277 new Integer (DeliveryMode.NON_PERSISTENT), 1278 new Integer (DeliveryMode.PERSISTENT)} ); 1279 } 1280 1281 public void testQueueBrowserWith2Consumers() throws Exception { 1282 1283 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 1284 1285 StubConnection connection1 = createConnection(); 1287 ConnectionInfo connectionInfo1 = createConnectionInfo(); 1288 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 1289 ProducerInfo producerInfo = createProducerInfo(sessionInfo1); 1290 connection1.send(connectionInfo1); 1291 connection1.send(sessionInfo1); 1292 connection1.send(producerInfo); 1293 1294 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 1295 consumerInfo1.setPrefetchSize(1); 1296 connection1.request(consumerInfo1); 1297 1298 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1300 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1301 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1302 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1303 1304 StubConnection connection2 = createConnection(); 1306 ConnectionInfo connectionInfo2 = createConnectionInfo(); 1307 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 1308 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 1309 consumerInfo2.setPrefetchSize(1); 1310 consumerInfo2.setBrowser(true); 1311 connection2.send(connectionInfo2); 1312 connection2.send(sessionInfo2); 1313 connection2.request(consumerInfo2); 1314 1315 for( int i=0; i < 4; i++ ) { 1316 Message m1 = receiveMessage(connection1); 1317 Message m2 = receiveMessage(connection2); 1318 assertNotNull("m1 is null for index: " + i, m1); 1319 assertNotNull("m2 is null for index: " + i, m2); 1320 assertEquals(m1.getMessageId(), m2.getMessageId()); 1321 connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 1322 connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE)); 1323 } 1324 1325 assertNoMessagesLeft(connection1); 1326 assertNoMessagesLeft(connection2); 1327 } 1328 1329 public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() { 1330 addCombinationValues( "deliveryMode", new Object []{ 1331 new Integer (DeliveryMode.NON_PERSISTENT), 1332 new Integer (DeliveryMode.PERSISTENT)} ); 1333 } 1334 public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception { 1335 1336 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 1337 1338 StubConnection connection1 = createConnection(); 1340 ConnectionInfo connectionInfo1 = createConnectionInfo(); 1341 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 1342 ProducerInfo producerInfo = createProducerInfo(sessionInfo1); 1343 connection1.send(connectionInfo1); 1344 connection1.send(sessionInfo1); 1345 connection1.send(producerInfo); 1346 1347 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 1348 consumerInfo1.setPrefetchSize(1); 1349 connection1.send(consumerInfo1); 1350 1351 StubConnection connection2 = createConnection(); 1353 ConnectionInfo connectionInfo2 = createConnectionInfo(); 1354 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 1355 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 1356 consumerInfo2.setPrefetchSize(1); 1357 connection2.send(connectionInfo2); 1358 connection2.send(sessionInfo2); 1359 connection2.send(consumerInfo2); 1360 1361 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1363 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1364 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1365 connection1.send(createMessage(producerInfo, destination, deliveryMode)); 1366 1367 for( int i=0; i < 2 ; i++ ) { 1368 Message m1 = receiveMessage(connection1); 1369 Message m2 = receiveMessage(connection2); 1370 1371 assertNotNull("m1 is null for index: " + i, m1); 1372 assertNotNull("m2 is null for index: " + i, m2); 1373 1374 assertNotSame(m1.getMessageId(), m2.getMessageId()); 1375 connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); 1376 connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE)); 1377 } 1378 1379 assertNoMessagesLeft(connection1); 1380 assertNoMessagesLeft(connection2); 1381 } 1382 1383 public void initCombosForTestQueueSendThenAddConsumer() { 1384 addCombinationValues( "deliveryMode", new Object []{ 1385 new Integer (DeliveryMode.NON_PERSISTENT), 1386 new Integer (DeliveryMode.PERSISTENT)} ); 1387 addCombinationValues( "destinationType", new Object []{ 1388 new Byte (ActiveMQDestination.QUEUE_TYPE), 1389 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 1390 } ); 1391 } 1392 public void testQueueSendThenAddConsumer() throws Exception { 1393 1394 StubConnection connection = createConnection(); 1396 ConnectionInfo connectionInfo = createConnectionInfo(); 1397 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 1398 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 1399 connection.send(connectionInfo); 1400 connection.send(sessionInfo); 1401 connection.send(producerInfo); 1402 1403 destination = createDestinationInfo(connection, connectionInfo, destinationType); 1404 1405 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1407 1408 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 1410 connection.send(consumerInfo); 1411 1412 Message m = receiveMessage(connection); 1414 assertNotNull(m); 1415 1416 } 1417 1418 public void initCombosForTestQueueAckRemovesMessage() { 1419 addCombinationValues( "deliveryMode", new Object []{ 1420 new Integer (DeliveryMode.NON_PERSISTENT), 1421 new Integer (DeliveryMode.PERSISTENT)} ); 1422 addCombinationValues( "destinationType", new Object []{ 1423 new Byte (ActiveMQDestination.QUEUE_TYPE), 1424 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 1425 } ); 1426 } 1427 1428 public void testQueueAckRemovesMessage() throws Exception { 1429 1430 StubConnection connection = createConnection(); 1432 ConnectionInfo connectionInfo = createConnectionInfo(); 1433 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 1434 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 1435 connection.send(connectionInfo); 1436 connection.send(sessionInfo); 1437 connection.send(producerInfo); 1438 1439 destination = createDestinationInfo(connection, connectionInfo, destinationType); 1440 1441 Message message1 = createMessage(producerInfo, destination, deliveryMode); 1442 Message message2 = createMessage(producerInfo, destination, deliveryMode); 1443 connection.send(message1); 1444 connection.send(message2); 1445 1446 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 1448 connection.request(consumerInfo); 1449 Message m = receiveMessage(connection); 1450 assertNotNull(m); assertEquals(m.getMessageId(), message1.getMessageId()); 1451 1452 assertTrue(countMessagesInQueue(connection, connectionInfo, destination)==2); 1453 connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE)); 1454 assertTrue(countMessagesInQueue(connection, connectionInfo, destination)==2); 1455 connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); 1456 assertTrue(countMessagesInQueue(connection, connectionInfo, destination)==1); 1457 1458 } 1459 1460 public void initCombosForTestSelectorSkipsMessages() { 1461 addCombinationValues( "destination", new Object []{ 1462 new ActiveMQTopic("TEST_TOPIC"), 1463 new ActiveMQQueue("TEST_QUEUE")} ); 1464 addCombinationValues( "destinationType", new Object []{ 1465 new Byte (ActiveMQDestination.QUEUE_TYPE), 1466 new Byte (ActiveMQDestination.TOPIC_TYPE), 1467 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 1468 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE)} ); 1469 } 1470 1471 public void testSelectorSkipsMessages() throws Exception { 1472 1473 StubConnection connection = createConnection(); 1475 ConnectionInfo connectionInfo = createConnectionInfo(); 1476 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 1477 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 1478 connection.send(connectionInfo); 1479 connection.send(sessionInfo); 1480 connection.send(producerInfo); 1481 1482 destination = createDestinationInfo(connection, connectionInfo, destinationType); 1483 1484 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 1485 consumerInfo.setSelector("JMSType='last'"); 1486 connection.send(consumerInfo); 1487 1488 Message message1 = createMessage(producerInfo, destination, deliveryMode); 1489 message1.setType("first"); 1490 Message message2 = createMessage(producerInfo, destination, deliveryMode); 1491 message2.setType("last"); 1492 connection.send(message1); 1493 connection.send(message2); 1494 1495 Message m = receiveMessage(connection); 1497 assertNotNull(m); assertEquals(m.getMessageId(), message2.getMessageId()); 1498 connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); 1499 connection.send(closeConsumerInfo(consumerInfo)); 1500 1501 assertNoMessagesLeft(connection); 1502 } 1503 1504 public void initCombosForTestAddConsumerThenSend() { 1505 addCombinationValues( "deliveryMode", new Object []{ 1506 new Integer (DeliveryMode.NON_PERSISTENT), 1507 new Integer (DeliveryMode.PERSISTENT)} ); 1508 addCombinationValues( "destinationType", new Object []{ 1509 new Byte (ActiveMQDestination.QUEUE_TYPE), 1510 new Byte (ActiveMQDestination.TOPIC_TYPE), 1511 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 1512 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE)} ); 1513 } 1514 1515 public void testAddConsumerThenSend() throws Exception { 1516 1517 StubConnection connection = createConnection(); 1519 ConnectionInfo connectionInfo = createConnectionInfo(); 1520 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 1521 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 1522 connection.send(connectionInfo); 1523 connection.send(sessionInfo); 1524 connection.send(producerInfo); 1525 1526 destination = createDestinationInfo(connection, connectionInfo, destinationType); 1527 1528 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 1529 connection.send(consumerInfo); 1530 1531 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1532 1533 Message m = receiveMessage(connection); 1535 assertNotNull(m); 1536 } 1537 1538 public void initCombosForTestConsumerPrefetchAtOne() { 1539 addCombinationValues( "deliveryMode", new Object []{ 1540 new Integer (DeliveryMode.NON_PERSISTENT), 1541 new Integer (DeliveryMode.PERSISTENT)} ); 1542 addCombinationValues( "destinationType", new Object []{ 1543 new Byte (ActiveMQDestination.QUEUE_TYPE), 1544 new Byte (ActiveMQDestination.TOPIC_TYPE), 1545 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 1546 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE)} ); 1547 } 1548 1549 public void testConsumerPrefetchAtOne() throws Exception { 1550 1551 StubConnection connection = createConnection(); 1553 ConnectionInfo connectionInfo = createConnectionInfo(); 1554 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 1555 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 1556 connection.send(connectionInfo); 1557 connection.send(sessionInfo); 1558 connection.send(producerInfo); 1559 1560 destination = createDestinationInfo(connection, connectionInfo, destinationType); 1561 1562 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 1563 consumerInfo.setPrefetchSize(1); 1564 connection.send(consumerInfo); 1565 1566 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1568 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1569 1570 Message m = receiveMessage(connection); 1572 assertNotNull(m); 1573 assertNoMessagesLeft(connection); 1574 1575 } 1576 1577 public void initCombosForTestConsumerPrefetchAtTwo() { 1578 addCombinationValues( "deliveryMode", new Object []{ 1579 new Integer (DeliveryMode.NON_PERSISTENT), 1580 new Integer (DeliveryMode.PERSISTENT)} ); 1581 addCombinationValues( "destinationType", new Object []{ 1582 new Byte (ActiveMQDestination.QUEUE_TYPE), 1583 new Byte (ActiveMQDestination.TOPIC_TYPE), 1584 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 1585 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE)} ); 1586 } 1587 1588 public void testConsumerPrefetchAtTwo() throws Exception { 1589 1590 StubConnection connection = createConnection(); 1592 ConnectionInfo connectionInfo = createConnectionInfo(); 1593 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 1594 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 1595 connection.send(connectionInfo); 1596 connection.send(sessionInfo); 1597 connection.send(producerInfo); 1598 1599 destination = createDestinationInfo(connection, connectionInfo, destinationType); 1600 1601 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 1602 consumerInfo.setPrefetchSize(2); 1603 connection.send(consumerInfo); 1604 1605 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1607 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1608 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1609 1610 Message m = receiveMessage(connection); 1612 assertNotNull(m); 1613 m = receiveMessage(connection); 1614 assertNotNull(m); 1615 assertNoMessagesLeft(connection); 1616 1617 } 1618 1619 public void initCombosForTestConsumerPrefetchAndDeliveredAck() { 1620 addCombinationValues( "deliveryMode", new Object []{ 1621 new Integer (DeliveryMode.NON_PERSISTENT), 1622 new Integer (DeliveryMode.PERSISTENT)} ); 1623 addCombinationValues( "destinationType", new Object []{ 1624 new Byte (ActiveMQDestination.QUEUE_TYPE), 1625 new Byte (ActiveMQDestination.TOPIC_TYPE), 1626 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 1627 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE)} ); 1628 } 1629 1630 public void testConsumerPrefetchAndDeliveredAck() throws Exception { 1631 1632 StubConnection connection = createConnection(); 1634 ConnectionInfo connectionInfo = createConnectionInfo(); 1635 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 1636 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 1637 connection.send(connectionInfo); 1638 connection.send(sessionInfo); 1639 connection.send(producerInfo); 1640 1641 destination = createDestinationInfo(connection, connectionInfo, destinationType); 1642 1643 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 1644 consumerInfo.setPrefetchSize(1); 1645 connection.send(consumerInfo); 1646 1647 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1649 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1650 connection.send(createMessage(producerInfo, destination, deliveryMode)); 1651 1652 Message m1 = receiveMessage(connection); 1654 assertNotNull(m1); 1655 1656 assertNoMessagesLeft(connection); 1657 1658 connection.send(createAck(consumerInfo, m1, 1, MessageAck.DELIVERED_ACK_TYPE)); 1660 1661 Message m2 = receiveMessage(connection); 1662 assertNotNull(m2); 1663 connection.send(createAck(consumerInfo, m2, 1, MessageAck.DELIVERED_ACK_TYPE)); 1664 1665 Message m3 = receiveMessage(connection); 1666 assertNotNull(m3); 1667 connection.send(createAck(consumerInfo, m3, 1, MessageAck.DELIVERED_ACK_TYPE)); 1668 } 1669 1670 public static Test suite() { 1671 return suite(BrokerTest.class); 1672 } 1673 1674 public static void main(String [] args) { 1675 junit.textui.TestRunner.run(suite()); 1676 } 1677 1678} 1679 | Popular Tags |