KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > ConcurrentStackTest


1 package org.jgroups.tests;
2
3 import junit.framework.TestCase;
4 import org.jgroups.*;
5 import org.jgroups.stack.Protocol;
6 import org.jgroups.util.Util;
7
8 import java.util.*;
9 import java.util.concurrent.*;
10 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
11
12 /**
13  * Tests the TLS
14  * @author Bela Ban
15  * @version $Id: ConcurrentStackTest.java,v 1.1 2006/12/27 10:04:08 belaban Exp $
16  */

17 public class ConcurrentStackTest extends TestCase {
18     String JavaDoc props="udp.xml";
19     JChannel ch1, ch2, ch3;
20     final static int NUM=10, EXPECTED=NUM * 3;
21     final static long SLEEPTIME=100;
22     CyclicBarrier barrier;
23
24     public ConcurrentStackTest(String JavaDoc name) {
25         super(name);
26     }
27
28     public void setUp() throws Exception JavaDoc {
29         super.setUp();
30         barrier=new CyclicBarrier(4);
31         ch1=new JChannel(props);
32         ch2=new JChannel(props);
33         ch3=new JChannel(props);
34     }
35
36     protected void tearDown() throws Exception JavaDoc {
37         if(ch3 != null) ch3.close();
38         if(ch2 != null) ch2.close();
39         if(ch1 != null) ch1.close();
40         barrier.reset();
41         super.tearDown();
42     }
43
44
45
46     public void testSequentialDelivery() throws Exception JavaDoc {
47         doIt(false);
48     }
49
50     public void testConcurrentDelivery() throws Exception JavaDoc {
51         doIt(true);
52     }
53
54
55     private void doIt(boolean threadless) throws Exception JavaDoc {
56         long start, stop, diff;
57         setThreadless(ch1, threadless);
58         setThreadless(ch2, threadless);
59         setThreadless(ch3, threadless);
60
61         MyReceiver r1=new MyReceiver("R1"), r2=new MyReceiver("R2"), r3=new MyReceiver("R3");
62         ch1.setReceiver(r1); ch2.setReceiver(r2); ch3.setReceiver(r3);
63
64         ch1.connect("test");
65         ch2.connect("test");
66         ch3.connect("test");
67         View v=ch3.getView();
68         assertEquals(3, v.size());
69
70         new Thread JavaDoc(new Sender(ch1)) {}.start();
71         new Thread JavaDoc(new Sender(ch2)) {}.start();
72         new Thread JavaDoc(new Sender(ch3)) {}.start();
73         barrier.await(); // start senders
74
start=System.currentTimeMillis();
75
76         Exception JavaDoc ex=null;
77
78         try {
79             barrier.await((long)(EXPECTED * SLEEPTIME * 1.3), TimeUnit.MILLISECONDS); // wait for all receivers
80
}
81         catch(java.util.concurrent.TimeoutException JavaDoc e) {
82             ex=e;
83         }
84
85
86         stop=System.currentTimeMillis();
87         diff=stop - start;
88
89         System.out.println("Total time: " + diff + " ms\n");
90
91         checkFIFO(r1);
92         checkFIFO(r2);
93         checkFIFO(r3);
94         checkTime(diff, threadless);
95
96         if(ex != null)
97             throw ex;
98     }
99
100     private void checkFIFO(MyReceiver r) {
101         List<Pair<Address,Integer JavaDoc>> msgs=r.getMessages();
102         Map<Address,List<Integer JavaDoc>> map=new HashMap();
103         for(Pair<Address,Integer JavaDoc> p: msgs) {
104             Address sender=p.key;
105             List<Integer JavaDoc> list=map.get(sender);
106             if(list == null) {
107                 list=new LinkedList();
108                 map.put(sender, list);
109             }
110             list.add(p.val);
111         }
112
113         boolean fifo=true;
114         List<Address> incorrect_receivers=new LinkedList();
115         System.out.println("Checking FIFO for " + r.getName() + ":");
116         for(Address addr: map.keySet()) {
117             List<Integer JavaDoc> list=map.get(addr);
118             print(addr, list);
119             if(!verifyFIFO(list)) {
120                 fifo=false;
121                 incorrect_receivers.add(addr);
122             }
123         }
124         System.out.print("\n");
125
126         if(!fifo)
127             fail("The following receivers didn't receive all messages in FIFO order: " + incorrect_receivers);
128     }
129
130
131     private boolean verifyFIFO(List<Integer JavaDoc> list) {
132         List<Integer JavaDoc> list2=new LinkedList(list);
133         Collections.sort(list2);
134         return list.equals(list2);
135     }
136
137     private void print(Address addr, List<Integer JavaDoc> list) {
138         StringBuilder JavaDoc sb=new StringBuilder JavaDoc();
139         sb.append(addr).append(": ");
140         for(Integer JavaDoc i: list)
141             sb.append(i).append(" ");
142         System.out.println(sb);
143     }
144
145     /** Accepts up to 30% over */
146     private void checkTime(long time, boolean threadless) {
147         long min_time, max_time;
148
149         if(threadless) {
150             min_time=NUM * SLEEPTIME;
151         }
152         else {
153             min_time=EXPECTED * SLEEPTIME;
154         }
155         max_time=(long)(min_time * 1.3); // 30%
156

157         assertTrue("time (" + time + "ms) is out of bounds (min=" + min_time + "ms, max=" + max_time + "ms)",
158                    min_time <= time && time <= max_time);
159     }
160
161
162     private void setThreadless(JChannel ch1, boolean threadless) {
163         Protocol tp=ch1.getProtocolStack().findProtocol("UDP");
164         if(tp == null)
165             throw new IllegalStateException JavaDoc("Protocol UDP not found in properties");
166         Properties p=new Properties();
167         p.setProperty("use_concurrent_stack", String.valueOf(threadless));
168         p.setProperty("thread_pool.min_threads", "1");
169         p.setProperty("thread_pool.max_threads", "100");
170         p.setProperty("thread_pool.queue_enabled", "false");
171         // p.setProperty("loopback", "true");
172
tp.setProperties(p);
173     }
174
175
176     private class Sender implements Runnable JavaDoc {
177         Channel ch;
178         Address local_addr;
179
180         public Sender(Channel ch) {
181             this.ch=ch;
182             local_addr=ch.getLocalAddress();
183         }
184
185         public void run() {
186             Message msg;
187             try {
188                 barrier.await();
189             }
190             catch(Throwable JavaDoc t) {
191                 return;
192             }
193
194             for(int i=1; i <= NUM; i++) {
195                 msg=new Message(null, null, new Integer JavaDoc(i));
196                 try {
197                     // System.out.println(local_addr + ": sending " + i);
198
ch.send(msg);
199                 }
200                 catch(Exception JavaDoc e) {
201                     e.printStackTrace();
202                 }
203             }
204         }
205     }
206
207
208     private class Pair<K,V> {
209         K key;
210         V val;
211
212         public Pair(K key, V val) {
213             this.key=key;
214             this.val=val;
215         }
216
217         public String JavaDoc toString() {
218             return key + "::" + val;
219         }
220     }
221
222     private class MyReceiver extends ReceiverAdapter {
223         String JavaDoc name;
224         final List<Pair<Address,Integer JavaDoc>> msgs=new LinkedList();
225         AtomicInteger JavaDoc count=new AtomicInteger JavaDoc(0);
226
227         public MyReceiver(String JavaDoc name) {
228             this.name=name;
229         }
230
231         public void receive(Message msg) {
232             Util.sleep(SLEEPTIME);
233             Pair pair=new Pair<Address,Integer JavaDoc>(msg.getSrc(), (Integer JavaDoc)msg.getObject());
234             // System.out.println(name + ": received " + pair);
235
synchronized(msgs) {
236                 msgs.add(pair);
237             }
238             if(count.incrementAndGet() >= EXPECTED) {
239                 try {
240                     barrier.await();
241                 }
242                 catch(Exception JavaDoc e) {
243                     e.printStackTrace();
244                 }
245             }
246         }
247
248         public List getMessages() {return msgs;}
249
250         public String JavaDoc getName() {
251             return name;
252         }
253     }
254
255
256     public static void main(String JavaDoc[] args) {
257         String JavaDoc[] testCaseName={ConcurrentStackTest.class.getName()};
258         junit.textui.TestRunner.main(testCaseName);
259     }
260
261 }
262
Popular Tags