KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmq > test > JBossSessionRecoverUnitTestCase


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.test.jbossmq.test;
23
24 import javax.jms.JMSException JavaDoc;
25 import javax.jms.Message JavaDoc;
26 import javax.jms.MessageListener JavaDoc;
27 import javax.jms.ObjectMessage JavaDoc;
28 import javax.jms.Queue JavaDoc;
29 import javax.jms.QueueConnection JavaDoc;
30 import javax.jms.QueueConnectionFactory JavaDoc;
31 import javax.jms.QueueReceiver JavaDoc;
32 import javax.jms.QueueSender JavaDoc;
33 import javax.jms.QueueSession JavaDoc;
34 import javax.jms.Session JavaDoc;
35 import javax.naming.Context JavaDoc;
36 import javax.naming.InitialContext JavaDoc;
37
38 import org.jboss.test.JBossTestCase;
39
40
41
42
43
44 /**
45
46  * JBossSessionRecoverUnitTestCase.java
47
48  *
49
50  * a simple session.recover test of JBossMQ
51
52  *
53
54  * @author Seth Sites
55
56  * @version $Revision: 37406 $
57
58  */

59
60 public class JBossSessionRecoverUnitTestCase
61
62    extends JBossTestCase
63
64 {
65
66    String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
67
68    String JavaDoc TEST_QUEUE = "queue/testQueue";
69
70
71
72    Context JavaDoc context;
73
74    QueueConnection JavaDoc queueConnection;
75
76    QueueSession JavaDoc session;
77
78
79
80    int counter=0;
81
82    Exception JavaDoc exception=null;
83
84
85
86    public JBossSessionRecoverUnitTestCase(String JavaDoc name) throws Exception JavaDoc
87
88    {
89
90       super(name);
91
92    }
93
94
95
96    protected void setUp()
97
98       throws Exception JavaDoc
99
100    {
101
102       this.getLog().debug("JBossSessionRecoverUnitTestCase, ConnectionFactory started");
103
104    }
105
106
107
108    protected void tearDown() throws Exception JavaDoc
109
110    {
111
112       this.getLog().debug("JBossSessionRecoverUnitTestCase, ConnectionFactory done");
113
114    }
115
116
117
118    // Emptys out all the messages in a queue
119

120    private void drainQueue() throws Exception JavaDoc
121
122    {
123
124       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
125
126       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
127
128
129
130       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
131
132       Message JavaDoc message = receiver.receive( 1000 );
133
134       int c=0;
135
136       while( message != null )
137
138       {
139
140          message = receiver.receive( 1000 );
141
142          c++;
143
144       }
145
146
147
148       if( c!=0 )
149
150          getLog().debug(" Drained "+c+" messages from the queue");
151
152
153
154       session.close();
155
156    }
157
158
159
160    static public void main ( String JavaDoc []args )
161
162    {
163
164       String JavaDoc newArgs[] = { "org.jboss.test.jbossmq.test.JBossSessionRecoverUnitTestCase" };
165
166       junit.swingui.TestRunner.main(newArgs);
167
168    }
169
170
171
172    protected void connect() throws Exception JavaDoc
173
174    {
175
176       if( context == null )
177
178       {
179
180          context = new InitialContext JavaDoc();
181
182       }
183
184       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc) context.lookup(QUEUE_FACTORY);
185
186       queueConnection = queueFactory.createQueueConnection();
187
188
189
190       getLog().debug("Connection to JBossMQ established.");
191
192    }
193
194
195
196    /**
197
198     * Test that session.recover works with a message listener
199
200     */

