KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmessaging > test > Jms11UnitTest


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.test.jbossmessaging.test;
23
24 import java.util.Enumeration JavaDoc;
25 import javax.jms.Connection JavaDoc;
26 import javax.jms.ConnectionFactory JavaDoc;
27 import javax.jms.DeliveryMode JavaDoc;
28 import javax.jms.InvalidDestinationException JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30 import javax.jms.Message JavaDoc;
31 import javax.jms.MessageConsumer JavaDoc;
32 import javax.jms.MessageListener JavaDoc;
33 import javax.jms.MessageProducer JavaDoc;
34 import javax.jms.Queue JavaDoc;
35 import javax.jms.QueueBrowser JavaDoc;
36 import javax.jms.ServerSession JavaDoc;
37 import javax.jms.ServerSessionPool JavaDoc;
38 import javax.jms.Session JavaDoc;
39 import javax.jms.TemporaryQueue JavaDoc;
40 import javax.jms.TemporaryTopic JavaDoc;
41 import javax.jms.TextMessage JavaDoc;
42 import javax.jms.Topic JavaDoc;
43 import javax.jms.TopicConnection JavaDoc;
44 import javax.jms.TopicConnectionFactory JavaDoc;
45 import javax.jms.TopicSubscriber JavaDoc;
46 import javax.naming.Context JavaDoc;
47 import javax.naming.InitialContext JavaDoc;
48
49 import EDU.oswego.cs.dl.util.concurrent.CountDown;
50 import org.jboss.logging.Logger;
51 import org.jboss.test.jbossmessaging.JMSTestCase;
52
53 /**
54  * Basic tests using the jms 1.1 producer/consumer apis.
55  *
56  * @author Scott.Stark@jboss.org
57  * @version $Revision: 37406 $
58  */

