KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tctest > WaitNotifySystemTestApp


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tctest;
5
6 import com.tc.logging.TCLogger;
7 import com.tc.logging.TCLogging;
8 import com.tc.object.config.ConfigVisitor;
9 import com.tc.object.config.DSOClientConfigHelper;
10 import com.tc.simulator.app.ApplicationConfig;
11 import com.tc.simulator.listener.ListenerProvider;
12 import com.tctest.runner.AbstractTransparentApp;
13
14 import java.util.ArrayList JavaDoc;
15 import java.util.Arrays JavaDoc;
16 import java.util.HashSet JavaDoc;
17 import java.util.Iterator JavaDoc;
18 import java.util.LinkedList JavaDoc;
19 import java.util.List JavaDoc;
20 import java.util.Random JavaDoc;
21 import java.util.Set JavaDoc;
22
23 public class WaitNotifySystemTestApp extends AbstractTransparentApp {
24   private static final TCLogger logger = TCLogging.getTestingLogger(WaitNotifySystemTestApp.class);
25
26   // roots
27
private final List JavaDoc queue = new LinkedList JavaDoc();
28   private final Set JavaDoc takers = new HashSet JavaDoc();
29   private final Set JavaDoc putters = new HashSet JavaDoc();
30   private final Set JavaDoc workers = new HashSet JavaDoc();
31   private final List JavaDoc takeCounts = new LinkedList JavaDoc();
32   private final Flag first = new Flag();
33
34   private static final int PUTS = 50;
35   private static final boolean debug = true;
36   private Random JavaDoc random;
37
38   public WaitNotifySystemTestApp(String JavaDoc globalId, ApplicationConfig cfg, ListenerProvider listenerProvider) {
39     super(globalId, cfg, listenerProvider);
40
41     if (getParticipantCount() < 3) { throw new RuntimeException JavaDoc("Must have at least 3 participants to run this test"); }
42
43   }
44
45   public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config) {
46     String JavaDoc testClassName = WaitNotifySystemTestApp.class.getName();
47
48     List JavaDoc roots = Arrays.asList(new Object JavaDoc[] { "queue", "takers", "putters", "workers", "takeCounts", "first" });
49     for (Iterator JavaDoc iter = roots.iterator(); iter.hasNext();) {
50       String JavaDoc root = (String JavaDoc) iter.next();
51       config.addRoot(testClassName, root, root + "Lock", true);
52       System.err.println("Adding root for " + testClassName + "." + root);
53     }
54
55     String JavaDoc methodExpression = "* " + testClassName + "*.*(..)";
56     System.err.println("Adding autolock for: " + methodExpression);
57     config.addWriteAutolock(methodExpression);
58
59     config.addIncludePattern(Flag.class.getName());
60     config.addIncludePattern(WorkItem.class.getName());
61   }
62
63   public void run() {
64     random = new Random JavaDoc(new Random JavaDoc(System.currentTimeMillis() + getApplicationId().hashCode()).nextLong());
65
66     try {
67       run0();
68       notifyResult(Boolean.TRUE);
69     } catch (Throwable JavaDoc t) {
70       notifyError(t);
71       notifyResult(Boolean.FALSE);
72     }
73   }
74
75   public void run0() throws Throwable JavaDoc {
76     final long id = new Long JavaDoc(getApplicationId()).longValue();
77
78     if (first.attemptSet()) {
79       // I am the master, bow before me
80
runMaster(getParticipantCount() - 1);
81     } else {
82       if (random.nextBoolean()) {
83         runPutter(id);
84       } else {
85         runTaker(id);
86       }
87     }
88   }
89
90   private void runTaker(long id) throws InterruptedException JavaDoc {
91     Thread.currentThread().setName("TAKER-" + id);
92
93     Long JavaDoc myID = new Long JavaDoc(id);
94     synchronized (takers) {
95       takers.add(myID);
96     }
97
98     synchronized (workers) {
99       workers.add(myID);
100       workers.notify();
101     }
102
103     log("STARTED");
104
105     long count = 0;
106     try {
107       synchronized (queue) {
108         while (true) {
109           if (queue.size() > 0) {
110             WorkItem wi = (WorkItem) queue.remove(0);
111             // log("TOOK " + wi);
112
if (wi.isStop()) {
113               log("took a stop item");
114               return;
115             }
116             count++;
117           } else {
118             if (random.nextBoolean()) {
119               long millis = random.nextInt(10000);
120               if (random.nextBoolean()) {
121                 int nanos = random.nextInt(10000);
122                 // log("wait(" + millis + ", " + nanos + ")");
123
queue.wait(millis, nanos);
124               } else {
125                 // log("wait(" + millis + ")");
126
queue.wait(millis);
127               }
128             } else {
129               // log("wait()");
130
queue.wait();
131             }
132           }
133         }
134       }
135
136     } finally {
137       log("adding to takeCount");
138       synchronized (takeCounts) {
139         takeCounts.add(new Long JavaDoc(count));
140       }
141
142       log("removing self from takers set");
143       synchronized (takers) {
144         takers.remove(myID);
145         takers.notify();
146       }
147
148       log("ENDED");
149     }
150   }
151
152   private void runPutter(long id) {
153     Thread.currentThread().setName("PUTTER-" + id);
154
155     Long JavaDoc myID = new Long JavaDoc(id);
156     synchronized (putters) {
157       putters.add(myID);
158     }
159
160     synchronized (workers) {
161       workers.add(myID);
162       workers.notify();
163     }
164
165     log("STARTED");
166
167     try {
168       for (int i = 0; i < PUTS; i++) {
169         synchronized (queue) {
170           WorkItem newWork = new WorkItem(myID.toString() + "-" + i);
171
172           // log("PUTTING new work: " + newWork);
173

174           queue.add(newWork);
175
176           if (random.nextBoolean()) {
177             // log("notify all");
178
queue.notifyAll();
179           } else {
180             // log("notify");
181
queue.notify();
182           }
183         }
184       }
185     } finally {
186       log("removing self from putters set");
187       synchronized (putters) {
188         putters.remove(myID);
189         putters.notify();
190       }
191
192       log("ENDED");
193     }
194   }
195
196   private void runMaster(int workerCount) throws InterruptedException JavaDoc {
197     Thread.currentThread().setName("MASTER");
198
199     waitForAllWorkers(workerCount);
200
201     log("All worker nodes started");
202
203     final Long JavaDoc workerIDs[];
204     synchronized (workers) {
205       workerIDs = (Long JavaDoc[]) workers.toArray(new Long JavaDoc[] {});
206     }
207
208     final long extraTakerID = getUniqueId(workerIDs);
209     final List JavaDoc next = new ArrayList JavaDoc(workers);
210     next.add(new Long JavaDoc(extraTakerID));
211     final long extraPutterID = getUniqueId((Long JavaDoc[]) next.toArray(new Long JavaDoc[] {}));
212
213     // start up another taker and putter locally. Do this for two reasons:
214
// 1) Taker/Putter choice is made randomly, thus it is possible that there are only putters or takers out there
215
// 2) To force some wait/notify in the intra-VM context
216
Thread JavaDoc extraTaker = new Thread JavaDoc( new Runnable JavaDoc() {
217       public void run() {
218         try {
219           runTaker(extraTakerID);
220         } catch (Throwable JavaDoc t) {
221           WaitNotifySystemTestApp.this.notifyError(t);
222         }
223       }
224     });
225     extraTaker.start();
226
227     Thread JavaDoc extraPutter = new Thread JavaDoc(new Runnable JavaDoc() {
228       public void run() {
229         try {
230           runPutter(extraPutterID);
231         } catch (Throwable JavaDoc t) {
232           WaitNotifySystemTestApp.this.notifyError(t);
233         }
234       }
235     });
236     extraPutter.start();
237
238     workerCount += 2;
239     waitForAllWorkers(workerCount);
240
241     log("Extra workers started");
242
243     final int numTakers;
244     synchronized (takers) {
245       numTakers = takers.size();
246     }
247     log("takers count = " + numTakers);
248
249     final int numPutters = workerCount - numTakers;
250     log("putters count = " + numPutters);
251
252     // wait for all putters to finish
253
synchronized (putters) {
254       while (putters.size() > 0) {
255         log("waiting for putters: " + putters.size());
256         putters.wait();
257       }
258     }
259
260     log("All putters done");
261
262     // tell the takers to stop
263
synchronized (queue) {
264       for (int i = 0; i < numTakers; i++) {
265         queue.add(WorkItem.STOP);
266       }
267       queue.notifyAll();
268     }
269
270     log("Takers told to stop");
271
272     // wait for all the takers to finish
273
synchronized (takers) {
274       while (takers.size() > 0) {
275         log("waiting for takers: " + takers.size());
276         takers.wait();
277       }
278     }
279
280     log("Takers all done");
281
282     // total up the work items each taker saw
283
long total = 0;
284     synchronized (takeCounts) {
285       log("Collecting take counts");
286
287       if (takeCounts.size() != numTakers) {
288         // shouldn't happen, but if something is wrong, it might be useful to know how many take counts were there
289
throw new RuntimeException JavaDoc("Wrong number of take counts: " + takeCounts.size() + " != " + numTakers);
290       }
291
292       for (Iterator JavaDoc iter = takeCounts.iterator(); iter.hasNext();) {
293         Long JavaDoc count = (Long JavaDoc) iter.next();
294         total += count.longValue();
295       }
296     }
297
298     // verify the results
299
final int expectedTotal = numPutters * PUTS;
300     if (total != expectedTotal) { throw new RuntimeException JavaDoc("Expected " + expectedTotal + ", but we got " + total); }
301   }
302
303   private void waitForAllWorkers(int workerCount) throws InterruptedException JavaDoc {
304     // wait for everyone to start
305
synchronized (workers) {
306       while (workers.size() < workerCount) {
307         final int lastCount = workers.size();
308         log("waiting for workers " + workers.size());
309         workers.wait();
310         if (lastCount == workers.size()) { throw new Error JavaDoc("Size didn't change!!!"); }
311       }
312     }
313   }
314
315   private long getUniqueId(Long JavaDoc[] workerIDs) {
316     while (true) {
317       final long candidate = random.nextInt(Integer.MAX_VALUE);
318       boolean okay = true;
319       for (int i = 0; i < workerIDs.length; i++) {
320         if (workerIDs[i].longValue() == candidate) {
321           okay = false;
322           break;
323         }
324
325         if (okay) { return candidate; }
326       }
327     }
328   }
329
330   private static void log(String JavaDoc msg) {
331     if (debug) logger.info(msg);
332   }
333
334   private static class Flag {
335     private boolean set = false;
336
337     synchronized boolean attemptSet() {
338       if (!set) {
339         set = true;
340         return true;
341       }
342       return false;
343     }
344   }
345
346   private static class WorkItem {
347     static final WorkItem STOP = new WorkItem("STOP");
348
349     private final String JavaDoc name;
350
351     WorkItem(String JavaDoc name) {
352       this.name = name;
353     }
354
355     boolean isStop() {
356       return STOP.name.equals(name);
357     }
358
359     public String JavaDoc toString() {
360       return this.name;
361     }
362   }
363
364 }
Popular Tags