201
202    public void testQueueSessionRecovermessageListener() throws Exception JavaDoc
203
204    {
205
206       counter = 0;
207
208       getLog().debug("Starting session.recover() Message Listener test");
209
210
211
212       connect();
213
214
215
216       queueConnection.start();
217
218
219
220       drainQueue();
221
222
223
224       session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
225
226       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
227
228       QueueSender JavaDoc sender = session.createSender(queue);
229
230
231
232       // send 20 messages to the queue
233

234       for ( int i=0; i<20; i++ )
235
236       {
237
238          sender.send(session.createObjectMessage(new Integer JavaDoc(i)));
239
240       }
241
242
243
244       //close the session, so we can start one with CLIENT_ACKNOWLEDGE
245

246       session.close();
247
248       queueConnection.stop();
249
250       session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
251
252
253
254       //create our receiver
255

256       QueueReceiver JavaDoc receiver = session.createReceiver( queue );
257
258       MessageListener JavaDoc messagelistener = new MessageListener JavaDoc()
259
260       {
261
262          public void onMessage(Message JavaDoc message)
263
264          {
265
266             processMessage( message );
267
268          }
269
270       };
271
272       receiver.setMessageListener( messagelistener );
273
274       queueConnection.start();
275
276
277
278       //since we put in 20 messages and recovered after receiving 20 we should receive those 20
279

280       //back and get 40 total
281

282       while ( counter < 40 && exception == null )
283
284       {
285
286          try
287
288          {
289
290             Thread.sleep( 500 );
291
292          }
293
294          catch ( InterruptedException JavaDoc ie )
295
296          {
297
298          }
299
300       }
301
302
303
304       if ( exception != null )
305
306       {
307
308          queueConnection.close();
309
310          throw exception;
311
312       }
313
314
315
316       queueConnection.close();
317
318       getLog().debug("session.recover() Message Listener passed");
319
320    }
321
322
323
324    private void processMessage ( Message JavaDoc message )
325
326    {
327
328       try
329
330       {
331
332          if ( message instanceof ObjectMessage JavaDoc )
333
334          {
335
336             counter++;
337
338             ObjectMessage JavaDoc objectmessage = (ObjectMessage JavaDoc)message;
339
340             Integer JavaDoc integer = (Integer JavaDoc)objectmessage.getObject();
341
342             int mynumber = integer.intValue();
343
344             getLog().debug("message object " + integer + " counter=" + counter );
345
346             if ( mynumber == 19 )
347
348             {
349
350                if (counter == 20)
351
352                {
353
354                   session.recover();
355
356                }
357
358                else
359
360                {
361
362                   message.acknowledge();
363
364                }
365
366             }
367
368          }
369
370       }
371
372       catch ( JMSException JavaDoc e )
373
374       {
375
376          exception = e;
377
378       }
379
380    }
381
382
383    class Synch
384    {
385        boolean waiting = false;
386        public synchronized void doWait(long timeout)
387           throws InterruptedException JavaDoc
388        {
389           waiting = true;
390           this.wait(timeout);
391        }
392        public synchronized void doNotify()
393           throws InterruptedException JavaDoc
394        {
395           while (waiting == false)
396              wait(100);
397           this.notifyAll();
398        }
399    }
400
401
402    /**
403
404     * Test that session.recover delivers messages in the correct orer
405
406     */

407
408    public void testQueueSessionRecoverMessageListenerOrder()
409       throws Exception JavaDoc
410
411    {
412
413       counter = 0;
414
415       exception = null;
416       getLog().debug("Starting session.recover() Message Listener Order test");
417
418
419
420       connect();
421
422
423
424       queueConnection.start();
425
426
427
428       drainQueue();
429
430
431
432       session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
433
434       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
435
436       QueueSender JavaDoc sender = session.createSender(queue);
437
438
439
440       // send 4 messages to the queue
441

442       for (int i=0; i<4; ++i)
443
444       {
445
446          sender.send(session.createObjectMessage(new Integer JavaDoc(i)));
447
448       }
449
450
451
452       //create our receiver
453

454       QueueReceiver JavaDoc receiver = session.createReceiver( queue );
455
456       final Synch synch = new Synch();
457       MessageListener JavaDoc messagelistener = new MessageListener JavaDoc()
458
459       {
460
461          public void onMessage(Message JavaDoc message)
462
463          {
464
465             checkMessagesInOrder(session, message, synch);
466
467          }
468
469       };
470
471       receiver.setMessageListener( messagelistener );
472
473       queueConnection.start();
474
475
476       synch.doWait(10000);
477
478
479       if ( exception != null )
480
481       {
482
483          queueConnection.close();
484
485          throw exception;
486
487       }
488
489
490
491       queueConnection.close();
492
493       getLog().debug("session.recover() Message Listener Order passed");
494
495    }
496
497
498
499    private void checkMessagesInOrder(Session JavaDoc session, Message JavaDoc message, Synch synch)
500
501    {
502
503       try
504
505       {
506
507          ObjectMessage JavaDoc objectmessage = (ObjectMessage JavaDoc)message;
508
509          Integer JavaDoc integer = (Integer JavaDoc)objectmessage.getObject();
510
511          int mynumber = integer.intValue();
512
513          if (message.getJMSRedelivered() == false)
514          {
515             log.debug("Recovering " + mynumber);
516             session.recover();
517             return;
518          }
519
520          log.debug("Checking " + mynumber);
521          assertTrue("Expected messages in order", mynumber == counter);
522          counter++;
523          if (counter == 4)
524             synch.doNotify();
525       }
526
527       catch (Exception JavaDoc e)
528
529       {
530
531          exception = e;
532
533       }
534
535    }
536
537
538
539    /**
540
541     * Test that session.recover works with receive
542
543     */