59 public class Jms11UnitTest extends JMSTestCase
60 {
61    /** The default TopicFactory jndi name */
62    static String JavaDoc TOPIC_FACTORY = "ConnectionFactory";
63    /** The default QueueFactory jndi name */
64    static String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
65
66    static String JavaDoc TEST_QUEUE = "queue/testQueue";
67    static String JavaDoc TEST_TOPIC = "topic/testTopic";
68    static String JavaDoc TEST_DURABLE_TOPIC = "topic/testDurableTopic";
69
70    //JMSProviderAdapter providerAdapter;
71
static Context JavaDoc context;
72    static Connection JavaDoc queueConnection;
73    static Connection JavaDoc topicConnection;
74
75    public Jms11UnitTest(String JavaDoc name) throws Exception JavaDoc
76    {
77       super(name);
78    }
79
80    // Emptys out all the messages in a queue
81
protected void drainQueue() throws Exception JavaDoc
82    {
83       Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
84       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
85
86       MessageConsumer JavaDoc receiver = session.createConsumer(queue);
87       Message JavaDoc message = receiver.receive(50);
88       int c = 0;
89       while (message != null)
90       {
91          message = receiver.receive(50);
92          c++;
93       }
94
95       if (c != 0)
96          getLog().debug(" Drained " + c + " messages from the queue");
97
98       session.close();
99    }
100
101    protected void connect() throws Exception JavaDoc
102    {
103       if (context == null)
104       {
105          context = new InitialContext JavaDoc();
106       }
107       ConnectionFactory JavaDoc queueFactory = (ConnectionFactory JavaDoc) context.lookup(QUEUE_FACTORY);
108       queueConnection = queueFactory.createConnection();
109
110       ConnectionFactory JavaDoc topicFactory = (ConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
111       topicConnection = topicFactory.createConnection();
112       getLog().debug("Connection to JBossMQ established.");
113    }
114
115    protected void disconnect() throws Exception JavaDoc
116    {
117       queueConnection.close();
118       topicConnection.close();
119    }
120
121    /**
122     * Test that messages are ordered by message arrival and priority.
123     * This also tests :
124     * Using a non-transacted AUTO_ACKNOWLEDGE session
125     * Using a MessageConsumer
126     * Using a QueueSender
127     * Sending PERSITENT and NON_PERSISTENT text messages.
128     * Using a QueueBrowser
129     */

130    public void testQueueMessageOrder() throws Exception JavaDoc
131    {
132
133       getLog().debug("Starting QueueMessageOrder test");
134
135       connect();
136
137       queueConnection.start();
138
139       drainQueue();
140
141       Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
142       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
143       MessageProducer JavaDoc sender = session.createProducer(queue);
144
145       TextMessage JavaDoc message = session.createTextMessage();
146       message.setText("Normal message");
147       sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0);
148       message.setText("Persistent message");
149       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
150       message.setText("High Priority Persistent message");
151       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
152
153       QueueBrowser JavaDoc browser = session.createBrowser(queue);
154       Enumeration JavaDoc i = browser.getEnumeration();
155       getLog().debug(message.getText());
156
157       message = (TextMessage JavaDoc) i.nextElement();
158       getLog().debug(message.getText());
159
160       message = (TextMessage JavaDoc) i.nextElement();
161       getLog().debug(message.getText());
162
163       disconnect();
164       getLog().debug("QueueMessageOrder passed");
165    }
166
167    /**
168     * Test that temporary queues can be deleted.
169     */

170    public void testTemporaryQueueDelete() throws Exception JavaDoc
171    {
172
173       getLog().debug("Starting TemporaryQueueDelete test");
174       connect();
175
176       Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
177       TemporaryQueue JavaDoc queue = session.createTemporaryQueue();
178
179       queue.delete();
180
181       disconnect();
182
183       getLog().debug("TemporaryQueueDelete passed");
184    }
185
186    /**
187     * Test that temporary topics can be deleted.
188     */

189    public void testTemporaryTopicDelete() throws Exception JavaDoc
190    {
191
192       getLog().debug("Starting TemporaryTopicDelete test");
193       connect();
194
195       Session JavaDoc session = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
196       TemporaryTopic JavaDoc topic = session.createTemporaryTopic();
197
198       topic.delete();
199
200       disconnect();
201
202       getLog().debug("TemporaryTopicDelete passed");
203    }
204
205    /**
206     * Test invalid destination trying to send a message.
207     */

208    public void testInvalidDestinationQueueSend() throws Exception JavaDoc
209    {
210
211       getLog().debug("Starting InvaidDestinationQueueSend test");
212       connect();
213
214       Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
215       TemporaryQueue JavaDoc queue = session.createTemporaryQueue();
216       MessageProducer JavaDoc sender = session.createProducer(queue);
217       queue.delete();
218
219       TextMessage JavaDoc message = session.createTextMessage("hello");
220       boolean caught = false;
221       try
222       {
223          sender.send(message);
224       }
225       catch (InvalidDestinationException JavaDoc expected)
226       {
227          caught = true;
228       }
229
230       disconnect();
231
232       assertTrue("Expected an InvalidDestinationException", caught);
233
234       getLog().debug("InvaldDestinationQueueSend passed");
235    }
236
237    /**
238     * Test invalid destination trying to browse a message.
239     */

240    public void testInvalidDestinationQueueBrowse() throws Exception JavaDoc
241    {
242
243       getLog().debug("Starting InvalidDestinationQueueBrowse test");
244       connect();
245
246       Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
247       TemporaryQueue JavaDoc queue = session.createTemporaryQueue();
248       QueueBrowser JavaDoc browser = session.createBrowser(queue);
249       queue.delete();
250
251       boolean caught = false;
252       try
253       {
254          browser.getEnumeration();
255       }
256       catch (InvalidDestinationException JavaDoc expected)
257       {
258          caught = true;
259       }
260
261       disconnect();
262
263       assertTrue("Expected an InvalidDestinationException", caught);
264
265       getLog().debug("InvalidDestinationQueueBrowse passed");
266    }
267
268    /**
269     * Test invalid destination trying to send a message.
270     */

271    public void testInvalidDestinationTopicPublish() throws Exception JavaDoc
272    {
273
274       getLog().debug("Starting InvaidDestinationTopicPublish test");
275       connect();
276
277       Session JavaDoc session = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
278       TemporaryTopic JavaDoc topic = session.createTemporaryTopic();
279       MessageProducer JavaDoc publisher = session.createProducer(topic);
280       topic.delete();
281
282       TextMessage JavaDoc message = session.createTextMessage("hello");
283       boolean caught = false;
284       try
285       {
286          publisher.send(message);
287       }
288       catch (InvalidDestinationException JavaDoc expected)
289       {
290          caught = true;
291       }
292
293       disconnect();
294
295       assertTrue("Expected an InvalidDestinationException", caught);
296
297       getLog().debug("InvaldDestinationTopicPublish passed");
298    }
299
300    /**
301     * Test errors trying on topic subscribe.
302     */

303    public void testErrorsTopicSubscribe() throws Exception JavaDoc
304    {
305
306       getLog().debug("Starting InvalidDestinationTopicSubscribe test");
307       connect();
308
309       try
310       {
311          Session JavaDoc session = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
312          Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
313          TemporaryTopic JavaDoc temp = session.createTemporaryTopic();
314
315          boolean caught = false;
316          try
317          {
318             session.createConsumer(null);
319          }
320          catch (InvalidDestinationException JavaDoc expected)
321          {
322             caught = true;
323          }
324          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
325
326          caught = false;
327          try
328          {
329             session.createConsumer(null, null, true);
330          }
331          catch (InvalidDestinationException JavaDoc expected)
332          {
333             caught = true;
334          }
335          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
336
337          caught = false;
338          try
339          {
340             session.createDurableSubscriber(null, "NotUsed");
341          }
342          catch (InvalidDestinationException JavaDoc expected)
343          {
344             caught = true;
345          }
346          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
347
348          caught = false;
349          try
350          {
351             session.createDurableSubscriber(temp, "NotUsed");
352          }
353          catch (InvalidDestinationException JavaDoc expected)
354          {
355             caught = true;
356          }
357          assertTrue("Expected an InvalidDestinationException for a temporary topic", caught);
358
359          caught = false;
360          try
361          {
362             session.createDurableSubscriber(null, "NotUsed", null, true);
363          }
364          catch (InvalidDestinationException JavaDoc expected)
365          {
366             caught = true;
367          }
368          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
369
370          caught = false;
371          try
372          {
373             session.createDurableSubscriber(temp, "NotUsed", null, true);
374          }
375          catch (InvalidDestinationException JavaDoc expected)
376          {
377             caught = true;
378          }
379          assertTrue("Expected an InvalidDestinationException for a temporary topic", caught);
380
381          caught = false;
382          try
383          {
384             session.createDurableSubscriber(topic, null);
385          }
386          catch (Exception JavaDoc expected)
387          {
388             caught = true;
389          }
390          assertTrue("Expected a Exception for a null subscription", caught);
391
392          caught = false;
393          try
394          {
395             session.createDurableSubscriber(topic, null, null, false);
396          }
397          catch (Exception JavaDoc expected)
398          {
399             caught = true;
400          }
401          assertTrue("Expected a Exception for a null subscription", caught);
402
403          caught = false;
404          try
405          {
406             session.createDurableSubscriber(topic, " ");
407          }
408          catch (Exception JavaDoc expected)
409          {
410             caught = true;
411          }
412          assertTrue("Expected a Exception for an empty subscription", caught);
413
414          caught = false;
415          try
416          {
417             session.createDurableSubscriber(topic, " ", null, false);
418          }
419          catch (Exception JavaDoc expected)
420          {
421             caught = true;
422          }
423          assertTrue("Expected a Exception for an empty subscription", caught);
424       }
425       finally
426       {
427          disconnect();
428       }
429
430       getLog().debug("InvalidDestinationTopicSubscriber passed");
431    }
432
433    /**
434     * Test create queue.
435     */

436    public void testCreateQueue() throws Exception JavaDoc
437    {
438
439       getLog().debug("Starting create queue test");
440       connect();
441
442       Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
443
444       Queue JavaDoc jndiQueue = (Queue JavaDoc) getInitialContext().lookup("queue/testQueue");
445       Queue JavaDoc createQueue = session.createQueue(jndiQueue.getQueueName());
446       assertTrue("Failed for " + QUEUE_FACTORY, jndiQueue.equals(createQueue));
447
448       getLog().debug("InvalidDestinationTopicSubscriber passed");
449    }
450
451    public void testMessageListener() throws Exception JavaDoc
452    {
453       getLog().debug("Starting create queue test");
454
455       connect();
456       queueConnection.start();
457       drainQueue();
458       final CountDown counter1 = new CountDown(3);
459
460       Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
461       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
462
463       MessageConsumer JavaDoc receiver = session.createConsumer(queue);
464       receiver.setMessageListener(new MessageListener JavaDoc()
465       {
466          public void onMessage(Message JavaDoc msg)
467          {
468             Logger log = Logger.getLogger(getClass().getName());
469             log.debug("ML");
470             try
471             {
472                if (msg instanceof TextMessage JavaDoc)
473                {
474                   log.debug(((TextMessage JavaDoc) msg).getText());
475                   counter1.release();
476                }
477             }
478             catch (Exception JavaDoc e)
479             {
480             }
481          }
482       });
483
484       MessageProducer JavaDoc sender = session.createProducer(queue);
485
486       TextMessage JavaDoc message = session.createTextMessage();
487       message.setText("Normal message");
488       sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0);
489       //sender.send(queue, message, DeliveryMode.NON_PERSISTENT, 4, 0);
490
message.setText("Persistent message");
491       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
492       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
493
message.setText("High Priority Persistent message");
494       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
495       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
496

497       // Wait for the msgs to be received
498
counter1.acquire();
499       log.debug("MessageListener1 received the TMs sent");
500
501       final CountDown counter2 = new CountDown(2);
502       receiver.setMessageListener(new MessageListener JavaDoc()
503       {
504          public void onMessage(Message JavaDoc msg)
505          {
506             Logger log = Logger.getLogger(getClass().getName());
507             log.debug("ML 2");
508             try
509             {
510                if (msg instanceof TextMessage JavaDoc)
511                {
512                   log.debug(((TextMessage JavaDoc) msg).getText());
513                   counter2.release();
514                }
515             }
516             catch (Exception JavaDoc e)
517             {
518             }
519          }
520       });
521
522       message.setText("Persistent message");
523       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
524       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
525
message.setText("High Priority Persistent message");
526       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
527       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
528

529       // Wait for the msgs to be received
530
counter2.acquire();
531       log.debug("MessageListener2 received the TMs sent");
532
533       receiver.setMessageListener(null);
534
535       message.setText("Persistent message");
536       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
537       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
538
message.setText("High Priority Persistent message");
539       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
540       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
541

542       sender.close();
543       drainQueue();
544       disconnect();
545       getLog().debug("MessageListener test passed");
546    }
547
548    public void testApplicationServerStuff() throws Exception JavaDoc
549    {
550       getLog().debug("Starting testing app server stuff");
551       connect();
552
553       Queue JavaDoc testQueue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
554       final Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
555
556       session.setMessageListener(new MessageListener JavaDoc()
557       {
558          public void onMessage(Message JavaDoc mess)
559          {
560             Logger log = Logger.getLogger(getClass().getName());
561             log.debug("Processing message");
562             try
563             {
564                if (mess instanceof TextMessage JavaDoc)
565                   log.debug(((TextMessage JavaDoc) mess).getText());
566             }
567             catch (Exception JavaDoc e)
568             {
569                log.error("Error", e);
570             }
571          }
572       });
573
574       MessageProducer JavaDoc sender = session.createProducer(testQueue);
575       sender.send(session.createTextMessage("Hi"));
576       sender.send(session.createTextMessage("There"));
577       sender.send(session.createTextMessage("Guys"));
578       queueConnection.createConnectionConsumer(testQueue, null, new ServerSessionPool JavaDoc()
579       {
580          public ServerSession JavaDoc getServerSession()
581          {
582             Logger.getLogger(getClass().getName()).debug("Getting server session.");
583             return new ServerSession JavaDoc()
584             {
585                public Session JavaDoc getSession()
586                {
587                   return session;
588                }
589                public void start()
590                {
591                   Logger.getLogger(getClass().getName()).debug("Starting server session.");
592                   session.run();
593                }
594             };
595          }
596       }, 10);
597
598       queueConnection.start();
599
600       try
601       {
602          Thread.sleep(5 * 1000);
603       }
604       catch (Exception JavaDoc e)
605       {
606       }
607
608       disconnect();
609       getLog().debug("Testing app server stuff passed");
610    }
611
612    private void drainMessagesForTopic(MessageConsumer JavaDoc sub) throws JMSException JavaDoc
613    {
614       Message JavaDoc msg = sub.receive(50);
615       int c = 0;
616       while (msg != null)
617       {
618          c++;
619          if (msg instanceof TextMessage JavaDoc)
620             getLog().debug(((TextMessage JavaDoc) msg).getText());
621          msg = sub.receive(50);
622       }
623       getLog().debug("Received " + c + " messages from topic.");
624    }
625
626    public void testTopics() throws Exception JavaDoc
627    {
628       getLog().debug("Starting Topic test");
629       connect();
630
631       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
632       topicConnection = topicFactory.createTopicConnection("john", "needle");
633
634       topicConnection.start();
635
636       //set up some subscribers to the topic
637
Session JavaDoc session = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
638       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
639
640       TopicSubscriber JavaDoc sub1 = session.createDurableSubscriber(topic, "sub1");
641       MessageConsumer JavaDoc sub2 = session.createConsumer(topic);
642       MessageConsumer JavaDoc sub3 = session.createConsumer(topic);
643
644       //Now a sender
645
MessageProducer JavaDoc sender = session.createProducer(topic);
646
647       //send some messages
648
sender.send(session.createTextMessage("Message 1"));
649       sender.send(session.createTextMessage("Message 2"));
650       sender.send(session.createTextMessage("Message 3"));
651       drainMessagesForTopic(sub1);
652       drainMessagesForTopic(sub2);
653       drainMessagesForTopic(sub3);
654
655       //close some subscribers
656
sub1.close();
657       sub2.close();
658
659       //send some more messages
660
sender.send(session.createTextMessage("Message 4"));
661       sender.send(session.createTextMessage("Message 5"));
662       sender.send(session.createTextMessage("Message 6"));
663
664       //give time for message 4 to be negatively acked (as it will be cause last receive timed out)
665
try
666       {
667          Thread.sleep(5 * 1000);
668       }
669       catch (InterruptedException JavaDoc e)
670       {
671       }
672
673       drainMessagesForTopic(sub3);
674
675       //open subscribers again.
676
sub1 = session.createDurableSubscriber(topic, "sub1");
677       sub2 = session.createConsumer(topic);
678
679       //Send a final message
680
sender.send(session.createTextMessage("Final message"));
681       sender.close();
682
683       drainMessagesForTopic(sub1);
684       drainMessagesForTopic(sub2);
685       drainMessagesForTopic(sub3);
686
687       sub1.close();
688       sub2.close();
689       sub3.close();
690
691       session.unsubscribe("sub1");
692
693       topicConnection.stop();
694       topicConnection.close();
695
696       disconnect();
697       getLog().debug("Topic test passed");
698    }
699
700    /**
701     * Test to seeif the NoLocal feature of topics works.
702     * Messages sended from the same connection should not
703     * be received by Subscribers on the same connection.
704     */

