KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmq > test > JBossMQUnitTest


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.test.jbossmq.test;
23
24 import java.util.Enumeration JavaDoc;
25
26 import javax.jms.DeliveryMode JavaDoc;
27 import javax.jms.InvalidDestinationException JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29 import javax.jms.Message JavaDoc;
30 import javax.jms.MessageListener JavaDoc;
31 import javax.jms.Queue JavaDoc;
32 import javax.jms.QueueBrowser JavaDoc;
33 import javax.jms.QueueConnection JavaDoc;
34 import javax.jms.QueueConnectionFactory JavaDoc;
35 import javax.jms.QueueReceiver JavaDoc;
36 import javax.jms.QueueRequestor JavaDoc;
37 import javax.jms.QueueSender JavaDoc;
38 import javax.jms.QueueSession JavaDoc;
39 import javax.jms.ServerSession JavaDoc;
40 import javax.jms.ServerSessionPool JavaDoc;
41 import javax.jms.Session JavaDoc;
42 import javax.jms.TemporaryQueue JavaDoc;
43 import javax.jms.TemporaryTopic JavaDoc;
44 import javax.jms.TextMessage JavaDoc;
45 import javax.jms.Topic JavaDoc;
46 import javax.jms.TopicConnection JavaDoc;
47 import javax.jms.TopicConnectionFactory JavaDoc;
48 import javax.jms.TopicPublisher JavaDoc;
49 import javax.jms.TopicSession JavaDoc;
50 import javax.jms.TopicSubscriber JavaDoc;
51 import javax.naming.Context JavaDoc;
52 import javax.naming.InitialContext JavaDoc;
53
54 import org.jboss.logging.Logger;
55 import org.jboss.test.JBossTestCase;
56
57 import EDU.oswego.cs.dl.util.concurrent.CountDown;
58
59 /**
60  * Basic jms tests.
61  *
62  * @author Scott.Stark@jboss.org
63  * @version $Revision: 58115 $
64  */

65 public class JBossMQUnitTest extends JBossTestCase
66 {
67    /** The default TopicFactory jndi name */
68    static String JavaDoc TOPIC_FACTORY = "ConnectionFactory";
69    /** The default QueueFactory jndi name */
70    static String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
71
72    static String JavaDoc TEST_QUEUE = "queue/testQueue";
73    static String JavaDoc TEST_TOPIC = "topic/testTopic";
74    static String JavaDoc TEST_DURABLE_TOPIC = "topic/testDurableTopic";
75
76    //JMSProviderAdapter providerAdapter;
77
static Context JavaDoc context;
78    static QueueConnection JavaDoc queueConnection;
79    static TopicConnection JavaDoc topicConnection;
80
81    public JBossMQUnitTest(String JavaDoc name) throws Exception JavaDoc
82    {
83       super(name);
84    }
85
86    // Emptys out all the messages in a queue
87
protected void drainQueue() throws Exception JavaDoc
88    {
89       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
90       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
91
92       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
93       Message JavaDoc message = receiver.receive(50);
94       int c = 0;
95       while (message != null)
96       {
97          message = receiver.receive(50);
98          c++;
99       }
100
101       if (c != 0)
102          getLog().debug(" Drained " + c + " messages from the queue");
103
104       session.close();
105    }
106
107    protected void connect() throws Exception JavaDoc
108    {
109
110       if (context == null)
111       {
112
113          context = new InitialContext JavaDoc();
114
115       }
116       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc) context.lookup(QUEUE_FACTORY);
117       queueConnection = queueFactory.createQueueConnection();
118
119       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
120       topicConnection = topicFactory.createTopicConnection();
121
122       getLog().debug("Connection to spyderMQ established.");
123
124    }
125
126    protected void disconnect() throws Exception JavaDoc
127    {
128       queueConnection.close();
129       topicConnection.close();
130    }
131
132    /**
133     * Test that messages are ordered by message arrival and priority.
134     * This also tests :
135     * Using a non-transacted AUTO_ACKNOWLEDGE session
136     * Using a QueueReceiver
137     * Using a QueueSender
138     * Sending PERSITENT and NON_PERSISTENT text messages.
139     * Using a QueueBrowser
140     */