544
545    public void testQueueSessionRecoverReceive() throws Exception JavaDoc
546
547    {
548
549       counter = 0;
550
551       getLog().debug("Starting session.recover() receive test");
552
553
554
555       connect();
556
557
558
559       queueConnection.start();
560
561
562
563       drainQueue();
564
565
566
567       session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
568
569       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
570
571       QueueSender JavaDoc sender = session.createSender(queue);
572
573
574
575       // send 20 messages to the queue
576

577       for ( int i=0; i<20; i++ )
578
579       {
580
581          sender.send(session.createObjectMessage(new Integer JavaDoc(i)));
582
583       }
584
585
586
587       //close the session, so we can start one with CLIENT_ACKNOWLEDGE
588

589       session.close();
590
591       queueConnection.stop();
592
593       session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
594
595
596
597       //create our receiver
598

599       QueueReceiver JavaDoc receiver = session.createReceiver( queue );
600
601       queueConnection.start();
602
603
604
605       Message JavaDoc message = receiver.receive( 1000 );
606
607       int messagecounter=0;
608
609       while( message != null )
610
611       {
612
613          message = receiver.receive( 1000 );
614
615          messagecounter++;
616
617       }
618
619
620
621       if ( messagecounter != 20 )
622
623       {
624
625          throw new Exception JavaDoc ( "Not all sent messages were delivered! messagecounter=" + messagecounter );
626
627       }
628
629
630
631       //we got all of our messages, let's recover
632

633       session.recover();
634
635
636
637       message = receiver.receive();
638
639       messagecounter=0;
640
641       while( message != null )
642
643       {
644
645          if ( !message.getJMSRedelivered() )
646
647          {
648
649             throw new Exception JavaDoc ( "Message was not marked as redelivered! messagecounter=" + messagecounter );
650
651          }
652
653          message.acknowledge();
654
655
656
657          messagecounter++;
658
659          //workaround to keep from timing out since there are no more message on the server
660

661          if ( messagecounter < 15 )
662
663          {
664
665             message = receiver.receive();
666
667          }
668
669          else
670
671          {
672
673             message = receiver.receive ( 1000 );
674
675          }
676
677       }
678
679
680
681       if ( messagecounter != 20 )
682
683       {
684
685          throw new Exception JavaDoc ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter );
686
687       }
688
689
690
691       queueConnection.close();
692
693       getLog().debug("session.recover() receive passed");
694
695    }
696
697
698
699    /**
700
701     * Test that session.recover works with receive(timeout)
702
703     */

