KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tctest > FutureTaskTestApp


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tctest;
5
6 import com.tc.exception.TCRuntimeException;
7 import com.tc.object.config.ConfigVisitor;
8 import com.tc.object.config.DSOClientConfigHelper;
9 import com.tc.object.config.TransparencyClassSpec;
10 import com.tc.simulator.app.ApplicationConfig;
11 import com.tc.simulator.listener.ListenerProvider;
12 import com.tc.util.Assert;
13 import com.tctest.runner.AbstractTransparentApp;
14
15 import java.util.ArrayList JavaDoc;
16 import java.util.List JavaDoc;
17 import java.util.concurrent.Callable JavaDoc;
18 import java.util.concurrent.CancellationException JavaDoc;
19 import java.util.concurrent.CyclicBarrier JavaDoc;
20 import java.util.concurrent.ExecutionException JavaDoc;
21 import java.util.concurrent.ExecutorService JavaDoc;
22 import java.util.concurrent.FutureTask JavaDoc;
23 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
24 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
25 import java.util.concurrent.TimeUnit JavaDoc;
26 import java.util.concurrent.TimeoutException JavaDoc;
27
28 public class FutureTaskTestApp extends AbstractTransparentApp {
29
30   private final CyclicBarrier JavaDoc barrier;
31   private final CyclicBarrier JavaDoc barrier2 = new CyclicBarrier JavaDoc(2);
32
33   private final int NUM_OF_ITEMS = 500;
34   private final DataRoot root = new DataRoot();
35   private final LinkedBlockingQueue JavaDoc workerQueue = new LinkedBlockingQueue JavaDoc();
36   private final LinkedBlockingQueue JavaDoc resultQueue = new LinkedBlockingQueue JavaDoc();
37
38   public FutureTaskTestApp(String JavaDoc appId, ApplicationConfig cfg, ListenerProvider listenerProvider) {
39     super(appId, cfg, listenerProvider);
40     barrier = new CyclicBarrier JavaDoc(getParticipantCount());
41   }
42
43   public void run() {
44     try {
45       int index = barrier.await();
46
47       testSharedDuringRunning(index);
48       testWithCallable(index);
49       testWithRunnable(index);
50       testWithMyFutureTask(index);
51       testWithLinkedBlockingQueue(index);
52       testWithExecutorService(index);
53
54     } catch (Throwable JavaDoc t) {
55       notifyError(t);
56     }
57   }
58
59   private void testSharedDuringRunning(int index) throws Exception JavaDoc {
60     if (index == 0) {
61       FutureTask JavaDoc task = new MyFutureTask(new MyCallable());
62       Thread JavaDoc thread1 = new Thread JavaDoc(new GetUnSharedRunnable(task));
63       thread1.start();
64       Thread.sleep(1000);
65       root.setTask(task);
66       task.run();
67       barrier2.await();
68     }
69
70     barrier.await();
71   }
72
73   private void testWithExecutorService(int index) throws Exception JavaDoc {
74     long startTime = System.currentTimeMillis();
75     if (index == 0) {
76       List JavaDoc list = new ArrayList JavaDoc();
77       for (int i = 0; i < NUM_OF_ITEMS; i++) {
78         Callable JavaDoc callable = new MyCallable(i);
79         list.add(callable);
80       }
81       root.setList(list);
82     }
83
84     barrier.await();
85
86     if (index == 1) {
87       ExecutorService JavaDoc service = new ThreadPoolExecutor JavaDoc(10, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue JavaDoc());
88       List JavaDoc futures = service.invokeAll(root.getList());
89       root.setTasksList(futures);
90     } else {
91       List JavaDoc tasksList = root.getTasksList();
92       while (tasksList == null) {
93         tasksList = root.getTasksList();
94       }
95       for (int i = 0; i < NUM_OF_ITEMS; i++) {
96         System.err.println("Getting Task " + i);
97         Assert.assertEquals(root, ((FutureTask JavaDoc) tasksList.get(i)).get());
98       }
99     }
100
101     long endTime = System.currentTimeMillis();
102     System.err.println("Elapsed time in ExecutorService: " + (endTime - startTime));
103
104     barrier.await();
105   }
106
107   private void testWithLinkedBlockingQueue(int index) throws Exception JavaDoc {
108     long startTime = System.currentTimeMillis();
109     if (index == 0) {
110       for (int i = 0; i < NUM_OF_ITEMS; i++) {
111         System.err.println("Putting task " + i);
112         FutureTask JavaDoc task = new MyFutureTask(new MyCallable(i));
113         workerQueue.put(task);
114       }
115       workerQueue.put("STOP");
116       workerQueue.put("STOP");
117     } else {
118       while (true) {
119         Object JavaDoc o = workerQueue.take();
120         if ("STOP".equals(o)) {
121           break;
122         } else {
123           FutureTask JavaDoc task = (FutureTask JavaDoc) o;
124           task.run();
125           resultQueue.put(task);
126         }
127       }
128     }
129     if (index == 0) {
130       for (int i = 0; i < NUM_OF_ITEMS; i++) {
131         FutureTask JavaDoc task = (FutureTask JavaDoc) resultQueue.take();
132         
133         Assert.assertEquals(root, task.get());
134       }
135     }
136
137     long endTime = System.currentTimeMillis();
138     System.err.println("Elapsed time in LinkedBlockingQueue: " + (endTime - startTime));
139
140     barrier.await();
141   }
142
143   private void testWithMyFutureTask(int index) throws Exception JavaDoc {
144     FutureTask JavaDoc task = new MyFutureTask(new MyCallable());
145     basicRunTask(index, task);
146
147     task = new MyFutureTask(new MyLongCallable());
148     basicCancelTask(index, task);
149
150     task = new MyFutureTask(new MySemiLongCallable());
151     basicCancelTaskWithCompletion(index, task);
152
153     task = new MyFutureTask(new MyLongCallable());
154     timeoutGetTask(index, task);
155
156     task = new MyFutureTask(new MyCallable());
157     basicSet(index, (MyFutureTask) task);
158
159     task = new MyFutureTask(new MyCallable());
160     basicSetException(index, (MyFutureTask) task);
161
162     task = new MyFutureTask(new MyCallable());
163     basicRunAndResetException(index, (MyFutureTask) task);
164   }
165
166   private void testWithRunnable(int index) throws Exception JavaDoc {
167     FutureTask JavaDoc task = new FutureTask JavaDoc(new MyRunnable(), root);
168     basicRunTask(index, task);
169
170     task = new FutureTask JavaDoc(new MyLongRunnable(), root);
171     basicCancelTask(index, task);
172
173     task = new FutureTask JavaDoc(new MySemiLongRunnable(), root);
174     basicCancelTaskWithCompletion(index, task);
175
176     task = new FutureTask JavaDoc(new MyLongRunnable(), root);
177     basicCancelTask(index, task);
178
179     task = new FutureTask JavaDoc(new MyLongRunnable(), root);
180     timeoutGetTask(index, task);
181   }
182
183   private void testWithCallable(int index) throws Exception JavaDoc {
184     FutureTask JavaDoc task = new FutureTask JavaDoc(new MyCallable());
185     basicRunTask(index, task);
186
187     task = new FutureTask JavaDoc(new MyLongCallable());
188     basicCancelTask(index, task);
189
190     task = new FutureTask JavaDoc(new MySemiLongCallable());
191     basicCancelTaskWithCompletion(index, task);
192
193     task = new FutureTask JavaDoc(new MyLongCallable());
194     timeoutGetTask(index, task);
195
196     task = new FutureTask JavaDoc(new MyThrowable());
197     basicRunTaskWithException(index, task);
198   }
199
200   private void basicSet(int index, MyFutureTask task) throws Exception JavaDoc {
201     if (index == 0) {
202       root.setTask(task);
203     }
204
205     barrier.await();
206
207     if (index == 1) {
208       ((MyFutureTask) root.getTask()).set(root);
209     }
210
211     Object JavaDoc o = root.getTask().get();
212     while (o == null) {
213       o = root.getTask().get();
214     }
215     Assert.assertEquals(root, o);
216
217     Assert.assertTrue(root.getTask().isDone());
218
219     barrier.await();
220   }
221
222   private void basicRunAndResetException(int index, MyFutureTask task) throws Exception JavaDoc {
223     if (index == 0) {
224       root.setTask(task);
225     }
226
227     barrier.await();
228
229     if (index == 1) {
230       boolean flag = ((MyFutureTask) root.getTask()).runAndReset();
231       Assert.assertTrue(flag);
232     }
233
234     barrier.await();
235
236     Assert.assertFalse(root.getTask().isDone());
237
238     barrier.await();
239   }
240
241   private void basicSetException(int index, MyFutureTask task) throws Exception JavaDoc {
242     final String JavaDoc exceptionMsg = "Test setting InterruptedException";
243     if (index == 0) {
244       root.setTask(task);
245     }
246
247     barrier.await();
248
249     if (index == 1) {
250       ((MyFutureTask) root.getTask()).setException(new InterruptedException JavaDoc(exceptionMsg));
251     }
252
253     barrier.await();
254
255     try {
256       root.getTask().get();
257       throw new AssertionError JavaDoc("Should have thrown an ExecutionException.");
258     } catch (ExecutionException JavaDoc e) {
259       Assert.assertEquals(exceptionMsg, e.getCause().getMessage());
260     }
261
262     barrier.await();
263   }
264
265   private void timeoutGetTask(int index, FutureTask JavaDoc longTask) throws Exception JavaDoc {
266     if (index == 0) {
267       root.setTask(longTask);
268     }
269
270     barrier.await();
271
272     if (index == 1) {
273       root.getTask().run();
274     } else if (index == 0) {
275       try {
276         root.getTask().get(10000, TimeUnit.MILLISECONDS);
277         throw new AssertionError JavaDoc("Should have thrown a TimeoutException.");
278       } catch (TimeoutException JavaDoc e) {
279         root.getTask().cancel(true);
280       }
281     }
282
283     barrier.await();
284
285     Assert.assertTrue(root.getTask().isCancelled());
286
287     Assert.assertTrue(root.getTask().isDone());
288
289     barrier.await();
290     
291   }
292
293   private void basicCancelTask(int index, FutureTask JavaDoc longTask) throws Exception JavaDoc {
294     if (index == 0) {
295       root.setTask(longTask);
296     }
297
298     barrier.await();
299
300     if (index == 1) {
301       root.getTask().run();
302     } else if (index == 0) {
303       root.getTask().cancel(true);
304     }
305
306     barrier.await();
307
308     Assert.assertTrue(root.getTask().isCancelled());
309
310     Assert.assertTrue(root.getTask().isDone());
311
312     try {
313       root.getTask().get();
314       throw new AssertionError JavaDoc("Could have thrown a CancellationException.");
315     } catch (CancellationException JavaDoc e) {
316       // Expected
317
}
318
319     barrier.await();
320     
321   }
322
323   private void basicCancelTaskWithCompletion(int index, FutureTask JavaDoc longTask) throws Exception JavaDoc {
324     if (index == 0) {
325       root.setTask(longTask);
326     }
327
328     barrier.await();
329
330     if (index == 1) {
331       root.getTask().run();
332     } else if (index == 0) {
333       root.getTask().cancel(false);
334     }
335
336     barrier.await();
337
338     Assert.assertTrue(root.getTask().isCancelled());
339
340     Assert.assertTrue(root.getTask().isDone());
341
342     try {
343       root.getTask().get();
344       throw new AssertionError JavaDoc("Could have thrown a CancellationException.");
345     } catch (CancellationException JavaDoc e) {
346       // Expected
347
}
348
349     barrier.await();
350
351   }
352
353   private void basicRunTaskWithException(int index, FutureTask JavaDoc task) throws Exception JavaDoc {
354     if (index == 0) {
355       root.setTask(task);
356     }
357
358     barrier.await();
359
360     if (index == 1) {
361       root.getTask().run();
362     }
363
364     try {
365       root.getTask().get();
366       throw new AssertionError JavaDoc("Should have thrown an ExecutionException");
367     } catch (ExecutionException JavaDoc e) {
368       Assert.assertTrue(e.getCause() instanceof InterruptedException JavaDoc);
369       Assert.assertEquals(MyThrowable.EXCEPTION_MSG, e.getCause().getMessage());
370     }
371
372     barrier.await();
373   }
374
375   private void basicRunTask(int index, FutureTask JavaDoc task) throws Exception JavaDoc {
376     if (index == 0) {
377       root.setTask(task);
378     }
379
380     barrier.await();
381
382     if (index == 1) {
383
384       root.getTask().run();
385     }
386
387     Assert.assertEquals(root, root.getTask().get());
388
389     Assert.assertTrue(root.getTask().isDone());
390
391     barrier.await();
392   }
393
394   public static void visitL1DSOConfig(ConfigVisitor visitor, DSOClientConfigHelper config) {
395     String JavaDoc testClass = FutureTaskTestApp.class.getName();
396     TransparencyClassSpec spec = config.getOrCreateSpec(testClass);
397
398     config.addIncludePattern(testClass + "$*");
399
400     String JavaDoc methodExpression = "* " + testClass + "*.*(..)";
401     config.addWriteAutolock(methodExpression);
402
403     spec.addRoot("barrier", "barrier");
404     spec.addRoot("barrier2", "barrier2");
405     spec.addRoot("root", "root");
406     spec.addRoot("workerQueue", "workerQueue");
407     spec.addRoot("resultQueue", "resultQueue");
408   }
409
410   private static class DataRoot {
411     private FutureTask JavaDoc task;
412     private List JavaDoc list;
413     private List JavaDoc tasksList;
414
415     public DataRoot() {
416       super();
417     }
418
419     public synchronized void setTask(FutureTask JavaDoc task) {
420       this.task = task;
421     }
422
423     public synchronized FutureTask JavaDoc getTask() {
424       return this.task;
425     }
426
427     public List JavaDoc getList() {
428       return list;
429     }
430
431     public synchronized void setList(List JavaDoc list) {
432       this.list = list;
433     }
434
435     public synchronized List JavaDoc getTasksList() {
436       return tasksList;
437     }
438
439     public synchronized void setTasksList(List JavaDoc tasksList) {
440       this.tasksList = tasksList;
441     }
442   }
443
444   private class MyLongCallable implements Callable JavaDoc {
445     public Object JavaDoc call() throws Exception JavaDoc {
446       while (true) {
447         if (Thread.interrupted()) {
448           throw new InterruptedException JavaDoc();
449         }
450         Thread.sleep(10000);
451       }
452     }
453   }
454
455   private class MySemiLongCallable implements Callable JavaDoc {
456     public Object JavaDoc call() throws Exception JavaDoc {
457       Thread.sleep(30000);
458       return root;
459     }
460   }
461
462   private class MyCallable implements Callable JavaDoc {
463     private Integer JavaDoc value;
464
465     public MyCallable(int i) {
466       this.value = new Integer JavaDoc(i);
467     }
468
469     public MyCallable() {
470       super();
471     }
472
473     public Object JavaDoc call() throws Exception JavaDoc {
474       if (value != null) {
475         System.err.println("Running call() in MyCallable: " + value);
476       }
477       return root;
478     }
479   }
480
481   private class MyThrowable implements Callable JavaDoc {
482     public static final String JavaDoc EXCEPTION_MSG = "Test InterruptException";
483
484     public Object JavaDoc call() throws Exception JavaDoc {
485       throw new InterruptedException JavaDoc(EXCEPTION_MSG);
486     }
487   }
488
489   private class MyLongRunnable implements Runnable JavaDoc {
490     public void run() {
491       try {
492         while (true) {
493           if (Thread.interrupted()) {
494             throw new InterruptedException JavaDoc();
495           }
496           Thread.sleep(10000);
497         }
498       } catch (Exception JavaDoc e) {
499         throw new TCRuntimeException(e);
500       }
501     }
502   }
503
504   private class MySemiLongRunnable implements Runnable JavaDoc {
505     public void run() {
506       try {
507         Thread.sleep(30000);
508       } catch (Exception JavaDoc e) {
509         throw new TCRuntimeException(e);
510       }
511     }
512   }
513
514   private class MyRunnable implements Runnable JavaDoc {
515     public void run() {
516       // do nothing
517
}
518   }
519
520   private class MyFutureTask extends FutureTask JavaDoc {
521     public MyFutureTask(Callable JavaDoc callable) {
522       super(callable);
523     }
524
525     public MyFutureTask(Runnable JavaDoc runnable, Object JavaDoc result) {
526       super(runnable, result);
527     }
528
529     public synchronized void set(Object JavaDoc v) {
530       super.set(v);
531     }
532
533     public synchronized void setException(Throwable JavaDoc t) {
534       super.setException(t);
535     }
536
537     public boolean runAndReset() {
538       return super.runAndReset();
539     }
540   }
541
542   private class GetUnSharedRunnable implements Runnable JavaDoc {
543     private FutureTask JavaDoc task;
544
545     public GetUnSharedRunnable(FutureTask JavaDoc task) {
546       this.task = task;
547     }
548
549     public void run() {
550       try {
551         Assert.assertEquals(root, task.get());
552         barrier2.await();
553       } catch (Exception JavaDoc e) {
554         throw new TCRuntimeException(e);
555       }
556     }
557   }
558
559 }
560
Popular Tags