KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmessaging > test > JBossSessionRecoverUnitTestCase


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.test.jbossmessaging.test;
23
24 import javax.jms.JMSException JavaDoc;
25 import javax.jms.Message JavaDoc;
26 import javax.jms.MessageListener JavaDoc;
27 import javax.jms.ObjectMessage JavaDoc;
28 import javax.jms.Queue JavaDoc;
29 import javax.jms.QueueConnection JavaDoc;
30 import javax.jms.QueueConnectionFactory JavaDoc;
31 import javax.jms.QueueReceiver JavaDoc;
32 import javax.jms.QueueSender JavaDoc;
33 import javax.jms.QueueSession JavaDoc;
34 import javax.jms.Session JavaDoc;
35 import javax.naming.Context JavaDoc;
36 import javax.naming.InitialContext JavaDoc;
37
38 import org.jboss.test.jbossmessaging.JMSTestCase;
39
40 /**
41  * JBossSessionRecoverUnitTestCase.java
42  *
43  * a simple session.recover test of JBossMQ
44  *
45  * @author Seth Sites
46  * @version $Revision: 37406 $
47  */

48
49 public class JBossSessionRecoverUnitTestCase extends JMSTestCase
50 {
51    String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
52    String JavaDoc TEST_QUEUE = "queue/testQueue";
53
54    Context JavaDoc context;
55    QueueConnection JavaDoc queueConnection;
56    QueueSession JavaDoc session;
57    int counter=0;
58    Exception JavaDoc exception=null;
59
60    public JBossSessionRecoverUnitTestCase(String JavaDoc name) throws Exception JavaDoc
61    {
62       super(name);
63    }
64
65    protected void setUp()
66       throws Exception JavaDoc
67    {
68        // call setUp() in superclass
69
super.setUp() ;
70
71       this.getLog().debug("JBossSessionRecoverUnitTestCase, ConnectionFactory started");
72    }
73
74    protected void tearDown() throws Exception JavaDoc
75    {
76       this.getLog().debug("JBossSessionRecoverUnitTestCase, ConnectionFactory done");
77
78       // call tearDown() in superclass
79
super.tearDown() ;
80    }
81
82    // Emptys out all the messages in a queue
83
private void drainQueue() throws Exception JavaDoc
84    {
85       QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
86       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
87
88       QueueReceiver JavaDoc receiver = session.createReceiver(queue);
89       Message JavaDoc message = receiver.receive( 1000 );
90
91       int c=0;
92       while( message != null )
93       {
94          message = receiver.receive( 1000 );
95          c++;
96       }
97
98       if( c!=0 )
99          getLog().debug(" Drained "+c+" messages from the queue");
100
101       session.close();
102    }
103
104    static public void main ( String JavaDoc []args )
105    {
106       String JavaDoc newArgs[] = { "org.jboss.test.jbossmq.test.JBossSessionRecoverUnitTestCase" };
107       junit.swingui.TestRunner.main(newArgs);
108    }
109
110    protected void connect() throws Exception JavaDoc
111    {
112       if( context == null )
113       {
114          context = new InitialContext JavaDoc();
115       }
116
117       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc) context.lookup(QUEUE_FACTORY);
118       queueConnection = queueFactory.createQueueConnection();
119
120       getLog().debug("Connection to JBossMQ established.");
121    }
122
123    /**
124     * Test that session.recover works with a message listener
125     */

126    public void testQueueSessionRecovermessageListener() throws Exception JavaDoc
127    {
128       counter = 0;
129       getLog().debug("Starting session.recover() Message Listener test");
130
131       connect();
132       queueConnection.start();
133       drainQueue();
134
135       session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
136       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
137       QueueSender JavaDoc sender = session.createSender(queue);
138
139       // send 20 messages to the queue
140
for ( int i=0; i<20; i++ )
141       {
142          sender.send(session.createObjectMessage(new Integer JavaDoc(i)));
143       }
144
145       //close the session, so we can start one with CLIENT_ACKNOWLEDGE
146
session.close();
147       queueConnection.stop();
148       session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
149
150       //create our receiver
151
QueueReceiver JavaDoc receiver = session.createReceiver( queue );
152       MessageListener JavaDoc messagelistener = new MessageListener JavaDoc()
153       {
154          public void onMessage(Message JavaDoc message)
155          {
156             processMessage( message );
157          }
158       };
159
160       receiver.setMessageListener( messagelistener );
161       queueConnection.start();
162
163       //since we put in 20 messages and recovered after receiving 20 we should receive those 20
164
//back and get 40 total
165
while ( counter < 40 && exception == null )
166       {
167          try
168          {
169             Thread.sleep( 500 );
170          }
171          catch ( InterruptedException JavaDoc ie )
172          {
173          }
174       }
175
176       if ( exception != null )
177       {
178          queueConnection.close();
179          throw exception;
180       }
181
182       queueConnection.close();
183       getLog().debug("session.recover() Message Listener passed");
184    }
185
186    private void processMessage ( Message JavaDoc message )
187    {
188       try
189       {
190          if ( message instanceof ObjectMessage JavaDoc )
191          {
192             counter++;
193             ObjectMessage JavaDoc objectmessage = (ObjectMessage JavaDoc)message;
194             Integer JavaDoc integer = (Integer JavaDoc)objectmessage.getObject();
195             int mynumber = integer.intValue();
196             getLog().debug("message object " + integer + " counter=" + counter );
197
198             if ( mynumber == 19 )
199             {
200                if (counter == 20)
201                {
202                   session.recover();
203                }
204                else
205                {
206                   message.acknowledge();
207                }
208             }
209          }
210       }
211       catch ( JMSException JavaDoc e )
212       {
213          exception = e;
214       }
215    }
216
217    class Synch
218    {
219        boolean waiting = false;
220        public synchronized void doWait(long timeout)
221           throws InterruptedException JavaDoc
222        {
223           waiting = true;
224           this.wait(timeout);
225        }
226        public synchronized void doNotify()
227           throws InterruptedException JavaDoc
228        {
229           while (waiting == false)
230              wait(100);
231           this.notifyAll();
232        }
233    }
234
235
236    /**
237     * Test that session.recover delivers messages in the correct orer
238     */