705    public void testTopicNoLocal() throws Exception JavaDoc
706    {
707       getLog().debug("Starting TopicNoLocal test");
708       connect();
709
710       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
711       TopicConnection JavaDoc topicConnection1 = topicFactory.createTopicConnection();
712       topicConnection1.start();
713       TopicConnection JavaDoc topicConnection2 = topicFactory.createTopicConnection();
714       topicConnection2.start();
715
716       // We don't want local messages on this topic.
717
Session JavaDoc session1 = topicConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
718       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
719       MessageConsumer JavaDoc subscriber1 = session1.createConsumer(topic, null, true);
720       MessageProducer JavaDoc sender1 = session1.createProducer(topic);
721
722       //Now a sender
723
Session JavaDoc session2 = topicConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
724       MessageProducer JavaDoc sender2 = session2.createProducer(topic);
725
726       drainMessagesForTopic(subscriber1);
727
728       //send some messages
729
sender1.send(session1.createTextMessage("Local Message"));
730       sender2.send(session2.createTextMessage("Remote Message"));
731
732       // Get the messages, we should get the remote message
733
// but not the local message
734
TextMessage JavaDoc msg1 = (TextMessage JavaDoc) subscriber1.receive(2000);
735       if (msg1 == null)
736       {
737          fail("Did not get any messages");
738       }
739       else
740       {
741          getLog().debug("Got message: " + msg1);
742          if (msg1.getText().equals("Local Message"))
743          {
744             fail("Got a local message");
745          }
746          TextMessage JavaDoc msg2 = (TextMessage JavaDoc) subscriber1.receive(2000);
747          if (msg2 != null)
748          {
749             getLog().debug("Got message: " + msg2);
750             fail("Got an extra message. msg1:" + msg1 + ", msg2:" + msg2);
751          }
752       }
753
754       topicConnection1.stop();
755       topicConnection1.close();
756       topicConnection2.stop();
757       topicConnection2.close();
758
759       disconnect();
760       getLog().debug("TopicNoLocal test passed");
761    }
762
763    /**
764     * Test to see whether no local works if a message
765     * was created somewhere else.
766     */

