KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > BrokerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import javax.jms.DeliveryMode JavaDoc;
21 import javax.jms.JMSException JavaDoc;
22
23 import junit.framework.Test;
24
25 import org.apache.activemq.command.ActiveMQDestination;
26 import org.apache.activemq.command.ActiveMQQueue;
27 import org.apache.activemq.command.ActiveMQTopic;
28 import org.apache.activemq.command.ConnectionInfo;
29 import org.apache.activemq.command.ConsumerInfo;
30 import org.apache.activemq.command.DestinationInfo;
31 import org.apache.activemq.command.LocalTransactionId;
32 import org.apache.activemq.command.Message;
33 import org.apache.activemq.command.MessageAck;
34 import org.apache.activemq.command.ProducerInfo;
35 import org.apache.activemq.command.SessionInfo;
36
37 import java.util.concurrent.TimeUnit JavaDoc;
38
39 public class BrokerTest extends BrokerTestSupport {
40     
41     public ActiveMQDestination destination;
42     public int deliveryMode;
43     public int prefetch;
44     public byte destinationType;
45     public boolean durableConsumer;
46     
47     public void initCombosForTestConsumerPrefetchAndStandardAck() {
48         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
49 // new Integer(DeliveryMode.NON_PERSISTENT),
50
new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
51         addCombinationValues( "destinationType", new Object JavaDoc[]{
52                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
53                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
54                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
55                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)
56                 } );
57     }
58     
59     public void testConsumerPrefetchAndStandardAck() throws Exception JavaDoc {
60         
61         // Start a producer and consumer
62
StubConnection connection = createConnection();
63         ConnectionInfo connectionInfo = createConnectionInfo();
64         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
65         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
66         connection.send(connectionInfo);
67         connection.send(sessionInfo);
68         connection.send(producerInfo);
69
70         destination = createDestinationInfo(connection, connectionInfo, destinationType);
71         
72         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
73         consumerInfo.setPrefetchSize(1);
74         connection.send(consumerInfo);
75         
76         // Send 3 messages to the broker.
77
connection.send(createMessage(producerInfo, destination, deliveryMode));
78         connection.send(createMessage(producerInfo, destination, deliveryMode));
79         connection.send(createMessage(producerInfo, destination, deliveryMode));
80         
81         // Make sure only 1 message was delivered.
82
Message m1 = receiveMessage(connection);
83         assertNotNull(m1);
84         assertNoMessagesLeft(connection);
85         
86         // Acknowledge the first message. This should cause the next message to get dispatched.
87
connection.send(createAck(consumerInfo, m1, 1, MessageAck.STANDARD_ACK_TYPE));
88         
89         Message m2 = receiveMessage(connection);
90         assertNotNull(m2);
91         connection.send(createAck(consumerInfo, m2, 1, MessageAck.STANDARD_ACK_TYPE));
92         
93         Message m3 = receiveMessage(connection);
94         assertNotNull(m3);
95         connection.send(createAck(consumerInfo, m3, 1, MessageAck.STANDARD_ACK_TYPE));
96         
97         connection.send(closeConnectionInfo(connectionInfo));
98     }
99
100
101     public void initCombosForTestTransactedAckWithPrefetchOfOne() {
102         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
103                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
104                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
105         addCombinationValues( "destinationType", new Object JavaDoc[]{
106                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
107                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
108                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
109                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)
110                 } );
111     }
112
113     public void testTransactedAckWithPrefetchOfOne() throws Exception JavaDoc {
114         
115         // Setup a first connection
116
StubConnection connection1 = createConnection();
117         ConnectionInfo connectionInfo1 = createConnectionInfo();
118         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
119         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
120         connection1.send(connectionInfo1);
121         connection1.send(sessionInfo1);
122         connection1.send(producerInfo1);
123
124         destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
125
126         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
127         consumerInfo1.setPrefetchSize(1);
128         connection1.send(consumerInfo1);
129
130         // Send the messages
131
for( int i=0; i < 4 ; i++ ) {
132             Message message = createMessage(producerInfo1, destination, deliveryMode);
133             connection1.send(message);
134         }
135
136         // Begin the transaction.
137
LocalTransactionId txid = createLocalTransaction(sessionInfo1);
138         connection1.send(createBeginTransaction(connectionInfo1, txid));
139         
140         // Now get the messages.
141
for( int i=0; i < 4 ; i++ ) {
142             Message m1 = receiveMessage(connection1);
143             assertNotNull(m1);
144             MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
145             ack.setTransactionId(txid);
146             connection1.send(ack);
147         }
148         
149         // Commit the transaction.
150
connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
151         
152         assertNoMessagesLeft(connection1);
153     }
154
155     public void initCombosForTestTransactedSend() {
156         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
157                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
158                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
159         addCombinationValues( "destinationType", new Object JavaDoc[]{
160                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
161                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
162                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
163                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
164     }
165     public void testTransactedSend() throws Exception JavaDoc {
166         
167         // Setup a first connection
168
StubConnection connection1 = createConnection();
169         ConnectionInfo connectionInfo1 = createConnectionInfo();
170         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
171         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
172         connection1.send(connectionInfo1);
173         connection1.send(sessionInfo1);
174         connection1.send(producerInfo1);
175         
176         destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
177         
178         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
179         consumerInfo1.setPrefetchSize(100);
180         connection1.send(consumerInfo1);
181
182         // Begin the transaction.
183
LocalTransactionId txid = createLocalTransaction(sessionInfo1);
184         connection1.send(createBeginTransaction(connectionInfo1, txid));
185         
186         // Send the messages
187
for( int i=0; i < 4 ; i++ ) {
188             Message message = createMessage(producerInfo1, destination, deliveryMode);
189             message.setTransactionId(txid);
190             connection1.send(message);
191         }
192         
193         // The point of this test is that message should not be delivered until
194
// send is committed.
195
assertNull(receiveMessage(connection1));
196
197         // Commit the transaction.
198
connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
199         
200         // Now get the messages.
201
for( int i=0; i < 4 ; i++ ) {
202             Message m1 = receiveMessage(connection1);
203             assertNotNull(m1);
204         }
205         
206         assertNoMessagesLeft(connection1);
207     }
208
209     public void initCombosForTestQueueTransactedAck() {
210         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
211                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
212                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
213         addCombinationValues( "destinationType", new Object JavaDoc[]{
214                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
215                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
216         } );
217     }
218     
219     public void testQueueTransactedAck() throws Exception JavaDoc {
220         
221         // Setup a first connection
222
StubConnection connection1 = createConnection();
223         ConnectionInfo connectionInfo1 = createConnectionInfo();
224         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
225         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
226         connection1.send(connectionInfo1);
227         connection1.send(sessionInfo1);
228         connection1.send(producerInfo1);
229         
230         destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
231
232         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
233         consumerInfo1.setPrefetchSize(100);
234         connection1.send(consumerInfo1);
235
236         // Send the messages
237
for( int i=0; i < 4 ; i++ ) {
238             Message message = createMessage(producerInfo1, destination, deliveryMode);
239             connection1.send(message);
240         }
241
242         // Begin the transaction.
243
LocalTransactionId txid = createLocalTransaction(sessionInfo1);
244         connection1.send(createBeginTransaction(connectionInfo1, txid));
245         
246         // Acknowledge the first 2 messages.
247
for( int i=0; i < 2 ; i++ ) {
248             Message m1 = receiveMessage(connection1);
249             assertNotNull("m1 is null for index: " + i, m1);
250             MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
251             ack.setTransactionId(txid);
252             connection1.request(ack);
253         }
254             
255         // Commit the transaction.
256
connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
257         
258         // The queue should now only have the remaining 2 messages
259
assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
260     }
261
262     public void initCombosForTestConsumerCloseCausesRedelivery() {
263         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
264                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
265                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
266         addCombinationValues( "destination", new Object JavaDoc[]{
267                 new ActiveMQQueue("TEST")} );
268     }
269     
270     public void testConsumerCloseCausesRedelivery() throws Exception JavaDoc {
271         
272         // Setup a first connection
273
StubConnection connection1 = createConnection();
274         ConnectionInfo connectionInfo1 = createConnectionInfo();
275         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
276         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
277         connection1.send(connectionInfo1);
278         connection1.send(sessionInfo1);
279         connection1.send(producerInfo1);
280         
281         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
282         consumerInfo1.setPrefetchSize(100);
283         connection1.request(consumerInfo1);
284
285         // Send the messages
286
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
287         connection1.send(createMessage(producerInfo1, destination, deliveryMode));
288         connection1.send(createMessage(producerInfo1, destination, deliveryMode));
289         connection1.send(createMessage(producerInfo1, destination, deliveryMode));
290
291         // Receive the messages.
292
for( int i=0; i < 4 ; i++ ) {
293             Message m1 = receiveMessage(connection1);
294             assertNotNull("m1 is null for index: " + i, m1);
295             assertFalse(m1.isRedelivered());
296         }
297         
298         // Close the consumer without acking.. this should cause re-delivery of the messages.
299
connection1.send(consumerInfo1.createRemoveCommand());
300         
301         // Create another consumer that should get the messages again.
302
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1, destination);
303         consumerInfo2.setPrefetchSize(100);
304         connection1.request(consumerInfo2);
305         
306         // Receive the messages.
307
for( int i=0; i < 4 ; i++ ) {
308             Message m1 = receiveMessage(connection1);
309             assertNotNull("m1 is null for index: " + i, m1);
310             assertTrue(m1.isRedelivered());
311         }
312         assertNoMessagesLeft(connection1);
313         
314     }
315
316     public void testTopicDurableSubscriptionCanBeRestored() throws Exception JavaDoc {
317         
318         ActiveMQDestination destination = new ActiveMQTopic("TEST");
319         
320         // Setup a first connection
321
StubConnection connection1 = createConnection();
322         ConnectionInfo connectionInfo1 = createConnectionInfo();
323         connectionInfo1.setClientId("clientid1");
324         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
325         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
326         connection1.send(connectionInfo1);
327         connection1.send(sessionInfo1);
328         connection1.send(producerInfo1);
329         
330         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
331         consumerInfo1.setPrefetchSize(100);
332         consumerInfo1.setSubscriptionName("test");
333         connection1.send(consumerInfo1);
334
335         // Send the messages
336
connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
337         connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
338         connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
339         connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT));
340
341         // Get the messages
342
Message m=null;
343         for( int i=0; i < 2 ; i++ ) {
344             m = receiveMessage(connection1);
345             assertNotNull(m);
346         }
347         // Ack the last message.
348
connection1.send(createAck(consumerInfo1, m, 2, MessageAck.STANDARD_ACK_TYPE));
349         // Close the connection.
350
connection1.request(closeConnectionInfo(connectionInfo1));
351         connection1.stop();
352         
353         // Setup a second connection
354
StubConnection connection2 = createConnection();
355         ConnectionInfo connectionInfo2 = createConnectionInfo();
356         connectionInfo2.setClientId("clientid1");
357         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
358         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
359         consumerInfo2.setPrefetchSize(100);
360         consumerInfo2.setSubscriptionName("test");
361         
362         connection2.send(connectionInfo2);
363         connection2.send(sessionInfo2);
364         connection2.send(consumerInfo2);
365
366         // Get the rest of the messages
367
for( int i=0; i < 2 ; i++ ) {
368             Message m1 = receiveMessage(connection2);
369             assertNotNull("m1 is null for index: " + i, m1);
370         }
371         assertNoMessagesLeft(connection2);
372     }
373
374
375     public void initCombosForTestGroupedMessagesDeliveredToOnlyOneConsumer() {
376         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
377                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
378                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
379     }
380     public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception JavaDoc {
381         
382         ActiveMQDestination destination = new ActiveMQQueue("TEST");
383         
384         // Setup a first connection
385
StubConnection connection1 = createConnection();
386         ConnectionInfo connectionInfo1 = createConnectionInfo();
387         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
388         ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
389         connection1.send(connectionInfo1);
390         connection1.send(sessionInfo1);
391         connection1.send(producerInfo);
392
393         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
394         consumerInfo1.setPrefetchSize(1);
395         connection1.send(consumerInfo1);
396         
397         // Send the messages.
398
for( int i=0; i < 4 ; i++ ) {
399             Message message = createMessage(producerInfo, destination, deliveryMode);
400             message.setGroupID("TEST-GROUP");
401             message.setGroupSequence(i+1);
402             connection1.request(message);
403         }
404
405         // Setup a second connection
406
StubConnection connection2 = createConnection();
407         ConnectionInfo connectionInfo2 = createConnectionInfo();
408         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
409         connection2.send(connectionInfo2);
410         connection2.send(sessionInfo2);
411
412         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
413         consumerInfo2.setPrefetchSize(1);
414         connection2.send(consumerInfo2);
415         
416         // All the messages should have been sent down connection 1.. just get the first 3
417
for( int i=0; i < 3 ; i++ ) {
418             Message m1 = receiveMessage(connection1);
419             assertNotNull("m1 is null for index: " + i, m1);
420             connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
421         }
422         
423         // Close the first consumer.
424
connection1.send(closeConsumerInfo(consumerInfo1));
425
426         // The last messages should now go the the second consumer.
427
for( int i=0; i < 1 ; i++ ) {
428             Message m1 = receiveMessage(connection2);
429             assertNotNull("m1 is null for index: " + i, m1);
430             connection2.send(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE));
431         }
432         
433         assertNoMessagesLeft(connection2);
434     }
435
436     public void initCombosForTestTopicConsumerOnlySeeMessagesAfterCreation() {
437         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
438                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
439                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
440         addCombinationValues( "durableConsumer", new Object JavaDoc[]{
441                 Boolean.TRUE,
442                 Boolean.FALSE});
443     }
444     
445     public void testTopicConsumerOnlySeeMessagesAfterCreation() throws Exception JavaDoc {
446         
447         ActiveMQDestination destination = new ActiveMQTopic("TEST");
448         
449         // Setup a first connection
450
StubConnection connection1 = createConnection();
451         ConnectionInfo connectionInfo1 = createConnectionInfo();
452         connectionInfo1.setClientId("A");
453         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
454         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
455         connection1.send(connectionInfo1);
456         connection1.send(sessionInfo1);
457         connection1.send(producerInfo1);
458
459         // Send the 1st message
460
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
461         
462         // Create the durable subscription.
463
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
464         if( durableConsumer ) {
465             consumerInfo1.setSubscriptionName("test");
466         }
467         consumerInfo1.setPrefetchSize(100);
468         connection1.send(consumerInfo1);
469         
470         Message m = createMessage(producerInfo1, destination, deliveryMode);
471         connection1.send(m);
472         connection1.send(createMessage(producerInfo1, destination, deliveryMode));
473         
474         // Subscription should skip over the first message
475
Message m2 = receiveMessage(connection1);
476         assertNotNull(m2);
477         assertEquals(m.getMessageId(), m2.getMessageId());
478         m2 = receiveMessage(connection1);
479         assertNotNull(m2);
480         
481         assertNoMessagesLeft(connection1);
482     }
483     
484     public void initCombosForTestTopicRetroactiveConsumerSeeMessagesBeforeCreation() {
485         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
486                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
487                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
488         addCombinationValues( "durableConsumer", new Object JavaDoc[]{
489                 Boolean.TRUE,
490                 Boolean.FALSE});
491     }
492     
493     public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception JavaDoc {
494                 
495         ActiveMQDestination destination = new ActiveMQTopic("TEST");
496         
497         // Setup a first connection
498
StubConnection connection1 = createConnection();
499         ConnectionInfo connectionInfo1 = createConnectionInfo();
500         connectionInfo1.setClientId("A");
501         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
502         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
503         connection1.send(connectionInfo1);
504         connection1.send(sessionInfo1);
505         connection1.send(producerInfo1);
506
507         // Send the messages
508
Message m = createMessage(producerInfo1, destination, deliveryMode);
509         connection1.send(m);
510         
511         // Create the durable subscription.
512
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
513         if( durableConsumer ) {
514             consumerInfo1.setSubscriptionName("test");
515         }
516         consumerInfo1.setPrefetchSize(100);
517         consumerInfo1.setRetroactive(true);
518         connection1.send(consumerInfo1);
519         
520         connection1.send(createMessage(producerInfo1, destination, deliveryMode));
521         connection1.send(createMessage(producerInfo1, destination, deliveryMode));
522
523         // the behavior is VERY dependent on the recovery policy used.
524
// But the default broker settings try to make it as consistent as possible
525

526         // Subscription should see all messages sent.
527
Message m2 = receiveMessage(connection1);
528         assertNotNull(m2);
529         assertEquals(m.getMessageId(), m2.getMessageId());
530         for( int i=0; i < 2 ; i++ ) {
531             m2 = receiveMessage(connection1);
532             assertNotNull(m2);
533         }
534             
535         assertNoMessagesLeft(connection1);
536     }
537
538     
539 //
540
// TODO: need to reimplement this since we don't fail when we send to a non-existant
541
// destination. But if we can access the Region directly then we should be able to
542
// check that if the destination was removed.
543
//
544
// public void initCombosForTestTempDestinationsRemovedOnConnectionClose() {
545
// addCombinationValues( "deliveryMode", new Object[]{
546
// new Integer(DeliveryMode.NON_PERSISTENT),
547
// new Integer(DeliveryMode.PERSISTENT)} );
548
// addCombinationValues( "destinationType", new Object[]{
549
// new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
550
// new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
551
// }
552
//
553
// public void testTempDestinationsRemovedOnConnectionClose() throws Exception {
554
//
555
// // Setup a first connection
556
// StubConnection connection1 = createConnection();
557
// ConnectionInfo connectionInfo1 = createConnectionInfo();
558
// SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
559
// ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
560
// connection1.send(connectionInfo1);
561
// connection1.send(sessionInfo1);
562
// connection1.send(producerInfo1);
563
//
564
// destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
565
//
566
// StubConnection connection2 = createConnection();
567
// ConnectionInfo connectionInfo2 = createConnectionInfo();
568
// SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
569
// ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
570
// connection2.send(connectionInfo2);
571
// connection2.send(sessionInfo2);
572
// connection2.send(producerInfo2);
573
//
574
// // Send from connection2 to connection1's temp destination. Should succeed.
575
// connection2.send(createMessage(producerInfo2, destination, deliveryMode));
576
//
577
// // Close connection 1
578
// connection1.request(closeConnectionInfo(connectionInfo1));
579
//
580
// try {
581
// // Send from connection2 to connection1's temp destination. Should not succeed.
582
// connection2.request(createMessage(producerInfo2, destination, deliveryMode));
583
// fail("Expected JMSException.");
584
// } catch ( JMSException success ) {
585
// }
586
//
587
// }
588

