KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > logicalcobwebs > concurrent > FJTaskRunnerGroup


1 /*
2   File: FJTaskRunnerGroup.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   7Jan1999 dl First public release
12   12Jan1999 dl made getActiveCount public; misc minor cleanup.
13   14Jan1999 dl Added executeTask
14   20Jan1999 dl Allow use of priorities; reformat stats
15   6Feb1999 dl Lazy thread starts
16   27Apr1999 dl Renamed
17 */

18
19 package org.logicalcobwebs.concurrent;
20
21 /**
22  * A stripped down analog of a ThreadGroup used for
23  * establishing and managing FJTaskRunner threads.
24  * ThreadRunnerGroups serve as the control boundary separating
25  * the general world of normal threads from the specialized world
26  * of FJTasks.
27  * <p>
28  * By intent, this class does not subclass java.lang.ThreadGroup, and
29  * does not support most methods found in ThreadGroups, since they
30  * would make no sense for FJTaskRunner threads. In fact, the class
31  * does not deal with ThreadGroups at all. If you want to restrict
32  * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
33  * it from within that ThreadGroup.
34  * <p>
35  * The main contextual parameter for a FJTaskRunnerGroup is
36  * the group size, established in the constructor.
37  * Groups must be of a fixed size.
38  * There is no way to dynamically increase or decrease the number
39  * of threads in an existing group.
40  * <p>
41  * In general, the group size should be equal to the number
42  * of CPUs on the system. (Unfortunately, there is no portable
43  * means of automatically detecting the number of CPUs on a JVM, so there is
44  * no good way to automate defaults.) In principle, when
45  * FJTasks are used for computation-intensive tasks, having only
46  * as many threads as CPUs should minimize bookkeeping overhead
47  * and contention, and so maximize throughput. However, because
48  * FJTaskRunners lie atop Java threads, and in turn operating system
49  * thread support and scheduling policies,
50  * it is very possible that using more threads
51  * than CPUs will improve overall throughput even though it adds
52  * to overhead. This will always be so if FJTasks are I/O bound.
53  * So it may pay to experiment a bit when tuning on particular platforms.
54  * You can also use <code>setRunPriorities</code> to either
55  * increase or decrease the priorities of active threads, which
56  * may interact with group size choice.
57  * <p>
58  * In any case, overestimating group sizes never
59  * seriously degrades performance (at least within reasonable bounds).
60  * You can also use a value
61  * less than the number of CPUs in order to reserve processing
62  * for unrelated threads.
63  * <p>
64  * There are two general styles for using a FJTaskRunnerGroup.
65  * You can create one group per entire program execution, for example
66  * as a static singleton, and use it for all parallel tasks:
67  * <pre>
68  * class Tasks {
69  * static FJTaskRunnerGroup group;
70  * public void initialize(int groupsize) {
71  * group = new FJTaskRunnerGroup(groupSize);
72  * }
73  * // ...
74  * }
75  * </pre>
76  * Alternatively, you can make new groups on the fly and use them only for
77  * particular task sets. This is more flexible,,
78  * and leads to more controllable and deterministic execution patterns,
79  * but it encounters greater overhead on startup. Also, to reclaim
80  * system resources, you should
81  * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
82  * using one-shot groups. Otherwise, because FJTaskRunners set
83  * <code>Thread.isDaemon</code>
84  * status, they will not normally be reclaimed until program termination.
85  * <p>
86  * The main supported methods are <code>execute</code>,
87  * which starts a task processed by FJTaskRunner threads,
88  * and <code>invoke</code>, which starts one and waits for completion.
89  * For example, you might extend the above <code>FJTasks</code>
90  * class to support a task-based computation, say, the
91  * <code>Fib</code> class from the <code>FJTask</code> documentation:
92  * <pre>
93  * class Tasks { // continued
94  * // ...
95  * static int fib(int n) {
96  * try {
97  * Fib f = new Fib(n);
98  * group.invoke(f);
99  * return f.getAnswer();
100  * }
101  * catch (InterruptedException ex) {
102  * throw new Error("Interrupted during computation");
103  * }
104  * }
105  * }
106  * </pre>
107  * <p>
108  * Method <code>stats()</code> can be used to monitor performance.
109  * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
110  * the compile-time constant COLLECT_STATS set to false. In this
111  * case, various simple counts reported in stats() are not collected.
112  * On platforms tested,
113  * this leads to such a tiny performance improvement that there is
114  * very little motivation to bother.
115  *
116  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
117  * <p>
118  * @see FJTask
119  * @see FJTaskRunner
120  **/