141    public void testQueueMessageOrder() throws Exception JavaDoc
142    {
143
144       getLog().debug("Starting QueueMessageOrder test");
145
146       connect();
147
148       queueConnection.start();
149
150       drainQueue();
151
152       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
153       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
154       QueueSender JavaDoc sender = session.createSender(queue);
155
156       TextMessage JavaDoc message = session.createTextMessage();
157       message.setText("Normal message");
158       sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0);
159       //sender.send(queue, message, DeliveryMode.NON_PERSISTENT, 4, 0);
160
message.setText("Persistent message");
161       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
162       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
163
message.setText("High Priority Persistent message");
164       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
165       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
166

167       //message.setText("Expiring Persistent message");
168
//sender.send(queue, message, DeliveryMode.NON_PERSISTENT, 4, 1);
169

170       QueueBrowser JavaDoc browser = session.createBrowser(queue);
171       Enumeration JavaDoc i = browser.getEnumeration();
172       //message = (TextMessage)enum.nextElement();
173
//if( !message.getText().equals("High Priority Persistent message") )
174
// throw new Exception("Queue is not prioritizing messages correctly. Unexpected Message:"+message);
175
getLog().debug(message.getText());
176
177       message = (TextMessage JavaDoc) i.nextElement();
178       //if( !message.getText().equals("Normal message") )
179
// throw new Exception("Queue is not ordering messages correctly. Unexpected Message:"+message);
180
getLog().debug(message.getText());
181
182       message = (TextMessage JavaDoc) i.nextElement();
183       //if( !message.getText().equals("Persistent message") )
184
// throw new Exception("Queue is not ordering messages correctly. Unexpected Message:"+message);
185
getLog().debug(message.getText());
186
187       // if( enum.hasMoreElements() )
188
// throw new Exception("Queue does not expire messages correctly. Unexpected Message:"+enum.nextElement());
189

190       disconnect();
191       getLog().debug("QueueMessageOrder passed");
192    }
193
194    /**
195     * Test that a using QueueRequestor works.
196     * this also tests that :
197     * temporary queues work.
198     */

199    public void testRequestReplyQueue() throws Exception JavaDoc
200    {
201
202       getLog().debug("Starting RequestReplyQueue test");
203       connect();
204
205       {
206          queueConnection.start();
207          drainQueue();
208       }
209
210       Thread JavaDoc serverThread = new Thread JavaDoc()
211       {
212          public void run()
213          {
214             Logger log = Logger.getLogger(getClass().getName());
215             try
216             {
217                log.debug("Server Thread Started");
218                QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
219                Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
220
221                QueueReceiver JavaDoc queueReceiver = session.createReceiver(queue);
222
223                boolean done = false;
224                while (!done)
225                {
226                   TextMessage JavaDoc message = (TextMessage JavaDoc) queueReceiver.receive();
227                   Queue JavaDoc tempQueue = (Queue JavaDoc) message.getJMSReplyTo();
228
229                   QueueSender JavaDoc replySender = session.createSender(tempQueue);
230                   TextMessage JavaDoc reply = session.createTextMessage();
231                   reply.setText("Request Processed");
232                   reply.setJMSCorrelationID(message.getJMSMessageID());
233                   replySender.send(reply);
234
235                   if (message.getText().equals("Quit"))
236                      done = true;
237                }
238
239                session.close();
240                log.debug("Server Thread Finished");
241
242             }
243             catch (Exception JavaDoc e)
244             {
245                log.error("Error", e);
246             }
247          }
248       };
249
250       serverThread.start();
251
252       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
253       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
254
255       QueueRequestor JavaDoc queueRequestor = new QueueRequestor JavaDoc(session, queue);
256       TextMessage JavaDoc message = session.createTextMessage();
257       message.setText("Request Test");
258
259       for (int i = 0; i < 5; i++)
260       {
261
262          getLog().debug("Making client request #" + i);
263          TextMessage JavaDoc reply = (TextMessage JavaDoc) queueRequestor.request(message);
264          String JavaDoc replyID = new String JavaDoc(reply.getJMSCorrelationID());
265          if (!replyID.equals(message.getJMSMessageID()))
266             throw new Exception JavaDoc("REQUEST: ERROR: Reply does not match sent message");
267
268       }
269
270       getLog().debug("Making client request to shut server down.");
271       message.setText("Quit");
272       queueRequestor.request(message);
273
274       serverThread.join();
275       disconnect();
276
277       getLog().debug("RequestReplyQueue passed");
278    }
279
280    /**
281     * Test that temporary queues can be deleted.
282     */

