KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > JMSConsumerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import javax.jms.BytesMessage JavaDoc;
21 import javax.jms.DeliveryMode JavaDoc;
22 import javax.jms.Message JavaDoc;
23 import javax.jms.MessageConsumer JavaDoc;
24 import javax.jms.MessageListener JavaDoc;
25 import javax.jms.MessageProducer JavaDoc;
26 import javax.jms.Session JavaDoc;
27 import javax.jms.TextMessage JavaDoc;
28 import javax.jms.Topic JavaDoc;
29
30 import junit.framework.Test;
31
32 import org.apache.activemq.ActiveMQConnection;
33 import org.apache.activemq.ActiveMQMessageConsumer;
34 import org.apache.activemq.command.ActiveMQDestination;
35 import org.apache.activemq.command.ActiveMQQueue;
36
37 import java.util.concurrent.CountDownLatch JavaDoc;
38 import java.util.concurrent.TimeUnit JavaDoc;
39 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
40
41 /**
42  * Test cases used to test the JMS message consumer.
43  *
44  * @version $Revision$
45  */

46 public class JMSConsumerTest extends JmsTestSupport {
47
48     private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
49             .getLog(JMSConsumerTest.class);
50     
51     public static Test suite() {
52         return suite(JMSConsumerTest.class);
53     }
54
55     public static void main(String JavaDoc[] args) {
56         junit.textui.TestRunner.run(suite());
57     }
58
59     public ActiveMQDestination destination;
60     public int deliveryMode;
61     public int prefetch;
62     public int ackMode;
63     public byte destinationType;
64     public boolean durableConsumer;
65     
66     public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
67         addCombinationValues("deliveryMode", new Object JavaDoc[] {
68                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
69                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
70         addCombinationValues("destinationType", new Object JavaDoc[] {
71                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
72                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
73                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
74                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE) });
75     }
76     public void testMessageListenerWithConsumerCanBeStopped() throws Exception JavaDoc {
77
78         final AtomicInteger JavaDoc counter = new AtomicInteger JavaDoc(0);
79         final CountDownLatch JavaDoc done1 = new CountDownLatch JavaDoc(1);
80         final CountDownLatch JavaDoc done2 = new CountDownLatch JavaDoc(1);
81         
82         // Receive a message with the JMS API
83
connection.start();
84         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
85         destination = createDestination(session, destinationType);
86         ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination);
87         consumer.setMessageListener(new MessageListener JavaDoc() {
88             public void onMessage(Message m) {
89                 counter.incrementAndGet();
90                 if( counter.get()==1 )
91                     done1.countDown();
92                 if( counter.get()==2 )
93                     done2.countDown();
94             }
95         });
96
97         // Send a first message to make sure that the consumer dispatcher is running
98
sendMessages(session, destination, 1);
99         assertTrue(done1.await(1, TimeUnit.SECONDS));
100         assertEquals(1, counter.get());
101
102         // Stop the consumer.
103
consumer.stop();
104
105         // Send a message, but should not get delivered.
106
sendMessages(session, destination, 1);
107         assertFalse(done2.await(1, TimeUnit.SECONDS));
108         assertEquals(1, counter.get());
109         
110         // Start the consumer, and the message should now get delivered.
111
consumer.start();
112         assertTrue(done2.await(1, TimeUnit.SECONDS));
113         assertEquals(2, counter.get());
114     }
115     
116     public void initCombosForTestMutiReceiveWithPrefetch1() {
117         addCombinationValues("deliveryMode", new Object JavaDoc[] {
118                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
119                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
120         addCombinationValues("ackMode", new Object JavaDoc[] {
121                 new Integer JavaDoc(Session.AUTO_ACKNOWLEDGE),
122                 new Integer JavaDoc(Session.DUPS_OK_ACKNOWLEDGE),
123                 new Integer JavaDoc(Session.CLIENT_ACKNOWLEDGE) });
124         addCombinationValues("destinationType", new Object JavaDoc[] {
125                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
126                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
127                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
128                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)
129                 });
130     }
131
132     public void testMutiReceiveWithPrefetch1() throws Exception JavaDoc {
133
134         // Set prefetch to 1
135
connection.getPrefetchPolicy().setAll(1);
136         connection.start();
137
138         // Use all the ack modes
139
Session JavaDoc session = connection.createSession(false, ackMode);
140         destination = createDestination(session, destinationType);
141         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
142
143         // Send the messages
144
sendMessages(session, destination, 4);
145
146         // Make sure 4 messages were delivered.
147
Message message = null;
148         for (int i = 0; i < 4; i++) {
149             message = consumer.receive(1000);
150             assertNotNull(message);
151         }
152         assertNull(consumer.receiveNoWait());
153         message.acknowledge();
154     }
155
156     public void initCombosForTestDurableConsumerSelectorChange() {
157         addCombinationValues("deliveryMode", new Object JavaDoc[] {
158                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
159                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
160         addCombinationValues("destinationType", new Object JavaDoc[] {
161                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE)});
162     }
163     public void testDurableConsumerSelectorChange() throws Exception JavaDoc {
164
165         // Receive a message with the JMS API
166
connection.setClientID("test");
167         connection.start();
168         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
169         destination = createDestination(session, destinationType);
170         MessageProducer JavaDoc producer = session.createProducer(destination);
171         producer.setDeliveryMode(deliveryMode);
172         MessageConsumer JavaDoc consumer = session.createDurableSubscriber((Topic JavaDoc)destination, "test", "color='red'", false);
173
174         // Send the messages
175
TextMessage JavaDoc message = session.createTextMessage("1st");
176         message.setStringProperty("color", "red");
177         producer.send(message);
178         
179         Message m = consumer.receive(1000);
180         assertNotNull(m);
181         assertEquals("1st", ((TextMessage JavaDoc)m).getText());
182
183         // Change the subscription.
184
consumer.close();
185         consumer = session.createDurableSubscriber((Topic JavaDoc)destination, "test", "color='blue'", false);
186         
187         message = session.createTextMessage("2nd");
188         message.setStringProperty("color", "red");
189         producer.send(message);
190         message = session.createTextMessage("3rd");
191         message.setStringProperty("color", "blue");
192         producer.send(message);
193
194         // Selector should skip the 2nd message.
195
m = consumer.receive(1000);
196         assertNotNull(m);
197         assertEquals("3rd", ((TextMessage JavaDoc)m).getText());
198         
199         assertNull(consumer.receiveNoWait());
200     }
201
202     public void initCombosForTestSendReceiveBytesMessage() {
203         addCombinationValues("deliveryMode", new Object JavaDoc[] { new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
204                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
205         addCombinationValues("destinationType", new Object JavaDoc[] { new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
206                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE), new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
207                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE) });
208     }
209
210     public void testSendReceiveBytesMessage() throws Exception JavaDoc {
211
212         // Receive a message with the JMS API
213
connection.start();
214         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
215         destination = createDestination(session, destinationType);
216         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
217         MessageProducer JavaDoc producer = session.createProducer(destination);
218         
219         BytesMessage JavaDoc message = session.createBytesMessage();
220         message.writeBoolean(true);
221         message.writeBoolean(false);
222         producer.send(message);
223         
224         // Make sure only 1 message was delivered.
225
BytesMessage JavaDoc m = (BytesMessage JavaDoc)consumer.receive(1000);
226         assertNotNull(m);
227         assertTrue(m.readBoolean());
228         assertFalse(m.readBoolean());
229         
230         assertNull(consumer.receiveNoWait());
231     }
232
233     
234     public void initCombosForTestSetMessageListenerAfterStart() {
235         addCombinationValues("deliveryMode", new Object JavaDoc[] {
236                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
237                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
238         addCombinationValues("destinationType", new Object JavaDoc[] {
239                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
240                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
241                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
242                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE) });
243     }
244     public void testSetMessageListenerAfterStart() throws Exception JavaDoc {
245
246         final AtomicInteger JavaDoc counter = new AtomicInteger JavaDoc(0);
247         final CountDownLatch JavaDoc done = new CountDownLatch JavaDoc(1);
248         
249         // Receive a message with the JMS API
250
connection.start();
251         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
252         destination = createDestination(session, destinationType);
253         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
254
255         // Send the messages
256
sendMessages(session, destination, 4);
257
258         // See if the message get sent to the listener
259
consumer.setMessageListener(new MessageListener JavaDoc() {
260             public void onMessage(Message m) {
261                 counter.incrementAndGet();
262                 if( counter.get()==4 )
263                     done.countDown();
264             }
265         });
266
267         assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
268         Thread.sleep(200);
269         
270         // Make sure only 4 messages were delivered.
271
assertEquals(4, counter.get());
272     }
273     
274     public void initCombosForTestMessageListenerUnackedWithPrefetch1StayInQueue() {
275         addCombinationValues("deliveryMode", new Object JavaDoc[] {
276                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
277                 new Integer JavaDoc(DeliveryMode.PERSISTENT)
278                 });
279         addCombinationValues("ackMode", new Object JavaDoc[] {
280                 new Integer JavaDoc(Session.AUTO_ACKNOWLEDGE),
281                 new Integer JavaDoc(Session.DUPS_OK_ACKNOWLEDGE),
282                 new Integer JavaDoc(Session.CLIENT_ACKNOWLEDGE)
283                 });
284         addCombinationValues("destinationType", new Object JavaDoc[] { new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE), });
285     }
286
287     public void testMessageListenerUnackedWithPrefetch1StayInQueue() throws Exception JavaDoc {
288
289         final AtomicInteger JavaDoc counter = new AtomicInteger JavaDoc(0);
290         final CountDownLatch JavaDoc sendDone = new CountDownLatch JavaDoc(1);
291         final CountDownLatch JavaDoc got2Done = new CountDownLatch JavaDoc(1);
292
293         // Set prefetch to 1
294
connection.getPrefetchPolicy().setAll(1);
295         // This test case does not work if optimized message dispatch is used as the main thread send block until the consumer receives the
296
// message. This test depends on thread decoupling so that the main thread can stop the consumer thread.
297
connection.setOptimizedMessageDispatch(false);
298         connection.start();
299
300         // Use all the ack modes
301
Session JavaDoc session = connection.createSession(false, ackMode);
302         destination = createDestination(session, destinationType);
303         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
304         consumer.setMessageListener(new MessageListener JavaDoc() {
305             public void onMessage(Message m) {
306                 try {
307                     TextMessage JavaDoc tm = (TextMessage JavaDoc)m;
308                     log.info("Got in first listener: "+tm.getText());
309                     assertEquals( ""+counter.get(), tm.getText() );
310                     counter.incrementAndGet();
311                     m.acknowledge();
312                     if( counter.get()==2 ) {
313                         sendDone.await();
314                         connection.close();
315                         got2Done.countDown();
316                     }
317                 } catch (Throwable JavaDoc e) {
318                     e.printStackTrace();
319                 }
320             }
321         });
322
323         // Send the messages
324
sendMessages(session, destination, 4);
325         sendDone.countDown();
326         
327         // Wait for first 2 messages to arrive.
328
assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
329
330         // Re-start connection.
331
connection = (ActiveMQConnection) factory.createConnection();
332         connections.add(connection);
333         
334         connection.getPrefetchPolicy().setAll(1);
335         connection.start();
336
337         // Pickup the remaining messages.
338
final CountDownLatch JavaDoc done2 = new CountDownLatch JavaDoc(1);
339         session = connection.createSession(false, ackMode);
340         consumer = session.createConsumer(destination);
341         consumer.setMessageListener(new MessageListener JavaDoc() {
342             public void onMessage(Message m) {
343                 try {
344                     TextMessage JavaDoc tm = (TextMessage JavaDoc)m;
345                     log.info("Got in second listener: "+tm.getText());
346                     assertEquals( ""+counter.get(), tm.getText() );
347                     counter.incrementAndGet();
348                     if( counter.get()==4 )
349                         done2.countDown();
350                 } catch (Throwable JavaDoc e) {
351                     e.printStackTrace();
352                 }
353             }
354         });
355
356         assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
357         Thread.sleep(200);
358         
359         // Make sure only 4 messages were delivered.
360
assertEquals(4, counter.get());
361
362     }
363
364     
365     public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
366         addCombinationValues("deliveryMode", new Object JavaDoc[] {
367                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
368                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
369         addCombinationValues("destinationType", new Object JavaDoc[] {
370                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
371                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
372                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
373                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE) });
374     }
375     public void testMessageListenerWithConsumerWithPrefetch1() throws Exception JavaDoc {
376
377         final AtomicInteger JavaDoc counter = new AtomicInteger JavaDoc(0);
378         final CountDownLatch JavaDoc done = new CountDownLatch JavaDoc(1);
379         
380         // Receive a message with the JMS API
381
connection.getPrefetchPolicy().setAll(1);
382         connection.start();
383         
384         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
385         destination = createDestination(session, destinationType);
386         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
387         consumer.setMessageListener(new MessageListener JavaDoc() {
388             public void onMessage(Message m) {
389                 counter.incrementAndGet();
390                 if( counter.get()==4 )
391                     done.countDown();
392             }
393         });
394
395         // Send the messages
396
sendMessages(session, destination, 4);
397
398         assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
399         Thread.sleep(200);
400         
401         // Make sure only 4 messages were delivered.
402
assertEquals(4, counter.get());
403     }
404
405     public void initCombosForTestMessageListenerWithConsumer() {
406         addCombinationValues("deliveryMode", new Object JavaDoc[] {
407                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
408                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
409         addCombinationValues("destinationType", new Object JavaDoc[] {
410                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
411                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
412                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
413                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE) });
414     }
415     public void testMessageListenerWithConsumer() throws Exception JavaDoc {
416
417         final AtomicInteger JavaDoc counter = new AtomicInteger JavaDoc(0);
418         final CountDownLatch JavaDoc done = new CountDownLatch JavaDoc(1);
419         
420         // Receive a message with the JMS API
421
connection.start();
422         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
423         destination = createDestination(session, destinationType);
424         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
425         consumer.setMessageListener(new MessageListener JavaDoc() {
426             public void onMessage(Message m) {
427                 counter.incrementAndGet();
428                 if( counter.get()==4 )
429                     done.countDown();
430             }
431         });
432
433         // Send the messages
434
sendMessages(session, destination, 4);
435
436         assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
437         Thread.sleep(200);
438         
439         // Make sure only 4 messages were delivered.
440
assertEquals(4, counter.get());
441     }
442
443     public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
444         addCombinationValues("deliveryMode", new Object JavaDoc[] { new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
445                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
446         addCombinationValues("ackMode", new Object JavaDoc[] { new Integer JavaDoc(Session.AUTO_ACKNOWLEDGE),
447                 new Integer JavaDoc(Session.DUPS_OK_ACKNOWLEDGE), new Integer JavaDoc(Session.CLIENT_ACKNOWLEDGE) });
448         addCombinationValues("destinationType", new Object JavaDoc[] { new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE), });
449     }
450
451     public void testUnackedWithPrefetch1StayInQueue() throws Exception JavaDoc {
452
453         // Set prefetch to 1
454
connection.getPrefetchPolicy().setAll(1);
455         connection.start();
456
457         // Use all the ack modes
458
Session JavaDoc session = connection.createSession(false, ackMode);
459         destination = createDestination(session, destinationType);
460         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
461
462         // Send the messages
463
sendMessages(session, destination, 4);
464
465         // Only pick up the first 2 messages.
466
Message message = null;
467         for (int i = 0; i < 2; i++) {
468             message = consumer.receive(1000);
469             assertNotNull(message);
470         }
471         message.acknowledge();
472
473         connection.close();
474         connection = (ActiveMQConnection) factory.createConnection();
475         connections.add(connection);
476         connection.getPrefetchPolicy().setAll(1);
477         connection.start();
478
479         // Use all the ack modes
480
session = connection.createSession(false, ackMode);
481         consumer = session.createConsumer(destination);
482
483         // Pickup the rest of the messages.
484
for (int i = 0; i < 2; i++) {
485             message = consumer.receive(1000);
486             assertNotNull(message);
487         }
488         message.acknowledge();
489         assertNull(consumer.receiveNoWait());
490
491     }
492     public void initCombosForTestPrefetch1MessageNotDispatched() {
493         addCombinationValues("deliveryMode", new Object JavaDoc[] { new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
494                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
495     }
496     
497     public void testPrefetch1MessageNotDispatched() throws Exception JavaDoc {
498
499         // Set prefetch to 1
500
connection.getPrefetchPolicy().setAll(1);
501         connection.start();
502
503         Session JavaDoc session = connection.createSession(true, 0);
504         destination = new ActiveMQQueue("TEST");
505         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
506
507         // Send 2 messages to the destination.
508
sendMessages(session, destination, 2);
509         session.commit();
510
511         // Only pick up the first message.
512
Message message1 = consumer.receive(1000);
513         assertNotNull(message1);
514         
515         // Don't acknowledge yet. This should keep our prefetch full.
516
// Since prefetch is still full, the 2nd message should get dispatched to
517
// another consumer.. lets create the 2nd consumer test that it does make sure it does.
518
ActiveMQConnection connection2 = (ActiveMQConnection) factory.createConnection();
519         connections.add(connection2);
520         Session JavaDoc session2 = connection2.createSession(true, 0);
521         MessageConsumer JavaDoc consumer2 = session2.createConsumer(destination);
522         
523         // Only pick up the 2nd messages.
524
Message message2 = consumer.receive(1000);
525         assertNotNull(message2);
526         
527         session.commit();
528         session2.commit();
529
530         assertNull(consumer.receiveNoWait());
531
532     }
533     
534     public void initCombosForTestDontStart() {
535         addCombinationValues("deliveryMode", new Object JavaDoc[] { new Integer JavaDoc(DeliveryMode.NON_PERSISTENT), });
536         addCombinationValues("destinationType", new Object JavaDoc[] { new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
537                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE), });
538     }
539
540     public void testDontStart() throws Exception JavaDoc {
541
542         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
543         destination = createDestination(session, destinationType);
544         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
545
546         // Send the messages
547
sendMessages(session, destination, 1);
548
549         // Make sure no messages were delivered.
550
assertNull(consumer.receive(1000));
551     }
552
553     public void initCombosForTestStartAfterSend() {
554         addCombinationValues("deliveryMode", new Object JavaDoc[] { new Integer JavaDoc(DeliveryMode.NON_PERSISTENT), });
555         addCombinationValues("destinationType", new Object JavaDoc[] { new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
556                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE), });
557     }
558
559     public void testStartAfterSend() throws Exception JavaDoc {
560
561         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
562         destination = createDestination(session, destinationType);
563         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
564
565         // Send the messages
566
sendMessages(session, destination, 1);
567
568         // Start the conncection after the message was sent.
569
connection.start();
570
571         // Make sure only 1 message was delivered.
572
assertNotNull(consumer.receive(1000));
573         assertNull(consumer.receiveNoWait());
574     }
575
576     public void initCombosForTestReceiveMessageWithConsumer() {
577         addCombinationValues("deliveryMode", new Object JavaDoc[] { new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
578                 new Integer JavaDoc(DeliveryMode.PERSISTENT) });
579         addCombinationValues("destinationType", new Object JavaDoc[] { new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
580                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE), new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
581                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE) });
582     }
583
584     public void testReceiveMessageWithConsumer() throws Exception JavaDoc {
585
586         // Receive a message with the JMS API
587
connection.start();
588         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
589         destination = createDestination(session, destinationType);
590         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
591
592         // Send the messages
593
sendMessages(session, destination, 1);
594
595         // Make sure only 1 message was delivered.
596
Message m = consumer.receive(1000);
597         assertNotNull(m);
598         assertEquals("0", ((TextMessage JavaDoc)m).getText());
599         assertNull(consumer.receiveNoWait());
600     }
601
602 }
603
Popular Tags