767    public void testTopicNoLocalBounce() throws Exception JavaDoc
768    {
769       getLog().debug("Starting TopicNoLocalBounce test");
770       connect();
771
772       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
773       TopicConnection JavaDoc topicConnection1 = topicFactory.createTopicConnection();
774       topicConnection1.start();
775       TopicConnection JavaDoc topicConnection2 = topicFactory.createTopicConnection();
776       topicConnection2.start();
777
778       // Session 1
779
Session JavaDoc session1 = topicConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
780       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
781       MessageConsumer JavaDoc subscriber1 = session1.createConsumer(topic, null, true);
782       MessageProducer JavaDoc sender1 = session1.createProducer(topic);
783
784       // Session 2
785
Session JavaDoc session2 = topicConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
786       MessageConsumer JavaDoc subscriber2 = session2.createConsumer(topic, null, true);
787       MessageProducer JavaDoc sender2 = session2.createProducer(topic);
788
789       drainMessagesForTopic(subscriber1);
790       drainMessagesForTopic(subscriber2);
791
792       //send the message
793
sender1.send(session1.createTextMessage("Message"));
794
795       assertTrue("Subscriber1 should not get a message", subscriber1.receiveNoWait() == null);
796       TextMessage JavaDoc msg = (TextMessage JavaDoc) subscriber2.receive(2000);
797       assertTrue("Subscriber2 should get a message, got " + msg, msg != null && msg.getText().equals("Message"));
798
799       //send it back
800
sender2.send(msg);
801
802       msg = (TextMessage JavaDoc) subscriber1.receive(2000);
803       assertTrue("Subscriber1 should get a message, got " + msg, msg != null && msg.getText().equals("Message"));
804       assertTrue("Subscriber2 should not get a message", subscriber2.receiveNoWait() == null);
805
806       topicConnection1.stop();
807       topicConnection1.close();
808       topicConnection2.stop();
809       topicConnection2.close();
810
811       disconnect();
812       getLog().debug("TopicNoLocalBounce test passed");
813    }
814
815    /**
816     * Test subscribing to a topic with one selector, then changing to another
817     */