589     
590 // public void initCombosForTestTempDestinationsAreNotAutoCreated() {
591
// addCombinationValues( "deliveryMode", new Object[]{
592
// new Integer(DeliveryMode.NON_PERSISTENT),
593
// new Integer(DeliveryMode.PERSISTENT)} );
594
// addCombinationValues( "destinationType", new Object[]{
595
// new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
596
// new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
597
// }
598
//
599
//
600

601     
602 // We create temp destination on demand now so this test case is no longer
603
// valid.
604
//
605
// public void testTempDestinationsAreNotAutoCreated() throws Exception {
606
//
607
// // Setup a first connection
608
// StubConnection connection1 = createConnection();
609
// ConnectionInfo connectionInfo1 = createConnectionInfo();
610
// SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
611
// ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
612
// connection1.send(connectionInfo1);
613
// connection1.send(sessionInfo1);
614
// connection1.send(producerInfo1);
615
//
616
// destination = ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1", destinationType);
617
//
618
// // Should not be able to send to a non-existant temp destination.
619
// try {
620
// connection1.request(createMessage(producerInfo1, destination, deliveryMode));
621
// fail("Expected JMSException.");
622
// } catch ( JMSException success ) {
623
// }
624
//
625
// }
626

627
628
629     
630     public void initCombosForTestTempDestinationsOnlyAllowsLocalConsumers() {
631         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
632                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
633                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
634         addCombinationValues( "destinationType", new Object JavaDoc[]{
635                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
636                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
637     }
638     
639     public void testTempDestinationsOnlyAllowsLocalConsumers() throws Exception JavaDoc {
640         
641         // Setup a first connection
642
StubConnection connection1 = createConnection();
643         ConnectionInfo connectionInfo1 = createConnectionInfo();
644         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
645         connection1.send(connectionInfo1);
646         connection1.send(sessionInfo1);
647
648         DestinationInfo destinationInfo = createTempDestinationInfo(connectionInfo1, destinationType);
649         connection1.request(destinationInfo);
650         destination = destinationInfo.getDestination();
651
652         // Setup a second connection
653
StubConnection connection2 = createConnection();
654         ConnectionInfo connectionInfo2 = createConnectionInfo();
655         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
656         connection2.send(connectionInfo2);
657         connection2.send(sessionInfo2);
658         
659         // Only consumers local to the temp destination should be allowed to subscribe.
660
try {
661             ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
662             connection2.request(consumerInfo2);
663             fail("Expected JMSException.");
664         } catch ( JMSException JavaDoc success ) {
665         }
666         
667         // This should succeed since it's local.
668
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
669         connection1.send(consumerInfo1);
670     }
671
672     public void initCombosForTestExclusiveQueueDeliversToOnlyOneConsumer() {
673         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
674                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
675                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
676     }
677     public void testExclusiveQueueDeliversToOnlyOneConsumer() throws Exception JavaDoc {
678         
679         ActiveMQDestination destination = new ActiveMQQueue("TEST");
680         
681         // Setup a first connection
682
StubConnection connection1 = createConnection();
683         ConnectionInfo connectionInfo1 = createConnectionInfo();
684         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
685         ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
686         connection1.send(connectionInfo1);
687         connection1.send(sessionInfo1);
688         connection1.send(producerInfo);
689
690         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
691         consumerInfo1.setPrefetchSize(1);
692         consumerInfo1.setExclusive(true);
693         connection1.send(consumerInfo1);
694         
695         // Send a message.. this should make consumer 1 the exclusive owner.
696
connection1.request(createMessage(producerInfo, destination, deliveryMode));
697
698         // Setup a second connection
699
StubConnection connection2 = createConnection();
700         ConnectionInfo connectionInfo2 = createConnectionInfo();
701         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
702         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
703         consumerInfo2.setPrefetchSize(1);
704         consumerInfo2.setExclusive(true);
705         connection2.send(connectionInfo2);
706         connection2.send(sessionInfo2);
707         connection2.request(consumerInfo2);
708         
709         // Second message should go to consumer 1 even though consumer 2 is ready
710
// for dispatch.
711
connection1.send(createMessage(producerInfo, destination, deliveryMode));
712         connection1.send(createMessage(producerInfo, destination, deliveryMode));
713
714         // Acknowledge the first 2 messages
715
for( int i=0; i < 2 ; i++ ) {
716             Message m1 = receiveMessage(connection1);
717             assertNotNull(m1);
718             connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
719         }
720         
721         // Close the first consumer.
722
connection1.send(closeConsumerInfo(consumerInfo1));
723
724         // The last two messages should now go the the second consumer.
725
connection1.send(createMessage(producerInfo, destination, deliveryMode));
726         
727         for( int i=0; i < 2 ; i++ ) {
728             Message m1 = receiveMessage(connection2);
729             assertNotNull(m1);
730             connection2.send(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE));
731         }
732         
733         assertNoMessagesLeft(connection2);
734     }
735
736     public void initCombosForTestWildcardConsume() {
737         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
738                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
739                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
740         addCombinationValues( "destinationType", new Object JavaDoc[]{
741                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
742                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE)} );
743     }
744     
745     public void testWildcardConsume() throws Exception JavaDoc {
746         
747         // Setup a first connection
748
StubConnection connection1 = createConnection();
749         ConnectionInfo connectionInfo1 = createConnectionInfo();
750         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
751         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
752         connection1.send(connectionInfo1);
753         connection1.send(sessionInfo1);
754         connection1.send(producerInfo1);
755         
756         // setup the wildcard consumer.
757
ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("WILD.*.TEST", destinationType);
758         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination);
759         consumerInfo1.setPrefetchSize(100);
760         connection1.send(consumerInfo1);
761
762         // These two message should NOT match the wild card.
763
connection1.send(createMessage(producerInfo1, ActiveMQDestination.createDestination("WILD.CARD", destinationType), deliveryMode));
764         connection1.send(createMessage(producerInfo1, ActiveMQDestination.createDestination("WILD.TEST", destinationType), deliveryMode));
765
766         // These two message should match the wild card.
767
ActiveMQDestination d1 = ActiveMQDestination.createDestination("WILD.CARD.TEST", destinationType);
768         connection1.send(createMessage(producerInfo1, d1, deliveryMode));
769         ActiveMQDestination d2 = ActiveMQDestination.createDestination("WILD.FOO.TEST", destinationType);
770         connection1.send(createMessage(producerInfo1, d2, deliveryMode));
771
772         Message m = receiveMessage(connection1);
773         assertNotNull(m);
774         assertEquals(d1,m.getDestination());
775         m = receiveMessage(connection1);
776         assertNotNull(m);
777         assertEquals(d2,m.getDestination());
778
779         assertNoMessagesLeft(connection1);
780         connection1.send(closeConnectionInfo(connectionInfo1));
781     }
782
783     public void initCombosForTestCompositeConsume() {
784         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
785                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
786                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
787         addCombinationValues( "destinationType", new Object JavaDoc[]{
788                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
789                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE)} );
790     }
791     
792     public void testCompositeConsume() throws Exception JavaDoc {
793         
794         // Setup a first connection
795
StubConnection connection1 = createConnection();
796         ConnectionInfo connectionInfo1 = createConnectionInfo();
797         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
798         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
799         connection1.send(connectionInfo1);
800         connection1.send(sessionInfo1);
801         connection1.send(producerInfo1);
802         
803         // setup the composite consumer.
804
ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B", destinationType);
805         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination);
806         consumerInfo1.setRetroactive(true);
807         consumerInfo1.setPrefetchSize(100);
808         connection1.send(consumerInfo1);
809
810         // Publish to the two destinations
811
ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
812         ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType);
813
814         // Send a message to each destination .
815
connection1.send(createMessage(producerInfo1, destinationA, deliveryMode));
816         connection1.send(createMessage(producerInfo1, destinationB, deliveryMode));
817
818         // The consumer should get both messages.
819
for( int i=0; i < 2 ; i++ ) {
820             Message m1 = receiveMessage(connection1);
821             assertNotNull(m1);
822         }
823
824         assertNoMessagesLeft(connection1);
825         connection1.send(closeConnectionInfo(connectionInfo1));
826     }
827
828     public void initCombosForTestCompositeSend() {
829         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
830                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
831                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
832         addCombinationValues( "destinationType", new Object JavaDoc[]{
833                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
834                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE)} );
835     }
836     
837     public void testCompositeSend() throws Exception JavaDoc {
838         
839         // Setup a first connection
840
StubConnection connection1 = createConnection();
841         ConnectionInfo connectionInfo1 = createConnectionInfo();
842         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
843         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
844         connection1.send(connectionInfo1);
845         connection1.send(sessionInfo1);
846         connection1.send(producerInfo1);
847         
848         ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
849         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationA);
850         consumerInfo1.setRetroactive(true);
851         consumerInfo1.setPrefetchSize(100);
852         connection1.send(consumerInfo1);
853
854         // Setup a second connection
855
StubConnection connection2 = createConnection();
856         ConnectionInfo connectionInfo2 = createConnectionInfo();
857         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
858         connection2.send(connectionInfo2);
859         connection2.send(sessionInfo2);
860
861         ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType);
862         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationB);
863         consumerInfo2.setRetroactive(true);
864         consumerInfo2.setPrefetchSize(100);
865         connection2.send(consumerInfo2);
866
867         // Send the messages to the composite destination.
868
ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B", destinationType);
869         for( int i=0; i < 4 ; i++ ) {
870             connection1.send(createMessage(producerInfo1, compositeDestination, deliveryMode));
871         }
872
873         // The messages should have been delivered to both the A and B destination.
874
for( int i=0; i < 4 ; i++ ) {
875             Message m1 = receiveMessage(connection1);
876             Message m2 = receiveMessage(connection2);
877
878             assertNotNull(m1);
879             assertNotNull(m2);
880             
881             assertEquals( m1.getMessageId(), m2.getMessageId() );
882             assertEquals( compositeDestination, m1.getOriginalDestination());
883             assertEquals( compositeDestination, m2.getOriginalDestination());
884             
885             connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
886             connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
887
888         }
889
890         assertNoMessagesLeft(connection1);
891         assertNoMessagesLeft(connection2);
892         
893         connection1.send(closeConnectionInfo(connectionInfo1));
894         connection2.send(closeConnectionInfo(connectionInfo2));
895     }
896
897     public void initCombosForTestConnectionCloseCascades() {
898         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
899                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
900                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
901         addCombinationValues( "destination", new Object JavaDoc[]{
902                 new ActiveMQTopic("TEST"),
903                 new ActiveMQQueue("TEST")} );
904     }
905     
906     public void testConnectionCloseCascades() throws Exception JavaDoc {
907         
908         // Setup a first connection
909
StubConnection connection1 = createConnection();
910         ConnectionInfo connectionInfo1 = createConnectionInfo();
911         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
912         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
913         connection1.send(connectionInfo1);
914         connection1.send(sessionInfo1);
915         connection1.send(producerInfo1);
916         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
917         consumerInfo1.setPrefetchSize(100);
918         consumerInfo1.setNoLocal(true);
919         connection1.request(consumerInfo1);
920
921         // Setup a second connection
922
StubConnection connection2 = createConnection();
923         ConnectionInfo connectionInfo2 = createConnectionInfo();
924         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
925         ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
926         connection2.send(connectionInfo2);
927         connection2.send(sessionInfo2);
928         connection2.send(producerInfo2);
929         
930         // Send the messages
931
connection2.send(createMessage(producerInfo2, destination, deliveryMode));
932         connection2.send(createMessage(producerInfo2, destination, deliveryMode));
933         connection2.send(createMessage(producerInfo2, destination, deliveryMode));
934         connection2.send(createMessage(producerInfo2, destination, deliveryMode));
935         
936         for( int i=0; i < 4 ; i++ ) {
937             Message m1 = receiveMessage(connection1);
938             assertNotNull(m1);
939             connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
940         }
941         
942         // Close the connection, this should in turn close the consumer.
943
connection1.request(closeConnectionInfo(connectionInfo1));
944         
945         // Send another message, connection1 should not get the message.
946
connection2.send(createMessage(producerInfo2, destination, deliveryMode));
947         
948         assertNull(connection1.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS));
949     }
950     
951     public void initCombosForTestSessionCloseCascades() {
952         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
953                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
954                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
955         addCombinationValues( "destination", new Object JavaDoc[]{
956                 new ActiveMQTopic("TEST"),
957                 new ActiveMQQueue("TEST")} );
958     }
959     
960     public void testSessionCloseCascades() throws Exception JavaDoc {
961         
962         // Setup a first connection
963
StubConnection connection1 = createConnection();
964         ConnectionInfo connectionInfo1 = createConnectionInfo();
965         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
966         ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
967         connection1.send(connectionInfo1);
968         connection1.send(sessionInfo1);
969         connection1.send(producerInfo1);
970         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
971         consumerInfo1.setPrefetchSize(100);
972         consumerInfo1.setNoLocal(true);
973         connection1.request(consumerInfo1);
974
975         // Setup a second connection
976
StubConnection connection2 = createConnection();
977         ConnectionInfo connectionInfo2 = createConnectionInfo();
978         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
979         ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
980         connection2.send(connectionInfo2);
981         connection2.send(sessionInfo2);
982         connection2.send(producerInfo2);
983         
984         // Send the messages
985
connection2.send(createMessage(producerInfo2, destination, deliveryMode));
986         connection2.send(createMessage(producerInfo2, destination, deliveryMode));
987         connection2.send(createMessage(producerInfo2, destination, deliveryMode));
988         connection2.send(createMessage(producerInfo2, destination, deliveryMode));
989         
990         for( int i=0; i < 4 ; i++ ) {
991             Message m1 = receiveMessage(connection1);
992             assertNotNull(m1);
993             connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
994         }
995         
996         // Close the session, this should in turn close the consumer.
997
connection1.request(closeSessionInfo(sessionInfo1));
998         
999         // Send another message, connection1 should not get the message.
1000
connection2.send(createMessage(producerInfo2, destination, deliveryMode));
1001        
1002        assertNull(connection1.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS));
1003    }
1004    
1005    public void initCombosForTestConsumerClose() {
1006        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1007                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1008                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1009        addCombinationValues( "destination", new Object JavaDoc[]{
1010                new ActiveMQTopic("TEST"),
1011                new ActiveMQQueue("TEST")} );
1012    }
1013    
1014    public void testConsumerClose() throws Exception JavaDoc {
1015        
1016        // Setup a first connection
1017
StubConnection connection1 = createConnection();
1018        ConnectionInfo connectionInfo1 = createConnectionInfo();
1019        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
1020        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
1021        connection1.send(connectionInfo1);
1022        connection1.send(sessionInfo1);
1023        connection1.send(producerInfo1);
1024        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
1025        consumerInfo1.setPrefetchSize(100);
1026        consumerInfo1.setNoLocal(true);
1027        connection1.request(consumerInfo1);
1028
1029        // Setup a second connection
1030
StubConnection connection2 = createConnection();
1031        ConnectionInfo connectionInfo2 = createConnectionInfo();
1032        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
1033        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
1034        connection2.send(connectionInfo2);
1035        connection2.send(sessionInfo2);
1036        connection2.send(producerInfo2);
1037        
1038        // Send the messages
1039
connection2.send(createMessage(producerInfo2, destination, deliveryMode));
1040        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
1041        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
1042        connection2.send(createMessage(producerInfo2, destination, deliveryMode));
1043        
1044        for( int i=0; i < 4 ; i++ ) {
1045            Message m1 = receiveMessage(connection1);
1046            assertNotNull(m1);
1047            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
1048        }
1049        
1050        // Close the consumer.
1051
connection1.request(closeConsumerInfo(consumerInfo1));
1052        
1053        // Send another message, connection1 should not get the message.
1054
connection2.send(createMessage(producerInfo2, destination, deliveryMode));
1055        
1056        assertNull(connection1.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS));
1057    }
1058    public void initCombosForTestTopicNoLocal() {
1059        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1060                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1061                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1062    }
1063    
1064    public void testTopicNoLocal() throws Exception JavaDoc {
1065        
1066        ActiveMQDestination destination = new ActiveMQTopic("TEST");
1067        
1068        // Setup a first connection
1069
StubConnection connection1 = createConnection();
1070        ConnectionInfo connectionInfo1 = createConnectionInfo();
1071        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
1072        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
1073        connection1.send(connectionInfo1);
1074        connection1.send(sessionInfo1);
1075        connection1.send(producerInfo1);
1076
1077        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
1078        consumerInfo1.setRetroactive(true);
1079        consumerInfo1.setPrefetchSize(100);
1080        consumerInfo1.setNoLocal(true);
1081        connection1.send(consumerInfo1);
1082
1083        // Setup a second connection
1084
StubConnection connection2 = createConnection();
1085        ConnectionInfo connectionInfo2 = createConnectionInfo();
1086        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
1087        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
1088        connection2.send(connectionInfo2);
1089        connection2.send(sessionInfo2);
1090        connection2.send(producerInfo2);
1091        
1092        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
1093        consumerInfo2.setRetroactive(true);
1094        consumerInfo2.setPrefetchSize(100);
1095        consumerInfo2.setNoLocal(true);
1096        connection2.send(consumerInfo2);
1097
1098        // Send the messages
1099
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
1100        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
1101        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
1102        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
1103        
1104        // The 2nd connection should get the messages.
1105
for( int i=0; i < 4 ; i++ ) {
1106            Message m1 = receiveMessage(connection2);
1107            assertNotNull(m1);
1108        }
1109        
1110        // Send a message with the 2nd connection
1111
Message message = createMessage(producerInfo2, destination, deliveryMode);
1112        connection2.send(message);
1113        
1114        // The first connection should not see the initial 4 local messages sent but should
1115
// see the messages from connection 2.
1116
Message m = receiveMessage(connection1);
1117        assertNotNull(m);
1118        assertEquals(message.getMessageId(), m.getMessageId());
1119        
1120        assertNoMessagesLeft(connection1);
1121        assertNoMessagesLeft(connection2);
1122    }
1123
1124    
1125    public void initCombosForTopicDispatchIsBroadcast() {
1126        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1127                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1128                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1129    }
1130    
1131    public void testTopicDispatchIsBroadcast() throws Exception JavaDoc {
1132        
1133        ActiveMQDestination destination = new ActiveMQTopic("TEST");
1134        
1135        // Setup a first connection
1136
StubConnection connection1 = createConnection();
1137        ConnectionInfo connectionInfo1 = createConnectionInfo();
1138        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
1139        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
1140        connection1.send(connectionInfo1);
1141        connection1.send(sessionInfo1);
1142        connection1.send(producerInfo1);
1143
1144        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
1145        consumerInfo1.setRetroactive(true);
1146        consumerInfo1.setPrefetchSize(100);
1147        connection1.send(consumerInfo1);
1148
1149        // Setup a second connection
1150
StubConnection connection2 = createConnection();
1151        ConnectionInfo connectionInfo2 = createConnectionInfo();
1152        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
1153        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
1154        consumerInfo2.setRetroactive(true);
1155        consumerInfo2.setPrefetchSize(100);
1156        connection2.send(connectionInfo2);
1157        connection2.send(sessionInfo2);
1158        connection2.send(consumerInfo2);
1159
1160        // Send the messages
1161
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
1162        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
1163        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
1164        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
1165        
1166        // Get the messages
1167
for( int i=0; i < 4 ; i++ ) {
1168            Message m1 = receiveMessage(connection1);
1169            assertNotNull(m1);
1170            m1 = receiveMessage(connection2);
1171            assertNotNull(m1);
1172        }
1173    }
1174    
1175    public void initCombosForTestQueueDispatchedAreRedeliveredOnConsumerClose() {
1176        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1177                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1178                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1179        addCombinationValues( "destinationType", new Object JavaDoc[]{
1180                new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
1181                new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
1182                } );
1183    }
1184    
1185    public void testQueueDispatchedAreRedeliveredOnConsumerClose() throws Exception JavaDoc {
1186        
1187        // Setup a first connection
1188
StubConnection connection1 = createConnection();
1189        ConnectionInfo connectionInfo1 = createConnectionInfo();
1190        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
1191        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
1192        connection1.send(connectionInfo1);
1193        connection1.send(sessionInfo1);
1194        connection1.send(producerInfo);
1195        
1196        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
1197
1198        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
1199        consumerInfo1.setPrefetchSize(100);
1200        connection1.send(consumerInfo1);
1201        
1202        // Send the messages
1203
connection1.send(createMessage(producerInfo, destination, deliveryMode));
1204        connection1.send(createMessage(producerInfo, destination, deliveryMode));
1205        connection1.send(createMessage(producerInfo, destination, deliveryMode));
1206        connection1.send(createMessage(producerInfo, destination, deliveryMode));
1207        
1208        // Get the messages
1209
for( int i=0; i < 4 ; i++ ) {
1210            Message m1 = receiveMessage(connection1);
1211            assertNotNull(m1);
1212            assertFalse(m1.isRedelivered());
1213        }
1214        // Close the consumer without sending any ACKS.
1215
connection1.send(closeConsumerInfo(consumerInfo1));
1216
1217        // Drain any in flight messages..
1218
while(connection1.getDispatchQueue().poll(0, TimeUnit.MILLISECONDS)!=null){
1219        }
1220
1221        // Add the second consumer
1222
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1, destination);
1223        consumerInfo2.setPrefetchSize(100);
1224        connection1.send(consumerInfo2);
1225
1226        // Make sure the messages were re delivered to the 2nd consumer.
1227
for( int i=0; i < 4 ; i++ ) {
1228            Message m1 = receiveMessage(connection1);
1229            assertNotNull(m1);
1230            assertTrue(m1.isRedelivered());
1231        }
1232    }
1233    
1234    public void initCombosForTestQueueBrowseMessages() {
1235        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1236                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1237                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1238        addCombinationValues( "destinationType", new Object JavaDoc[]{
1239                new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
1240                new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
1241                } );
1242    }
1243    public void testQueueBrowseMessages() throws Exception JavaDoc {
1244                
1245        // Start a producer and consumer
1246
StubConnection connection = createConnection();
1247        ConnectionInfo connectionInfo = createConnectionInfo();
1248        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
1249        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
1250        connection.send(connectionInfo);
1251        connection.send(sessionInfo);
1252        connection.send(producerInfo);
1253        
1254        destination = createDestinationInfo(connection, connectionInfo, destinationType);
1255
1256        connection.send(createMessage(producerInfo, destination, deliveryMode));
1257        connection.send(createMessage(producerInfo, destination, deliveryMode));
1258        connection.send(createMessage(producerInfo, destination, deliveryMode));
1259        connection.send(createMessage(producerInfo, destination, deliveryMode));
1260        
1261        // Use selector to skip first message.
1262
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
1263        consumerInfo.setBrowser(true);
1264        connection.send(consumerInfo);
1265        
1266        for( int i=0; i < 4; i++ ) {
1267            Message m = receiveMessage(connection);
1268            assertNotNull(m);
1269            connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE));
1270        }
1271        
1272        assertNoMessagesLeft(connection);
1273    }
1274
1275    public void initCombosForTestQueuBrowserWith2Consumers() {
1276        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1277                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1278                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1279    }
1280    
1281    public void testQueueBrowserWith2Consumers() throws Exception JavaDoc {
1282        
1283        ActiveMQDestination destination = new ActiveMQQueue("TEST");
1284        
1285        // Setup a first connection
1286
StubConnection connection1 = createConnection();
1287        ConnectionInfo connectionInfo1 = createConnectionInfo();
1288        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
1289        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
1290        connection1.send(connectionInfo1);
1291        connection1.send(sessionInfo1);
1292        connection1.send(producerInfo);
1293
1294        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
1295        consumerInfo1.setPrefetchSize(1);
1296        connection1.request(consumerInfo1);
1297        
1298        // Send the messages
1299
connection1.send(createMessage(producerInfo, destination, deliveryMode));
1300        connection1.send(createMessage(producerInfo, destination, deliveryMode));
1301        connection1.send(createMessage(producerInfo, destination, deliveryMode));
1302        connection1.send(createMessage(producerInfo, destination, deliveryMode));
1303
1304        // Setup a second connection with a queue browser.
1305
StubConnection connection2 = createConnection();
1306        ConnectionInfo connectionInfo2 = createConnectionInfo();
1307        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
1308        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
1309        consumerInfo2.setPrefetchSize(1);
1310        consumerInfo2.setBrowser(true);
1311        connection2.send(connectionInfo2);
1312        connection2.send(sessionInfo2);
1313        connection2.request(consumerInfo2);
1314
1315        for( int i=0; i < 4; i++ ) {
1316            Message m1 = receiveMessage(connection1);
1317            Message m2 = receiveMessage(connection2);
1318            assertNotNull("m1 is null for index: " + i, m1);
1319            assertNotNull("m2 is null for index: " + i, m2);
1320            assertEquals(m1.getMessageId(), m2.getMessageId());
1321            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
1322            connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
1323        }
1324        
1325        assertNoMessagesLeft(connection1);
1326        assertNoMessagesLeft(connection2);
1327    }
1328     
1329    public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {
1330        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1331                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1332                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1333    }
1334    public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception JavaDoc {
1335        
1336        ActiveMQDestination destination = new ActiveMQQueue("TEST");
1337        
1338        // Setup a first connection
1339
StubConnection connection1 = createConnection();
1340        ConnectionInfo connectionInfo1 = createConnectionInfo();
1341        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
1342        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
1343        connection1.send(connectionInfo1);
1344        connection1.send(sessionInfo1);
1345        connection1.send(producerInfo);
1346
1347        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
1348        consumerInfo1.setPrefetchSize(1);
1349        connection1.send(consumerInfo1);
1350
1351        // Setup a second connection
1352
StubConnection connection2 = createConnection();
1353        ConnectionInfo connectionInfo2 = createConnectionInfo();
1354        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
1355        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
1356        consumerInfo2.setPrefetchSize(1);
1357        connection2.send(connectionInfo2);
1358        connection2.send(sessionInfo2);
1359        connection2.send(consumerInfo2);
1360        
1361        // Send the messages
1362
connection1.send(createMessage(producerInfo, destination, deliveryMode));
1363        connection1.send(createMessage(producerInfo, destination, deliveryMode));
1364        connection1.send(createMessage(producerInfo, destination, deliveryMode));
1365        connection1.send(createMessage(producerInfo, destination, deliveryMode));
1366        
1367        for( int i=0; i < 2 ; i++ ) {
1368            Message m1 = receiveMessage(connection1);
1369            Message m2 = receiveMessage(connection2);
1370            
1371            assertNotNull("m1 is null for index: " + i, m1);
1372            assertNotNull("m2 is null for index: " + i, m2);
1373
1374            assertNotSame(m1.getMessageId(), m2.getMessageId());
1375            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
1376            connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
1377        }
1378        
1379        assertNoMessagesLeft(connection1);
1380        assertNoMessagesLeft(connection2);
1381    }
1382    
1383    public void initCombosForTestQueueSendThenAddConsumer() {
1384        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1385                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1386                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1387        addCombinationValues( "destinationType", new Object JavaDoc[]{
1388                new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
1389                new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
1390                } );
1391    }
1392    public void testQueueSendThenAddConsumer() throws Exception JavaDoc {
1393        
1394        // Start a producer
1395
StubConnection connection = createConnection();
1396        ConnectionInfo connectionInfo = createConnectionInfo();
1397        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
1398        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
1399        connection.send(connectionInfo);
1400        connection.send(sessionInfo);
1401        connection.send(producerInfo);
1402        
1403        destination = createDestinationInfo(connection, connectionInfo, destinationType);
1404        
1405        // Send a message to the broker.
1406
connection.send(createMessage(producerInfo, destination, deliveryMode));
1407        
1408        // Start the consumer
1409
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
1410        connection.send(consumerInfo);
1411
1412        // Make sure the message was delivered.
1413
Message m = receiveMessage(connection);
1414        assertNotNull(m);
1415        
1416    }
1417    
1418    public void initCombosForTestQueueAckRemovesMessage() {
1419        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1420                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1421                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1422        addCombinationValues( "destinationType", new Object JavaDoc[]{
1423                new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
1424                new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
1425                } );
1426    }
1427    
1428    public void testQueueAckRemovesMessage() throws Exception JavaDoc {
1429        
1430        // Start a producer and consumer
1431
StubConnection connection = createConnection();
1432        ConnectionInfo connectionInfo = createConnectionInfo();
1433        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
1434        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
1435        connection.send(connectionInfo);
1436        connection.send(sessionInfo);
1437        connection.send(producerInfo);
1438        
1439        destination = createDestinationInfo(connection, connectionInfo, destinationType);
1440        
1441        Message message1 = createMessage(producerInfo, destination, deliveryMode);
1442        Message message2 = createMessage(producerInfo, destination, deliveryMode);
1443        connection.send(message1);
1444        connection.send(message2);
1445        
1446        // Make sure the message was delivered.
1447
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
1448        connection.request(consumerInfo);
1449        Message m = receiveMessage(connection);
1450        assertNotNull(m); assertEquals(m.getMessageId(), message1.getMessageId());
1451        
1452        assertTrue(countMessagesInQueue(connection, connectionInfo, destination)==2);
1453        connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE));
1454        assertTrue(countMessagesInQueue(connection, connectionInfo, destination)==2);
1455        connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
1456        assertTrue(countMessagesInQueue(connection, connectionInfo, destination)==1);
1457        
1458    }
1459
1460    public void initCombosForTestSelectorSkipsMessages() {
1461        addCombinationValues( "destination", new Object JavaDoc[]{
1462            new ActiveMQTopic("TEST_TOPIC"),
1463            new ActiveMQQueue("TEST_QUEUE")} );
1464        addCombinationValues( "destinationType", new Object JavaDoc[]{
1465                new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
1466                new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
1467                new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
1468                new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
1469    }
1470    
1471    public void testSelectorSkipsMessages() throws Exception JavaDoc {
1472        
1473        // Start a producer and consumer
1474
StubConnection connection = createConnection();
1475        ConnectionInfo connectionInfo = createConnectionInfo();
1476        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
1477        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
1478        connection.send(connectionInfo);
1479        connection.send(sessionInfo);
1480        connection.send(producerInfo);
1481        
1482        destination = createDestinationInfo(connection, connectionInfo, destinationType);
1483
1484        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
1485        consumerInfo.setSelector("JMSType='last'");
1486        connection.send(consumerInfo);
1487        
1488        Message message1 = createMessage(producerInfo, destination, deliveryMode);
1489        message1.setType("first");
1490        Message message2 = createMessage(producerInfo, destination, deliveryMode);
1491        message2.setType("last");
1492        connection.send(message1);
1493        connection.send(message2);
1494        
1495        // Use selector to skip first message.
1496
Message m = receiveMessage(connection);
1497        assertNotNull(m); assertEquals(m.getMessageId(), message2.getMessageId());
1498        connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
1499        connection.send(closeConsumerInfo(consumerInfo));
1500        
1501        assertNoMessagesLeft(connection);
1502    }
1503    
1504    public void initCombosForTestAddConsumerThenSend() {
1505        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1506                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1507                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1508        addCombinationValues( "destinationType", new Object JavaDoc[]{
1509                new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
1510                new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
1511                new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
1512                new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
1513    }
1514    
1515    public void testAddConsumerThenSend() throws Exception JavaDoc {
1516        
1517        // Start a producer and consumer
1518
StubConnection connection = createConnection();
1519        ConnectionInfo connectionInfo = createConnectionInfo();
1520        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
1521        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
1522        connection.send(connectionInfo);
1523        connection.send(sessionInfo);
1524        connection.send(producerInfo);
1525
1526        destination = createDestinationInfo(connection, connectionInfo, destinationType);
1527
1528        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
1529        connection.send(consumerInfo);
1530        
1531        connection.send(createMessage(producerInfo, destination, deliveryMode));
1532        
1533        // Make sure the message was delivered.
1534
Message m = receiveMessage(connection);
1535        assertNotNull(m);
1536    }
1537
1538    public void initCombosForTestConsumerPrefetchAtOne() {
1539        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1540                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1541                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1542        addCombinationValues( "destinationType", new Object JavaDoc[]{
1543                new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
1544                new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
1545                new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
1546                new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
1547    }
1548    
1549    public void testConsumerPrefetchAtOne() throws Exception JavaDoc {
1550        
1551        // Start a producer and consumer
1552
StubConnection connection = createConnection();
1553        ConnectionInfo connectionInfo = createConnectionInfo();
1554        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
1555        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
1556        connection.send(connectionInfo);
1557        connection.send(sessionInfo);
1558        connection.send(producerInfo);
1559        
1560        destination = createDestinationInfo(connection, connectionInfo, destinationType);
1561
1562        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
1563        consumerInfo.setPrefetchSize(1);
1564        connection.send(consumerInfo);
1565        
1566        // Send 2 messages to the broker.
1567
connection.send(createMessage(producerInfo, destination, deliveryMode));
1568        connection.send(createMessage(producerInfo, destination, deliveryMode));
1569        
1570        // Make sure only 1 message was delivered.
1571
Message m = receiveMessage(connection);
1572        assertNotNull(m);
1573        assertNoMessagesLeft(connection);
1574        
1575    }
1576    
1577    public void initCombosForTestConsumerPrefetchAtTwo() {
1578        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1579                new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1580                new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1581        addCombinationValues( "destinationType", new Object JavaDoc[]{
1582                new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
1583                new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
1584                new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
1585                new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
1586    }
1587
1588    public void testConsumerPrefetchAtTwo() throws Exception JavaDoc {
1589        
1590        // Start a producer and consumer
1591
StubConnection connection = createConnection();
1592        ConnectionInfo connectionInfo = createConnectionInfo();
1593        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
1594        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
1595        connection.send(connectionInfo);
1596        connection.send(sessionInfo);
1597        connection.send(producerInfo);
1598        
1599        destination = createDestinationInfo(connection, connectionInfo, destinationType);
1600        
1601        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
1602        consumerInfo.setPrefetchSize(2);
1603        connection.send(consumerInfo);
1604        
1605        // Send 3 messages to the broker.
1606
connection.send(createMessage(producerInfo, destination, deliveryMode));
1607        connection.send(createMessage(producerInfo, destination, deliveryMode));
1608        connection.send(createMessage(producerInfo, destination, deliveryMode));
1609        
1610        // Make sure only 1 message was delivered.
1611
Message m = receiveMessage(connection);
1612        assertNotNull(m);
1613        m = receiveMessage(connection);
1614        assertNotNull(m);
1615        assertNoMessagesLeft(connection);
1616        
1617    }
1618    
1619    public void initCombosForTestConsumerPrefetchAndDeliveredAck() {
1620        addCombinationValues( "deliveryMode", new Object JavaDoc[]{
1621            new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
1622            new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
1623        addCombinationValues( "destinationType", new Object JavaDoc[]{
1624                new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
1625                new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
1626                new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
1627                new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
1628    }
1629    
1630    public void testConsumerPrefetchAndDeliveredAck() throws Exception JavaDoc {
1631        
1632        // Start a producer and consumer
1633
StubConnection connection = createConnection();
1634        ConnectionInfo connectionInfo = createConnectionInfo();
1635        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
1636        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
1637        connection.send(connectionInfo);
1638        connection.send(sessionInfo);
1639        connection.send(producerInfo);
1640
1641        destination = createDestinationInfo(connection, connectionInfo, destinationType);
1642        
1643        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
1644        consumerInfo.setPrefetchSize(1);
1645        connection.send(consumerInfo);
1646        
1647        // Send 3 messages to the broker.
1648
connection.send(createMessage(producerInfo, destination, deliveryMode));
1649        connection.send(createMessage(producerInfo, destination, deliveryMode));
1650        connection.send(createMessage(producerInfo, destination, deliveryMode));
1651        
1652        // Make sure only 1 message was delivered.
1653
Message m1 = receiveMessage(connection);
1654        assertNotNull(m1);
1655        
1656        assertNoMessagesLeft(connection);
1657        
1658        // Acknowledge the first message. This should cause the next message to get dispatched.
1659
connection.send(createAck(consumerInfo, m1, 1, MessageAck.DELIVERED_ACK_TYPE));
1660        
1661        Message m2 = receiveMessage(connection);
1662        assertNotNull(m2);
1663        connection.send(createAck(consumerInfo, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
1664        
1665        Message m3 = receiveMessage(connection);
1666        assertNotNull(m3);
1667        connection.send(createAck(consumerInfo, m3, 1, MessageAck.DELIVERED_ACK_TYPE));
1668    }
1669
1670    public static Test suite() {
1671        return suite(BrokerTest.class);
1672    }
1673    
1674    public static void main(String JavaDoc[] args) {
1675        junit.textui.TestRunner.run(suite());
1676    }
1677
1678}
1679
Popular Tags