1 package net.walend.somnifugi.test; 2 3 import java.util.List ; 4 import java.util.ArrayList ; 5 import java.util.Hashtable ; 6 7 import javax.naming.InitialContext ; 8 import javax.naming.Context ; 9 import javax.naming.NamingException ; 10 11 import javax.jms.QueueConnectionFactory ; 12 import javax.jms.QueueConnection ; 13 import javax.jms.Queue ; 14 import javax.jms.QueueSession ; 15 import javax.jms.QueueSender ; 16 import javax.jms.Message ; 17 import javax.jms.ObjectMessage ; 18 import javax.jms.BytesMessage ; 19 import javax.jms.QueueReceiver ; 20 import javax.jms.Session ; 21 import javax.jms.JMSException ; 22 import javax.jms.MessageListener ; 23 import javax.jms.QueueRequestor ; 24 import javax.jms.Topic ; 25 import javax.jms.TopicConnection ; 26 import javax.jms.TopicConnectionFactory ; 27 import javax.jms.TopicPublisher ; 28 import javax.jms.TopicSession ; 29 import javax.jms.TopicSubscriber ; 30 31 import junit.framework.TestSuite; 32 import junit.framework.Test; 33 34 import net.walend.toolkit.junit.TestCase; 35 36 import net.walend.somnifugi.SomniInterruptedException; 37 38 45 46 public class FlowLimitTests extends TestCase 47 { 48 public FlowLimitTests(String testName) 49 { 50 super(testName); 51 } 52 53 54 protected class ObjectSender 55 implements Runnable 56 { 57 private int delay; 58 private QueueSession session; 59 private QueueSender sender; 60 private Exception exception = null; 61 private volatile Thread thread = null; 62 63 public ObjectSender(QueueSession session, QueueSender sender,int delay) 64 { 65 this.sender = sender; 66 this.session = session; 67 this.delay = delay; 68 } 69 70 public void run() 71 { 72 thread = Thread.currentThread(); 73 try 74 { 75 int i = 0; 76 while(!thread.isInterrupted()) 77 { 78 byte[] bytes = new byte[1000]; 79 if ( i++ % 100 == 0 ) { 80 System.out.println("Sent : " + i ); 81 } 82 Message message = session.createObjectMessage(bytes); 83 sender.send(message); 84 if ( delay > 0 ) { 85 Thread.currentThread().sleep(delay); 86 } 87 } 88 } 89 catch(InterruptedException ie) 90 { 91 } 93 catch(SomniInterruptedException ie) 94 { 95 } 97 catch(Exception e) 98 { 99 exception = e; 100 } 101 } 102 103 public void stop() { 104 thread.interrupt(); 105 } 106 107 public Exception getException() { 108 return exception; 109 } 110 } 111 112 protected class ObjectPublisher 113 implements Runnable 114 { 115 private int delay; 116 private TopicSession session; 117 private TopicPublisher sender; 118 private Exception exception = null; 119 private volatile Thread thread = null; 120 121 public ObjectPublisher(TopicSession session, TopicPublisher sender,int delay) 122 { 123 this.sender = sender; 124 this.session = session; 125 this.delay = delay; 126 } 127 128 public void run() 129 { 130 thread = Thread.currentThread(); 131 try 132 { 133 int i = 0; 134 while(!thread.isInterrupted()) 135 { 136 byte[] bytes = new byte[1000]; 137 if ( i++ % 100 == 0 ) { 138 System.out.println("Sent : " + i ); 139 } 140 Message message = session.createObjectMessage(bytes); 141 sender.publish(message); 142 if ( delay > 0 ) { 143 Thread.currentThread().sleep(delay); 144 } 145 } 146 } 147 catch(InterruptedException ie) 148 { 149 } 151 catch(SomniInterruptedException ie) 152 { 153 } 155 catch(Exception e) 156 { 157 exception = e; 158 } 159 } 160 161 public void stop() { 162 thread.interrupt(); 163 } 164 165 public Exception getException() { 166 return exception; 167 } 168 } 169 170 protected class ObjectReceiver 171 implements Runnable 172 { 173 private QueueSession session; 174 private QueueReceiver receiver; 175 private long delay; 176 private Exception exception = null; 177 private volatile Thread thread = null; 178 179 public ObjectReceiver(QueueSession session,QueueReceiver receiver,long delay) 180 { 181 this.session = session; 182 this.receiver = receiver; 183 this.delay = delay; 184 } 185 186 public void run() 187 { 188 thread = Thread.currentThread(); 189 try 190 { 191 int i = 0; 192 while(!thread.isInterrupted()){ 193 ObjectMessage message = (ObjectMessage )receiver.receive(100); 194 if ( message == null ) { 195 System.out.println("Receive timeout"); 196 } else { 197 if ( i++ % 100 == 0 ) { 198 System.out.println("Received : " + i ); 199 } 200 if ( delay > 0 ) { 201 Thread.currentThread().sleep(delay); 202 } 203 } 204 } 205 } 206 catch(InterruptedException ie) 207 { 208 } 210 catch(SomniInterruptedException ie) 211 { 212 } 214 catch(Exception e) 215 { 216 exception = e; 217 } 218 } 219 220 public void stop() { 221 thread.interrupt(); 222 } 223 224 public Exception getException() { 225 return exception; 226 } 227 } 228 229 protected class ObjectSubscriber 230 implements Runnable 231 { 232 private TopicSession session; 233 private TopicSubscriber receiver; 234 private long delay; 235 private Exception exception = null; 236 private volatile Thread thread = null; 237 238 public ObjectSubscriber(TopicSession session,TopicSubscriber receiver,long delay) 239 { 240 this.session = session; 241 this.receiver = receiver; 242 this.delay = delay; 243 } 244 245 public void run() 246 { 247 thread = Thread.currentThread(); 248 try 249 { 250 int i = 0; 251 while(!thread.isInterrupted()){ 252 ObjectMessage message = (ObjectMessage )receiver.receive(100); 253 if ( message == null ) { 254 System.out.println("Receive timeout"); 255 } else { 256 if ( i++ % 100 == 0 ) { 257 System.out.println("Received : " + i ); 258 } 259 if ( delay > 0 ) { 260 Thread.currentThread().sleep(delay); 261 } 262 } 263 } 264 } 265 catch(InterruptedException ie) 266 { 267 } 269 catch(SomniInterruptedException ie) 270 { 271 } 273 catch(Exception e) 274 { 275 exception = e; 276 } 277 } 278 279 public void stop() { 280 thread.interrupt(); 281 } 282 283 public Exception getException() { 284 return exception; 285 } 286 } 287 288 protected class TestMessageListener 289 implements MessageListener 290 { 291 int i = 0; 292 long delay; 293 294 protected TestMessageListener(long delay) 295 { 296 this.delay = delay; 297 } 298 299 public void onMessage(Message message) 300 { 301 ObjectMessage om = (ObjectMessage )message; 302 303 if ( i++ % 100 == 0 ) { 304 System.out.println("OnMessage : " + i ); 305 } 306 try 307 { 308 Thread.sleep(delay); 309 } 310 catch(InterruptedException ie) 311 { 312 Thread.currentThread().interrupt(); 313 } 314 } 315 } 316 317 318 public void testOverflow() 319 { 320 try 321 { 322 Hashtable <String ,String > env = new Hashtable <String ,String >(11); 324 325 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 326 env.put(Context.PROVIDER_URL,"<not-used>"); 327 334 335 Context ctx = new InitialContext (env); 337 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 338 connection.start(); 339 Queue queue = (Queue )ctx.lookup("testOverflowQueue"); 340 341 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 342 QueueSender sender = session.createSender(queue); 343 344 ObjectSender senderHelper = new ObjectSender(session, sender,0); 345 Thread sendThread = new Thread (senderHelper); 346 sendThread.start(); 347 348 Thread.currentThread().sleep(30000); 349 senderHelper.stop(); 350 sendThread.join(1000); 351 if ( senderHelper.getException() != null ) { 352 fail(senderHelper.getException()); 353 } 354 355 sender.close(); 356 session.close(); 357 connection.close(); 358 } 359 catch(Exception e) 360 { 361 fail(e); 362 } 363 } 364 365 public void testQueueFlowlimit() 366 { 367 try 368 { 369 Hashtable <String ,String > env = new Hashtable <String ,String >(11); 371 372 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 373 env.put(Context.PROVIDER_URL,"<not-used>"); 374 env.put("testFlowLimitQueue.ChannelFactoryClassName", "net.walend.somnifugi.TimeoutChannelFactory"); 375 env.put("testFlowLimitQueue.timeout", "1000"); 376 env.put("wrapped-testFlowLimitQueue.ChannelFactoryClassName", "net.walend.somnifugi.juc.SimpleChannelFactory"); 377 env.put("testFlowLimitQueue.capacity", "100"); 378 379 Context ctx = new InitialContext (env); 381 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 382 connection.start(); 383 Queue queue = (Queue )ctx.lookup("testFlowLimitQueue"); 384 385 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 386 QueueSender sender = session.createSender(queue); 387 388 QueueSession session2 = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 389 QueueReceiver receiver = session2.createReceiver(queue); 390 391 ObjectSender senderHelper = new ObjectSender(session, sender,0); 392 Thread sendThread = new Thread (senderHelper); 393 sendThread.start(); 394 395 ObjectReceiver receiverHelper = new ObjectReceiver(session2,receiver,100); 396 Thread receiveThread = new Thread (receiverHelper); 397 receiveThread.start(); 398 399 Thread.currentThread().sleep(30000); 400 senderHelper.stop(); 401 receiverHelper.stop(); 402 sendThread.join(1000); 403 receiveThread.join(1000); 404 if ( senderHelper.getException() != null ) { 405 fail(senderHelper.getException()); 406 } 407 if ( receiverHelper.getException() != null ) { 408 fail(receiverHelper.getException()); 409 } 410 411 sender.close(); 412 receiver.close(); 413 session.close(); 414 session2.close(); 415 connection.close(); 416 } 417 catch(Exception e) 418 { 419 fail(e); 420 } 421 } 422 423 424 public void testQueueFlowlimitMessageListener() 425 { 426 try 427 { 428 Hashtable <String ,String > env = new Hashtable <String ,String >(11); 430 431 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 432 env.put(Context.PROVIDER_URL,"<not-used>"); 433 env.put("testFlowLimitQueue.ChannelFactoryClassName", "net.walend.somnifugi.TimeoutChannelFactory"); 434 env.put("testFlowLimitQueue.timeout", "1000"); 435 env.put("wrapped-testFlowLimitQueue.ChannelFactoryClassName", "net.walend.somnifugi.juc.SimpleChannelFactory"); 436 env.put("testFlowLimitQueue.capacity", "100"); 437 438 Context ctx = new InitialContext (env); 440 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 441 connection.start(); 442 Queue queue = (Queue )ctx.lookup("testFlowLimitQueue"); 443 444 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 445 QueueSender sender = session.createSender(queue); 446 447 TestMessageListener messageListener = new TestMessageListener(100); 448 QueueSession session2 = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 449 QueueReceiver receiver = session2.createReceiver(queue); 450 receiver.setMessageListener(messageListener); 451 452 ObjectSender senderHelper = new ObjectSender(session, sender,0); 453 Thread sendThread = new Thread (senderHelper); 454 sendThread.start(); 455 456 Thread.currentThread().sleep(30000); 457 senderHelper.stop(); 458 sendThread.join(1000); 459 if ( senderHelper.getException() != null ) { 460 fail(senderHelper.getException()); 461 } 462 463 sender.close(); 464 receiver.close(); 466 session.close(); 467 session2.close(); 468 connection.close(); 469 } 470 catch(Exception e) 471 { 472 fail(e); 473 } 474 } 475 476 public void testTopicFlowlimit() 477 { 478 try 479 { 480 Hashtable <String ,String > env = new Hashtable <String ,String >(11); 482 483 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniTopicContextFactory"); 484 env.put(Context.PROVIDER_URL,"<not-used>"); 485 env.put("testFlowLimitTopic.ChannelFactoryClassName", "net.walend.somnifugi.TimeoutChannelFactory"); 486 env.put("testFlowLimitTopic.timeout", "1000"); 487 env.put("wrapped-testFlowLimitTopic.ChannelFactoryClassName", "net.walend.somnifugi.juc.SimpleChannelFactory"); 488 env.put("testFlowLimitTopic.capacity", "100"); 489 490 Context ctx = new InitialContext (env); 492 TopicConnection connection = (TopicConnection )ctx.lookup("Connection"); 493 connection.start(); 494 Topic topic = (Topic )ctx.lookup("testFlowLimitTopic"); 495 496 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 497 TopicPublisher sender = session.createPublisher(topic); 498 499 TopicSession session2 = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 500 TopicSubscriber subscriber = session2.createSubscriber(topic); 501 502 ObjectPublisher senderHelper = new ObjectPublisher(session, sender,0); 503 Thread sendThread = new Thread (senderHelper); 504 sendThread.start(); 505 506 ObjectSubscriber receiverHelper = new ObjectSubscriber(session2,subscriber,100); 507 Thread receiveThread = new Thread (receiverHelper); 508 receiveThread.start(); 509 510 Thread.currentThread().sleep(30000); 511 senderHelper.stop(); 512 receiverHelper.stop(); 513 sendThread.join(1000); 514 receiveThread.join(1000); 515 if ( senderHelper.getException() != null ) { 516 fail(senderHelper.getException()); 517 } 518 if ( receiverHelper.getException() != null ) { 519 fail(receiverHelper.getException()); 520 } 521 522 sender.close(); 523 subscriber.close(); 525 session.close(); 526 session2.close(); 527 connection.close(); 528 } 529 catch(Exception e) 530 { 531 fail(e); 532 } 533 } 534 535 536 public static Test suite() 537 { 538 TestSuite suite = new TestSuite() ; 539 540 543 suite.addTest(new FlowLimitTests("testQueueFlowlimit")); 545 suite.addTest(new FlowLimitTests("testTopicFlowlimit")); 546 suite.addTest(new FlowLimitTests("testQueueFlowlimitMessageListener")); 547 548 return suite; 549 } 550 } 551 552 553 573 574 575 | Popular Tags |