KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmq > test > JBossMQUnitTest


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.test.jbossmq.test;
23
24 import java.util.Enumeration JavaDoc;
25
26 import javax.jms.DeliveryMode JavaDoc;
27 import javax.jms.InvalidDestinationException JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29 import javax.jms.Message JavaDoc;
30 import javax.jms.MessageListener JavaDoc;
31 import javax.jms.Queue JavaDoc;
32 import javax.jms.QueueBrowser JavaDoc;
33 import javax.jms.QueueConnection JavaDoc;
34 import javax.jms.QueueConnectionFactory JavaDoc;
35 import javax.jms.QueueReceiver JavaDoc;
36 import javax.jms.QueueRequestor JavaDoc;
37 import javax.jms.QueueSender JavaDoc;
38 import javax.jms.QueueSession JavaDoc;
39 import javax.jms.ServerSession JavaDoc;
40 import javax.jms.ServerSessionPool JavaDoc;
41 import javax.jms.Session JavaDoc;
42 import javax.jms.TemporaryQueue JavaDoc;
43 import javax.jms.TemporaryTopic JavaDoc;
44 import javax.jms.TextMessage JavaDoc;
45 import javax.jms.Topic JavaDoc;
46 import javax.jms.TopicConnection JavaDoc;
47 import javax.jms.TopicConnectionFactory JavaDoc;
48 import javax.jms.TopicPublisher JavaDoc;
49 import javax.jms.TopicSession JavaDoc;
50 import javax.jms.TopicSubscriber JavaDoc;
51 import javax.naming.Context JavaDoc;
52 import javax.naming.InitialContext JavaDoc;
53
54 import org.jboss.logging.Logger;
55 import org.jboss.test.JBossTestCase;
56
57 import EDU.oswego.cs.dl.util.concurrent.CountDown;
58
59 /**
60  * Basic jms tests.
61  *
62  * @author Scott.Stark@jboss.org
63  * @version $Revision: 58115 $
64  */

65 public class JBossMQUnitTest extends JBossTestCase
66 {
67    /** The default TopicFactory jndi name */
68    static String JavaDoc TOPIC_FACTORY = "ConnectionFactory";
69    /** The default QueueFactory jndi name */
70    static String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
71
72    static String JavaDoc TEST_QUEUE = "queue/testQueue";
73    static String JavaDoc TEST_TOPIC = "topic/testTopic";
74    static String JavaDoc TEST_DURABLE_TOPIC = "topic/testDurableTopic";
75
76    //JMSProviderAdapter providerAdapter;
77
static Context JavaDoc context;
78    static QueueConnection JavaDoc queueConnection;
79    static TopicConnection JavaDoc topicConnection;
80
81    public JBossMQUnitTest(String JavaDoc name) throws Exception JavaDoc
82    {
83       super(name);
84    }
85
86    // Emptys out all the messages in a queue
87
protected void drainQueue() throws Exception JavaDoc
88    {
89       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
90       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
91
92       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
93       Message JavaDoc message = receiver.receive(50);
94       int c = 0;
95       while (message != null)
96       {
97          message = receiver.receive(50);
98          c++;
99       }
100
101       if (c != 0)
102          getLog().debug(" Drained " + c + " messages from the queue");
103
104       session.close();
105    }
106
107    protected void connect() throws Exception JavaDoc
108    {
109
110       if (context == null)
111       {
112
113          context = new InitialContext JavaDoc();
114
115       }
116       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc) context.lookup(QUEUE_FACTORY);
117       queueConnection = queueFactory.createQueueConnection();
118
119       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
120       topicConnection = topicFactory.createTopicConnection();
121
122       getLog().debug("Connection to spyderMQ established.");
123
124    }
125
126    protected void disconnect() throws Exception JavaDoc
127    {
128       queueConnection.close();
129       topicConnection.close();
130    }
131
132    /**
133     * Test that messages are ordered by message arrival and priority.
134     * This also tests :
135     * Using a non-transacted AUTO_ACKNOWLEDGE session
136     * Using a QueueReceiver
137     * Using a QueueSender
138     * Sending PERSITENT and NON_PERSISTENT text messages.
139     * Using a QueueBrowser
140     */

141    public void testQueueMessageOrder() throws Exception JavaDoc
142    {
143
144       getLog().debug("Starting QueueMessageOrder test");
145
146       connect();
147
148       queueConnection.start();
149
150       drainQueue();
151
152       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
153       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
154       QueueSender JavaDoc sender = session.createSender(queue);
155
156       TextMessage JavaDoc message = session.createTextMessage();
157       message.setText("Normal message");
158       sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0);
159       //sender.send(queue, message, DeliveryMode.NON_PERSISTENT, 4, 0);
160
message.setText("Persistent message");
161       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
162       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
163
message.setText("High Priority Persistent message");
164       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
165       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
166

167       //message.setText("Expiring Persistent message");
168
//sender.send(queue, message, DeliveryMode.NON_PERSISTENT, 4, 1);
169