239    public void testQueueSessionRecoverMessageListenerOrder()
240       throws Exception JavaDoc
241    {
242       counter = 0;
243       exception = null;
244       getLog().debug("Starting session.recover() Message Listener Order test");
245
246       connect();
247       queueConnection.start();
248       drainQueue();
249
250       session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
251       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
252       QueueSender JavaDoc sender = session.createSender(queue);
253
254       // send 4 messages to the queue
255
for (int i=0; i<4; ++i)
256       {
257          sender.send(session.createObjectMessage(new Integer JavaDoc(i)));
258       }
259
260       //create our receiver
261
QueueReceiver JavaDoc receiver = session.createReceiver( queue );
262       final Synch synch = new Synch();
263       MessageListener JavaDoc messagelistener = new MessageListener JavaDoc()
264       {
265          public void onMessage(Message JavaDoc message)
266          {
267             checkMessagesInOrder(session, message, synch);
268          }
269       };
270
271       receiver.setMessageListener( messagelistener );
272       queueConnection.start();
273       synch.doWait(10000);
274
275       if ( exception != null )
276       {
277          queueConnection.close();
278          throw exception;
279       }
280
281       queueConnection.close();
282       getLog().debug("session.recover() Message Listener Order passed");
283    }
284
285    private void checkMessagesInOrder(Session JavaDoc session, Message JavaDoc message, Synch synch)
286    {
287       try
288       {
289          ObjectMessage JavaDoc objectmessage = (ObjectMessage JavaDoc)message;
290          Integer JavaDoc integer = (Integer JavaDoc)objectmessage.getObject();
291          int mynumber = integer.intValue();
292
293          if (message.getJMSRedelivered() == false)
294          {
295             log.debug("Recovering " + mynumber);
296             session.recover();
297             return;
298          }
299
300          log.debug("Checking " + mynumber);
301          assertTrue("Expected messages in order", mynumber == counter);
302          counter++;
303          if (counter == 4)
304             synch.doNotify();
305       }
306       catch (Exception JavaDoc e)
307       {
308          exception = e;
309       }
310    }
311
312
313
314    /**
315     * Test that session.recover works with receive
316     */

317    public void testQueueSessionRecoverReceive() throws Exception JavaDoc
318    {
319       counter = 0;
320       getLog().debug("Starting session.recover() receive test");
321
322       connect();
323       queueConnection.start();
324       drainQueue();
325
326       session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
327       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
328       QueueSender JavaDoc sender = session.createSender(queue);
329
330       // send 20 messages to the queue
331
for ( int i=0; i<20; i++ )
332       {
333          sender.send(session.createObjectMessage(new Integer JavaDoc(i)));
334       }
335
336       //close the session, so we can start one with CLIENT_ACKNOWLEDGE
337
session.close();
338       queueConnection.stop();
339       session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
340
341       //create our receiver
342
QueueReceiver JavaDoc receiver = session.createReceiver( queue );
343       queueConnection.start();
344
345       Message JavaDoc message = receiver.receive( 1000 );
346       int messagecounter=0;
347       while( message != null )
348       {
349          message = receiver.receive( 1000 );
350          messagecounter++;
351       }
352
353       if ( messagecounter != 20 )
354       {
355          throw new Exception JavaDoc ( "Not all sent messages were delivered! messagecounter=" + messagecounter );
356       }
357
358       //we got all of our messages, let's recover
359
session.recover();
360       message = receiver.receive();
361       messagecounter=0;
362
363       while( message != null )
364       {
365          if ( !message.getJMSRedelivered() )
366          {
367             throw new Exception JavaDoc ( "Message was not marked as redelivered! messagecounter=" + messagecounter );
368          }
369
370          message.acknowledge();
371          messagecounter++;
372
373          //workaround to keep from timing out since there are no more message on the server
374
if ( messagecounter < 15 )
375          {
376             message = receiver.receive();
377          }
378          else
379          {
380             message = receiver.receive ( 1000 );
381          }
382       }
383
384       if ( messagecounter != 20 )
385       {
386          throw new Exception JavaDoc ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter );
387       }
388
389       queueConnection.close();
390       getLog().debug("session.recover() receive passed");
391    }
392
393    /**
394     * Test that session.recover works with receive(timeout)
395     */

