KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tctest > restart > system > ObjectDataRestartTestApp


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tctest.restart.system;
6
7 import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
8 import EDU.oswego.cs.dl.util.concurrent.LinkedNode;
9 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
10 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
11
12 import com.tc.exception.TCRuntimeException;
13 import com.tc.object.config.ConfigVisitor;
14 import com.tc.object.config.DSOClientConfigHelper;
15 import com.tc.object.config.TransparencyClassSpec;
16 import com.tc.object.config.spec.SynchronizedIntSpec;
17 import com.tc.simulator.app.ApplicationConfig;
18 import com.tc.simulator.listener.ListenerProvider;
19 import com.tc.simulator.listener.OutputListener;
20 import com.tc.util.Assert;
21 import com.tc.util.DebugUtil;
22 import com.tctest.runner.AbstractTransparentApp;
23
24 import java.util.ArrayList JavaDoc;
25 import java.util.Collection JavaDoc;
26 import java.util.Date JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.HashSet JavaDoc;
29 import java.util.Iterator JavaDoc;
30 import java.util.List JavaDoc;
31 import java.util.Map JavaDoc;
32
33 public class ObjectDataRestartTestApp extends AbstractTransparentApp {
34   public static final String JavaDoc SYNCHRONOUS_WRITE = "synch-write";
35
36   private int threadCount = 10;
37   private int workSize = 1 * 100;
38   private int testObjectDepth = 1 * 50;
39   // I had to dial this down considerably because this takes a long time to run.
40
private int iterationCount = 1 * 2;
41   private List JavaDoc workQueue = new ArrayList JavaDoc();
42   private Collection JavaDoc resultSet = new HashSet JavaDoc();
43   private SynchronizedInt complete = new SynchronizedInt(0);
44   private OutputListener out;
45   private SynchronizedInt nodes = new SynchronizedInt(0);
46
47   public ObjectDataRestartTestApp(String JavaDoc appId, ApplicationConfig cfg, ListenerProvider listenerProvider) {
48     super(appId, cfg, listenerProvider);
49     this.out = listenerProvider.getOutputListener();
50   }
51
52   public void run() {
53     try {
54       // set up workers
55
WorkerFactory wf = new WorkerFactory(getApplicationId(), workQueue, resultSet, complete);
56
57       for (int i = 0; i < threadCount; i++) {
58         Worker worker = wf.newWorker();
59         new Thread JavaDoc(worker).start();
60       }
61
62       if (nodes.increment() == 1) {
63         // if we are the first participant, we control the work queue and do the verifying
64
// System.err.println("Populating work queue...");
65
populateWorkQueue(workSize, testObjectDepth, workQueue);
66         for (int i = 0; i < iterationCount; i++) {
67           synchronized (resultSet) {
68             while (resultSet.size() < workSize) {
69               try {
70                 resultSet.wait();
71               } catch (InterruptedException JavaDoc e) {
72                 throw new TCRuntimeException(e);
73               }
74             }
75           }
76           verify(i + 1, resultSet);
77           if (i != (iterationCount - 1)) {
78             synchronized (resultSet) {
79               for (Iterator JavaDoc iter = resultSet.iterator(); iter.hasNext();) {
80                 put(workQueue, iter.next());
81                 iter.remove();
82               }
83             }
84           }
85         }
86         for (int i = 0; i < wf.getGlobalWorkerCount(); i++) {
87           put(workQueue, "STOP");
88         }
89       }
90     } catch (Exception JavaDoc e) {
91       throw new TCRuntimeException(e);
92     }
93   }
94
95   private void populateWorkQueue(int size, int depth, List JavaDoc queue) {
96     System.err.println(" Thread - " + Thread.currentThread().getName() + " inside populateWorkQueue !");
97     for (int i = 0; i < size; i++) {
98       TestObject to = new TestObject("" + i);
99       to.populate(depth);
100       put(queue, to);
101     }
102   }
103
104   private void verify(int expectedValue, Collection JavaDoc results) {
105     synchronized (results) {
106       Assert.assertEquals(workSize, results.size());
107       for (Iterator JavaDoc i = results.iterator(); i.hasNext();) {
108         TestObject to = (TestObject) i.next();
109         if (!to.validate(expectedValue)) { throw new RuntimeException JavaDoc("Failed!"); }
110       }
111     }
112   }
113
114   public final void println(Object JavaDoc o) {
115     try {
116       out.println(o);
117     } catch (InterruptedException JavaDoc e) {
118       throw new TCRuntimeException(e);
119     }
120   }
121
122   public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config) {
123     visitL1DSOConfig(visitor, config, new HashMap JavaDoc());
124   }
125
126   public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config, Map JavaDoc optionalAttributes) {
127     DebugUtil.DEBUG = true;
128
129     boolean isSynchronousWrite = false;
130     if (optionalAttributes.size() > 0) {
131       isSynchronousWrite = Boolean.valueOf((String JavaDoc) optionalAttributes.get(ObjectDataRestartTestApp.SYNCHRONOUS_WRITE))
132           .booleanValue();
133     }
134
135     visitor.visit(config, Barriers.class);
136
137     String JavaDoc testClassName = ObjectDataRestartTestApp.class.getName();
138     TransparencyClassSpec spec = config.getOrCreateSpec(testClassName);
139
140     String JavaDoc idProviderClassname = IDProvider.class.getName();
141     config.addIncludePattern(idProviderClassname);
142
143     String JavaDoc linkedQueueClassname = LinkedQueue.class.getName();
144     config.addIncludePattern(linkedQueueClassname);
145
146     String JavaDoc linkedNodeClassname = LinkedNode.class.getName();
147     config.addIncludePattern(linkedNodeClassname);
148     //
149
// String syncIntClassname = SynchronizedInt.class.getName();
150
// config.addIncludeClass(syncIntClassname);
151
//
152
// String syncVarClassname = SynchronizedVariable.class.getName();
153
// config.addIncludeClass(syncVarClassname);
154

155     String JavaDoc testObjectClassname = TestObject.class.getName();
156     config.addIncludePattern(testObjectClassname);
157
158     String JavaDoc workerClassname = Worker.class.getName();
159     config.addIncludePattern(workerClassname);
160
161     // Create Roots
162
spec.addRoot("workQueue", testClassName + ".workQueue");
163     spec.addRoot("resultSet", testClassName + ".resultSet");
164     spec.addRoot("complete", testClassName + ".complete");
165     spec.addRoot("nodes", testClassName + ".nodes");
166
167     String JavaDoc workerFactoryClassname = WorkerFactory.class.getName();
168     config.addIncludePattern(workerFactoryClassname);
169     TransparencyClassSpec workerFactorySpec = config.getOrCreateSpec(workerFactoryClassname);
170     workerFactorySpec.addRoot("globalWorkerCount", workerFactoryClassname + ".globalWorkerCount");
171
172     // Create locks
173
String JavaDoc verifyExpression = "* " + testClassName + ".verify(..)";
174     addWriteAutolock(config, isSynchronousWrite, verifyExpression);
175
176     String JavaDoc runExpression = "* " + testClassName + ".run(..)";
177     addWriteAutolock(config, isSynchronousWrite, runExpression);
178
179     String JavaDoc populateWorkQueueExpression = "* " + testClassName + ".populateWorkQueue(..)";
180     addWriteAutolock(config, isSynchronousWrite, populateWorkQueueExpression);
181
182     String JavaDoc putExpression = "* " + testClassName + ".put(..)";
183     addWriteAutolock(config, isSynchronousWrite, putExpression);
184
185     String JavaDoc takeExpression = "* " + testClassName + ".take(..)";
186     addWriteAutolock(config, isSynchronousWrite, takeExpression);
187
188     // TestObject config
189
String JavaDoc incrementExpression = "* " + testObjectClassname + ".increment(..)";
190     addWriteAutolock(config, isSynchronousWrite, incrementExpression);
191
192     String JavaDoc populateExpression = "* " + testObjectClassname + ".populate(..)";
193     addWriteAutolock(config, isSynchronousWrite, populateExpression);
194
195     String JavaDoc validateExpression = "* " + testObjectClassname + ".validate(..)";
196     config.addReadAutolock(validateExpression);
197
198     // Worker factory config
199
String JavaDoc workerFactoryExpression = "* " + workerFactoryClassname + ".*(..)";
200     addWriteAutolock(config, isSynchronousWrite, workerFactoryExpression);
201
202     // Worker config
203
String JavaDoc workerRunExpression = "* " + workerClassname + ".run(..)";
204     addWriteAutolock(config, isSynchronousWrite, workerRunExpression);
205
206     new SynchronizedIntSpec().visit(visitor, config);
207
208     // IDProvider config
209
String JavaDoc nextIDExpression = "* " + idProviderClassname + ".nextID(..)";
210     addWriteAutolock(config, isSynchronousWrite, nextIDExpression);
211
212     DebugUtil.DEBUG = false;
213   }
214
215   private static void addWriteAutolock(DSOClientConfigHelper config, boolean isSynchronousWrite, String JavaDoc methodPattern) {
216     if (isSynchronousWrite) {
217       config.addSynchronousWriteAutolock(methodPattern);
218       debugPrintln("***** doing a synchronous write");
219     } else {
220       config.addWriteAutolock(methodPattern);
221     }
222   }
223
224   private static void debugPrintln(String JavaDoc s) {
225     if (DebugUtil.DEBUG) {
226       System.err.println(s);
227     }
228   }
229
230   public static final class WorkerFactory {
231     private int localWorkerCount = 0;
232     private final SynchronizedInt globalWorkerCount; // = new SynchronizedInt(0);
233
private final List JavaDoc workQueue;
234     private final Collection JavaDoc results;
235     private final Collection JavaDoc localWorkers = new HashSet JavaDoc();
236     private final SynchronizedInt complete;
237     private final String JavaDoc appId;
238
239     public WorkerFactory(String JavaDoc appId, List JavaDoc workQueue, Collection JavaDoc results, SynchronizedInt complete) {
240       this.appId = appId;
241       this.workQueue = workQueue;
242       this.results = results;
243       this.complete = complete;
244       this.globalWorkerCount = new SynchronizedInt(0);
245     }
246
247     public Worker newWorker() {
248       localWorkerCount++;
249       int globalWorkerID = globalWorkerCount.increment();
250       // System.err.println("Worker: " + globalWorkerID);
251
Worker rv = new Worker("(" + appId + ") : Worker " + globalWorkerID + "," + localWorkerCount, this.workQueue,
252                              this.results, this.complete);
253       this.localWorkers.add(rv);
254       return rv;
255     }
256
257     public int getGlobalWorkerCount() {
258       return globalWorkerCount.get();
259     }
260   }
261
262   protected static final class Worker implements Runnable JavaDoc {
263
264     private final String JavaDoc name;
265     private final List JavaDoc workQueue;
266     private final Collection JavaDoc results;
267     private final SynchronizedInt workCompletedCount = new SynchronizedInt(0);
268     private final SynchronizedInt objectChangeCount = new SynchronizedInt(0);
269
270     public Worker(String JavaDoc name, List JavaDoc workQueue, Collection JavaDoc results, SynchronizedInt complete) {
271       this.name = name;
272       this.workQueue = workQueue;
273       this.results = results;
274     }
275
276     public void run() {
277       Thread.currentThread().setName(name);
278       try {
279         while (true) {
280           TestObject to;
281           Object JavaDoc o = take(workQueue);
282           if (o instanceof TestObject) {
283             to = (TestObject) o;
284             System.err.println(name + " : Got : " + to);
285           } else if ("STOP".equals(o)) {
286             return;
287           } else {
288             throw new RuntimeException JavaDoc("Unexpected task: " + o);
289           }
290           objectChangeCount.add(to.increment());
291           synchronized (results) {
292             results.add(to);
293             results.notifyAll();
294           }
295           workCompletedCount.increment();
296         }
297       } catch (Exception JavaDoc e) {
298         throw new TCRuntimeException(e);
299       }
300     }
301
302     public int getWorkCompletedCount() {
303       return this.workCompletedCount.get();
304     }
305
306     public int getObjectChangeCount() {
307       return this.objectChangeCount.get();
308     }
309   }
310
311   public static final class Barriers {
312     private final Map JavaDoc barriers;
313     private final int nodeCount;
314
315     public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config) {
316       String JavaDoc classname = Barriers.class.getName();
317       TransparencyClassSpec spec = config.getOrCreateSpec(classname);
318       spec.addRoot("barriers", classname + ".barriers");
319       String JavaDoc barriersExpression = "* " + classname + ".*(..)";
320       config.addWriteAutolock(barriersExpression);
321
322       String JavaDoc cyclicBarrierClassname = CyclicBarrier.class.getName();
323       config.addIncludePattern(cyclicBarrierClassname);
324
325       // CyclicBarrier config
326
String JavaDoc cyclicBarrierExpression = "* " + cyclicBarrierClassname + ".*(..)";
327       config.addWriteAutolock(cyclicBarrierExpression);
328     }
329
330     public Barriers(int nodeCount) {
331       this.barriers = new HashMap JavaDoc();
332       this.nodeCount = nodeCount;
333     }
334
335     public int barrier(int barrierID) {
336       try {
337         return getOrCreateBarrier(barrierID).barrier();
338       } catch (InterruptedException JavaDoc e) {
339         throw new TCRuntimeException(e);
340       }
341     }
342
343     private CyclicBarrier getOrCreateBarrier(int barrierID) {
344       synchronized (barriers) {
345         Integer JavaDoc key = new Integer JavaDoc(barrierID);
346         CyclicBarrier rv = (CyclicBarrier) barriers.get(key);
347         if (rv == null) {
348           rv = new CyclicBarrier(this.nodeCount);
349           this.barriers.put(key, rv);
350         }
351         return rv;
352       }
353     }
354
355   }
356
357   protected static final class TestObject {
358     private TestObject child;
359     private int counter;
360     private List JavaDoc activity = new ArrayList JavaDoc();
361     private String JavaDoc id;
362
363     public TestObject(String JavaDoc id) {
364       this.id = id;
365     }
366
367     private synchronized void addActivity(Object JavaDoc msg) {
368       activity.add(msg + "\n");
369     }
370
371     public void populate(int count) {
372       TestObject to = this;
373       for (int i = 0; i < count; i++) {
374         synchronized (to) {
375           addActivity(this + ": Populated : (i,count) = (" + i + "," + count + ") @ " + new Date JavaDoc() + " by thread "
376                       + Thread.currentThread().getName());
377           to.child = new TestObject(id + "," + i);
378         }
379         to = to.child;
380       }
381     }
382
383     public int increment() {
384       TestObject to = this;
385       int currentValue = Integer.MIN_VALUE;
386       int changeCounter = 0;
387       do {
388         synchronized (to) {
389           // XXX: This synchronization is here to provide transaction boundaries, not because other threads will be
390
// fussing with this object.
391
if (currentValue == Integer.MIN_VALUE) {
392             currentValue = to.counter;
393           }
394           if (currentValue != to.counter) { throw new RuntimeException JavaDoc("Expected current value=" + currentValue
395                                                                        + ", actual current value=" + to.counter); }
396           to.addActivity(this + ": increment <inside loop> : old value=" + to.counter + ", thread="
397                          + Thread.currentThread().getName() + " - " + to.counter + " @ " + new Date JavaDoc());
398           to.counter++;
399           changeCounter++;
400         }
401       } while ((to = to.getChild()) != null);
402       return changeCounter;
403     }
404
405     public boolean validate(int expectedValue) {
406       TestObject to = this;
407       do {
408         // XXX: This synchronization is here to provide transaction boundaries, not because other threads will be
409
// fussing with this object.
410
synchronized (to) {
411           if (to.counter != expectedValue) {
412             System.err.println("Expected " + expectedValue + " but found: " + to.counter + " on Test Object : " + to);
413             System.err.println(" To Activities = " + to.activity);
414             System.err.println(" This Activities = " + activity);
415             return false;
416           }
417         }
418       } while ((to = to.getChild()) != null);
419       return true;
420     }
421
422     private synchronized TestObject getChild() {
423       return child;
424     }
425
426     public String JavaDoc toString() {
427       return "TestObject@" + System.identityHashCode(this) + "(" + id + ")={ counter = " + counter + " }";
428     }
429   }
430
431   protected static final class IDProvider {
432     private int current;
433
434     public synchronized Integer JavaDoc nextID() {
435       int rv = current++;
436       // System.err.println("Issuing new id: " + rv);
437
return new Integer JavaDoc(rv);
438     }
439
440     public synchronized Integer JavaDoc getCurrentID() {
441       return new Integer JavaDoc(current);
442     }
443   }
444
445   private static Object JavaDoc take(List JavaDoc workQueue2) {
446     synchronized (workQueue2) {
447       while (workQueue2.size() == 0) {
448         try {
449           System.err.println(Thread.currentThread().getName() + " : Going to sleep : Size = " + workQueue2.size());
450           workQueue2.wait();
451           System.err.println(Thread.currentThread().getName() + " : Waking from sleep : Size = " + workQueue2.size());
452         } catch (InterruptedException JavaDoc e) {
453           throw new RuntimeException JavaDoc(e);
454         }
455       }
456       return workQueue2.remove(0);
457     }
458   }
459
460   private static void put(List JavaDoc workQueue2, Object JavaDoc o) {
461     synchronized (workQueue2) {
462       workQueue2.add(o);
463       workQueue2.notify();
464       System.err.println(Thread.currentThread().getName() + " : notifying All : Size = " + workQueue2.size());
465     }
466   }
467 }
468
Popular Tags