170       QueueBrowser JavaDoc browser = session.createBrowser(queue);
171       Enumeration JavaDoc i = browser.getEnumeration();
172       //message = (TextMessage)enum.nextElement();
173
//if( !message.getText().equals("High Priority Persistent message") )
174
// throw new Exception("Queue is not prioritizing messages correctly. Unexpected Message:"+message);
175
getLog().debug(message.getText());
176
177       message = (TextMessage JavaDoc) i.nextElement();
178       //if( !message.getText().equals("Normal message") )
179
// throw new Exception("Queue is not ordering messages correctly. Unexpected Message:"+message);
180
getLog().debug(message.getText());
181
182       message = (TextMessage JavaDoc) i.nextElement();
183       //if( !message.getText().equals("Persistent message") )
184
// throw new Exception("Queue is not ordering messages correctly. Unexpected Message:"+message);
185
getLog().debug(message.getText());
186
187       // if( enum.hasMoreElements() )
188
// throw new Exception("Queue does not expire messages correctly. Unexpected Message:"+enum.nextElement());
189

190       disconnect();
191       getLog().debug("QueueMessageOrder passed");
192    }
193
194    /**
195     * Test that a using QueueRequestor works.
196     * this also tests that :
197     * temporary queues work.
198     */

199    public void testRequestReplyQueue() throws Exception JavaDoc
200    {
201
202       getLog().debug("Starting RequestReplyQueue test");
203       connect();
204
205       {
206          queueConnection.start();
207          drainQueue();
208       }
209
210       Thread JavaDoc serverThread = new Thread JavaDoc()
211       {
212          public void run()
213          {
214             Logger log = Logger.getLogger(getClass().getName());
215             try
216             {
217                log.debug("Server Thread Started");
218                QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
219                Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
220
221                QueueReceiver JavaDoc queueReceiver = session.createReceiver(queue);
222
223                boolean done = false;
224                while (!done)
225                {
226                   TextMessage JavaDoc message = (TextMessage JavaDoc) queueReceiver.receive();
227                   Queue JavaDoc tempQueue = (Queue JavaDoc) message.getJMSReplyTo();
228
229                   QueueSender JavaDoc replySender = session.createSender(tempQueue);
230                   TextMessage JavaDoc reply = session.createTextMessage();
231                   reply.setText("Request Processed");
232                   reply.setJMSCorrelationID(message.getJMSMessageID());
233                   replySender.send(reply);
234
235                   if (message.getText().equals("Quit"))
236                      done = true;
237                }
238
239                session.close();
240                log.debug("Server Thread Finished");
241
242             }
243             catch (Exception JavaDoc e)
244             {
245                log.error("Error", e);
246             }
247          }
248       };
249
250       serverThread.start();
251
252       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
253       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
254
255       QueueRequestor JavaDoc queueRequestor = new QueueRequestor JavaDoc(session, queue);
256       TextMessage JavaDoc message = session.createTextMessage();
257       message.setText("Request Test");
258
259       for (int i = 0; i < 5; i++)
260       {
261
262          getLog().debug("Making client request #" + i);
263          TextMessage JavaDoc reply = (TextMessage JavaDoc) queueRequestor.request(message);
264          String JavaDoc replyID = new String JavaDoc(reply.getJMSCorrelationID());
265          if (!replyID.equals(message.getJMSMessageID()))
266             throw new Exception JavaDoc("REQUEST: ERROR: Reply does not match sent message");
267
268       }
269
270       getLog().debug("Making client request to shut server down.");
271       message.setText("Quit");
272       queueRequestor.request(message);
273
274       serverThread.join();
275       disconnect();
276
277       getLog().debug("RequestReplyQueue passed");
278    }
279
280    /**
281     * Test that temporary queues can be deleted.
282     */

283    public void testTemporaryQueueDelete() throws Exception JavaDoc
284    {
285
286       getLog().debug("Starting TemporaryQueueDelete test");
287       connect();
288
289       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
290       TemporaryQueue JavaDoc queue = session.createTemporaryQueue();
291
292       queue.delete();
293
294       disconnect();
295
296       getLog().debug("TemporaryQueueDelete passed");
297    }
298
299    /**
300     * Test that temporary topics can be deleted.
301     */

302    public void testTemporaryTopicDelete() throws Exception JavaDoc
303    {
304
305       getLog().debug("Starting TemporaryTopicDelete test");
306       connect();
307
308       TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
309       TemporaryTopic JavaDoc topic = session.createTemporaryTopic();
310
311       topic.delete();
312
313       disconnect();
314
315       getLog().debug("TemporaryTopicDelete passed");
316    }
317
318    /**
319     * Test invalid destination trying to send a message.
320     */

321    public void testInvalidDestinationQueueSend() throws Exception JavaDoc
322    {
323
324       getLog().debug("Starting InvaidDestinationQueueSend test");
325       connect();
326
327       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
328       TemporaryQueue JavaDoc queue = session.createTemporaryQueue();
329       QueueSender JavaDoc sender = session.createSender(queue);
330       queue.delete();
331
332       TextMessage JavaDoc message = session.createTextMessage("hello");
333       boolean caught = false;
334       try
335       {
336          sender.send(message);
337       }
338       catch (InvalidDestinationException JavaDoc expected)
339       {
340          caught = true;
341       }
342
343       disconnect();
344
345       assertTrue("Expected an InvalidDestinationException", caught);
346
347       getLog().debug("InvaldDestinationQueueSend passed");
348    }
349
350    /**
351     * Test invalid destination trying to browse a message.
352     */