283    public void testTemporaryQueueDelete() throws Exception JavaDoc
284    {
285
286       getLog().debug("Starting TemporaryQueueDelete test");
287       connect();
288
289       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
290       TemporaryQueue JavaDoc queue = session.createTemporaryQueue();
291
292       queue.delete();
293
294       disconnect();
295
296       getLog().debug("TemporaryQueueDelete passed");
297    }
298
299    /**
300     * Test that temporary topics can be deleted.
301     */

302    public void testTemporaryTopicDelete() throws Exception JavaDoc
303    {
304
305       getLog().debug("Starting TemporaryTopicDelete test");
306       connect();
307
308       TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
309       TemporaryTopic JavaDoc topic = session.createTemporaryTopic();
310
311       topic.delete();
312
313       disconnect();
314
315       getLog().debug("TemporaryTopicDelete passed");
316    }
317
318    /**
319     * Test invalid destination trying to send a message.
320     */

321    public void testInvalidDestinationQueueSend() throws Exception JavaDoc
322    {
323
324       getLog().debug("Starting InvaidDestinationQueueSend test");
325       connect();
326
327       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
328       TemporaryQueue JavaDoc queue = session.createTemporaryQueue();
329       QueueSender JavaDoc sender = session.createSender(queue);
330       queue.delete();
331
332       TextMessage JavaDoc message = session.createTextMessage("hello");
333       boolean caught = false;
334       try
335       {
336          sender.send(message);
337       }
338       catch (InvalidDestinationException JavaDoc expected)
339       {
340          caught = true;
341       }
342
343       disconnect();
344
345       assertTrue("Expected an InvalidDestinationException", caught);
346
347       getLog().debug("InvaldDestinationQueueSend passed");
348    }
349
350    /**
351     * Test invalid destination trying to browse a message.
352     */

353    public void testInvalidDestinationQueueBrowse() throws Exception JavaDoc
354    {
355
356       getLog().debug("Starting InvalidDestinationQueueBrowse test");
357       connect();
358
359       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
360       TemporaryQueue JavaDoc queue = session.createTemporaryQueue();
361       QueueBrowser JavaDoc browser = session.createBrowser(queue);
362       queue.delete();
363
364       boolean caught = false;
365       try
366       {
367          browser.getEnumeration();
368       }
369       catch (InvalidDestinationException JavaDoc expected)
370       {
371          caught = true;
372       }
373
374       disconnect();
375
376       assertTrue("Expected an InvalidDestinationException", caught);
377
378       getLog().debug("InvalidDestinationQueueBrowse passed");
379    }
380
381    /**
382     * Test invalid destination trying to send a message.
383     */

384    public void testInvalidDestinationTopicPublish() throws Exception JavaDoc
385    {
386
387       getLog().debug("Starting InvaidDestinationTopicPublish test");
388       connect();
389
390       TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
391       TemporaryTopic JavaDoc topic = session.createTemporaryTopic();
392       TopicPublisher JavaDoc publisher = session.createPublisher(topic);
393       topic.delete();
394
395       TextMessage JavaDoc message = session.createTextMessage("hello");
396       boolean caught = false;
397       try
398       {
399          publisher.publish(message);
400       }
401       catch (InvalidDestinationException JavaDoc expected)
402       {
403          caught = true;
404       }
405
406       disconnect();
407
408       assertTrue("Expected an InvalidDestinationException", caught);
409
410       getLog().debug("InvaldDestinationTopicPublish passed");
411    }
412
413    /**
414     * Test errors trying on topic subscribe.
415     */