818    public void testTopicSelectorChange() throws Exception JavaDoc
819    {
820       getLog().debug("Starting TopicSelectorChange test");
821
822       getLog().debug("Create topic connection");
823       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
824       topicConnection = topicFactory.createTopicConnection("john", "needle");
825       topicConnection.start();
826
827       try
828       {
829          getLog().debug("Retrieving Topic");
830          Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_DURABLE_TOPIC);
831
832          getLog().debug("Creating a send session");
833          Session JavaDoc sendSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
834          MessageProducer JavaDoc sender = sendSession.createProducer(topic);
835
836          getLog().debug("Clearing the topic");
837          Session JavaDoc subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
838          MessageConsumer JavaDoc subscriber = subSession.createDurableSubscriber(topic, "test");
839          Message JavaDoc message = subscriber.receive(50);
840          while (message != null)
841             message = subscriber.receive(50);
842          subSession.close();
843
844          getLog().debug("Subscribing to topic, looking for Value = 'A'");
845          subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
846          subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false);
847
848          getLog().debug("Send some messages");
849          message = sendSession.createTextMessage("Message1");
850          message.setStringProperty("Value", "A");
851          sender.send(message);
852          message = sendSession.createTextMessage("Message2");
853          message.setStringProperty("Value", "A");
854          sender.send(message);
855          message = sendSession.createTextMessage("Message3");
856          message.setStringProperty("Value", "B");
857          sender.send(message);
858
859          getLog().debug("Retrieving the A messages");
860          message = subscriber.receive(2000);
861          assertTrue("Expected message 1", message != null);
862          assertTrue("Should get an A", message.getStringProperty("Value").equals("A"));
863          message = subscriber.receive(2000);
864          assertTrue("Expected message 2", message != null);
865          assertTrue("Should get a second A", message.getStringProperty("Value").equals("A"));
866          assertTrue("That should be it for A", subscriber.receive(2000) == null);
867
868          getLog().debug("Closing the subscriber without acknowledgement");
869          subSession.close();
870
871          getLog().debug("Subscribing to topic, looking for Value = 'B'");
872          subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
873          subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'B'", false);
874
875          getLog().debug("Retrieving the non-existent B messages");
876          assertTrue("B should not be there", subscriber.receive(2000) == null);
877
878          getLog().debug("Closing the subscriber.");
879          subSession.close();
880
881          getLog().debug("Subscribing to topic, looking for those Value = 'A'");
882          subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
883          subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false);
884          assertTrue("Should not be any A the subscription was changed", subscriber.receive(2000) == null);
885          subSession.close();
886
887          getLog().debug("Subscribing to topic, looking for everything");
888          subSession = topicConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
889          subscriber = subSession.createDurableSubscriber(topic, "test", null, false);
890
891          message = sendSession.createTextMessage("Message4");
892          message.setStringProperty("Value", "A");
893          sender.send(message);
894
895          message = subscriber.receive(2000);
896          assertTrue("Expected message 4", message != null);
897          assertTrue("Should be an A which we don't acknowledge", message.getStringProperty("Value").equals("A"));
898          subSession.close();
899
900          getLog().debug("Subscribing to topic, looking for the Value = 'A'");
901          subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
902          subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false);
903          assertTrue(
904             "Should not be any A, the subscription was changed. Even though the old and new selectors match the message",
905             subscriber.receive(2000) == null);
906          subSession.close();
907
908          getLog().debug("Closing the send session");
909          sendSession.close();
910
911          getLog().debug("Removing the subscription");
912          subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
913          subSession.unsubscribe("test");
914
915       }
916       finally
917       {
918          getLog().debug("Closing the connection");
919          topicConnection.close();
920       }
921
922       getLog().debug("TopicSelectorChange test passed");
923    }
924
925    /**
926     * Test subscribing to a topic with a null and empty selector
927     */