353    public void testInvalidDestinationQueueBrowse() throws Exception JavaDoc
354    {
355
356       getLog().debug("Starting InvalidDestinationQueueBrowse test");
357       connect();
358
359       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
360       TemporaryQueue JavaDoc queue = session.createTemporaryQueue();
361       QueueBrowser JavaDoc browser = session.createBrowser(queue);
362       queue.delete();
363
364       boolean caught = false;
365       try
366       {
367          browser.getEnumeration();
368       }
369       catch (InvalidDestinationException JavaDoc expected)
370       {
371          caught = true;
372       }
373
374       disconnect();
375
376       assertTrue("Expected an InvalidDestinationException", caught);
377
378       getLog().debug("InvalidDestinationQueueBrowse passed");
379    }
380
381    /**
382     * Test invalid destination trying to send a message.
383     */

384    public void testInvalidDestinationTopicPublish() throws Exception JavaDoc
385    {
386
387       getLog().debug("Starting InvaidDestinationTopicPublish test");
388       connect();
389
390       TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
391       TemporaryTopic JavaDoc topic = session.createTemporaryTopic();
392       TopicPublisher JavaDoc publisher = session.createPublisher(topic);
393       topic.delete();
394
395       TextMessage JavaDoc message = session.createTextMessage("hello");
396       boolean caught = false;
397       try
398       {
399          publisher.publish(message);
400       }
401       catch (InvalidDestinationException JavaDoc expected)
402       {
403          caught = true;
404       }
405
406       disconnect();
407
408       assertTrue("Expected an InvalidDestinationException", caught);
409
410       getLog().debug("InvaldDestinationTopicPublish passed");
411    }
412
413    /**
414     * Test errors trying on topic subscribe.
415     */

416    public void testErrorsTopicSubscribe() throws Exception JavaDoc
417    {
418
419       getLog().debug("Starting InvalidDestinationTopicSubscribe test");
420       connect();
421
422       try
423       {
424          TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
425          Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
426          TemporaryTopic JavaDoc temp = session.createTemporaryTopic();
427
428          boolean caught = false;
429          try
430          {
431             session.createSubscriber(null);
432          }
433          catch (InvalidDestinationException JavaDoc expected)
434          {
435             caught = true;
436          }
437          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
438
439          caught = false;
440          try
441          {
442             session.createSubscriber(null, null, true);
443          }
444          catch (InvalidDestinationException JavaDoc expected)
445          {
446             caught = true;
447          }
448          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
449
450          caught = false;
451          try
452          {
453             session.createDurableSubscriber(null, "NotUsed");
454          }
455          catch (InvalidDestinationException JavaDoc expected)
456          {
457             caught = true;
458          }
459          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
460
461          caught = false;
462          try
463          {
464             session.createDurableSubscriber(temp, "NotUsed");
465          }
466          catch (InvalidDestinationException JavaDoc expected)
467          {
468             caught = true;
469          }
470          assertTrue("Expected an InvalidDestinationException for a temporary topic", caught);
471
472          caught = false;
473          try
474          {
475             session.createDurableSubscriber(null, "NotUsed", null, true);
476          }
477          catch (InvalidDestinationException JavaDoc expected)
478          {
479             caught = true;
480          }
481          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
482
483          caught = false;
484          try
485          {
486             session.createDurableSubscriber(temp, "NotUsed", null, true);
487          }
488          catch (InvalidDestinationException JavaDoc expected)
489          {
490             caught = true;
491          }
492          assertTrue("Expected an InvalidDestinationException for a temporary topic", caught);
493
494          caught = false;
495          try
496          {
497             session.createDurableSubscriber(topic, null);
498          }
499          catch (Exception JavaDoc expected)
500          {
501             caught = true;
502          }
503          assertTrue("Expected a Exception for a null subscription", caught);
504
505          caught = false;
506          try
507          {
508             session.createDurableSubscriber(topic, null, null, false);
509          }
510          catch (Exception JavaDoc expected)
511          {
512             caught = true;
513          }
514          assertTrue("Expected a Exception for a null subscription", caught);
515
516          caught = false;
517          try
518          {
519             session.createDurableSubscriber(topic, " ");
520          }
521          catch (Exception JavaDoc expected)
522          {
523             caught = true;
524          }
525          assertTrue("Expected a Exception for an empty subscription", caught);
526
527          caught = false;
528          try
529          {
530             session.createDurableSubscriber(topic, " ", null, false);
531          }
532          catch (Exception JavaDoc expected)
533          {
534             caught = true;
535          }
536          assertTrue("Expected a Exception for an empty subscription", caught);
537       }
538       finally
539       {
540          disconnect();
541       }
542
543       getLog().debug("InvalidDestinationTopicSubscriber passed");
544    }
545
546    /**
547     * Test create queue.
548     */

