1 package org.jgroups.tests; 2 3 import junit.framework.TestCase; 4 import junit.framework.TestSuite; 5 import org.jgroups.Address; 6 import org.jgroups.stack.IpAddress; 7 import org.jgroups.util.FIFOMessageQueue; 8 import org.jgroups.util.Util; 9 10 import java.util.Collections ; 11 import java.util.LinkedList ; 12 import java.util.concurrent.BrokenBarrierException ; 13 import java.util.concurrent.CyclicBarrier ; 14 15 19 public class FIFOMessageQueueTest extends TestCase { 20 FIFOMessageQueue<String ,Integer > queue; 21 String s1="s1", s2="s2", s3="s3"; 22 private static final Address a1, a2; 23 24 static { 25 a1=new IpAddress(5000); 26 a2=new IpAddress(6000); 27 } 28 29 public void setUp() throws Exception { 30 super.setUp(); 31 queue=new FIFOMessageQueue<String ,Integer >(); 32 } 33 34 public void tearDown() throws Exception { 35 super.tearDown(); 36 } 37 38 39 public void testPollFromEmptyQueue() throws InterruptedException { 40 assertEquals(0, queue.size()); 41 Integer ret=queue.poll(5); 42 assertNull(ret); 43 assertEquals("queue.size() should be 0, but is " + queue.size(), 0, queue.size()); 44 } 45 46 47 public void testPutTwoTakeTwo() throws InterruptedException { 48 queue.put(a1, s1, 1); queue.put(a1, s1, 2); Integer ret=queue.poll(5); 51 assertNotNull(ret); 52 queue.done(a1, s1); queue.done(a1, s1); ret=queue.poll(5); 55 assertNotNull(ret); 56 assertEquals(0, queue.size()); 57 queue.put(a1, s1, 3); 58 assertEquals(1, queue.size()); 59 ret=queue.poll(5); assertNotNull(ret); 61 } 62 63 64 public void testTakeFollowedByPut() throws InterruptedException { 65 assertEquals(0, queue.size()); 66 67 new Thread () { 68 69 public void run() { 70 Util.sleep(1000); 71 try { 72 queue.put(a1, s1, 1); 73 } 74 catch(InterruptedException e) { 75 76 } 77 } 78 }.start(); 79 80 Integer ret=queue.take(); 81 assertNotNull(ret); 82 assertEquals(1, ret.intValue()); 83 assertEquals("queue.size() should be 0, but is " + queue.size(), 0, queue.size()); 84 } 85 86 87 public void testMultipleTakersOnePutter() throws Exception { 88 final CyclicBarrier barrier=new CyclicBarrier (11); 89 for(int i=0; i < 10; i++) { 90 new Thread () { 91 public void run() { 92 try { 93 barrier.await(); 94 queue.take(); 95 96 } 97 catch(Exception e) { 98 } 99 } 100 }.start(); 101 } 102 barrier.await(); 103 for(int i=0; i < 10; i++) { 104 queue.put(a1, s1, i); 105 queue.done(a1, s1); 106 } 107 Util.sleep(100); 108 assertEquals(0, queue.size()); 109 } 110 111 112 public void testConcurrentPutsAndTakes() throws InterruptedException { 113 final int NUM=10000; 114 final int print=NUM / 10; 115 116 Thread putter=new Thread () { 117 118 public void run() { 119 setName("Putter"); 120 int cnt=0; 121 for(int i=0; i < NUM; i++) { 122 try { 123 queue.put(a1, s1, i); 124 cnt++; 125 if(cnt % print == 0) { 126 System.out.println("Putter: " + cnt); 127 } 128 queue.done(a1, s1); 129 } 130 catch(InterruptedException e) { 131 e.printStackTrace(); 132 } 133 } 134 } 135 }; 136 137 Thread taker=new Thread () { 138 139 public void run() { 140 setName("Taker"); 141 int cnt=0; 142 for(int i=0; i < NUM; i++) { 143 try { 144 queue.take(); 145 cnt++; 146 if(cnt % print == 0) { 147 System.out.println("Taker: " + cnt); 148 } 149 } 150 catch(InterruptedException e) { 151 e.printStackTrace(); 152 } 153 } 154 } 155 }; 156 157 System.out.println("starting threads"); 158 taker.start(); 159 putter.start(); 160 161 new Thread () { 162 163 public void run() { 164 Util.sleep(3000); 165 System.out.println("queue:\n" + queue); 166 } 167 }.start(); 168 169 putter.join(); 170 taker.join(); 171 172 assertEquals(0, queue.size()); 173 } 174 175 176 public void testNullAddress() throws InterruptedException { 177 queue.put(null, s1, 1); 178 queue.put(a1, s1, 2); 179 queue.put(a1, s1, 3); 180 queue.put(null, s1, 4); 181 System.out.println("queue:\n" + queue); 182 183 Integer ret=queue.poll(5); 184 assertNotNull(ret); 185 assertEquals(1, ret.intValue()); 186 187 ret=queue.poll(5); 188 assertNotNull(ret); 189 assertEquals(2, ret.intValue()); 190 191 ret=queue.poll(5); 192 assertNotNull(ret); 193 assertEquals(4, ret.intValue()); 194 195 ret=queue.poll(5); 196 assertNull(ret); 197 198 queue.done(a1, s1); 199 ret=queue.poll(5); 200 assertNotNull(ret); 201 assertEquals(3, ret.intValue()); 202 203 ret=queue.poll(5); 204 assertNull(ret); 205 assertEquals(0, queue.size()); 206 } 207 208 209 public void testSimplePutAndTake() throws InterruptedException { 210 queue.put(a1, s1, 1); 211 assertEquals(1, queue.size()); 212 int ret=queue.take(); 213 assertEquals(1, ret); 214 assertEquals(0, queue.size()); 215 } 216 217 public void testSimplePutAndTakeMultipleSenders() throws InterruptedException { 218 queue.put(a1, s1, 1); 219 queue.put(a2, s1, 2); 220 System.out.println("queue is:\n" + queue); 221 assertEquals(2, queue.size()); 222 int ret=queue.take(); 223 assertEquals(1, ret); 224 assertEquals(1, queue.size()); 225 ret=queue.take(); 226 assertEquals(2, ret); 227 assertEquals(0, queue.size()); 228 } 229 230 public void testMultiplePutsAndTakes() throws InterruptedException { 231 for(int i=1; i <= 5; i++) 232 queue.put(a1, s1, i); 233 System.out.println("queue is " + queue); 234 assertEquals(5, queue.size()); 235 for(int i=1; i <= 5; i++) { 236 int ret=queue.take(); 237 assertEquals(i, ret); 238 assertEquals(5-i, queue.size()); 239 queue.done(a1, s1); 240 } 241 assertEquals(0, queue.size()); 242 } 243 244 245 248 public void testSameSenderSameDestination() throws InterruptedException { 249 queue.put(a1, s1, 1); 250 queue.put(a1, s1, 2); 251 queue.put(a1, s1, 3); 252 System.out.println("queue:\n" + queue); 253 254 assertEquals(3, queue.size()); 255 int ret=queue.take(); 256 257 assertEquals(1, ret); 258 Integer retval=queue.poll(100); 259 assertNull(retval); 260 queue.done(a1, s1); 261 System.out.println("queue:\n" + queue); 262 ret=queue.take(); 263 assertEquals(2, ret); 264 queue.done(a1, s1); 265 System.out.println("queue:\n" + queue); 266 ret=queue.take(); 267 System.out.println("queue:\n" + queue); 268 assertEquals(3, ret); 269 } 270 271 272 273 277 public void testSameSenderMultipleDestinations() throws InterruptedException { 278 queue.put(a1, s1, 10); 279 queue.put(a1, s1, 11); 280 queue.put(a1, s1, 12); 281 282 queue.put(a1, s2, 20); 283 queue.put(a1, s2, 21); 284 queue.put(a1, s2, 22); 285 286 queue.put(a1, s3, 30); 287 queue.put(a1, s3, 31); 288 queue.put(a1, s3, 32); 289 System.out.println("queue:\n" + queue); 290 Integer ret=queue.poll(5); 291 assertNotNull(ret); 292 assertEquals(10, ret.intValue()); 293 294 ret=queue.poll(5); 295 assertNotNull(ret); 296 assertEquals(20, ret.intValue()); 297 298 ret=queue.poll(5); 299 assertNotNull(ret); 300 assertEquals(30, ret.intValue()); 301 302 ret=queue.poll(5); 303 assertNull(ret); 304 305 queue.done(a1, s3); 306 queue.done(a1, s1); 307 queue.done(a1, s2); 308 309 ret=queue.poll(5); 310 assertNotNull(ret); 311 assertEquals(31, ret.intValue()); 312 313 ret=queue.poll(5); 314 assertNotNull(ret); 315 assertEquals(11, ret.intValue()); 316 317 ret=queue.poll(5); 318 assertNotNull(ret); 319 assertEquals(21, ret.intValue()); 320 321 ret=queue.poll(5); 322 assertNull(ret); 323 324 assertEquals(3, queue.size()); 325 326 ret=queue.poll(5); 327 assertNull(ret); 328 329 queue.done(a1, s1); 330 queue.done(a1, s3); 331 queue.done(a1, s2); 332 333 ret=queue.poll(5); 334 assertNotNull(ret); 335 assertEquals(12, ret.intValue()); 336 337 ret=queue.poll(5); 338 assertNotNull(ret); 339 assertEquals(32, ret.intValue()); 340 341 ret=queue.poll(5); 342 assertNotNull(ret); 343 assertEquals(22, ret.intValue()); 344 345 ret=queue.poll(5); 346 assertNull(ret); 347 348 assertEquals(0, queue.size()); 349 } 350 351 352 356 public void testDifferentSendersSameDestination() throws InterruptedException { 357 queue.put(a1, s1, 10); 358 queue.put(a2, s1, 20); 359 queue.put(a1, s1, 11); 360 queue.put(a2, s1, 21); 361 System.out.println("queue:\n" + queue); 362 assertEquals(4, queue.size()); 363 364 Integer ret=queue.poll(5); 365 assertNotNull(ret); 366 assertEquals(10, ret.intValue()); 367 368 ret=queue.poll(5); 369 assertNotNull(ret); 370 assertEquals(20, ret.intValue()); 371 372 queue.done(a1, s1); 373 ret=queue.poll(5); 374 assertNotNull(ret); 375 assertEquals(11, ret.intValue()); 376 377 queue.done(a2, s1); 378 ret=queue.poll(5); 379 assertNotNull(ret); 380 assertEquals(21, ret.intValue()); 381 382 ret=queue.poll(5); 383 assertNull(ret); 384 assertEquals(0, queue.size()); 385 } 386 387 388 389 392 public void testDifferentSendersDifferentDestinations() throws Exception { 393 queue.put(a1, s1, 1); 394 queue.put(a2, s2, 2); 395 queue.put(a1, s2, 3); 396 queue.put(a2, s1, 4); 397 System.out.println("queue:\n" + queue); 398 assertEquals(4, queue.size()); 399 400 Integer ret=queue.poll(5); 401 assertNotNull(ret); 402 assertEquals(1, ret.intValue()); 403 404 ret=queue.poll(5); 405 assertNotNull(ret); 406 assertEquals(2, ret.intValue()); 407 408 ret=queue.poll(5); 409 assertNotNull(ret); 410 assertEquals(3, ret.intValue()); 411 412 ret=queue.poll(5); 413 assertNotNull(ret); 414 assertEquals(4, ret.intValue()); 415 416 ret=queue.poll(5); 417 assertNull(ret); 418 assertEquals(0, queue.size()); 419 420 } 421 422 423 424 public void testDifferentSendersDifferentDestinationsMultipleMessages() throws Exception { 425 queue.put(a1, s1, 1); 426 queue.put(a2, s2, 2); 427 queue.put(a1, s2, 3); 428 queue.put(a2, s1, 4); 429 430 queue.put(a1, s1, 5); 431 queue.put(a2, s2, 6); 432 queue.put(a1, s2, 7); 433 queue.put(a2, s1, 8); 434 435 System.out.println("queue:\n" + queue); 436 assertEquals(8, queue.size()); 437 438 Integer ret=queue.poll(5); 439 assertNotNull(ret); 440 assertEquals(1, ret.intValue()); 441 442 ret=queue.poll(5); 443 assertNotNull(ret); 444 assertEquals(2, ret.intValue()); 445 446 ret=queue.poll(5); 447 assertNotNull(ret); 448 assertEquals(3, ret.intValue()); 449 450 ret=queue.poll(5); 451 assertNotNull(ret); 452 assertEquals(4, ret.intValue()); 453 454 455 queue.done(a1, s1); 456 ret=queue.poll(5); 457 assertNotNull(ret); 458 assertEquals(5, ret.intValue()); 459 460 queue.done(a2, s2); 461 ret=queue.poll(5); 462 assertNotNull(ret); 463 assertEquals(6, ret.intValue()); 464 465 queue.done(a1, s2); 466 ret=queue.poll(5); 467 assertNotNull(ret); 468 assertEquals(7, ret.intValue()); 469 470 queue.done(a2, s1); 471 ret=queue.poll(5); 472 assertNotNull(ret); 473 assertEquals(8, ret.intValue()); 474 } 475 476 477 478 public void testOrdering() throws InterruptedException { 479 for(int i=1; i <= 3; i++) 480 queue.put(a1, s1, i); 481 assertEquals(3, queue.size()); 482 483 int ret=queue.take(); 484 assertEquals(1, ret); 485 assertEquals(2, queue.size()); 486 487 queue.done(a1, s1); 488 queue.put(a1, s1, 4); 489 queue.put(a1, s1, 5); 490 System.out.println("queue: " + queue); 491 492 for(int i=2; i <= 5; i++) { 493 ret=queue.take(); 494 assertEquals(i, ret); 495 assertEquals(5-i, queue.size()); 496 queue.done(a1, s1); 497 } 498 assertEquals(0, queue.size()); 499 } 500 501 502 public void testOrderingMultipleThreads() throws BrokenBarrierException , InterruptedException { 503 CyclicBarrier barrier=new CyclicBarrier (4); 504 int NUM=500; 505 Producer p1=new Producer(queue, "s1", 1, NUM, barrier); 506 Producer p2=new Producer(queue, "s2", 1001, NUM, barrier); 507 Producer p3=new Producer(queue, "s3", 2001, NUM, barrier); 508 509 p1.start(); 510 p2.start(); 511 p3.start(); 512 Util.sleep(100); 513 barrier.await(); 515 p1.join(); 516 p2.join(); 517 p3.join(); 518 System.out.println("queue: " + queue.size() + " elements"); 519 assertEquals(NUM * 3, queue.size()); 520 } 521 522 public void testOrderingMultipleThreadsWithTakes() throws BrokenBarrierException , InterruptedException { 523 testOrderingMultipleThreads(); 524 int ret; 525 LinkedList <Integer > list=new LinkedList <Integer >(); 526 527 int size=queue.size(); 528 for(int i=0; i < size; i++) { 529 ret=queue.take(); 530 list.add(ret); 531 queue.done(a1, "s1"); 532 queue.done(a1, "s2"); 533 queue.done(a1, "s3"); 534 } 535 536 System.out.println("analyzing returned values for correct ordering"); 537 LinkedList <Integer > one=new LinkedList <Integer >(), two=new LinkedList <Integer >(), three=new LinkedList <Integer >(); 538 for(int val: list) { 539 if(val < 1000) { 540 one.add(val); 541 continue; 542 } 543 if(val > 1000 && val <= 2000) { 544 two.add(val); 545 continue; 546 } 547 if(val > 2000) { 548 three.add(val); 549 } 550 } 551 552 int len=one.size(); 553 assertEquals(len, two.size()); 554 assertEquals(len, three.size()); 555 556 557 LinkedList <Integer > sorted_one=new LinkedList <Integer >(one); 558 Collections.sort(sorted_one); 559 assertEquals("one: " + one + ", sorted: " + sorted_one, one, sorted_one); 560 561 LinkedList <Integer > sorted_two=new LinkedList <Integer >(two); 562 Collections.sort(sorted_two); 563 assertEquals("two: " + two + ", sorted: " + sorted_two, two, sorted_two); 564 565 LinkedList <Integer > sorted_three=new LinkedList <Integer >(three); 566 Collections.sort(sorted_three); 567 assertEquals("three: " + three + ", sorted: " + sorted_three, three, sorted_three); 568 569 System.out.println("OK - all 3 collections are ordered"); 570 } 571 572 573 574 private static class Producer extends Thread { 575 private FIFOMessageQueue<String ,Integer > queue; 576 private String key; 577 private int num_msgs; 578 private CyclicBarrier barrier; 579 private int start_num; 580 581 private Producer(FIFOMessageQueue<String ,Integer > queue, String key, int start_num, int num_msgs, CyclicBarrier barrier) { 582 this.queue=queue; 583 this.key=key; 584 this.start_num=start_num; 585 this.num_msgs=num_msgs; 586 this.barrier=barrier; 587 } 588 589 590 public void run() { 591 try { 592 barrier.await(); 593 } 594 catch(Exception e) { 595 e.printStackTrace(); 596 } 597 for(int i=start_num; i <= num_msgs+start_num-1; i++) { 598 try { 599 queue.put(a1, key, i); 601 } 602 catch(InterruptedException e) { 603 e.printStackTrace(); 604 } 605 } 606 } 607 } 608 609 610 public static junit.framework.Test suite() { 611 return new TestSuite(FIFOMessageQueueTest.class); 612 } 613 614 public static void main(String [] args) { 615 junit.textui.TestRunner.run(suite()); 616 } 617 } 618 | Popular Tags |