928    public void testTopicSelectorNullOrEmpty() throws Exception JavaDoc
929    {
930       getLog().debug("Starting TopicSelectorNullOrEmpty test");
931
932       getLog().debug("Create topic connection");
933       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
934       topicConnection = topicFactory.createTopicConnection("john", "needle");
935       topicConnection.start();
936
937       try
938       {
939          getLog().debug("Retrieving Topic");
940          Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_DURABLE_TOPIC);
941
942          getLog().debug("Creating a send session");
943          Session JavaDoc sendSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
944          MessageProducer JavaDoc sender = sendSession.createProducer(topic);
945
946          getLog().debug("Clearing the topic");
947          Session JavaDoc subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
948          MessageConsumer JavaDoc subscriber = subSession.createDurableSubscriber(topic, "test");
949          TextMessage JavaDoc message = (TextMessage JavaDoc) subscriber.receive(50);
950          while (message != null)
951             message = (TextMessage JavaDoc) subscriber.receive(50);
952          subSession.close();
953
954          getLog().debug("Subscribing to topic, with null selector");
955          subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
956          subscriber = subSession.createDurableSubscriber(topic, "test", null, false);
957
958          getLog().debug("Send a message");
959          message = sendSession.createTextMessage("Message1");
960          sender.send(message);
961
962          getLog().debug("Retrieving the message");
963          message = (TextMessage JavaDoc) subscriber.receive(2000);
964          assertTrue("Expected message 1", message != null);
965          assertTrue("Should get Message1", message.getText().equals("Message1"));
966          getLog().debug("Closing the subscriber");
967          subSession.close();
968
969          getLog().debug("Subscribing to topic, with an empty selector");
970          subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
971          subscriber = subSession.createDurableSubscriber(topic, "test", " ", false);
972
973          getLog().debug("Send a message");
974          message = sendSession.createTextMessage("Message2");
975          sender.send(message);
976
977          getLog().debug("Retrieving the message");
978          message = (TextMessage JavaDoc) subscriber.receive(2000);
979          assertTrue("Expected message 2", message != null);
980          assertTrue("Should get Message2", message.getText().equals("Message2"));
981          getLog().debug("Closing the subscriber");
982
983          getLog().debug("Removing the subscription");
984          subSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
985          subSession.unsubscribe("test");
986          subSession.close();
987
988       }
989       finally
990       {
991          getLog().debug("Closing the connection");
992          topicConnection.close();
993       }
994
995       getLog().debug("TopicSelectorNullOrEmpty test passed");
996    }
997
998    /**
999     * Test sending/receiving an outdated message
1000    */