549    public void testCreateQueue() throws Exception JavaDoc
550    {
551
552       getLog().debug("Starting create queue test");
553       connect();
554
555       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
556
557       Queue JavaDoc jndiQueue = (Queue JavaDoc) getInitialContext().lookup("queue/testQueue");
558       Queue JavaDoc createQueue = session.createQueue(jndiQueue.getQueueName());
559       assertTrue("Failed for " + QUEUE_FACTORY, jndiQueue.equals(createQueue));
560
561       getLog().debug("InvalidDestinationTopicSubscriber passed");
562    }
563
564    public void testMessageListener() throws Exception JavaDoc
565    {
566       getLog().debug("Starting create queue test");
567
568       connect();
569       queueConnection.start();
570       drainQueue();
571       final CountDown counter1 = new CountDown(3);
572
573       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
574       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
575
576       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
577       receiver.setMessageListener(new MessageListener JavaDoc()
578       {
579          public void onMessage(Message JavaDoc msg)
580          {
581             Logger log = Logger.getLogger(getClass().getName());
582             log.debug("ML");
583             try
584             {
585                if (msg instanceof TextMessage JavaDoc)
586                {
587                   log.debug(((TextMessage JavaDoc) msg).getText());
588                   counter1.release();
589                }
590             }
591             catch (Exception JavaDoc e)
592             {
593             }
594          }
595       });
596
597       QueueSender JavaDoc sender = session.createSender(queue);
598
599       TextMessage JavaDoc message = session.createTextMessage();
600       message.setText("Normal message");
601       sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0);
602       //sender.send(queue, message, DeliveryMode.NON_PERSISTENT, 4, 0);
603
message.setText("Persistent message");
604       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
605       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
606
message.setText("High Priority Persistent message");
607       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
608       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
609

610       // Wait for the msgs to be received
611
counter1.acquire();
612       log.debug("MessageListener1 received the TMs sent");
613
614       final CountDown counter2 = new CountDown(2);
615       receiver.setMessageListener(new MessageListener JavaDoc()
616       {
617          public void onMessage(Message JavaDoc msg)
618          {
619             Logger log = Logger.getLogger(getClass().getName());
620             log.debug("ML 2");
621             try
622             {
623                if (msg instanceof TextMessage JavaDoc)
624                {
625                   log.debug(((TextMessage JavaDoc) msg).getText());
626                   counter2.release();
627                }
628             }
629             catch (Exception JavaDoc e)
630             {
631             }
632          }
633       });
634
635       message.setText("Persistent message");
636       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
637       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
638
message.setText("High Priority Persistent message");
639       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
640       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
641

642       // Wait for the msgs to be received
643
counter2.acquire();
644       log.debug("MessageListener2 received the TMs sent");
645
646       receiver.setMessageListener(null);
647
648       message.setText("Persistent message");
649       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
650       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
651
message.setText("High Priority Persistent message");
652       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
653       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
654

655       sender.close();
656       drainQueue();
657       disconnect();
658       getLog().debug("MessageListener test passed");
659    }
660
661    public void testApplicationServerStuff() throws Exception JavaDoc
662    {
663       getLog().debug("Starting testing app server stuff");
664       connect();
665
666       Queue JavaDoc testQueue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
667       final QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
668
669       session.setMessageListener(new MessageListener JavaDoc()
670       {
671          public void onMessage(Message JavaDoc mess)
672          {
673             Logger log = Logger.getLogger(getClass().getName());
674             log.debug("Processing message");
675             try
676             {
677                if (mess instanceof TextMessage JavaDoc)
678                   log.debug(((TextMessage JavaDoc) mess).getText());
679             }
680             catch (Exception JavaDoc e)
681             {
682                log.error("Error", e);
683             }
684          }
685       });
686
687       QueueSender JavaDoc sender = session.createSender(testQueue);
688       sender.send(session.createTextMessage("Hi"));
689       sender.send(session.createTextMessage("There"));
690       sender.send(session.createTextMessage("Guys"));
691       queueConnection.createConnectionConsumer(testQueue, null, new ServerSessionPool JavaDoc()
692       {
693          public ServerSession JavaDoc getServerSession()
694          {
695             Logger.getLogger(getClass().getName()).debug("Getting server session.");
696             return new ServerSession JavaDoc()
697             {
698                public Session JavaDoc getSession()
699                {
700                   return session;
701                }
702                public void start()
703                {
704                   Logger.getLogger(getClass().getName()).debug("Starting server session.");
705                   session.run();
706                }
707             };
708          }
709       }, 10);
710
711       queueConnection.start();
712
713       try
714       {
715          Thread.sleep(5 * 1000);
716       }
717       catch (Exception JavaDoc e)
718       {
719       }
720
721       disconnect();
722       getLog().debug("Testing app server stuff passed");
723    }
724
725    //simply put a few messages on the test queue for next time.
726
/* public void testPM() throws Exception
727       {
728          getLog().debug("Starting testing pm");
729          connect();
730          
731          Queue testQueue = (Queue)context.lookup(TEST_QUEUE);
732          QueueSession session = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
733          QueueSender sender = session.createSender(testQueue);
734          sender.send(session.createTextMessage("From last time"));
735          sender.send(session.createTextMessage("From last time"));
736          sender.send(session.createTextMessage("From last time"));
737          sender.close();
738          session.close();
739          disconnect();
740          getLog().debug("Testing pm stuff passed");
741       }
742    */