121
122 public class FJTaskRunnerGroup implements Executor {
123
124     /** The threads in this group **/
125     protected final FJTaskRunner[] threads;
126
127     /** Group-wide queue for tasks entered via execute() **/
128     protected final LinkedQueue entryQueue = new LinkedQueue();
129
130     /** Number of threads that are not waiting for work **/
131     protected int activeCount = 0;
132
133     /** Number of threads that have been started. Used to avoid
134      unecessary contention during startup of task sets.
135      **/

136     protected int nstarted = 0;
137
138     /**
139      * Compile-time constant. If true, various counts of
140      * runs, waits, etc., are maintained. These are NOT
141      * updated with synchronization, so statistics reports
142      * might not be accurate.
143      **/

144
145     static final boolean COLLECT_STATS = true;
146     // static final boolean COLLECT_STATS = false;
147

148     // for stats
149

150     /** The time at which this ThreadRunnerGroup was constructed **/
151     long initTime = 0;
152
153     /** Total number of executes or invokes **/
154     int entries = 0;
155
156     static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY + 1;
157
158     /**
159      * Create a FJTaskRunnerGroup with the indicated number
160      * of FJTaskRunner threads. Normally, the best size to use is
161      * the number of CPUs on the system.
162      * <p>
163      * The threads in a FJTaskRunnerGroup are created with their
164      * isDaemon status set, so do not normally need to be
165      * shut down manually upon program termination.
166      **/

167
168     public FJTaskRunnerGroup(int groupSize) {
169         threads = new FJTaskRunner[groupSize];
170         initializeThreads();
171         initTime = System.currentTimeMillis();
172     }
173
174     /**
175      * Arrange for execution of the given task
176      * by placing it in a work queue. If the argument
177      * is not of type FJTask, it is embedded in a FJTask via
178      * <code>FJTask.Wrap</code>.
179      * @exception InterruptedException if current Thread is
180      * currently interrupted
181      **/

182
183     public void execute(Runnable JavaDoc r) throws InterruptedException JavaDoc {
184         if (r instanceof FJTask) {
185             entryQueue.put((FJTask) r);
186         } else {
187             entryQueue.put(new FJTask.Wrap(r));
188         }
189         signalNewTask();
190     }
191
192
193     /**
194      * Specialized form of execute called only from within FJTasks
195      **/

196     public void executeTask(FJTask t) {
197         try {
198             entryQueue.put(t);
199             signalNewTask();
200         } catch (InterruptedException JavaDoc ex) {
201             Thread.currentThread().interrupt();
202         }
203     }
204
205
206     /**
207      * Start a task and wait it out. Returns when the task completes.
208      * @exception InterruptedException if current Thread is
209      * interrupted before completion of the task.
210      **/

211
212     public void invoke(Runnable JavaDoc r) throws InterruptedException JavaDoc {
213         InvokableFJTask w = new InvokableFJTask(r);
214         entryQueue.put(w);
215         signalNewTask();
216         w.awaitTermination();
217     }
218
219
220     /**
221      * Try to shut down all FJTaskRunner threads in this group
222      * by interrupting them all. This method is designed
223      * to be used during cleanup when it is somehow known
224      * that all threads are idle.
225      * FJTaskRunners only
226      * check for interruption when they are not otherwise
227      * processing a task (and its generated subtasks,
228      * if any), so if any threads are active, shutdown may
229      * take a while, and may lead to unpredictable
230      * task processing.
231      **/

232
233     public void interruptAll() {
234         // paranoically interrupt current thread last if in group.
235
Thread JavaDoc current = Thread.currentThread();
236         boolean stopCurrent = false;
237
238         for (int i = 0; i < threads.length; ++i) {
239             Thread JavaDoc t = threads[i];
240             if (t == current)
241                 stopCurrent = true;
242             else
243                 t.interrupt();
244         }
245         if (stopCurrent)
246             current.interrupt();
247     }
248
249
250     /**
251      * Set the priority to use while a FJTaskRunner is
252      * polling for new tasks to perform. Default
253      * is currently Thread.MIN_PRIORITY+1. The value
254      * set may not go into effect immediately, but
255      * will be used at least the next time a thread scans for work.
256      **/

257     public synchronized void setScanPriorities(int pri) {
258         for (int i = 0; i < threads.length; ++i) {
259             FJTaskRunner t = threads[i];
260             t.setScanPriority(pri);
261             if (!t.active) t.setPriority(pri);
262         }
263     }
264
265
266     /**
267      * Set the priority to use while a FJTaskRunner is
268      * actively running tasks. Default
269      * is the priority that was in effect by the thread that
270      * constructed this FJTaskRunnerGroup. Setting this value
271      * while threads are running may momentarily result in
272      * them running at this priority even when idly waiting for work.
273      **/

274     public synchronized void setRunPriorities(int pri) {
275         for (int i = 0; i < threads.length; ++i) {
276             FJTaskRunner t = threads[i];
277             t.setRunPriority(pri);
278             if (t.active) t.setPriority(pri);
279         }
280     }
281
282
283     /** Return the number of FJTaskRunner threads in this group **/
284
285     public int size() {
286         return threads.length;
287     }
288
289
290     /**
291      * Return the number of threads that are not idly waiting for work.
292      * Beware that even active threads might not be doing any useful
293      * work, but just spinning waiting for other dependent tasks.
294      * Also, since this is just a snapshot value, some tasks
295      * may be in the process of becoming idle.
296      **/

297     public synchronized int getActiveCount() {
298         return activeCount;
299     }
300
301     /**
302      * Prints various snapshot statistics to System.out.
303      * <ul>
304      * <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for
305      * <em>n</em> from zero to group size - 1):
306      * <ul>
307      * <li> A star "*" is printed if the thread is currently active;
308      * that is, not sleeping while waiting for work. Because
309      * threads gradually enter sleep modes, an active thread
310      * may in fact be about to sleep (or wake up).
311      * <li> <em>Q Cap</em> The current capacity of its task queue.
312      * <li> <em>Run</em> The total number of tasks that have been run.
313      * <li> <em>New</em> The number of these tasks that were
314      * taken from either the entry queue or from other
315      * thread queues; that is, the number of tasks run
316      * that were <em>not</em> forked by the thread itself.
317      * <li> <em>Scan</em> The number of times other task
318      * queues or the entry queue were polled for tasks.
319      * </ul>
320      * <li> <em>Execute</em> The total number of tasks entered
321      * (but not necessarily yet run) via execute or invoke.
322      * <li> <em>Time</em> Time in seconds since construction of this
323      * FJTaskRunnerGroup.
324      * <li> <em>Rate</em> The total number of tasks processed
325      * per second across all threads. This
326      * may be useful as a simple throughput indicator
327      * if all processed tasks take approximately the
328      * same time to run.
329      * </ul>
330      * <p>
331      * Cautions: Some statistics are updated and gathered
332      * without synchronization,
333      * so may not be accurate. However, reported counts may be considered
334      * as lower bounds of actual values.
335      * Some values may be zero if classes are compiled
336      * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
337      * classes can be independently compiled with different values of
338      * COLLECT_STATS.) Also, the counts are maintained as ints so could
339      * overflow in exceptionally long-lived applications.
340      * <p>
341      * These statistics can be useful when tuning algorithms or diagnosing
342      * problems. For example:
343      * <ul>
344      * <li> High numbers of scans may mean that there is insufficient
345      * parallelism to keep threads busy. However, high scan rates
346      * are expected if the number
347      * of Executes is also high or there is a lot of global
348      * synchronization in the application, and the system is not otherwise
349      * busy. Threads may scan
350      * for work hundreds of times upon startup, shutdown, and
351      * global synch points of task sets.
352      * <li> Large imbalances in tasks run across different threads might
353      * just reflect contention with unrelated threads on a system
354      * (possibly including JVM threads such as GC), but may also
355      * indicate some systematic bias in how you generate tasks.
356      * <li> Large task queue capacities may mean that too many tasks are being
357      * generated before they can be run.
358      * Capacities are reported rather than current numbers of tasks
359      * in queues because they are better indicators of the existence
360      * of these kinds of possibly-transient problems.
361      * Queue capacities are
362      * resized on demand from their initial value of 4096 elements,
363      * which is much more than sufficient for the kinds of
364      * applications that this framework is intended to best support.
365      * </ul>
366      **/

367
368     public void stats() {
369         long time = System.currentTimeMillis() - initTime;
370         double secs = ((double) time) / 1000.0;
371         long totalRuns = 0;
372         long totalScans = 0;
373         long totalSteals = 0;
374
375         System.out.print("Thread" +
376                 "\tQ Cap" +
377                 "\tScans" +
378                 "\tNew" +
379                 "\tRuns" +
380                 "\n");
381
382         for (int i = 0; i < threads.length; ++i) {
383             FJTaskRunner t = threads[i];
384             int truns = t.runs;
385             totalRuns += truns;
386
387             int tscans = t.scans;
388             totalScans += tscans;
389
390             int tsteals = t.steals;
391             totalSteals += tsteals;
392
393             String JavaDoc star = (getActive(t)) ? "*" : " ";
394
395
396             System.out.print("T" + i + star +
397                     "\t" + t.deqSize() +
398                     "\t" + tscans +
399                     "\t" + tsteals +
400                     "\t" + truns +
401                     "\n");
402         }
403
404         System.out.print("Total" +
405                 "\t " +
406                 "\t" + totalScans +
407                 "\t" + totalSteals +
408                 "\t" + totalRuns +
409                 "\n");
410
411         System.out.print("Execute: " + entries);
412
413         System.out.print("\tTime: " + secs);
414
415         long rps = 0;
416         if (secs != 0) rps = Math.round((double) (totalRuns) / secs);
417
418         System.out.println("\tRate: " + rps);
419     }
420
421
422     /* ------------ Methods called only by FJTaskRunners ------------- */
423
424
425     /**
426      * Return the array of threads in this group.
427      * Called only by FJTaskRunner.scan().
428      **/

429
430     protected FJTaskRunner[] getArray() {
431         return threads;
432     }
433
434
435     /**
436      * Return a task from entry queue, or null if empty.
437      * Called only by FJTaskRunner.scan().
438      **/

439
440     protected FJTask pollEntryQueue() {
441         try {
442             FJTask t = (FJTask) (entryQueue.poll(0));
443             return t;
444         } catch (InterruptedException JavaDoc ex) { // ignore interrupts
445
Thread.currentThread().interrupt();
446             return null;
447         }
448     }
449
450
451     /**
452      * Return active status of t.
453      * Per-thread active status can only be accessed and
454      * modified via synchronized method here in the group class.
455      **/

456
457     protected synchronized boolean getActive(FJTaskRunner t) {
458         return t.active;
459     }
460
461
462     /**
463      * Set active status of thread t to true, and notify others
464      * that might be waiting for work.
465      **/

466
467     protected synchronized void setActive(FJTaskRunner t) {
468         if (!t.active) {
469             t.active = true;
470             ++activeCount;
471             if (nstarted < threads.length)
472                 threads[nstarted++].start();
473             else
474                 notifyAll();
475         }
476     }
477
478     /**
479      * Set active status of thread t to false.
480      **/

481
482     protected synchronized void setInactive(FJTaskRunner t) {
483         if (t.active) {
484             t.active = false;
485             --activeCount;
486         }
487     }
488
489     /**
490      * The number of times to scan other threads for tasks
491      * before transitioning to a mode where scans are
492      * interleaved with sleeps (actually timed waits).
493      * Upon transition, sleeps are for duration of
494      * scans / SCANS_PER_SLEEP milliseconds.
495      * <p>
496      * This is not treated as a user-tunable parameter because
497      * good values do not appear to vary much across JVMs or
498      * applications. Its main role is to help avoid some
499      * useless spinning and contention during task startup.
500      **/

501     static final long SCANS_PER_SLEEP = 15;
502
503     /**
504      * The maximum time (in msecs) to sleep when a thread is idle,
505      * yet others are not, so may eventually generate work that
506      * the current thread can steal. This value reflects the maximum time
507      * that a thread may sleep when it possibly should not, because there
508      * are other active threads that might generate work. In practice,
509      * designs in which some threads become stalled because others
510      * are running yet not generating tasks are not likely to work
511      * well in this framework anyway, so the exact value does not matter
512      * too much. However, keeping it in the sub-second range does
513      * help smooth out startup and shutdown effects.
514      **/

515
516     static final long MAX_SLEEP_TIME = 100;
517
518     /**
519      * Set active status of thread t to false, and
520      * then wait until: (a) there is a task in the entry
521      * queue, or (b) other threads are active, or (c) the current
522      * thread is interrupted. Upon return, it
523      * is not certain that there will be work available.
524      * The thread must itself check.
525      * <p>
526      * The main underlying reason
527      * for these mechanics is that threads do not
528      * signal each other when they add elements to their queues.
529      * (This would add to task overhead, reduce locality.
530      * and increase contention.)
531      * So we must rely on a tamed form of polling. However, tasks
532      * inserted into the entry queue do result in signals, so
533      * tasks can wait on these if all of them are otherwise idle.
534      **/

535
536     protected synchronized void checkActive(FJTaskRunner t, long scans) {
537
538         setInactive(t);
539
540         try {
541             // if nothing available, do a hard wait
542
if (activeCount == 0 && entryQueue.peek() == null) {
543                 wait();
544             } else {
545                 // If there is possibly some work,
546
// sleep for a while before rechecking
547

548                 long msecs = scans / SCANS_PER_SLEEP;
549                 if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
550                 int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
551
wait(msecs, nsecs);
552             }
553         } catch (InterruptedException JavaDoc ex) {
554             notify(); // avoid lost notifies on interrupts
555
Thread.currentThread().interrupt();
556         }
557     }
558
559     /* ------------ Utility methods ------------- */
560
561     /**
562      * Start or wake up any threads waiting for work
563      **/

564
565     protected synchronized void signalNewTask() {
566         if (COLLECT_STATS) ++entries;
567         if (nstarted < threads.length)
568             threads[nstarted++].start();
569         else
570             notify();
571     }
572
573     /**
574      * Create all FJTaskRunner threads in this group.
575      **/

576
577     protected void initializeThreads() {
578         for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
579     }
580
581
582     /**
583      * Wrap wait/notify mechanics around a task so that
584      * invoke() can wait it out
585      **/

586     protected static final class InvokableFJTask extends FJTask {
587         protected final Runnable JavaDoc wrapped;
588         protected boolean terminated = false;
589
590         protected InvokableFJTask(Runnable JavaDoc r) {
591             wrapped = r;
592         }
593
594         public void run() {
595             try {
596                 if (wrapped instanceof FJTask)
597                     FJTask.invoke((FJTask) (wrapped));
598                 else
599                     wrapped.run();
600             } finally {
601                 setTerminated();
602             }
603         }
604
605         protected synchronized void setTerminated() {
606             terminated = true;
607             notifyAll();
608         }
609
610         protected synchronized void awaitTermination() throws InterruptedException JavaDoc {
611             while (!terminated) wait();
612         }
613     }
614
615
616 }
617
618
Popular Tags