704
705    public void testQueueSessionRecoverReceiveTimeout() throws Exception JavaDoc
706
707    {
708
709       counter = 0;
710
711       getLog().debug("Starting session.recover() receive(timeout) test");
712
713
714
715       connect();
716
717
718
719       queueConnection.start();
720
721
722
723       drainQueue();
724
725
726
727       session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
728
729       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
730
731       QueueSender JavaDoc sender = session.createSender(queue);
732
733
734
735       // send 20 messages to the queue
736

737       for ( int i=0; i<20; i++ )
738
739       {
740
741          sender.send(session.createObjectMessage(new Integer JavaDoc(i)));
742
743       }
744
745
746
747       //close the session, so we can start one with CLIENT_ACKNOWLEDGE
748

749       session.close();
750
751       queueConnection.stop();
752
753       session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
754
755
756
757       //create our receiver
758

759       QueueReceiver JavaDoc receiver = session.createReceiver( queue );
760
761       queueConnection.start();
762
763
764
765       Message JavaDoc message = receiver.receive( 1000 );
766
767       int messagecounter=0;
768
769       while( message != null )
770
771       {
772
773          message = receiver.receive( 1000 );
774
775          messagecounter++;
776
777       }
778
779
780
781       if ( messagecounter != 20 )
782
783       {
784
785          throw new Exception JavaDoc ( "Not all sent messages were delivered! messagecounter=" + messagecounter );
786
787       }
788
789
790
791       //we got all of our messages, let's recover
792

793       session.recover();
794
795
796
797       message = receiver.receive(1000);
798
799       messagecounter=0;
800
801       while( message != null )
802
803       {
804
805          if ( !message.getJMSRedelivered() )
806
807          {
808
809             throw new Exception JavaDoc ( "Message was not marked as redelivered! messagecounter=" + messagecounter );
810
811          }
812
813          message.acknowledge();
814
815
816
817          messagecounter++;
818
819          message = receiver.receive( 1000 );
820
821       }
822
823
824
825       if ( messagecounter != 20 )
826
827       {
828
829          throw new Exception JavaDoc ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter );
830
831       }
832
833
834
835       queueConnection.close();
836
837       getLog().debug("session.recover() receive(timeout) passed");
838
839    }
840
841
842
843       /**
844
845     * Test that session.recover works with receiveNoWait
846
847     */

848
849    public void testQueueSessionRecoverReceiveNoWait() throws Exception JavaDoc
850
851    {
852
853       counter = 0;
854
855       getLog().debug("Starting session.recover() receiveNoWait test");
856
857
858
859       connect();
860
861
862
863       queueConnection.start();
864
865
866
867       drainQueue();
868
869
870
871       session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
872
873       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
874
875       QueueSender JavaDoc sender = session.createSender(queue);
876
877
878
879       // send 20 messages to the queue
880

881       for ( int i=0; i<20; i++ )
882
883       {
884
885          sender.send(session.createObjectMessage(new Integer JavaDoc(i)));
886
887       }
888
889
890
891       //close the session, so we can start one with CLIENT_ACKNOWLEDGE
892

893       session.close();
894
895       queueConnection.stop();
896
897       session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
898
899
900
901       //create our receiver
902

903       QueueReceiver JavaDoc receiver = session.createReceiver( queue );
904
905       queueConnection.start();
906
907
908
909       Message JavaDoc message = receiver.receiveNoWait();
910
911       int messagecounter=0;
912
913       while( message != null )
914
915       {
916
917          message = receiver.receiveNoWait();
918
919          messagecounter++;
920
921       }
922
923
924
925       if ( messagecounter != 20 )
926
927       {
928
929          throw new Exception JavaDoc ( "Not all sent messages were delivered! messagecounter=" + messagecounter );
930
931       }
932
933
934
935       //we got all of our messages, let's recover
936

937       session.recover();
938
939
940
941       message = receiver.receiveNoWait();
942
943       messagecounter=0;
944
945       while( message != null )
946
947       {
948
949          if ( !message.getJMSRedelivered() )
950
951          {
952
953             throw new Exception JavaDoc ( "Message was not marked as redelivered! messagecounter=" + messagecounter );
954
955          }
956
957          message.acknowledge();
958
959
960
961          messagecounter++;
962
963          message = receiver.receiveNoWait();
964
965       }
966
967
968
969       if ( messagecounter != 20 )
970
971       {
972
973          throw new Exception JavaDoc ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter );
974
975       }
976
977
978
979       queueConnection.close();
980
981       getLog().debug("session.recover() receiveNoWait passed");
982
983    }
984
985    public static junit.framework.Test suite() throws Exception JavaDoc
986    {
987        ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
988        return getDeploySetup(JBossSessionRecoverUnitTestCase.class,
989                loader.getResource("messaging/test-destinations-service.xml").toString());
990    }
991
992 }
Popular Tags