743    private void drainMessagesForTopic(TopicSubscriber JavaDoc sub) throws JMSException JavaDoc
744    {
745       Message JavaDoc msg = sub.receive(50);
746       int c = 0;
747       while (msg != null)
748       {
749          c++;
750          if (msg instanceof TextMessage JavaDoc)
751             getLog().debug(((TextMessage JavaDoc) msg).getText());
752          msg = sub.receive(50);
753       }
754       getLog().debug("Received " + c + " messages from topic.");
755    }
756
757    public void testTopics() throws Exception JavaDoc
758    {
759       getLog().debug("Starting Topic test");
760       connect();
761
762       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
763       topicConnection = topicFactory.createTopicConnection("john", "needle");
764
765       topicConnection.start();
766
767       //set up some subscribers to the topic
768
TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
769       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
770
771       TopicSubscriber JavaDoc sub1 = session.createDurableSubscriber(topic, "sub1");
772       TopicSubscriber JavaDoc sub2 = session.createSubscriber(topic);
773       TopicSubscriber JavaDoc sub3 = session.createSubscriber(topic);
774
775       //Now a sender
776
TopicPublisher JavaDoc sender = session.createPublisher(topic);
777
778       //send some messages
779
sender.publish(session.createTextMessage("Message 1"));
780       sender.publish(session.createTextMessage("Message 2"));
781       sender.publish(session.createTextMessage("Message 3"));
782       drainMessagesForTopic(sub1);
783       drainMessagesForTopic(sub2);
784       drainMessagesForTopic(sub3);
785
786       //close some subscribers
787
sub1.close();
788       sub2.close();
789
790       //send some more messages
791
sender.publish(session.createTextMessage("Message 4"));
792       sender.publish(session.createTextMessage("Message 5"));
793       sender.publish(session.createTextMessage("Message 6"));
794
795       //give time for message 4 to be negatively acked (as it will be cause last receive timed out)
796
try
797       {
798          Thread.sleep(5 * 1000);
799       }
800       catch (InterruptedException JavaDoc e)
801       {
802       }
803
804       drainMessagesForTopic(sub3);
805
806       //open subscribers again.
807
sub1 = session.createDurableSubscriber(topic, "sub1");
808       sub2 = session.createSubscriber(topic);
809
810       //Send a final message
811
sender.publish(session.createTextMessage("Final message"));
812       sender.close();
813
814       drainMessagesForTopic(sub1);
815       drainMessagesForTopic(sub2);
816       drainMessagesForTopic(sub3);
817
818       sub1.close();
819       sub2.close();
820       sub3.close();
821
822       session.unsubscribe("sub1");
823
824       topicConnection.stop();
825       topicConnection.close();
826
827       disconnect();
828       getLog().debug("Topic test passed");
829    }
830
831    /**
832     * Test to seeif the NoLocal feature of topics works.
833     * Messages published from the same connection should not
834     * be received by Subscribers on the same connection.
835     */

836    public void testTopicNoLocal() throws Exception JavaDoc
837    {
838       getLog().debug("Starting TopicNoLocal test");
839       connect();
840
841       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
842       TopicConnection JavaDoc topicConnection1 = topicFactory.createTopicConnection();
843       topicConnection1.start();
844       TopicConnection JavaDoc topicConnection2 = topicFactory.createTopicConnection();
845       topicConnection2.start();
846
847       // We don't want local messages on this topic.
848
TopicSession JavaDoc session1 = topicConnection1.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
849       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
850       TopicSubscriber JavaDoc subscriber1 = session1.createSubscriber(topic, null, true);
851       TopicPublisher JavaDoc sender1 = session1.createPublisher(topic);
852
853       //Now a sender
854
TopicSession JavaDoc session2 = topicConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
855       TopicPublisher JavaDoc sender2 = session2.createPublisher(topic);
856
857       drainMessagesForTopic(subscriber1);
858
859       //send some messages
860
sender1.publish(session1.createTextMessage("Local Message"));
861       sender2.publish(session2.createTextMessage("Remote Message"));
862
863       // Get the messages, we should get the remote message
864
// but not the local message
865
TextMessage JavaDoc msg1 = (TextMessage JavaDoc) subscriber1.receive(2000);
866       if (msg1 == null)
867       {
868          fail("Did not get any messages");
869       }
870       else
871       {
872          getLog().debug("Got message: " + msg1);
873          if (msg1.getText().equals("Local Message"))
874          {
875             fail("Got a local message");
876          }
877          TextMessage JavaDoc msg2 = (TextMessage JavaDoc) subscriber1.receive(2000);
878          if (msg2 != null)
879          {
880             getLog().debug("Got message: " + msg2);
881             fail("Got an extra message. msg1:" + msg1 + ", msg2:" + msg2);
882          }
883       }
884
885       topicConnection1.stop();
886       topicConnection1.close();
887       topicConnection2.stop();
888       topicConnection2.close();
889
890       disconnect();
891       getLog().debug("TopicNoLocal test passed");
892    }
893
894    /**
895     * Test to see whether no local works if a message
896     * was created somewhere else.
897     */