1001   public void testSendReceiveOutdated() throws Exception JavaDoc
1002   {
1003      getLog().debug("Starting SendReceiveOutdated test");
1004
1005      connect();
1006      try
1007      {
1008         queueConnection.start();
1009         drainQueue();
1010         queueConnection.stop();
1011
1012         Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
1013         Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
1014         MessageProducer JavaDoc sender = session.createProducer(queue);
1015         MessageConsumer JavaDoc receiver = session.createConsumer(queue);
1016
1017         // Send a message that has expired
1018
TextMessage JavaDoc message = session.createTextMessage("Outdated");
1019         sender.send(message, DeliveryMode.PERSISTENT, 4, 1);
1020         Thread.sleep(100);
1021
1022         // Send a message that has not expired
1023
message = session.createTextMessage("OK");
1024         sender.send(message);
1025
1026         // Try to receive the message the not expired message
1027
queueConnection.start();
1028         message = (TextMessage JavaDoc) receiver.receiveNoWait();
1029         assertEquals("OK", message.getText());
1030
1031         // Should be no more
1032
assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null);
1033      }
1034      finally
1035      {
1036         disconnect();
1037      }
1038
1039      getLog().debug("SendReceiveOutdated test passed");
1040   }
1041
1042   public void testSendReceiveExpired() throws Exception JavaDoc
1043   {
1044      getLog().debug("Starting testSendReceiveExpired test");
1045
1046      connect();
1047      try
1048      {
1049         queueConnection.start();
1050         drainQueue();
1051
1052         Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
1053         Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
1054         MessageProducer JavaDoc sender = session.createProducer(queue);
1055         MessageConsumer JavaDoc receiver = session.createConsumer(queue);
1056
1057         // Send a message that expires in 5 seconds
1058
TextMessage JavaDoc message = session.createTextMessage("5 Second Expiration");
1059         sender.send(message, DeliveryMode.PERSISTENT, 4, 5*1000);
1060         // Send a message that has not expired
1061
message = session.createTextMessage("OK");
1062         sender.send(message);
1063         // Sleep 6 seconds
1064
Thread.sleep(6*1000);
1065         // Try to receive the OK message
1066
message = (TextMessage JavaDoc) receiver.receiveNoWait();
1067         assertEquals("OK", message.getText());
1068
1069         // Should be no more
1070
assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null);
1071
1072         // Send a message that expires in 10 seconds
1073
message = session.createTextMessage("10 Second Expiration");
1074         sender.send(message, DeliveryMode.PERSISTENT, 4, 10*1000);
1075         // Send a message that has not expired
1076
message = session.createTextMessage("OK");
1077         sender.send(message);
1078         // Sleep 1 seconds
1079
Thread.sleep(1*1000);
1080         // Try to receive the messages
1081
message = (TextMessage JavaDoc) receiver.receiveNoWait();
1082         assertEquals("10 Second Expiration", message.getText());
1083         message = (TextMessage JavaDoc) receiver.receiveNoWait();
1084         assertEquals("OK", message.getText());
1085
1086         // Should be no more
1087
assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null);
1088         
1089         // Test that JMSExpiration has no affect
1090
message = session.createTextMessage("5 Second Expiration");
1091         message.setJMSExpiration(System.currentTimeMillis() + 5*1000);
1092         sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
1093         // Send a message that has not expired
1094
message = session.createTextMessage("OK");
1095         sender.send(message);
1096         // Sleep 6 seconds
1097
Thread.sleep(6*1000);
1098         // Try to receive the OK message
1099
message = (TextMessage JavaDoc) receiver.receiveNoWait();
1100         assertEquals("5 Second Expiration", message.getText());
1101         message = (TextMessage JavaDoc) receiver.receiveNoWait();
1102         assertEquals("OK", message.getText());
1103         assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null);
1104      }
1105      finally
1106      {
1107         disconnect();
1108      }
1109   }
1110
1111   class Synch
1112   {
1113      boolean waiting = false;
1114      String JavaDoc text;
1115      public synchronized void doWait(long timeout) throws InterruptedException JavaDoc
1116      {
1117         waiting = true;
1118         this.wait(timeout);
1119      }
1120      public synchronized void doNotify() throws InterruptedException JavaDoc
1121      {
1122         while (waiting == false)
1123            wait(100);
1124         this.notifyAll();
1125      }
1126      public String JavaDoc getText()
1127      {
1128         return text;
1129      }
1130      public void setText(String JavaDoc text)
1131      {
1132         this.text = text;
1133      }
1134   }
1135
1136   /**
1137    * Test sending/listening an outdated message
1138    */

