1 3 package org.jgroups.tests; 4 5 6 import junit.framework.TestCase; 7 import org.jgroups.TimeoutException; 8 import org.jgroups.util.Queue; 9 import org.jgroups.util.QueueClosedException; 10 import org.jgroups.util.Util; 11 12 import java.util.LinkedList ; 13 import java.util.ArrayList ; 14 15 16 public class QueueTest extends TestCase { 17 private Queue queue=null; 18 19 public QueueTest(String Name_) { 20 super(Name_); 21 } 22 23 public void setUp() throws Exception { 24 super.setUp(); 25 queue=new Queue(); 26 } 27 28 29 public void tearDown() throws Exception { 30 super.tearDown(); 31 if(queue != null) { 32 queue.reset(); 33 } 34 } 35 36 37 public void testQueue() { 38 try { 39 queue.add("Q1"); 40 queue.add("Q2"); 41 queue.add("Q3"); 42 43 assertEquals("Q1", queue.peek()); 44 assertEquals("Q1", queue.remove()); 45 46 assertEquals("Q2", queue.peek()); 47 assertEquals("Q2", queue.remove()); 48 49 queue.addAtHead("Q4"); 50 queue.add("Q5"); 51 assertEquals("Q4", queue.peek()); 52 assertEquals("Q4", queue.remove()); 53 54 queue.close(true); 55 56 try { 57 queue.add("Q6"); 58 fail("should not get here"); 59 } 60 catch(org.jgroups.util.QueueClosedException qc) { 61 assertTrue(true); 62 } 63 64 int size=queue.size(); 65 queue.removeElement("Q5"); 66 assertEquals((size - 1), queue.size()); 67 68 assertEquals("Q3", queue.peek()); 69 assertEquals("Q3", queue.remove()); 70 assertTrue(queue.closed()); 71 System.out.println("Everything is ok"); 72 } 73 catch(Exception x) { 74 System.out.println(x); 75 fail(); 76 } 77 } 78 79 80 public void testCloseWithoutFlush() { 81 queue.close(false); 82 try { 83 queue.remove(); 84 fail("we should have gotten a QueueClosedException trying to remove an element from a closed queue"); 85 } 86 catch(QueueClosedException e) { 87 assertTrue("queue is closed, this is okay", queue.closed()); 88 } 89 } 90 91 92 public void testCloseWithFlush() { 93 queue.close(true); 94 try { 95 queue.remove(); 96 fail("we should have gotten a QueueClosedException trying to remove an element from a closed queue"); 97 } 98 catch(QueueClosedException e) { 99 assertTrue("queue is closed, this is okay", queue.closed()); 100 } 101 } 102 103 104 public void testCloseWithFlush2() throws QueueClosedException { 105 queue.add(new Integer (1)); 106 queue.add(new Integer (2)); 107 queue.add(new Integer (3)); 108 queue.close(true); 109 try { 110 for(int i=1; i <= 3; i++) { 111 Object obj=queue.remove(); 112 assertNotNull(obj); 113 assertEquals(obj, new Integer (i)); 114 } 115 queue.remove(); 116 fail("we should have gotten a QueueClosedException trying to remove an element from a closed queue"); 117 } 118 catch(QueueClosedException e) { 119 assertTrue("queue is closed, this is okay", queue.closed()); 120 } 121 } 122 123 124 public void testValues() throws QueueClosedException { 125 queue.add(new Integer (1)); 126 queue.add(new Integer (3)); 127 queue.add(new Integer (99)); 128 queue.add(new Integer (8)); 129 System.out.println("queue: " + Util.dumpQueue(queue)); 130 int size=queue.size(); 131 assertEquals(4, size); 132 LinkedList values=queue.values(); 133 assertEquals(size, values.size()); 134 } 135 136 137 public void testLargeInsertion() { 138 String element="MyElement"; 139 long start, stop; 140 141 try { 142 System.out.println("Inserting 100000 elements"); 143 start=System.currentTimeMillis(); 144 for(int i=0; i < 100000; i++) 145 queue.add(element); 146 stop=System.currentTimeMillis(); 147 System.out.println("Took " + (stop - start) + " msecs"); 148 149 System.out.println("Removing 100000 elements"); 150 start=System.currentTimeMillis(); 151 while(queue.size() > 0) 152 queue.remove(); 153 stop=System.currentTimeMillis(); 154 System.out.println("Took " + (stop - start) + " msecs"); 155 } 156 catch(Exception ex) { 157 System.err.println(ex); 158 fail(); 159 } 160 } 161 162 163 public void testEmptyQueue() { 164 assertNull(queue.getFirst()); 165 assertNull(queue.getLast()); 166 assertEquals(queue.getFirst(), queue.getLast()); } 168 169 public void testAddAll() throws QueueClosedException { 170 ArrayList l=new ArrayList (); 171 l.add("one"); 172 l.add("two"); 173 l.add("three"); 174 queue.addAll(l); 175 System.out.println("queue is " + queue); 176 assertEquals(3, queue.size()); 177 assertEquals("one", queue.remove()); 178 assertEquals(2, queue.size()); 179 assertEquals("two", queue.remove()); 180 assertEquals(1, queue.size()); 181 assertEquals("three", queue.remove()); 182 assertEquals(0, queue.size()); 183 } 184 185 public void testInsertionAndRemoval() throws Exception { 186 String s1="Q1", s2="Q2"; 187 188 queue.add(s1); 189 assertTrue(queue.getFirst() != null); 190 assertTrue(queue.getLast() != null); 191 assertEquals(queue.getFirst(), queue.getLast()); 192 193 queue.add(s2); 194 assertTrue(queue.getFirst() != queue.getLast()); 195 196 Object o1=queue.peek(); 197 Object o2=queue.getFirst(); 198 199 System.out.println("o1=" + o1 + ", o2=" + o2 + ", o1.equals(o2)=" + o1.equals(o2)); 200 201 assertEquals(queue.peek(), queue.getFirst()); 202 queue.remove(); 203 204 assertEquals(1, queue.size()); 205 assertEquals(queue.getFirst(), queue.getLast()); 206 queue.remove(); 207 208 assertEquals(0, queue.size()); 209 assertTrue(queue.getFirst() == null); 210 assertTrue(queue.getLast() == null); 211 } 212 213 214 public void testWaitUntilClosed() { 215 queue.close(true); 216 queue.waitUntilClosed(0); 217 assertEquals(0, queue.size()); 218 } 219 220 public void testWaitUntilClosed2() { 221 queue.close(true); 222 try { 223 queue.peek(); 224 fail("peek() should throw a QueueClosedException"); 225 } 226 catch(QueueClosedException e) { 227 assertTrue(e != null); 228 } 229 assertEquals(0, queue.size()); 230 } 231 232 public void testWaitUntilClosed3() throws QueueClosedException { 233 queue.add("one"); 234 queue.close(true); 235 Object obj=queue.peek(); 236 assertEquals("one", obj); 237 assertEquals(1, queue.size()); 238 queue.remove(); 239 try { 240 queue.peek(); 241 fail("peek() should throw a QueueClosedException"); 242 } 243 catch(QueueClosedException e) { 244 assertTrue(e != null); 245 } 246 assertEquals(0, queue.size()); 247 } 248 249 public void testWaitUntilClosed4() throws QueueClosedException { 250 for(int i=0; i < 10; i++) 251 queue.add(new Integer (i)); 252 new Thread () { 253 public void run() { 254 while(!queue.closed()) { 255 try { 256 System.out.println("-- removed " + queue.remove()); 257 Util.sleep(200); 258 } 259 catch(QueueClosedException e) { 260 break; 261 } 262 } 263 } 264 }.start(); 265 queue.close(true); 266 queue.waitUntilClosed(0); 267 assertEquals(0, queue.size()); 268 } 269 270 271 public void testWaitUntilClosed5() throws QueueClosedException { 272 for(int i=0; i < 10; i++) 273 queue.add(new Integer (i)); 274 new Thread () { 275 public void run() { 276 while(!queue.closed()) { 277 try { 278 System.out.println("-- removed " + queue.remove()); 279 Util.sleep(200); 280 } 281 catch(QueueClosedException e) { 282 System.out.println("-- queue is closed, cannot remove element"); 283 break; 284 } 285 } 286 } 287 }.start(); 288 289 Util.sleep(600); 290 queue.close(false); 291 queue.waitUntilClosed(0); 292 assertTrue(queue.size() > 0); 293 } 294 295 296 297 public void testRemoveElementNoElement() { 298 String s1="Q1"; 299 300 try { 301 queue.removeElement(s1); 302 assertFalse(queue.closed()); 303 assertEquals(0, queue.size()); 304 } 305 catch(QueueClosedException ex) { 306 fail(ex.toString()); 307 } 308 } 309 310 311 public void testRemoveElementOneElement() { 312 String s1="Q1"; 313 314 try { 315 queue.add(s1); 316 queue.removeElement(s1); 317 assertEquals(0, queue.size()); 318 assertTrue(queue.getFirst() == null); 319 assertTrue(queue.getLast() == null); 320 } 321 catch(QueueClosedException ex) { 322 fail(ex.toString()); 323 } 324 } 325 326 public void testRemoveElementTwoElementsFirstFound() { 327 String s1="Q1", s2="Q2"; 328 329 try { 330 queue.add(s1); 331 queue.add(s2); 332 queue.removeElement(s1); 333 assertEquals(1, queue.size()); 334 assertEquals(queue.getFirst(), s2); 335 assertEquals(queue.getLast(), s2); 336 assertEquals(queue.getFirst(), queue.getLast()); 337 } 338 catch(QueueClosedException ex) { 339 fail(ex.toString()); 340 } 341 } 342 343 public void testRemoveElementTwoElementsSecondFound() { 344 String s1="Q1", s2="Q2"; 345 346 try { 347 queue.add(s1); 348 queue.add(s2); 349 queue.removeElement(s2); 350 assertEquals(1, queue.size()); 351 assertEquals(queue.getFirst(), s1); 352 assertEquals(queue.getLast(), s1); 353 assertEquals(queue.getFirst(), queue.getLast()); 354 } 355 catch(QueueClosedException ex) { 356 fail(ex.toString()); 357 } 358 } 359 360 public void testRemoveElementThreeElementsFirstFound() { 361 String s1="Q1", s2="Q2", s3="Q3"; 362 363 try { 364 queue.add(s1); 365 queue.add(s2); 366 queue.add(s3); 367 queue.removeElement(s1); 368 assertEquals(2, queue.size()); 369 assertEquals(queue.getFirst(), s2); 370 assertEquals(queue.getLast(), s3); 371 } 372 catch(QueueClosedException ex) { 373 fail(ex.toString()); 374 } 375 } 376 377 public void testRemoveElementThreeElementsSecondFound() { 378 String s1="Q1", s2="Q2", s3="Q3"; 379 380 try { 381 queue.add(s1); 382 queue.add(s2); 383 queue.add(s3); 384 queue.removeElement(s2); 385 assertEquals(2, queue.size()); 386 assertEquals(queue.getFirst(), s1); 387 assertEquals(queue.getLast(), s3); 388 } 389 catch(QueueClosedException ex) { 390 fail(ex.toString()); 391 } 392 } 393 394 public void testRemoveElementThreeElementsThirdFound() { 395 String s1="Q1", s2="Q2", s3="Q3"; 396 397 try { 398 queue.add(s1); 399 queue.add(s2); 400 queue.add(s3); 401 queue.removeElement(s3); 402 assertEquals(2, queue.size()); 403 assertEquals(queue.getFirst(), s1); 404 assertEquals(queue.getLast(), s2); 405 } 406 catch(QueueClosedException ex) { 407 fail(ex.toString()); 408 } 409 } 410 411 412 public void testRemoveAndClose() { 413 try { 414 new Thread () { 415 public void run() { 416 Util.sleep(1000); 417 queue.close(true); } 419 }.start(); 420 421 queue.remove(); 422 fail("we should not be able to remove an object from a closed queue"); 423 } 424 catch(QueueClosedException ex) { 425 assertTrue(ex instanceof QueueClosedException); } 427 } 428 429 430 public void testRemoveAndCloseWithTimeout() throws TimeoutException { 431 try { 432 new Thread () { 433 public void run() { 434 Util.sleep(1000); 435 queue.close(true); } 437 }.start(); 438 439 queue.remove(5000); 440 fail("we should not be able to remove an object from a closed queue"); 441 } 442 catch(QueueClosedException ex) { 443 assertTrue(ex instanceof QueueClosedException); } 445 catch(TimeoutException timeout) { 446 fail("we should not get a TimeoutException, but a QueueClosedException here"); 447 } 448 } 449 450 451 public void testInterruptAndRemove() throws QueueClosedException { 452 Thread.currentThread().interrupt(); 453 Object el=null; 454 try { 455 el=queue.remove(2000); 456 fail("we should not get here"); 457 } 458 catch(TimeoutException e) { 459 assertNull(el); 460 } 461 } 462 463 464 public void testRemoveAndInterrupt() { 465 466 Thread closer=new Thread () { 467 public void run() { 468 Util.sleep(1000); 469 System.out.println("-- closing queue"); 470 queue.close(false); 471 } 472 }; 473 closer.start(); 474 475 System.out.println("-- removing element"); 476 try { 477 queue.remove(); 478 fail("we should not get here, as the queue is closed"); 479 } 480 catch(QueueClosedException e) { 481 System.out.println("-- received queue closed exception - as expected"); 482 } 483 484 } 485 486 public void testClear() throws QueueClosedException { 487 queue.add("one"); 488 queue.add("two"); 489 assertEquals(2, queue.size()); 490 queue.close(true); 491 assertEquals(2, queue.size()); 492 queue.clear(); 493 assertEquals(0, queue.size()); 494 queue=new Queue(); 495 queue.add("one"); 496 queue.add("two"); 497 queue.clear(); 498 assertEquals(0, queue.size()); 499 queue.add("one"); 500 queue.add("two"); 501 assertEquals(2, queue.size()); 502 queue.clear(); 503 assertEquals(0, queue.size()); 504 } 505 506 507 598 599 601 public void testBarrier() { 602 RemoveOneItem[] removers=new RemoveOneItem[10]; 603 int num_dead=0; 604 605 for(int i=0; i < removers.length; i++) { 606 removers[i]=new RemoveOneItem(i); 607 removers[i].start(); 608 } 609 610 Util.sleep(1000); 611 612 System.out.println("-- adding element 99"); 613 try { 614 queue.add(new Long (99)); 615 } 616 catch(Exception ex) { 617 System.err.println(ex); 618 } 619 620 Util.sleep(5000); 621 System.out.println("-- adding element 100"); 622 try { 623 queue.add(new Long (100)); 624 } 625 catch(Exception ex) { 626 System.err.println(ex); 627 } 628 629 Util.sleep(1000); 630 631 for(int i=0; i < removers.length; i++) { 632 System.out.println("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")); 633 if(!removers[i].isAlive()) { 634 num_dead++; 635 } 636 } 637 638 assertEquals(2, num_dead); 639 } 640 641 643 public void testBarrierWithTimeOut() 644 { 645 RemoveOneItemWithTimeout[] removers = new RemoveOneItemWithTimeout[10]; 646 int num_dead = 0; 647 648 for (int i = 0; i < removers.length; i++) 649 { 650 removers[i] = new RemoveOneItemWithTimeout(i, 1000); 651 removers[i].start(); 652 } 653 654 Util.sleep(5000); 655 656 System.out.println("-- adding element 99"); 657 try 658 { 659 queue.add(new Long (99)); 660 } 661 catch (Exception ex) 662 { 663 System.err.println(ex); 664 } 665 666 Util.sleep(5000); 667 System.out.println("-- adding element 100"); 668 try 669 { 670 queue.add(new Long (100)); 671 } 672 catch (Exception ex) 673 { 674 System.err.println(ex); 675 } 676 677 Util.sleep(1000); 678 679 for (int i = 0; i < removers.length; i++) 680 { 681 System.out.println("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")); 682 if (!removers[i].isAlive()) 683 { 684 num_dead++; 685 } 686 } 687 688 assertEquals(2, num_dead); 689 690 queue.close(false); 692 Util.sleep(2000); 693 694 num_dead = 0; 695 for (int i = 0; i < removers.length; i++) 696 { 697 System.out.println("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")); 698 if (!removers[i].isAlive()) 699 { 700 num_dead++; 701 } 702 } 703 assertEquals(10, num_dead); 704 705 } 706 707 708 710 public void testMultipleWriterOneReader() 711 { 712 AddOneItem[] adders = new AddOneItem[10]; 713 int num_dead = 0; 714 int num_items = 0; 715 int items = 1000; 716 717 for (int i = 0; i < adders.length; i++) 718 { 719 adders[i] = new AddOneItem(i, items); 720 adders[i].start(); 721 } 722 723 while (num_items < (adders.length*items)) 724 { 725 try 726 { 727 queue.remove(); 728 num_items++; 729 } 730 catch (Exception ex) 731 { 732 System.err.println(ex); 733 } 734 } 735 736 Util.sleep(1000); 737 738 for (int i = 0; i < adders.length; i++) 739 { 740 System.out.println("adder #" + i + " is " + (adders[i].isAlive() ? "alive" : "terminated")); 741 if (!adders[i].isAlive()) 742 { 743 num_dead++; 744 } 745 } 746 747 assertEquals(10, num_dead); 748 749 queue.close(false); } 751 752 753 756 public void testConcurrentAddRemove() { 757 final long NUM=1000000; 758 long num_received=0; 759 Object ret; 760 long start, stop; 761 762 start=System.currentTimeMillis(); 763 764 new Thread () { 765 public void run() { 766 for(int i=0; i < NUM; i++) { 767 try { 768 queue.add(new Object ()); 769 } 770 catch(QueueClosedException e) { 771 } 772 } 773 } 774 }.start(); 775 776 while(num_received < NUM) { 777 try { 778 ret=queue.remove(); 779 if(ret != null) 780 num_received++; 781 } 782 catch(QueueClosedException e) { 783 e.printStackTrace(); 784 fail(); 785 } 786 } 787 assertEquals(NUM, num_received); 788 stop=System.currentTimeMillis(); 789 System.out.println("time to add/remove " + NUM + " elements: " + (stop-start)); 790 } 791 792 793 794 795 public void testConcurrentAccess() { 796 final int NUM_THREADS=10; 797 final int INTERVAL=20000; 798 799 Writer[] writers=new Writer[NUM_THREADS]; 800 Reader[] readers=new Reader[NUM_THREADS]; 801 int[] writes=new int[NUM_THREADS]; 802 int[] reads=new int[NUM_THREADS]; 803 long total_reads=0, total_writes=0; 804 805 806 for(int i=0; i < writers.length; i++) { 807 readers[i]=new Reader(i, reads); 808 readers[i].start(); 809 writers[i]=new Writer(i, writes); 810 writers[i].start(); 811 } 812 813 Util.sleep(INTERVAL); 814 815 System.out.println("current queue size=" + queue.size()); 816 817 for(int i=0; i < writers.length; i++) { 818 writers[i].stopThread(); 819 } 820 821 for(int i=0; i < readers.length; i++) { 822 readers[i].stopThread(); 823 } 824 825 queue.close(false); 827 System.out.println("current queue size=" + queue.size()); 828 829 for(int i=0; i < writers.length; i++) { 830 try { 831 writers[i].join(300); 832 readers[i].join(300); 833 } 834 catch(Exception ex) { 835 System.err.println(ex); 836 } 837 } 838 839 840 for(int i=0; i < writes.length; i++) { 841 System.out.println("Thread #" + i + ": " + writes[i] + " writes, " + reads[i] + " reads"); 842 total_writes+=writes[i]; 843 total_reads+=reads[i]; 844 } 845 System.out.println("total writes=" + total_writes + ", total_reads=" + total_reads + 846 ", diff=" + Math.abs(total_writes - total_reads)); 847 } 848 849 class AddOneItem extends Thread 850 { 851 Long retval = null; 852 int rank = 0; 853 int iteration = 0; 854 855 AddOneItem(int rank, int iteration) 856 { 857 super("AddOneItem thread #" + rank); 858 this.rank = rank; 859 this.iteration = iteration; 860 setDaemon(true); 861 } 862 863 public void run() 864 { 865 try 866 { 867 for (int i = 0; i < iteration; i++) 868 { 869 queue.add(new Long (rank)); 870 } 873 } 874 catch (QueueClosedException closed) 875 { 876 System.err.println("Thread #" + rank + ": queue was closed"); 877 } 878 } 879 880 } 881 882 class RemoveOneItem extends Thread { 883 Long retval=null; 884 int rank=0; 885 886 887 RemoveOneItem(int rank) { 888 super("RemoveOneItem thread #" + rank); 889 this.rank=rank; 890 setDaemon(true); 891 } 892 893 public void run() { 894 try { 895 retval=(Long )queue.remove(); 896 } 898 catch(QueueClosedException closed) { 899 System.err.println("Thread #" + rank + ": queue was closed"); 900 } 901 } 902 903 Long getRetval() { 904 return retval; 905 } 906 } 907 908 class RemoveOneItemWithTimeout extends Thread 909 { 910 Long retval = null; 911 int rank = 0; 912 long timeout = 0; 913 914 RemoveOneItemWithTimeout(int rank, long timeout) 915 { 916 super("RemoveOneItem thread #" + rank); 917 this.rank = rank; 918 this.timeout=timeout; 919 setDaemon(true); 920 } 921 922 public void run() 923 { 924 boolean finished = false; 925 while (!finished) 926 { 927 try 928 { 929 retval = (Long ) queue.remove(timeout); 930 finished = true; 932 } 933 catch (QueueClosedException closed) 934 { 935 System.err.println("Thread #" + rank + ": queue was closed"); 936 finished = true; 937 } 938 catch (TimeoutException e) 939 { 940 } 941 } 942 } 943 944 Long getRetval() 945 { 946 return retval; 947 } 948 } 949 950 951 952 953 class Writer extends Thread { 954 int rank=0; 955 int num_writes=0; 956 boolean running=true; 957 int[] writes=null; 958 959 Writer(int i, int[] writes) { 960 super("WriterThread"); 961 rank=i; 962 this.writes=writes; 963 setDaemon(true); 964 } 965 966 967 public void run() { 968 while(running) { 969 try { 970 queue.add(new Long (System.currentTimeMillis())); 971 num_writes++; 972 } 973 catch(QueueClosedException closed) { 974 running=false; 975 } 976 catch(Throwable t) { 977 System.err.println("QueueTest.Writer.run(): exception=" + t); 978 } 979 } 980 writes[rank]=num_writes; 981 } 982 983 void stopThread() { 984 running=false; 985 } 986 } 987 988 989 class Reader extends Thread { 990 int rank; 991 int num_reads=0; 992 int[] reads=null; 993 boolean running=true; 994 995 996 Reader(int i, int[] reads) { 997 super("ReaderThread"); 998 rank=i; 999 this.reads=reads; 1000 setDaemon(true); 1001 } 1002 1003 1004 public void run() { 1005 Long el; 1006 1007 while(running) { 1008 try { 1009 el=(Long )queue.remove(); 1010 if(el == null) { System.out.println("QueueTest.Reader.run(): peek() returned null element. " + 1012 "queue.size()=" + queue.size() + ", queue.closed()=" + queue.closed()); 1013 } 1014 assertNotNull(el); 1015 num_reads++; 1016 } 1017 catch(QueueClosedException closed) { 1018 running=false; 1019 } 1020 catch(Throwable t) { 1021 System.err.println("QueueTest.Reader.run(): exception=" + t); 1022 } 1023 } 1024 reads[rank]=num_reads; 1025 } 1026 1027 1028 void stopThread() { 1029 running=false; 1030 } 1031 1032 } 1033 1034 1035 public static void main(String [] args) { 1036 String [] testCaseName={QueueTest.class.getName()}; 1037 junit.textui.TestRunner.main(testCaseName); 1038 } 1039 1040} 1041 | Popular Tags |