898    public void testTopicNoLocalBounce() throws Exception JavaDoc
899    {
900       getLog().debug("Starting TopicNoLocalBounce test");
901       connect();
902
903       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
904       TopicConnection JavaDoc topicConnection1 = topicFactory.createTopicConnection();
905       topicConnection1.start();
906       TopicConnection JavaDoc topicConnection2 = topicFactory.createTopicConnection();
907       topicConnection2.start();
908
909       // Session 1
910
TopicSession JavaDoc session1 = topicConnection1.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
911       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
912       TopicSubscriber JavaDoc subscriber1 = session1.createSubscriber(topic, null, true);
913       TopicPublisher JavaDoc sender1 = session1.createPublisher(topic);
914
915       // Session 2
916
TopicSession JavaDoc session2 = topicConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
917       TopicSubscriber JavaDoc subscriber2 = session2.createSubscriber(topic, null, true);
918       TopicPublisher JavaDoc sender2 = session2.createPublisher(topic);
919
920       drainMessagesForTopic(subscriber1);
921       drainMessagesForTopic(subscriber2);
922
923       //send the message
924
sender1.publish(session1.createTextMessage("Message"));
925
926       assertTrue("Subscriber1 should not get a message", subscriber1.receiveNoWait() == null);
927       TextMessage JavaDoc msg = (TextMessage JavaDoc) subscriber2.receive(2000);
928       assertTrue("Subscriber2 should get a message, got " + msg, msg != null && msg.getText().equals("Message"));
929
930       //send it back
931
sender2.publish(msg);
932
933       msg = (TextMessage JavaDoc) subscriber1.receive(2000);
934       assertTrue("Subscriber1 should get a message, got " + msg, msg != null && msg.getText().equals("Message"));
935       assertTrue("Subscriber2 should not get a message", subscriber2.receiveNoWait() == null);
936
937       topicConnection1.stop();
938       topicConnection1.close();
939       topicConnection2.stop();
940       topicConnection2.close();
941
942       disconnect();
943       getLog().debug("TopicNoLocalBounce test passed");
944    }
945
946    /**
947     * Test subscribing to a topic with one selector, then changing to another
948     */

949    public void testTopicSelectorChange() throws Exception JavaDoc
950    {
951       getLog().debug("Starting TopicSelectorChange test");
952
953       getLog().debug("Create topic connection");
954       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
955       topicConnection = topicFactory.createTopicConnection("john", "needle");
956       topicConnection.start();
957
958       try
959       {
960          getLog().debug("Retrieving Topic");
961          Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_DURABLE_TOPIC);
962
963          getLog().debug("Creating a send session");
964          TopicSession JavaDoc sendSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
965          TopicPublisher JavaDoc sender = sendSession.createPublisher(topic);
966
967          getLog().debug("Clearing the topic");
968          TopicSession JavaDoc subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
969          TopicSubscriber JavaDoc subscriber = subSession.createDurableSubscriber(topic, "test");
970          Message JavaDoc message = subscriber.receive(50);
971          while (message != null)
972             message = subscriber.receive(50);
973          subSession.close();
974
975          getLog().debug("Subscribing to topic, looking for Value = 'A'");
976          subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
977          subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false);
978
979          getLog().debug("Send some messages");
980          message = sendSession.createTextMessage("Message1");
981          message.setStringProperty("Value", "A");
982          sender.publish(message);
983          message = sendSession.createTextMessage("Message2");
984          message.setStringProperty("Value", "A");
985          sender.publish(message);
986          message = sendSession.createTextMessage("Message3");
987          message.setStringProperty("Value", "B");
988          sender.publish(message);
989
990          getLog().debug("Retrieving the A messages");
991          message = subscriber.receive(2000);
992          assertTrue("Expected message 1", message != null);
993          assertTrue("Should get an A", message.getStringProperty("Value").equals("A"));
994          message = subscriber.receive(2000);
995          assertTrue("Expected message 2", message != null);
996          assertTrue("Should get a second A", message.getStringProperty("Value").equals("A"));
997          assertTrue("That should be it for A", subscriber.receive(2000) == null);
998
999          getLog().debug("Closing the subscriber without acknowledgement");
1000         subSession.close();
1001
1002         getLog().debug("Subscribing to topic, looking for Value = 'B'");
1003         subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
1004         subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'B'", false);
1005
1006         getLog().debug("Retrieving the non-existent B messages");
1007         assertTrue("B should not be there", subscriber.receive(2000) == null);
1008
1009         getLog().debug("Closing the subscriber.");
1010         subSession.close();
1011
1012         getLog().debug("Subscribing to topic, looking for those Value = 'A'");
1013         subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
1014         subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false);
1015         assertTrue("Should not be any A the subscription was changed", subscriber.receive(2000) == null);
1016         subSession.close();
1017
1018         getLog().debug("Subscribing to topic, looking for everything");
1019         subSession = topicConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
1020         subscriber = subSession.createDurableSubscriber(topic, "test", null, false);
1021
1022         message = sendSession.createTextMessage("Message4");
1023         message.setStringProperty("Value", "A");
1024         sender.publish(message);
1025
1026         message = subscriber.receive(2000);
1027         assertTrue("Expected message 4", message != null);
1028         assertTrue("Should be an A which we don't acknowledge", message.getStringProperty("Value").equals("A"));
1029         subSession.close();
1030
1031         getLog().debug("Subscribing to topic, looking for the Value = 'A'");
1032         subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
1033         subscriber = subSession.createDurableSubscriber(topic, "test", "Value = 'A'", false);
1034         assertTrue(
1035            "Should not be any A, the subscription was changed. Even though the old and new selectors match the message",
1036            subscriber.receive(2000) == null);
1037         subSession.close();
1038
1039         getLog().debug("Closing the send session");
1040         sendSession.close();
1041
1042         getLog().debug("Removing the subscription");
1043         subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
1044         subSession.unsubscribe("test");
1045
1046      }
1047      finally
1048      {
1049         getLog().debug("Closing the connection");
1050         topicConnection.close();
1051      }
1052
1053      getLog().debug("TopicSelectorChange test passed");
1054   }
1055
1056   /**
1057    * Test subscribing to a topic with a null and empty selector
1058    */