416    public void testErrorsTopicSubscribe() throws Exception JavaDoc
417    {
418
419       getLog().debug("Starting InvalidDestinationTopicSubscribe test");
420       connect();
421
422       try
423       {
424          TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
425          Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
426          TemporaryTopic JavaDoc temp = session.createTemporaryTopic();
427
428          boolean caught = false;
429          try
430          {
431             session.createSubscriber(null);
432          }
433          catch (InvalidDestinationException JavaDoc expected)
434          {
435             caught = true;
436          }
437          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
438
439          caught = false;
440          try
441          {
442             session.createSubscriber(null, null, true);
443          }
444          catch (InvalidDestinationException JavaDoc expected)
445          {
446             caught = true;
447          }
448          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
449
450          caught = false;
451          try
452          {
453             session.createDurableSubscriber(null, "NotUsed");
454          }
455          catch (InvalidDestinationException JavaDoc expected)
456          {
457             caught = true;
458          }
459          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
460
461          caught = false;
462          try
463          {
464             session.createDurableSubscriber(temp, "NotUsed");
465          }
466          catch (InvalidDestinationException JavaDoc expected)
467          {
468             caught = true;
469          }
470          assertTrue("Expected an InvalidDestinationException for a temporary topic", caught);
471
472          caught = false;
473          try
474          {
475             session.createDurableSubscriber(null, "NotUsed", null, true);
476          }
477          catch (InvalidDestinationException JavaDoc expected)
478          {
479             caught = true;
480          }
481          assertTrue("Expected an InvalidDestinationException for a null topic", caught);
482
483          caught = false;
484          try
485          {
486             session.createDurableSubscriber(temp, "NotUsed", null, true);
487          }
488          catch (InvalidDestinationException JavaDoc expected)
489          {
490             caught = true;
491          }
492          assertTrue("Expected an InvalidDestinationException for a temporary topic", caught);
493
494          caught = false;
495          try
496          {
497             session.createDurableSubscriber(topic, null);
498          }
499          catch (Exception JavaDoc expected)
500          {
501             caught = true;
502          }
503          assertTrue("Expected a Exception for a null subscription", caught);
504
505          caught = false;
506          try
507          {
508             session.createDurableSubscriber(topic, null, null, false);
509          }
510          catch (Exception JavaDoc expected)
511          {
512             caught = true;
513          }
514          assertTrue("Expected a Exception for a null subscription", caught);
515
516          caught = false;
517          try
518          {
519             session.createDurableSubscriber(topic, " ");
520          }
521          catch (Exception JavaDoc expected)
522          {
523             caught = true;
524          }
525          assertTrue("Expected a Exception for an empty subscription", caught);
526
527          caught = false;
528          try
529          {
530             session.createDurableSubscriber(topic, " ", null, false);
531          }
532          catch (Exception JavaDoc expected)
533          {
534             caught = true;
535          }
536          assertTrue("Expected a Exception for an empty subscription", caught);
537       }
538       finally
539       {
540          disconnect();
541       }
542
543       getLog().debug("InvalidDestinationTopicSubscriber passed");
544    }
545
546    /**
547     * Test create queue.
548     */

