1 22 package org.jboss.test.jbossmessaging.test; 23 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageListener ; 27 import javax.jms.ObjectMessage ; 28 import javax.jms.Queue ; 29 import javax.jms.QueueConnection ; 30 import javax.jms.QueueConnectionFactory ; 31 import javax.jms.QueueReceiver ; 32 import javax.jms.QueueSender ; 33 import javax.jms.QueueSession ; 34 import javax.jms.Session ; 35 import javax.naming.Context ; 36 import javax.naming.InitialContext ; 37 38 import org.jboss.test.jbossmessaging.JMSTestCase; 39 40 48 49 public class JBossSessionRecoverUnitTestCase extends JMSTestCase 50 { 51 String QUEUE_FACTORY = "ConnectionFactory"; 52 String TEST_QUEUE = "queue/testQueue"; 53 54 Context context; 55 QueueConnection queueConnection; 56 QueueSession session; 57 int counter=0; 58 Exception exception=null; 59 60 public JBossSessionRecoverUnitTestCase(String name) throws Exception 61 { 62 super(name); 63 } 64 65 protected void setUp() 66 throws Exception 67 { 68 super.setUp() ; 70 71 this.getLog().debug("JBossSessionRecoverUnitTestCase, ConnectionFactory started"); 72 } 73 74 protected void tearDown() throws Exception 75 { 76 this.getLog().debug("JBossSessionRecoverUnitTestCase, ConnectionFactory done"); 77 78 super.tearDown() ; 80 } 81 82 private void drainQueue() throws Exception 84 { 85 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 86 Queue queue = (Queue )context.lookup(TEST_QUEUE); 87 88 QueueReceiver receiver = session.createReceiver(queue); 89 Message message = receiver.receive( 1000 ); 90 91 int c=0; 92 while( message != null ) 93 { 94 message = receiver.receive( 1000 ); 95 c++; 96 } 97 98 if( c!=0 ) 99 getLog().debug(" Drained "+c+" messages from the queue"); 100 101 session.close(); 102 } 103 104 static public void main ( String []args ) 105 { 106 String newArgs[] = { "org.jboss.test.jbossmq.test.JBossSessionRecoverUnitTestCase" }; 107 junit.swingui.TestRunner.main(newArgs); 108 } 109 110 protected void connect() throws Exception 111 { 112 if( context == null ) 113 { 114 context = new InitialContext (); 115 } 116 117 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 118 queueConnection = queueFactory.createQueueConnection(); 119 120 getLog().debug("Connection to JBossMQ established."); 121 } 122 123 126 public void testQueueSessionRecovermessageListener() throws Exception 127 { 128 counter = 0; 129 getLog().debug("Starting session.recover() Message Listener test"); 130 131 connect(); 132 queueConnection.start(); 133 drainQueue(); 134 135 session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 136 Queue queue = (Queue )context.lookup(TEST_QUEUE); 137 QueueSender sender = session.createSender(queue); 138 139 for ( int i=0; i<20; i++ ) 141 { 142 sender.send(session.createObjectMessage(new Integer (i))); 143 } 144 145 session.close(); 147 queueConnection.stop(); 148 session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE ); 149 150 QueueReceiver receiver = session.createReceiver( queue ); 152 MessageListener messagelistener = new MessageListener () 153 { 154 public void onMessage(Message message) 155 { 156 processMessage( message ); 157 } 158 }; 159 160 receiver.setMessageListener( messagelistener ); 161 queueConnection.start(); 162 163 while ( counter < 40 && exception == null ) 166 { 167 try 168 { 169 Thread.sleep( 500 ); 170 } 171 catch ( InterruptedException ie ) 172 { 173 } 174 } 175 176 if ( exception != null ) 177 { 178 queueConnection.close(); 179 throw exception; 180 } 181 182 queueConnection.close(); 183 getLog().debug("session.recover() Message Listener passed"); 184 } 185 186 private void processMessage ( Message message ) 187 { 188 try 189 { 190 if ( message instanceof ObjectMessage ) 191 { 192 counter++; 193 ObjectMessage objectmessage = (ObjectMessage )message; 194 Integer integer = (Integer )objectmessage.getObject(); 195 int mynumber = integer.intValue(); 196 getLog().debug("message object " + integer + " counter=" + counter ); 197 198 if ( mynumber == 19 ) 199 { 200 if (counter == 20) 201 { 202 session.recover(); 203 } 204 else 205 { 206 message.acknowledge(); 207 } 208 } 209 } 210 } 211 catch ( JMSException e ) 212 { 213 exception = e; 214 } 215 } 216 217 class Synch 218 { 219 boolean waiting = false; 220 public synchronized void doWait(long timeout) 221 throws InterruptedException 222 { 223 waiting = true; 224 this.wait(timeout); 225 } 226 public synchronized void doNotify() 227 throws InterruptedException 228 { 229 while (waiting == false) 230 wait(100); 231 this.notifyAll(); 232 } 233 } 234 235 236 239 public void testQueueSessionRecoverMessageListenerOrder() 240 throws Exception 241 { 242 counter = 0; 243 exception = null; 244 getLog().debug("Starting session.recover() Message Listener Order test"); 245 246 connect(); 247 queueConnection.start(); 248 drainQueue(); 249 250 session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 251 Queue queue = (Queue )context.lookup(TEST_QUEUE); 252 QueueSender sender = session.createSender(queue); 253 254 for (int i=0; i<4; ++i) 256 { 257 sender.send(session.createObjectMessage(new Integer (i))); 258 } 259 260 QueueReceiver receiver = session.createReceiver( queue ); 262 final Synch synch = new Synch(); 263 MessageListener messagelistener = new MessageListener () 264 { 265 public void onMessage(Message message) 266 { 267 checkMessagesInOrder(session, message, synch); 268 } 269 }; 270 271 receiver.setMessageListener( messagelistener ); 272 queueConnection.start(); 273 synch.doWait(10000); 274 275 if ( exception != null ) 276 { 277 queueConnection.close(); 278 throw exception; 279 } 280 281 queueConnection.close(); 282 getLog().debug("session.recover() Message Listener Order passed"); 283 } 284 285 private void checkMessagesInOrder(Session session, Message message, Synch synch) 286 { 287 try 288 { 289 ObjectMessage objectmessage = (ObjectMessage )message; 290 Integer integer = (Integer )objectmessage.getObject(); 291 int mynumber = integer.intValue(); 292 293 if (message.getJMSRedelivered() == false) 294 { 295 log.debug("Recovering " + mynumber); 296 session.recover(); 297 return; 298 } 299 300 log.debug("Checking " + mynumber); 301 assertTrue("Expected messages in order", mynumber == counter); 302 counter++; 303 if (counter == 4) 304 synch.doNotify(); 305 } 306 catch (Exception e) 307 { 308 exception = e; 309 } 310 } 311 312 313 314 317 public void testQueueSessionRecoverReceive() throws Exception 318 { 319 counter = 0; 320 getLog().debug("Starting session.recover() receive test"); 321 322 connect(); 323 queueConnection.start(); 324 drainQueue(); 325 326 session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 327 Queue queue = (Queue )context.lookup(TEST_QUEUE); 328 QueueSender sender = session.createSender(queue); 329 330 for ( int i=0; i<20; i++ ) 332 { 333 sender.send(session.createObjectMessage(new Integer (i))); 334 } 335 336 session.close(); 338 queueConnection.stop(); 339 session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE ); 340 341 QueueReceiver receiver = session.createReceiver( queue ); 343 queueConnection.start(); 344 345 Message message = receiver.receive( 1000 ); 346 int messagecounter=0; 347 while( message != null ) 348 { 349 message = receiver.receive( 1000 ); 350 messagecounter++; 351 } 352 353 if ( messagecounter != 20 ) 354 { 355 throw new Exception ( "Not all sent messages were delivered! messagecounter=" + messagecounter ); 356 } 357 358 session.recover(); 360 message = receiver.receive(); 361 messagecounter=0; 362 363 while( message != null ) 364 { 365 if ( !message.getJMSRedelivered() ) 366 { 367 throw new Exception ( "Message was not marked as redelivered! messagecounter=" + messagecounter ); 368 } 369 370 message.acknowledge(); 371 messagecounter++; 372 373 if ( messagecounter < 15 ) 375 { 376 message = receiver.receive(); 377 } 378 else 379 { 380 message = receiver.receive ( 1000 ); 381 } 382 } 383 384 if ( messagecounter != 20 ) 385 { 386 throw new Exception ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter ); 387 } 388 389 queueConnection.close(); 390 getLog().debug("session.recover() receive passed"); 391 } 392 393 396 public void testQueueSessionRecoverReceiveTimeout() throws Exception 397 { 398 counter = 0; 399 getLog().debug("Starting session.recover() receive(timeout) test"); 400 401 connect(); 402 queueConnection.start(); 403 drainQueue(); 404 405 406 407 session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 408 Queue queue = (Queue )context.lookup(TEST_QUEUE); 409 QueueSender sender = session.createSender(queue); 410 411 for ( int i=0; i<20; i++ ) 413 { 414 sender.send(session.createObjectMessage(new Integer (i))); 415 } 416 417 session.close(); 419 queueConnection.stop(); 420 session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE ); 421 422 QueueReceiver receiver = session.createReceiver( queue ); 424 queueConnection.start(); 425 426 Message message = receiver.receive( 1000 ); 427 int messagecounter=0; 428 429 while( message != null ) 430 { 431 message = receiver.receive( 1000 ); 432 messagecounter++; 433 } 434 435 if ( messagecounter != 20 ) 436 { 437 throw new Exception ( "Not all sent messages were delivered! messagecounter=" + messagecounter ); 438 } 439 440 session.recover(); 442 message = receiver.receive(1000); 443 messagecounter=0; 444 445 while( message != null ) 446 { 447 if ( !message.getJMSRedelivered() ) 448 { 449 throw new Exception ( "Message was not marked as redelivered! messagecounter=" + messagecounter ); 450 } 451 452 message.acknowledge(); 453 messagecounter++; 454 message = receiver.receive( 1000 ); 455 } 456 457 if ( messagecounter != 20 ) 458 { 459 throw new Exception ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter ); 460 } 461 462 queueConnection.close(); 463 getLog().debug("session.recover() receive(timeout) passed"); 464 } 465 466 469 public void testQueueSessionRecoverReceiveNoWait() throws Exception 470 { 471 counter = 0; 472 473 getLog().debug("Starting session.recover() receiveNoWait test"); 474 475 476 477 connect(); 478 479 480 queueConnection.start(); 481 drainQueue(); 482 483 session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 484 Queue queue = (Queue )context.lookup(TEST_QUEUE); 485 QueueSender sender = session.createSender(queue); 486 487 for ( int i=0; i<20; i++ ) 489 { 490 sender.send(session.createObjectMessage(new Integer (i))); 491 } 492 493 session.close(); 495 queueConnection.stop(); 496 session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE ); 497 498 QueueReceiver receiver = session.createReceiver( queue ); 500 queueConnection.start(); 501 502 Message message = receiver.receiveNoWait(); 503 int messagecounter=0; 504 505 while( message != null ) 506 { 507 message = receiver.receiveNoWait(); 508 messagecounter++; 509 } 510 511 if ( messagecounter != 20 ) 512 { 513 throw new Exception ( "Not all sent messages were delivered! messagecounter=" + messagecounter ); 514 } 515 516 session.recover(); 518 519 message = receiver.receiveNoWait(); 520 messagecounter=0; 521 522 while( message != null ) 523 { 524 if ( !message.getJMSRedelivered() ) 525 { 526 throw new Exception ( "Message was not marked as redelivered! messagecounter=" + messagecounter ); 527 } 528 529 message.acknowledge(); 530 messagecounter++; 531 message = receiver.receiveNoWait(); 532 } 533 534 if ( messagecounter != 20 ) 535 { 536 throw new Exception ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter ); 537 } 538 539 queueConnection.close(); 540 getLog().debug("session.recover() receiveNoWait passed"); 541 } 542 543 public static junit.framework.Test suite() throws Exception 544 { 545 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 546 String resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ; 547 548 return getDeploySetup(JBossSessionRecoverUnitTestCase.class, 549 loader.getResource(resourceName).toString()); 550 } 551 } 552 | Popular Tags |