1059   public void testTopicSelectorNullOrEmpty() throws Exception JavaDoc
1060   {
1061      getLog().debug("Starting TopicSelectorNullOrEmpty test");
1062
1063      getLog().debug("Create topic connection");
1064      TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
1065      topicConnection = topicFactory.createTopicConnection("john", "needle");
1066      topicConnection.start();
1067
1068      try
1069      {
1070         getLog().debug("Retrieving Topic");
1071         Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_DURABLE_TOPIC);
1072
1073         getLog().debug("Creating a send session");
1074         TopicSession JavaDoc sendSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
1075         TopicPublisher JavaDoc sender = sendSession.createPublisher(topic);
1076
1077         getLog().debug("Clearing the topic");
1078         TopicSession JavaDoc subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
1079         TopicSubscriber JavaDoc subscriber = subSession.createDurableSubscriber(topic, "test");
1080         TextMessage JavaDoc message = (TextMessage JavaDoc) subscriber.receive(50);
1081         while (message != null)
1082            message = (TextMessage JavaDoc) subscriber.receive(50);
1083         subSession.close();
1084
1085         getLog().debug("Subscribing to topic, with null selector");
1086         subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
1087         subscriber = subSession.createDurableSubscriber(topic, "test", null, false);
1088
1089         getLog().debug("Send a message");
1090         message = sendSession.createTextMessage("Message1");
1091         sender.publish(message);
1092
1093         getLog().debug("Retrieving the message");
1094         message = (TextMessage JavaDoc) subscriber.receive(2000);
1095         assertTrue("Expected message 1", message != null);
1096         assertTrue("Should get Message1", message.getText().equals("Message1"));
1097         getLog().debug("Closing the subscriber");
1098         subSession.close();
1099
1100         getLog().debug("Subscribing to topic, with an empty selector");
1101         subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
1102         subscriber = subSession.createDurableSubscriber(topic, "test", " ", false);
1103
1104         getLog().debug("Send a message");
1105         message = sendSession.createTextMessage("Message2");
1106         sender.publish(message);
1107
1108         getLog().debug("Retrieving the message");
1109         message = (TextMessage JavaDoc) subscriber.receive(2000);
1110         assertTrue("Expected message 2", message != null);
1111         assertTrue("Should get Message2", message.getText().equals("Message2"));
1112         getLog().debug("Closing the subscriber");
1113
1114         getLog().debug("Removing the subscription");
1115         subSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
1116         subSession.unsubscribe("test");
1117         subSession.close();
1118
1119      }
1120      finally
1121      {
1122         getLog().debug("Closing the connection");
1123         topicConnection.close();
1124      }
1125
1126      getLog().debug("TopicSelectorNullOrEmpty test passed");
1127   }
1128
1129   /**
1130    * Test sending/receiving an outdated message
1131    */

