KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > FIFOMessageQueueTest


1 package org.jgroups.tests;
2
3 import junit.framework.TestCase;
4 import junit.framework.TestSuite;
5 import org.jgroups.Address;
6 import org.jgroups.stack.IpAddress;
7 import org.jgroups.util.FIFOMessageQueue;
8 import org.jgroups.util.Util;
9
10 import java.util.Collections JavaDoc;
11 import java.util.LinkedList JavaDoc;
12 import java.util.concurrent.BrokenBarrierException JavaDoc;
13 import java.util.concurrent.CyclicBarrier JavaDoc;
14
15 /**
16  * @author Bela Ban
17  * @version $Id: FIFOMessageQueueTest.java,v 1.1 2007/07/04 07:29:34 belaban Exp $
18  */

19 public class FIFOMessageQueueTest extends TestCase {
20     FIFOMessageQueue<String JavaDoc,Integer JavaDoc> queue;
21     String JavaDoc s1="s1", s2="s2", s3="s3";
22     private static final Address a1, a2;
23
24     static {
25         a1=new IpAddress(5000);
26         a2=new IpAddress(6000);
27     }
28
29     public void setUp() throws Exception JavaDoc {
30         super.setUp();
31         queue=new FIFOMessageQueue<String JavaDoc,Integer JavaDoc>();
32     }
33
34     public void tearDown() throws Exception JavaDoc {
35         super.tearDown();
36     }
37
38
39     public void testPollFromEmptyQueue() throws InterruptedException JavaDoc {
40         assertEquals(0, queue.size());
41         Integer JavaDoc ret=queue.poll(5);
42         assertNull(ret);
43         assertEquals("queue.size() should be 0, but is " + queue.size(), 0, queue.size());
44     }
45
46
47     public void testPutTwoTakeTwo() throws InterruptedException JavaDoc {
48         queue.put(a1, s1, 1); // 1 is available immediately
49
queue.put(a1, s1, 2); // 2 is queued
50
Integer JavaDoc ret=queue.poll(5);
51         assertNotNull(ret);
52         queue.done(a1, s1); // 2 is made available (moved into 'queue')
53
queue.done(a1, s1); // done() by the first putter
54
ret=queue.poll(5);
55         assertNotNull(ret);
56         assertEquals(0, queue.size());
57         queue.put(a1, s1, 3);
58         assertEquals(1, queue.size());
59         ret=queue.poll(5); // 3 should be available because queue for a1/s1 was empty
60
assertNotNull(ret);
61     }
62
63
64     public void testTakeFollowedByPut() throws InterruptedException JavaDoc {
65         assertEquals(0, queue.size());
66
67         new Thread JavaDoc() {
68
69             public void run() {
70                 Util.sleep(1000);
71                 try {
72                     queue.put(a1, s1, 1);
73                 }
74                 catch(InterruptedException JavaDoc e) {
75
76                 }
77             }
78         }.start();
79
80         Integer JavaDoc ret=queue.take();
81         assertNotNull(ret);
82         assertEquals(1, ret.intValue());
83         assertEquals("queue.size() should be 0, but is " + queue.size(), 0, queue.size());
84     }
85
86
87     public void testMultipleTakersOnePutter() throws Exception JavaDoc {
88         final CyclicBarrier JavaDoc barrier=new CyclicBarrier JavaDoc(11);
89         for(int i=0; i < 10; i++) {
90             new Thread JavaDoc() {
91                 public void run() {
92                     try {
93                         barrier.await();
94                         queue.take();
95
96                     }
97                     catch(Exception JavaDoc e) {
98                     }
99                 }
100             }.start();
101         }
102         barrier.await();
103         for(int i=0; i < 10; i++) {
104             queue.put(a1, s1, i);
105             queue.done(a1, s1);
106         }
107         Util.sleep(100);
108         assertEquals(0, queue.size());
109     }
110
111
112     public void testConcurrentPutsAndTakes() throws InterruptedException JavaDoc {
113         final int NUM=10000;
114         final int print=NUM / 10;
115
116         Thread JavaDoc putter=new Thread JavaDoc() {
117
118             public void run() {
119                 setName("Putter");
120                 int cnt=0;
121                 for(int i=0; i < NUM; i++) {
122                     try {
123                         queue.put(a1, s1, i);
124                         cnt++;
125                         if(cnt % print == 0) {
126                             System.out.println("Putter: " + cnt);
127                         }
128                         queue.done(a1, s1);
129                     }
130                     catch(InterruptedException JavaDoc e) {
131                         e.printStackTrace();
132                     }
133                 }
134             }
135         };
136
137         Thread JavaDoc taker=new Thread JavaDoc() {
138
139             public void run() {
140                 setName("Taker");
141                 int cnt=0;
142                 for(int i=0; i < NUM; i++) {
143                     try {
144                         queue.take();
145                         cnt++;
146                         if(cnt % print == 0) {
147                             System.out.println("Taker: " + cnt);
148                         }
149                     }
150                     catch(InterruptedException JavaDoc e) {
151                         e.printStackTrace();
152                     }
153                 }
154             }
155         };
156
157         System.out.println("starting threads");
158         taker.start();
159         putter.start();
160
161         new Thread JavaDoc() {
162
163             public void run() {
164                 Util.sleep(3000);
165                 System.out.println("queue:\n" + queue);
166             }
167         }.start();
168
169         putter.join();
170         taker.join();
171
172         assertEquals(0, queue.size());
173     }
174
175
176     public void testNullAddress() throws InterruptedException JavaDoc {
177         queue.put(null, s1, 1);
178         queue.put(a1, s1, 2);
179         queue.put(a1, s1, 3);
180         queue.put(null, s1, 4);
181         System.out.println("queue:\n" + queue);
182
183         Integer JavaDoc ret=queue.poll(5);
184         assertNotNull(ret);
185         assertEquals(1, ret.intValue());
186
187         ret=queue.poll(5);
188         assertNotNull(ret);
189         assertEquals(2, ret.intValue());
190
191         ret=queue.poll(5);
192         assertNotNull(ret);
193         assertEquals(4, ret.intValue());
194
195         ret=queue.poll(5);
196         assertNull(ret);
197
198         queue.done(a1, s1);
199         ret=queue.poll(5);
200         assertNotNull(ret);
201         assertEquals(3, ret.intValue());
202
203         ret=queue.poll(5);
204         assertNull(ret);
205         assertEquals(0, queue.size());
206     }
207
208
209     public void testSimplePutAndTake() throws InterruptedException JavaDoc {
210         queue.put(a1, s1, 1);
211         assertEquals(1, queue.size());
212         int ret=queue.take();
213         assertEquals(1, ret);
214         assertEquals(0, queue.size());
215     }
216
217     public void testSimplePutAndTakeMultipleSenders() throws InterruptedException JavaDoc {
218         queue.put(a1, s1, 1);
219         queue.put(a2, s1, 2);
220         System.out.println("queue is:\n" + queue);
221         assertEquals(2, queue.size());
222         int ret=queue.take();
223         assertEquals(1, ret);
224         assertEquals(1, queue.size());
225         ret=queue.take();
226         assertEquals(2, ret);
227         assertEquals(0, queue.size());
228     }
229
230     public void testMultiplePutsAndTakes() throws InterruptedException JavaDoc {
231         for(int i=1; i <= 5; i++)
232             queue.put(a1, s1, i);
233         System.out.println("queue is " + queue);
234         assertEquals(5, queue.size());
235         for(int i=1; i <= 5; i++) {
236             int ret=queue.take();
237             assertEquals(i, ret);
238             assertEquals(5-i, queue.size());
239             queue.done(a1, s1);
240         }
241         assertEquals(0, queue.size());
242     }
243
244
245     /**
246      * Sender A sends M1 to S1 and M2 to S1. M2 should wait until M1 is done
247      */

248     public void testSameSenderSameDestination() throws InterruptedException JavaDoc {
249         queue.put(a1, s1, 1);
250         queue.put(a1, s1, 2);
251         queue.put(a1, s1, 3);
252         System.out.println("queue:\n" + queue);
253
254         assertEquals(3, queue.size());
255         int ret=queue.take();
256
257         assertEquals(1, ret);
258         Integer JavaDoc retval=queue.poll(100);
259         assertNull(retval);
260         queue.done(a1, s1);
261         System.out.println("queue:\n" + queue);
262         ret=queue.take();
263         assertEquals(2, ret);
264         queue.done(a1, s1);
265         System.out.println("queue:\n" + queue);
266         ret=queue.take();
267         System.out.println("queue:\n" + queue);
268         assertEquals(3, ret);
269     }
270
271
272
273     /**
274      * Sender A sends M1 to S1 and M2 to S2. M2 should get processed immediately and not have
275      * to wait for M1 to complete
276      */

277     public void testSameSenderMultipleDestinations() throws InterruptedException JavaDoc {
278         queue.put(a1, s1, 10);
279         queue.put(a1, s1, 11);
280         queue.put(a1, s1, 12);
281
282         queue.put(a1, s2, 20);
283         queue.put(a1, s2, 21);
284         queue.put(a1, s2, 22);
285
286         queue.put(a1, s3, 30);
287         queue.put(a1, s3, 31);
288         queue.put(a1, s3, 32);
289         System.out.println("queue:\n" + queue);
290         Integer JavaDoc ret=queue.poll(5);
291         assertNotNull(ret);
292         assertEquals(10, ret.intValue());
293
294         ret=queue.poll(5);
295         assertNotNull(ret);
296         assertEquals(20, ret.intValue());
297
298         ret=queue.poll(5);
299         assertNotNull(ret);
300         assertEquals(30, ret.intValue());
301
302         ret=queue.poll(5);
303         assertNull(ret);
304
305         queue.done(a1, s3);
306         queue.done(a1, s1);
307         queue.done(a1, s2);
308
309         ret=queue.poll(5);
310         assertNotNull(ret);
311         assertEquals(31, ret.intValue());
312
313         ret=queue.poll(5);
314         assertNotNull(ret);
315         assertEquals(11, ret.intValue());
316
317         ret=queue.poll(5);
318         assertNotNull(ret);
319         assertEquals(21, ret.intValue());
320
321         ret=queue.poll(5);
322         assertNull(ret);
323
324         assertEquals(3, queue.size());
325
326         ret=queue.poll(5);
327         assertNull(ret);
328
329         queue.done(a1, s1);
330         queue.done(a1, s3);
331         queue.done(a1, s2);
332
333         ret=queue.poll(5);
334         assertNotNull(ret);
335         assertEquals(12, ret.intValue());
336
337         ret=queue.poll(5);
338         assertNotNull(ret);
339         assertEquals(32, ret.intValue());
340
341         ret=queue.poll(5);
342         assertNotNull(ret);
343         assertEquals(22, ret.intValue());
344
345         ret=queue.poll(5);
346         assertNull(ret);
347
348         assertEquals(0, queue.size());
349     }
350
351
352     /**
353      * Sender A sends M1 to S1 and sender B sends M2 to S1. M2 should get processed concurrently to M1 and
354      * should not have to wait for M1's completion
355      */

356     public void testDifferentSendersSameDestination() throws InterruptedException JavaDoc {
357         queue.put(a1, s1, 10);
358         queue.put(a2, s1, 20);
359         queue.put(a1, s1, 11);
360         queue.put(a2, s1, 21);
361         System.out.println("queue:\n" + queue);
362         assertEquals(4, queue.size());
363
364         Integer JavaDoc ret=queue.poll(5);
365         assertNotNull(ret);
366         assertEquals(10, ret.intValue());
367
368         ret=queue.poll(5);
369         assertNotNull(ret);
370         assertEquals(20, ret.intValue());
371
372         queue.done(a1, s1);
373         ret=queue.poll(5);
374         assertNotNull(ret);
375         assertEquals(11, ret.intValue());
376
377         queue.done(a2, s1);
378         ret=queue.poll(5);
379         assertNotNull(ret);
380         assertEquals(21, ret.intValue());
381
382         ret=queue.poll(5);
383         assertNull(ret);
384         assertEquals(0, queue.size());
385     }
386
387
388
389     /**
390      * Sender A sends M1 to S1 and sender B sends M2 to S2. M1 and M2 should get processed concurrently
391      */

392     public void testDifferentSendersDifferentDestinations() throws Exception JavaDoc {
393         queue.put(a1, s1, 1);
394         queue.put(a2, s2, 2);
395         queue.put(a1, s2, 3);
396         queue.put(a2, s1, 4);
397         System.out.println("queue:\n" + queue);
398         assertEquals(4, queue.size());
399
400         Integer JavaDoc ret=queue.poll(5);
401         assertNotNull(ret);
402         assertEquals(1, ret.intValue());
403
404         ret=queue.poll(5);
405         assertNotNull(ret);
406         assertEquals(2, ret.intValue());
407
408         ret=queue.poll(5);
409         assertNotNull(ret);
410         assertEquals(3, ret.intValue());
411
412         ret=queue.poll(5);
413         assertNotNull(ret);
414         assertEquals(4, ret.intValue());
415
416         ret=queue.poll(5);
417         assertNull(ret);
418         assertEquals(0, queue.size());
419
420     }
421
422
423
424     public void testDifferentSendersDifferentDestinationsMultipleMessages() throws Exception JavaDoc {
425         queue.put(a1, s1, 1);
426         queue.put(a2, s2, 2);
427         queue.put(a1, s2, 3);
428         queue.put(a2, s1, 4);
429
430         queue.put(a1, s1, 5);
431         queue.put(a2, s2, 6);
432         queue.put(a1, s2, 7);
433         queue.put(a2, s1, 8);
434
435         System.out.println("queue:\n" + queue);
436         assertEquals(8, queue.size());
437
438         Integer JavaDoc ret=queue.poll(5);
439         assertNotNull(ret);
440         assertEquals(1, ret.intValue());
441
442         ret=queue.poll(5);
443         assertNotNull(ret);
444         assertEquals(2, ret.intValue());
445
446         ret=queue.poll(5);
447         assertNotNull(ret);
448         assertEquals(3, ret.intValue());
449
450         ret=queue.poll(5);
451         assertNotNull(ret);
452         assertEquals(4, ret.intValue());
453
454
455         queue.done(a1, s1);
456         ret=queue.poll(5);
457         assertNotNull(ret);
458         assertEquals(5, ret.intValue());
459
460         queue.done(a2, s2);
461         ret=queue.poll(5);
462         assertNotNull(ret);
463         assertEquals(6, ret.intValue());
464
465         queue.done(a1, s2);
466         ret=queue.poll(5);
467         assertNotNull(ret);
468         assertEquals(7, ret.intValue());
469
470         queue.done(a2, s1);
471         ret=queue.poll(5);
472         assertNotNull(ret);
473         assertEquals(8, ret.intValue());
474     }
475     
476
477
478     public void testOrdering() throws InterruptedException JavaDoc {
479         for(int i=1; i <= 3; i++)
480             queue.put(a1, s1, i);
481         assertEquals(3, queue.size());
482
483         int ret=queue.take();
484         assertEquals(1, ret);
485         assertEquals(2, queue.size());
486
487         queue.done(a1, s1);
488         queue.put(a1, s1, 4);
489         queue.put(a1, s1, 5);
490         System.out.println("queue: " + queue);
491
492         for(int i=2; i <= 5; i++) {
493             ret=queue.take();
494             assertEquals(i, ret);
495             assertEquals(5-i, queue.size());
496             queue.done(a1, s1);
497         }
498         assertEquals(0, queue.size());
499     }
500
501
502     public void testOrderingMultipleThreads() throws BrokenBarrierException JavaDoc, InterruptedException JavaDoc {
503         CyclicBarrier JavaDoc barrier=new CyclicBarrier JavaDoc(4);
504         int NUM=500;
505         Producer p1=new Producer(queue, "s1", 1, NUM, barrier);
506         Producer p2=new Producer(queue, "s2", 1001, NUM, barrier);
507         Producer p3=new Producer(queue, "s3", 2001, NUM, barrier);
508
509         p1.start();
510         p2.start();
511         p3.start();
512         Util.sleep(100);
513         barrier.await(); // starts all 3 threads
514

515         p1.join();
516         p2.join();
517         p3.join();
518         System.out.println("queue: " + queue.size() + " elements");
519         assertEquals(NUM * 3, queue.size());
520     }
521
522     public void testOrderingMultipleThreadsWithTakes() throws BrokenBarrierException JavaDoc, InterruptedException JavaDoc {
523         testOrderingMultipleThreads();
524         int ret;
525         LinkedList JavaDoc<Integer JavaDoc> list=new LinkedList JavaDoc<Integer JavaDoc>();
526
527         int size=queue.size();
528         for(int i=0; i < size; i++) {
529             ret=queue.take();
530             list.add(ret);
531             queue.done(a1, "s1");
532             queue.done(a1, "s2");
533             queue.done(a1, "s3");
534         }
535
536         System.out.println("analyzing returned values for correct ordering");
537         LinkedList JavaDoc<Integer JavaDoc> one=new LinkedList JavaDoc<Integer JavaDoc>(), two=new LinkedList JavaDoc<Integer JavaDoc>(), three=new LinkedList JavaDoc<Integer JavaDoc>();
538         for(int val: list) {
539             if(val < 1000) {
540                 one.add(val);
541                 continue;
542             }
543             if(val > 1000 && val <= 2000) {
544                 two.add(val);
545                 continue;
546             }
547             if(val > 2000) {
548                 three.add(val);
549             }
550         }
551
552         int len=one.size();
553         assertEquals(len, two.size());
554         assertEquals(len, three.size());
555
556
557         LinkedList JavaDoc<Integer JavaDoc> sorted_one=new LinkedList JavaDoc<Integer JavaDoc>(one);
558         Collections.sort(sorted_one);
559         assertEquals("one: " + one + ", sorted: " + sorted_one, one, sorted_one);
560
561         LinkedList JavaDoc<Integer JavaDoc> sorted_two=new LinkedList JavaDoc<Integer JavaDoc>(two);
562         Collections.sort(sorted_two);
563         assertEquals("two: " + two + ", sorted: " + sorted_two, two, sorted_two);
564
565         LinkedList JavaDoc<Integer JavaDoc> sorted_three=new LinkedList JavaDoc<Integer JavaDoc>(three);
566         Collections.sort(sorted_three);
567         assertEquals("three: " + three + ", sorted: " + sorted_three, three, sorted_three);
568
569         System.out.println("OK - all 3 collections are ordered");
570     }
571
572
573
574     private static class Producer extends Thread JavaDoc {
575         private FIFOMessageQueue<String JavaDoc,Integer JavaDoc> queue;
576         private String JavaDoc key;
577         private int num_msgs;
578         private CyclicBarrier JavaDoc barrier;
579         private int start_num;
580
581         private Producer(FIFOMessageQueue<String JavaDoc,Integer JavaDoc> queue, String JavaDoc key, int start_num, int num_msgs, CyclicBarrier JavaDoc barrier) {
582             this.queue=queue;
583             this.key=key;
584             this.start_num=start_num;
585             this.num_msgs=num_msgs;
586             this.barrier=barrier;
587         }
588
589
590         public void run() {
591             try {
592                 barrier.await();
593             }
594             catch(Exception JavaDoc e) {
595                 e.printStackTrace();
596             }
597             for(int i=start_num; i <= num_msgs+start_num-1; i++) {
598                 try {
599                     // Util.sleepRandom(50);
600
queue.put(a1, key, i);
601                 }
602                 catch(InterruptedException JavaDoc e) {
603                     e.printStackTrace();
604                 }
605             }
606         }
607     }
608
609
610     public static junit.framework.Test suite() {
611         return new TestSuite(FIFOMessageQueueTest.class);
612     }
613
614     public static void main(String JavaDoc[] args) {
615         junit.textui.TestRunner.run(suite());
616     }
617 }
618
Popular Tags