549    public void testCreateQueue() throws Exception JavaDoc
550    {
551
552       getLog().debug("Starting create queue test");
553       connect();
554
555       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
556
557       Queue JavaDoc jndiQueue = (Queue JavaDoc) getInitialContext().lookup("queue/testQueue");
558       Queue JavaDoc createQueue = session.createQueue(jndiQueue.getQueueName());
559       assertTrue("Failed for " + QUEUE_FACTORY, jndiQueue.equals(createQueue));
560
561       getLog().debug("InvalidDestinationTopicSubscriber passed");
562    }
563
564    public void testMessageListener() throws Exception JavaDoc
565    {
566       getLog().debug("Starting create queue test");
567
568       connect();
569       queueConnection.start();
570       drainQueue();
571       final CountDown counter1 = new CountDown(3);
572
573       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
574       Queue JavaDoc queue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
575
576       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
577       receiver.setMessageListener(new MessageListener JavaDoc()
578       {
579          public void onMessage(Message JavaDoc msg)
580          {
581             Logger log = Logger.getLogger(getClass().getName());
582             log.debug("ML");
583             try
584             {
585                if (msg instanceof TextMessage JavaDoc)
586                {
587                   log.debug(((TextMessage JavaDoc) msg).getText());
588                   counter1.release();
589                }
590             }
591             catch (Exception JavaDoc e)
592             {
593             }
594          }
595       });
596
597       QueueSender JavaDoc sender = session.createSender(queue);
598
599       TextMessage JavaDoc message = session.createTextMessage();
600       message.setText("Normal message");
601       sender.send(message, DeliveryMode.NON_PERSISTENT, 4, 0);
602       //sender.send(queue, message, DeliveryMode.NON_PERSISTENT, 4, 0);
603
message.setText("Persistent message");
604       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
605       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
606
message.setText("High Priority Persistent message");
607       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
608       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
609

610       // Wait for the msgs to be received
611
counter1.acquire();
612       log.debug("MessageListener1 received the TMs sent");
613
614       final CountDown counter2 = new CountDown(2);
615       receiver.setMessageListener(new MessageListener JavaDoc()
616       {
617          public void onMessage(Message JavaDoc msg)
618          {
619             Logger log = Logger.getLogger(getClass().getName());
620             log.debug("ML 2");
621             try
622             {
623                if (msg instanceof TextMessage JavaDoc)
624                {
625                   log.debug(((TextMessage JavaDoc) msg).getText());
626                   counter2.release();
627                }
628             }
629             catch (Exception JavaDoc e)
630             {
631             }
632          }
633       });
634
635       message.setText("Persistent message");
636       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
637       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
638
message.setText("High Priority Persistent message");
639       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
640       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
641

642       // Wait for the msgs to be received
643
counter2.acquire();
644       log.debug("MessageListener2 received the TMs sent");
645
646       receiver.setMessageListener(null);
647
648       message.setText("Persistent message");
649       sender.send(message, DeliveryMode.PERSISTENT, 4, 0);
650       //sender.send(queue, message, DeliveryMode.PERSISTENT, 4, 0);
651
message.setText("High Priority Persistent message");
652       sender.send(message, DeliveryMode.PERSISTENT, 10, 0);
653       //sender.send(queue, message, DeliveryMode.PERSISTENT, 10, 0);
654

655       sender.close();
656       drainQueue();
657       disconnect();
658       getLog().debug("MessageListener test passed");
659    }
660
661    public void testApplicationServerStuff() throws Exception JavaDoc
662    {
663       getLog().debug("Starting testing app server stuff");
664       connect();
665
666       Queue JavaDoc testQueue = (Queue JavaDoc) context.lookup(TEST_QUEUE);
667       final QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
668
669       session.setMessageListener(new MessageListener JavaDoc()
670       {
671          public void onMessage(Message JavaDoc mess)
672          {
673             Logger log = Logger.getLogger(getClass().getName());
674             log.debug("Processing message");
675             try
676             {
677                if (mess instanceof TextMessage JavaDoc)
678                   log.debug(((TextMessage JavaDoc) mess).getText());
679             }
680             catch (Exception JavaDoc e)
681             {
682                log.error("Error", e);
683             }
684          }
685       });
686
687       QueueSender JavaDoc sender = session.createSender(testQueue);
688       sender.send(session.createTextMessage("Hi"));
689       sender.send(session.createTextMessage("There"));
690       sender.send(session.createTextMessage("Guys"));
691       queueConnection.createConnectionConsumer(testQueue, null, new ServerSessionPool JavaDoc()
692       {
693          public ServerSession JavaDoc getServerSession()
694          {
695             Logger.getLogger(getClass().getName()).debug("Getting server session.");
696             return new ServerSession JavaDoc()
697             {
698                public Session JavaDoc getSession()
699                {
700                   return session;
701                }
702                public void start()
703                {
704                   Logger.getLogger(getClass().getName()).debug("Starting server session.");
705                   session.run();
706                }
707             };
708          }
709       }, 10);
710
711       queueConnection.start();
712
713       try
714       {
715          Thread.sleep(5 * 1000);
716       }
717       catch (Exception JavaDoc e)
718       {
719       }
720
721       disconnect();
722       getLog().debug("Testing app server stuff passed");
723    }
724
725    //simply put a few messages on the test queue for next time.
726
/* public void testPM() throws Exception
727       {
728          getLog().debug("Starting testing pm");
729          connect();
730          
731          Queue testQueue = (Queue)context.lookup(TEST_QUEUE);
732          QueueSession session = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
733          QueueSender sender = session.createSender(testQueue);
734          sender.send(session.createTextMessage("From last time"));
735          sender.send(session.createTextMessage("From last time"));
736          sender.send(session.createTextMessage("From last time"));
737          sender.close();
738          session.close();
739          disconnect();
740          getLog().debug("Testing pm stuff passed");
741       }
742    */