1132   public void testSendReceiveOutdated() throws Exception JavaDoc
1133   {
1134      getLog().debug("Starting SendReceiveOutdated test");
1135
1136      connect();
1137      try
1138      {
1139         queueConnection.start();
1140         drainQueue();
1141         queueConnection.stop();
1142
1143         QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
1144         Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
1145         QueueSender JavaDoc sender = session.createSender(queue);
1146         QueueReceiver JavaDoc receiver = session.createReceiver(queue);
1147
1148         // Send a message that has expired
1149
TextMessage JavaDoc message = session.createTextMessage("Outdated");
1150         sender.send(message, DeliveryMode.PERSISTENT, 4, 1);
1151         Thread.sleep(100);
1152
1153         // Send a message that has not expired
1154
message = session.createTextMessage("OK");
1155         sender.send(message);
1156
1157         // Try to receive the message the not expired message
1158
queueConnection.start();
1159         message = (TextMessage JavaDoc) receiver.receiveNoWait();
1160         assertEquals("OK", message.getText());
1161
1162         // Should be no more
1163
assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null);
1164      }
1165      finally
1166      {
1167         disconnect();
1168      }
1169
1170      getLog().debug("SendReceiveOutdated test passed");
1171   }
1172
1173   public void testSendReceiveExpired() throws Exception JavaDoc
1174   {
1175      getLog().debug("Starting testSendReceiveExpired test");
1176
1177      connect();
1178      try
1179      {
1180         queueConnection.start();
1181         drainQueue();
1182
1183         QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
1184         Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
1185         QueueSender JavaDoc sender = session.createSender(queue);
1186         QueueReceiver JavaDoc receiver = session.createReceiver(queue);
1187
1188         // Send a message that expires in 5 seconds
1189
TextMessage JavaDoc message = session.createTextMessage("5 Second Expiration");
1190         sender.send(message, DeliveryMode.PERSISTENT, 4, 5*1000);
1191         // Send a message that has not expired
1192
message = session.createTextMessage("OK");
1193         sender.send(message);
1194         // Sleep 6 seconds
1195
Thread.sleep(6*1000);
1196         // Try to receive the OK message
1197
message = (TextMessage JavaDoc) receiver.receiveNoWait();
1198         assertEquals("OK", message.getText());
1199
1200         // Should be no more
1201
assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null);
1202
1203         // Send a message that expires in 10 seconds
1204
message = session.createTextMessage("10 Second Expiration");
1205         sender.send(message, DeliveryMode.PERSISTENT, 4, 10*1000);
1206         // Send a message that has not expired
1207
message = session.createTextMessage("OK");
1208         sender.send(message);
1209         // Sleep 1 seconds
1210
Thread.sleep(1*1000);
1211         // Try to receive the messages
1212
message = (TextMessage JavaDoc) receiver.receiveNoWait();
1213         assertEquals("10 Second Expiration", message.getText());
1214         message = (TextMessage JavaDoc) receiver.receiveNoWait();
1215         assertEquals("OK", message.getText());
1216
1217         // Should be no more
1218
assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null);
1219         
1220         // Test that JMSExpiration has no affect
1221
message = session.createTextMessage("5 Second Expiration");
1222         message.setJMSExpiration(System.currentTimeMillis() + 5*1000);
1223         sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
1224         // Send a message that has not expired
1225
message = session.createTextMessage("OK");
1226         sender.send(message);
1227         // Sleep 6 seconds
1228
Thread.sleep(6*1000);
1229         // Try to receive the OK message
1230
message = (TextMessage JavaDoc) receiver.receiveNoWait();
1231         assertEquals("5 Second Expiration", message.getText());
1232         message = (TextMessage JavaDoc) receiver.receiveNoWait();
1233         assertEquals("OK", message.getText());
1234         assertTrue("Didn't expect anymore messages", receiver.receiveNoWait() == null);
1235      }
1236      finally
1237      {
1238         disconnect();
1239      }
1240   }
1241
1242   class Synch
1243   {
1244      boolean waiting = false;
1245      String JavaDoc text;
1246      public synchronized void doWait(long timeout) throws InterruptedException JavaDoc
1247      {
1248         waiting = true;
1249         this.wait(timeout);
1250      }
1251      public synchronized void doNotify() throws InterruptedException JavaDoc
1252      {
1253         while (waiting == false)
1254            wait(100);
1255         this.notifyAll();
1256      }
1257      public String JavaDoc getText()
1258      {
1259         return text;
1260      }
1261      public void setText(String JavaDoc text)
1262      {
1263         this.text = text;
1264      }
1265   }
1266
1267   /**
1268    * Test sending/listening an outdated message
1269    */

1270   public void testSendListenOutdated() throws Exception JavaDoc
1271   {
1272      getLog().debug("Starting SendListenOutdated test");
1273
1274      connect();
1275      try
1276      {
1277         queueConnection.start();
1278         drainQueue();
1279         queueConnection.stop();
1280
1281         QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
1282         Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
1283         QueueSender JavaDoc sender = session.createSender(queue);
1284         QueueReceiver JavaDoc receiver = session.createReceiver(queue);
1285
1286         // Send a message that has expired
1287
TextMessage JavaDoc message = session.createTextMessage("Outdated");
1288         sender.send(message, DeliveryMode.PERSISTENT, 4, 1);
1289         Thread.sleep(100);
1290
1291         // Send a message that has not expired
1292
message = session.createTextMessage("OK");
1293         sender.send(message);
1294
1295         // Try to receive the message the not expired message
1296
final Synch synch = new Synch();
1297         MessageListener JavaDoc messagelistener = new MessageListener JavaDoc()
1298         {
1299            public void onMessage(Message JavaDoc message)
1300            {
1301               listenOutdated(message, synch);
1302            }
1303         };
1304         receiver.setMessageListener(messagelistener);
1305         queueConnection.start();
1306
1307         synch.doWait(10000);
1308         assertEquals("OK", synch.getText());
1309      }
1310      finally
1311      {
1312         disconnect();
1313      }
1314
1315      getLog().debug("SendListenOutdated test passed");
1316   }
1317
1318   private void listenOutdated(Message JavaDoc message, Synch synch)
1319   {
1320      try
1321      {
1322         synch.setText(((TextMessage JavaDoc) message).getText());
1323      }
1324      catch (Throwable JavaDoc t)
1325      {
1326         log.error("Error:", t);
1327      }
1328      finally
1329      {
1330         try
1331         {
1332            synch.doNotify();
1333         }
1334         catch (Throwable JavaDoc t)
1335         {
1336            log.error("Error:", t);
1337         }
1338      }
1339   }
1340}
1341
Popular Tags