396    public void testQueueSessionRecoverReceiveTimeout() throws Exception JavaDoc
397    {
398       counter = 0;
399       getLog().debug("Starting session.recover() receive(timeout) test");
400
401       connect();
402       queueConnection.start();
403       drainQueue();
404
405
406
407       session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
408       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
409       QueueSender JavaDoc sender = session.createSender(queue);
410
411       // send 20 messages to the queue
412
for ( int i=0; i<20; i++ )
413       {
414          sender.send(session.createObjectMessage(new Integer JavaDoc(i)));
415       }
416
417       //close the session, so we can start one with CLIENT_ACKNOWLEDGE
418
session.close();
419       queueConnection.stop();
420       session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
421
422       //create our receiver
423
QueueReceiver JavaDoc receiver = session.createReceiver( queue );
424       queueConnection.start();
425
426       Message JavaDoc message = receiver.receive( 1000 );
427       int messagecounter=0;
428
429       while( message != null )
430       {
431          message = receiver.receive( 1000 );
432          messagecounter++;
433       }
434
435       if ( messagecounter != 20 )
436       {
437          throw new Exception JavaDoc ( "Not all sent messages were delivered! messagecounter=" + messagecounter );
438       }
439
440       //we got all of our messages, let's recover
441
session.recover();
442       message = receiver.receive(1000);
443       messagecounter=0;
444
445       while( message != null )
446       {
447          if ( !message.getJMSRedelivered() )
448          {
449             throw new Exception JavaDoc ( "Message was not marked as redelivered! messagecounter=" + messagecounter );
450          }
451
452          message.acknowledge();
453          messagecounter++;
454          message = receiver.receive( 1000 );
455       }
456
457       if ( messagecounter != 20 )
458       {
459          throw new Exception JavaDoc ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter );
460       }
461
462       queueConnection.close();
463       getLog().debug("session.recover() receive(timeout) passed");
464    }
465
466    /**
467     * Test that session.recover works with receiveNoWait
468     */

469    public void testQueueSessionRecoverReceiveNoWait() throws Exception JavaDoc
470    {
471       counter = 0;
472
473       getLog().debug("Starting session.recover() receiveNoWait test");
474
475
476
477       connect();
478
479
480       queueConnection.start();
481       drainQueue();
482
483       session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
484       Queue JavaDoc queue = (Queue JavaDoc)context.lookup(TEST_QUEUE);
485       QueueSender JavaDoc sender = session.createSender(queue);
486
487       // send 20 messages to the queue
488
for ( int i=0; i<20; i++ )
489       {
490          sender.send(session.createObjectMessage(new Integer JavaDoc(i)));
491       }
492
493       //close the session, so we can start one with CLIENT_ACKNOWLEDGE
494
session.close();
495       queueConnection.stop();
496       session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
497
498       //create our receiver
499
QueueReceiver JavaDoc receiver = session.createReceiver( queue );
500       queueConnection.start();
501
502       Message JavaDoc message = receiver.receiveNoWait();
503       int messagecounter=0;
504
505       while( message != null )
506       {
507          message = receiver.receiveNoWait();
508          messagecounter++;
509       }
510
511       if ( messagecounter != 20 )
512       {
513          throw new Exception JavaDoc ( "Not all sent messages were delivered! messagecounter=" + messagecounter );
514       }
515
516       //we got all of our messages, let's recover
517
session.recover();
518
519       message = receiver.receiveNoWait();
520       messagecounter=0;
521
522       while( message != null )
523       {
524          if ( !message.getJMSRedelivered() )
525          {
526             throw new Exception JavaDoc ( "Message was not marked as redelivered! messagecounter=" + messagecounter );
527          }
528
529          message.acknowledge();
530          messagecounter++;
531          message = receiver.receiveNoWait();
532       }
533
534       if ( messagecounter != 20 )
535       {
536          throw new Exception JavaDoc ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter );
537       }
538
539       queueConnection.close();
540       getLog().debug("session.recover() receiveNoWait passed");
541    }
542
543    public static junit.framework.Test suite() throws Exception JavaDoc
544    {
545        ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
546        String JavaDoc resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ;
547
548        return getDeploySetup(JBossSessionRecoverUnitTestCase.class,
549                loader.getResource(resourceName).toString());
550    }
551 }
552
Popular Tags