743    private void drainMessagesForTopic(TopicSubscriber JavaDoc sub) throws JMSException JavaDoc
744    {
745       Message JavaDoc msg = sub.receive(50);
746       int c = 0;
747       while (msg != null)
748       {
749          c++;
750          if (msg instanceof TextMessage JavaDoc)
751             getLog().debug(((TextMessage JavaDoc) msg).getText());
752          msg = sub.receive(50);
753       }
754       getLog().debug("Received " + c + " messages from topic.");
755    }
756
757    public void testTopics() throws Exception JavaDoc
758    {
759       getLog().debug("Starting Topic test");
760       connect();
761
762       TopicConnectionFactory JavaDoc topicFactory = (TopicConnectionFactory JavaDoc) context.lookup(TOPIC_FACTORY);
763       topicConnection = topicFactory.createTopicConnection("john", "needle");
764
765       topicConnection.start();
766
767       //set up some subscribers to the topic
768
TopicSession JavaDoc session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
769       Topic JavaDoc topic = (Topic JavaDoc) context.lookup(TEST_TOPIC);
770
771       TopicSubscriber JavaDoc sub1 = session.createDurableSubscriber(topic, "sub1");
772       TopicSubscriber JavaDoc sub2 = session.createSubscriber(topic);
773       TopicSubscriber JavaDoc sub3 = session.createSubscriber(topic);
774
775       //Now a sender
776
TopicPublisher JavaDoc sender = session.createPublisher(topic);
777
778       //send some messages
779
sender.publish(session.createTextMessage("Message 1"));
780       sender.publish(session.createTextMessage("Message 2"));
781       sender.publish(session.createTextMessage("Message 3"));
782       drainMessagesForTopic(sub1);
783       drainMessagesForTopic(sub2);
784       drainMessagesForTopic(sub3);
785
786       //close some subscribers
787
sub1.close();
788       sub2.close();
789
790       //send some more messages
791
sender.publish(session.createTextMessage("Message 4"));
792       sender.publish(session.createTextMessage("Message 5"));
793       sender.publish(session.createTextMessage("Message 6"));
794
795       //give time for message 4 to be negatively acked (as it will be cause last receive timed out)
796
try
797       {
798          Thread.sleep(5 * 1000);
799       }
800       catch (InterruptedException JavaDoc e)
801       {
802       }
803
804       drainMessagesForTopic(sub3);
805
806       //open subscribers again.
807
sub1 = session.createDurableSubscriber(topic, "sub1");
808       sub2 = session.createSubscriber(topic);
809
810       //Send a final message
811
sender.publish(session.createTextMessage("Final message"));
812       sender.close();
813
814       drainMessagesForTopic(sub1);
815       drainMessagesForTopic(sub2);
816       drainMessagesForTopic(sub3);
817
818