1139   public void testSendListenOutdated() throws Exception JavaDoc
1140   {
1141      getLog().debug("Starting SendListenOutdated test");
1142
1143      connect();
1144      try
1145      {
1146         queueConnection.start();
1147         drainQueue();
1148         queueConnection.stop();
1149
1150         Session JavaDoc session = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
1151         Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
1152         MessageProducer JavaDoc sender = session.createProducer(queue);
1153         MessageConsumer JavaDoc receiver = session.createConsumer(queue);
1154
1155         // Send a message that has expired
1156
TextMessage JavaDoc message = session.createTextMessage("Outdated");
1157         sender.send(message, DeliveryMode.PERSISTENT, 4, 1);
1158         Thread.sleep(100);
1159
1160         // Send a message that has not expired
1161
message = session.createTextMessage("OK");
1162         sender.send(message);
1163
1164         // Try to receive the message the not expired message
1165
final Synch synch = new Synch();
1166         MessageListener JavaDoc messagelistener = new MessageListener JavaDoc()
1167         {
1168            public void onMessage(Message JavaDoc message)
1169            {
1170               listenOutdated(message, synch);
1171            }
1172         };
1173         receiver.setMessageListener(messagelistener);
1174         queueConnection.start();
1175
1176         synch.doWait(10000);
1177         assertEquals("OK", synch.getText());
1178      }
1179      finally
1180      {
1181         disconnect();
1182      }
1183
1184      getLog().debug("SendListenOutdated test passed");
1185   }
1186
1187   private void listenOutdated(Message JavaDoc message, Synch synch)
1188   {
1189      try
1190      {
1191         synch.setText(((TextMessage JavaDoc) message).getText());
1192      }
1193      catch (Throwable JavaDoc t)
1194      {
1195         log.error("Error:", t);
1196      }
1197      finally
1198      {
1199         try
1200         {
1201            synch.doNotify();
1202         }
1203         catch (Throwable JavaDoc t)
1204         {
1205            log.error("Error:", t);
1206         }
1207      }
1208   }
1209}
1210
Popular Tags