KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > logicalcobwebs > concurrent > FJTaskRunner


1 /*
2   File: FJTaskRunner.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   7Jan1999 dl First public release
12   13Jan1999 dl correct a stat counter update;
13                                 ensure inactive status on run termination;
14                                 misc minor cleaup
15   14Jan1999 dl Use random starting point in scan;
16                                 variable renamings.
17   18Jan1999 dl Runloop allowed to die on task exception;
18                                 remove useless timed join
19   22Jan1999 dl Rework scan to allow use of priorities.
20   6Feb1999 dl Documentation updates.
21   7Mar1999 dl Add array-based coInvoke
22   31Mar1999 dl Revise scan to remove need for NullTasks
23   27Apr1999 dl Renamed
24   23oct1999 dl Earlier detect of interrupt in scanWhileIdling
25   24nov1999 dl Now works on JVMs that do not properly
26                                 implement read-after-write of 2 volatiles.
27 */

28
29 package org.logicalcobwebs.concurrent;
30
31 import java.util.Random JavaDoc;
32
33 /**
34  * Specialized Thread subclass for running FJTasks.
35  * <p>
36  * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ).
37  * Double-ended queues support stack-based operations
38  * push and pop, as well as queue-based operations put and take.
39  * Normally, threads run their own tasks. But they
40  * may also steal tasks from each others DEQs.
41  * <p>
42  * The algorithms are minor variants of those used
43  * in <A HREF="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and
44  * <A HREF="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and
45  * to a lesser extent
46  * <A HREF="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>,
47  * but are adapted to work in Java.
48  * <p>
49  * The two most important capabilities are:
50  * <ul>
51  * <li> Fork a FJTask:
52  * <pre>
53  * Push task onto DEQ
54  * </pre>
55  * <li> Get a task to run (for example within taskYield)
56  * <pre>
57  * If DEQ is not empty,
58  * Pop a task and run it.
59  * Else if any other DEQ is not empty,
60  * Take ("steal") a task from it and run it.
61  * Else if the entry queue for our group is not empty,
62  * Take a task from it and run it.
63  * Else if current thread is otherwise idling
64  * If all threads are idling
65  * Wait for a task to be put on group entry queue
66  * Else
67  * Yield or Sleep for a while, and then retry
68  * </pre>
69  * </ul>
70  * The push, pop, and put are designed to only ever called by the
71  * current thread, and take (steal) is only ever called by
72  * other threads.
73  * All other operations are composites and variants of these,
74  * plus a few miscellaneous bookkeeping methods.
75  * <p>
76  * Implementations of the underlying representations and operations
77  * are geared for use on JVMs operating on multiple CPUs (although
78  * they should of course work fine on single CPUs as well).
79  * <p>
80  * A possible snapshot of a FJTaskRunner's DEQ is:
81  * <pre>
82  * 0 1 2 3 4 5 6 ...
83  * +-----+-----+-----+-----+-----+-----+-----+--
84  * | | t | t | t | t | | | ... deq array
85  * +-----+-----+-----+-----+-----+-----+-----+--
86  * ^ ^
87  * base top
88  * (incremented (incremented
89  * on take, on push
90  * decremented decremented
91  * on put) on pop)
92  * </pre>
93  * <p>
94  * FJTasks are held in elements of the DEQ.
95  * They are maintained in a bounded array that
96  * works similarly to a circular bounded buffer. To ensure
97  * visibility of stolen FJTasks across threads, the array elements
98  * must be <code>volatile</code>.
99  * Using volatile rather than synchronizing suffices here since
100  * each task accessed by a thread is either one that it
101  * created or one that has never seen before. Thus we cannot
102  * encounter any staleness problems executing run methods,
103  * although FJTask programmers must be still sure to either synch or use
104  * volatile for shared data within their run methods.
105  * <p>
106  * However, since there is no way
107  * to declare an array of volatiles in Java, the DEQ elements actually
108  * hold VolatileTaskRef objects, each of which in turn holds a
109  * volatile reference to a FJTask.
110  * Even with the double-indirection overhead of
111  * volatile refs, using an array for the DEQ works out
112  * better than linking them since fewer shared
113  * memory locations need to be
114  * touched or modified by the threads while using the DEQ.
115  * Further, the double indirection may alleviate cache-line
116  * sharing effects (which cannot otherwise be directly dealt with in Java).
117  * <p>
118  * The indices for the <code>base</code> and <code>top</code> of the DEQ
119  * are declared as volatile. The main contention point with
120  * multiple FJTaskRunner threads occurs when one thread is trying
121  * to pop its own stack while another is trying to steal from it.
122  * This is handled via a specialization of Dekker's algorithm,
123  * in which the popping thread pre-decrements <code>top</code>,
124  * and then checks it against <code>base</code>.
125  * To be conservative in the face of JVMs that only partially
126  * honor the specification for volatile, the pop proceeds
127  * without synchronization only if there are apparently enough
128  * items for both a simultaneous pop and take to succeed.
129  * It otherwise enters a
130  * synchronized lock to check if the DEQ is actually empty,
131  * if so failing. The stealing thread
132  * does almost the opposite, but is set up to be less likely
133  * to win in cases of contention: Steals always run under synchronized
134  * locks in order to avoid conflicts with other ongoing steals.
135  * They pre-increment <code>base</code>, and then check against
136  * <code>top</code>. They back out (resetting the base index
137  * and failing to steal) if the
138  * DEQ is empty or is about to become empty by an ongoing pop.
139  * <p>
140  * A push operation can normally run concurrently with a steal.
141  * A push enters a synch lock only if the DEQ appears full so must
142  * either be resized or have indices adjusted due to wrap-around
143  * of the bounded DEQ. The put operation always requires synchronization.
144  * <p>
145  * When a FJTaskRunner thread has no tasks of its own to run,
146  * it tries to be a good citizen.
147  * Threads run at lower priority while scanning for work.
148  * <p>
149  * If the task is currently waiting
150  * via yield, the thread alternates scans (starting at a randomly
151  * chosen victim) with Thread.yields. This is
152  * well-behaved so long as the JVM handles Thread.yield in a
153  * sensible fashion. (It need not. Thread.yield is so underspecified
154  * that it is legal for a JVM to treat it as a no-op.) This also
155  * keeps things well-behaved even if we are running on a uniprocessor
156  * JVM using a simple cooperative threading model.
157  * <p>
158  * If a thread needing work is
159  * is otherwise idle (which occurs only in the main runloop), and
160  * there are no available tasks to steal or poll, it
161  * instead enters into a sleep-based (actually timed wait(msec))
162  * phase in which it progressively sleeps for longer durations
163  * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME,
164  * currently 100ms) between scans.
165  * If all threads in the group
166  * are idling, they further progress to a hard wait phase, suspending
167  * until a new task is entered into the FJTaskRunnerGroup entry queue.
168  * A sleeping FJTaskRunner thread may be awakened by a new
169  * task being put into the group entry queue or by another FJTaskRunner
170  * becoming active, but not merely by some DEQ becoming non-empty.
171  * Thus the MAX_SLEEP_TIME provides a bound for sleep durations
172  * in cases where all but one worker thread start sleeping
173  * even though there will eventually be work produced
174  * by a thread that is taking a long time to place tasks in DEQ.
175  * These sleep mechanics are handled in the FJTaskRunnerGroup class.
176  * <p>
177  * Composite operations such as taskJoin include heavy
178  * manual inlining of the most time-critical operations
179  * (mainly FJTask.invoke).
180  * This opens up a few opportunities for further hand-optimizations.
181  * Until Java compilers get a lot smarter, these tweaks
182  * improve performance significantly enough for task-intensive
183  * programs to be worth the poorer maintainability and code duplication.
184  * <p>
185  * Because they are so fragile and performance-sensitive, nearly
186  * all methods are declared as final. However, nearly all fields
187  * and methods are also declared as protected, so it is possible,
188  * with much care, to extend functionality in subclasses. (Normally
189  * you would also need to subclass FJTaskRunnerGroup.)
190  * <p>
191  * None of the normal java.lang.Thread class methods should ever be called
192  * on FJTaskRunners. For this reason, it might have been nicer to
193  * declare FJTaskRunner as a Runnable to run within a Thread. However,
194  * this would have complicated many minor logistics. And since
195  * no FJTaskRunner methods should normally be called from outside the
196  * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact
197  * usage.
198  * <p>
199  * You might think that layering this kind of framework on top of
200  * Java threads, which are already several levels removed from raw CPU
201  * scheduling on most systems, would lead to very poor performance.
202  * But on the platforms
203  * tested, the performance is quite good.
204  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
205  * @see FJTask
206  * @see FJTaskRunnerGroup
207  **/

208
209 public class FJTaskRunner extends Thread JavaDoc {
210
211     /** The group of which this FJTaskRunner is a member **/
212     protected final FJTaskRunnerGroup group;
213
214     /**
215      * Constructor called only during FJTaskRunnerGroup initialization
216      **/

217
218     protected FJTaskRunner(FJTaskRunnerGroup g) {
219         group = g;
220         victimRNG = new Random JavaDoc(System.identityHashCode(this));
221         runPriority = getPriority();
222         setDaemon(true);
223     }
224
225     /**
226      * Return the FJTaskRunnerGroup of which this thread is a member
227      **/

228
229     protected final FJTaskRunnerGroup getGroup() {
230         return group;
231     }
232
233
234     /* ------------ DEQ Representation ------------------- */
235
236
237     /**
238      * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY
239      * elements. The DEQ is grown if necessary, but default value is
240      * normally much more than sufficient unless there are
241      * user programming errors or questionable operations generating
242      * large numbers of Tasks without running them.
243      * Capacities must be a power of two.
244      **/

245
246     protected static final int INITIAL_CAPACITY = 4096;
247
248     /**
249      * The maximum supported DEQ capacity.
250      * When exceeded, FJTaskRunner operations throw Errors
251      **/

252
253     protected static final int MAX_CAPACITY = 1 << 30;
254
255     /**
256      * An object holding a single volatile reference to a FJTask.
257      **/

258
259     protected final static class VolatileTaskRef {
260         /** The reference **/
261         protected volatile FJTask ref;
262
263         /** Set the reference **/
264         protected final void put(FJTask r) {
265             ref = r;
266         }
267
268         /** Return the reference **/
269         protected final FJTask get() {
270             return ref;
271         }
272
273         /** Return the reference and clear it **/
274         protected final FJTask take() {
275             FJTask r = ref;
276             ref = null;
277             return r;
278         }
279
280         /**
281          * Initialization utility for constructing arrays.
282          * Make an array of given capacity and fill it with
283          * VolatileTaskRefs.
284          **/

285         protected static VolatileTaskRef[] newArray(int cap) {
286             VolatileTaskRef[] a = new VolatileTaskRef[cap];
287             for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef();
288             return a;
289         }
290
291     }
292
293     /**
294      * The DEQ array.
295      **/

296
297     protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY);
298
299     /** Current size of the task DEQ **/
300     protected int deqSize() {
301         return deq.length;
302     }
303
304     /**
305      * Current top of DEQ. Generally acts just like a stack pointer in an
306      * array-based stack, except that it circularly wraps around the
307      * array, as in an array-based queue. The value is NOT
308      * always kept within <code>0 ... deq.length</code> though.
309      * The current top element is always at <code>top & (deq.length-1)</code>.
310      * To avoid integer overflow, top is reset down
311      * within bounds whenever it is noticed to be out out bounds;
312      * at worst when it is at <code>2 * deq.length</code>.
313      **/

314     protected volatile int top = 0;
315
316
317     /**
318      * Current base of DEQ. Acts like a take-pointer in an
319      * array-based bounded queue. Same bounds and usage as top.
320      **/

321
322     protected volatile int base = 0;
323
324
325     /**
326      * An extra object to synchronize on in order to
327      * achieve a memory barrier.
328      **/

329
330     protected final Object JavaDoc barrier = new Object JavaDoc();
331
332     /* ------------ Other BookKeeping ------------------- */
333
334     /**
335      * Record whether current thread may be processing a task
336      * (i.e., has been started and is not in an idle wait).
337      * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is
338      * stored here for simplicity.
339      **/

340
341     protected boolean active = false;
342
343     /** Random starting point generator for scan() **/
344     protected final Random JavaDoc victimRNG;
345
346
347     /** Priority to use while scanning for work **/
348     protected int scanPriority = FJTaskRunnerGroup.DEFAULT_SCAN_PRIORITY;
349
350     /** Priority to use while running tasks **/
351     protected int runPriority;
352
353     /**
354      * Set the priority to use while scanning.
355      * We do not bother synchronizing access, since
356      * by the time the value is needed, both this FJTaskRunner
357      * and its FJTaskRunnerGroup will
358      * necessarily have performed enough synchronization
359      * to avoid staleness problems of any consequence.
360      **/

361     protected void setScanPriority(int pri) {
362         scanPriority = pri;
363     }
364
365
366     /**
367      * Set the priority to use while running tasks.
368      * Same usage and rationale as setScanPriority.
369      **/

370     protected void setRunPriority(int pri) {
371         runPriority = pri;
372     }
373
374     /**
375      * Compile-time constant for statistics gathering.
376      * Even when set, reported values may not be accurate
377      * since all are read and written without synchronization.
378      **/

379
380
381     static final boolean COLLECT_STATS = true;
382     // static final boolean COLLECT_STATS = false;
383

384
385     // for stat collection
386

387     /** Total number of tasks run **/
388     protected int runs = 0;
389
390     /** Total number of queues scanned for work **/
391     protected int scans = 0;
392
393     /** Total number of tasks obtained via scan **/
394     protected int steals = 0;
395
396
397
398
399     /* ------------ DEQ operations ------------------- */
400
401
402     /**
403      * Push a task onto DEQ.
404      * Called ONLY by current thread.
405      **/

406
407     protected final void push(final FJTask r) {
408         int t = top;
409
410         /*
411           This test catches both overflows and index wraps. It doesn't
412           really matter if base value is in the midst of changing in take.
413           As long as deq length is < 2^30, we are guaranteed to catch wrap in
414           time since base can only be incremented at most length times
415           between pushes (or puts).
416         */

417
418         if (t < (base & (deq.length - 1)) + deq.length) {
419
420             deq[t & (deq.length - 1)].put(r);
421             top = t + 1;
422         } else // isolate slow case to increase chances push is inlined
423
slowPush(r); // check overflow and retry
424
}
425
426
427     /**
428      * Handle slow case for push
429      **/

430
431     protected synchronized void slowPush(final FJTask r) {
432         checkOverflow();
433         push(r); // just recurse -- this one is sure to succeed.
434
}
435
436
437     /**
438      * Enqueue task at base of DEQ.
439      * Called ONLY by current thread.
440      * This method is currently not called from class FJTask. It could be used
441      * as a faster way to do FJTask.start, but most users would
442      * find the semantics too confusing and unpredictable.
443      **/

444
445     protected final synchronized void put(final FJTask r) {
446         for (; ;) {
447             int b = base - 1;
448             if (top < b + deq.length) {
449
450                 int newBase = b & (deq.length - 1);
451                 deq[newBase].put(r);
452                 base = newBase;
453
454                 if (b != newBase) { // Adjust for index underflow
455
int newTop = top & (deq.length - 1);
456                     if (newTop < newBase) newTop += deq.length;
457                     top = newTop;
458                 }
459                 return;
460             } else {
461                 checkOverflow();
462                 // ... and retry
463
}
464         }
465     }
466
467     /**
468      * Return a popped task, or null if DEQ is empty.
469      * Called ONLY by current thread.
470      * <p>
471      * This is not usually called directly but is
472      * instead inlined in callers. This version differs from the
473      * cilk algorithm in that pop does not fully back down and
474      * retry in the case of potential conflict with take. It simply
475      * rechecks under synch lock. This gives a preference
476      * for threads to run their own tasks, which seems to
477      * reduce flailing a bit when there are few tasks to run.
478      **/

479
480     protected final FJTask pop() {
481         /*
482            Decrement top, to force a contending take to back down.
483         */

484
485         int t = --top;
486
487         /*
488           To avoid problems with JVMs that do not properly implement
489           read-after-write of a pair of volatiles, we conservatively
490           grab without lock only if the DEQ appears to have at least two
491           elements, thus guaranteeing that both a pop and take will succeed,
492           even if the pre-increment in take is not seen by current thread.
493           Otherwise we recheck under synch.
494         */

495
496         if (base + 1 < t)
497             return deq[t & (deq.length - 1)].take();
498         else
499             return confirmPop(t);
500
501     }
502
503
504     /**
505      * Check under synch lock if DEQ is really empty when doing pop.
506      * Return task if not empty, else null.
507      **/

508
509     protected final synchronized FJTask confirmPop(int provisionalTop) {
510         if (base <= provisionalTop)
511             return deq[provisionalTop & (deq.length - 1)].take();
512         else { // was empty
513
/*
514               Reset DEQ indices to zero whenever it is empty.
515               This both avoids unnecessary calls to checkOverflow
516               in push, and helps keep the DEQ from accumulating garbage
517             */

518
519             top = base = 0;
520             return null;
521         }
522     }
523
524
525     /**
526      * Take a task from the base of the DEQ.
527      * Always called by other threads via scan()
528      **/

529
530
531     protected final synchronized FJTask take() {
532
533         /*
534           Increment base in order to suppress a contending pop
535         */

536
537         int b = base++;
538
539         if (b < top)
540             return confirmTake(b);
541         else {
542             // back out
543
base = b;
544             return null;
545         }
546     }
547
548
549     /**
550      * double-check a potential take
551      **/

552
553     protected FJTask confirmTake(int oldBase) {
554
555         /*
556           Use a second (guaranteed uncontended) synch
557           to serve as a barrier in case JVM does not
558           properly process read-after-write of 2 volatiles
559         */

560
561         synchronized (barrier) {
562             if (oldBase < top) {
563                 /*
564                   We cannot call deq[oldBase].take here because of possible races when
565                   nulling out versus concurrent push operations. Resulting
566                   accumulated garbage is swept out periodically in
567                   checkOverflow, or more typically, just by keeping indices
568                   zero-based when found to be empty in pop, which keeps active
569                   region small and constantly overwritten.
570                 */

571
572                 return deq[oldBase & (deq.length - 1)].get();
573             } else {
574                 base = oldBase;
575                 return null;
576             }
577         }
578     }
579
580
581     /**
582      * Adjust top and base, and grow DEQ if necessary.
583      * Called only while DEQ synch lock being held.
584      * We don't expect this to be called very often. In most
585      * programs using FJTasks, it is never called.
586      **/

587
588     protected void checkOverflow() {
589         int t = top;
590         int b = base;
591
592         if (t - b < deq.length - 1) { // check if just need an index reset
593

594             int newBase = b & (deq.length - 1);
595             int newTop = top & (deq.length - 1);
596             if (newTop < newBase) newTop += deq.length;
597             top = newTop;
598             base = newBase;
599
600             /*
601                Null out refs to stolen tasks.
602                This is the only time we can safely do it.
603             */

604
605             int i = newBase;
606             while (i != newTop && deq[i].ref != null) {
607                 deq[i].ref = null;
608                 i = (i - 1) & (deq.length - 1);
609             }
610
611         } else { // grow by doubling array
612

613             int newTop = t - b;
614             int oldcap = deq.length;
615             int newcap = oldcap * 2;
616
617             if (newcap >= MAX_CAPACITY)
618                 throw new Error JavaDoc("FJTask queue maximum capacity exceeded");
619
620             VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
621
622             // copy in bottom half of new deq with refs from old deq
623
for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap - 1)];
624
625             // fill top half of new deq with new refs
626
for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef();
627
628             deq = newdeq;
629             base = 0;
630             top = newTop;
631         }
632     }
633
634
635     /* ------------ Scheduling ------------------- */
636
637
638     /**
639      * Do all but the pop() part of yield or join, by
640      * traversing all DEQs in our group looking for a task to
641      * steal. If none, it checks the entry queue.
642      * <p>
643      * Since there are no good, portable alternatives,
644      * we rely here on a mixture of Thread.yield and priorities
645      * to reduce wasted spinning, even though these are
646      * not well defined. We are hoping here that the JVM
647      * does something sensible.
648      * @param waitingFor if non-null, the current task being joined
649      **/

650
651     protected void scan(final FJTask waitingFor) {
652
653         FJTask task = null;
654
655         // to delay lowering priority until first failure to steal
656
boolean lowered = false;
657
658         /*
659           Circularly traverse from a random start index.
660
661           This differs slightly from cilk version that uses a random index
662           for each attempted steal.
663           Exhaustive scanning might impede analytic tractablity of
664           the scheduling policy, but makes it much easier to deal with
665           startup and shutdown.
666         */

667
668         FJTaskRunner[] ts = group.getArray();
669         int idx = victimRNG.nextInt(ts.length);
670
671         for (int i = 0; i < ts.length; ++i) {
672
673             FJTaskRunner t = ts[idx];
674             if (++idx >= ts.length) idx = 0; // circularly traverse
675

676             if (t != null && t != this) {
677
678                 if (waitingFor != null && waitingFor.isDone()) {
679                     break;
680                 } else {
681                     if (COLLECT_STATS) ++scans;
682                     task = t.take();
683                     if (task != null) {
684                         if (COLLECT_STATS) ++steals;
685                         break;
686                     } else if (isInterrupted()) {
687                         break;
688                     } else if (!lowered) { // if this is first fail, lower priority
689
lowered = true;
690                         setPriority(scanPriority);
691                     } else { // otherwise we are at low priority; just yield
692
yield();
693                     }
694                 }
695             }
696
697         }
698
699         if (task == null) {
700             if (COLLECT_STATS) ++scans;
701             task = group.pollEntryQueue();
702             if (COLLECT_STATS) if (task != null) ++steals;
703         }
704
705         if (lowered) setPriority(runPriority);
706
707         if (task != null && !task.isDone()) {
708             if (COLLECT_STATS) ++runs;
709             task.run();
710             task.setDone();
711         }
712
713     }
714
715     /**
716      * Same as scan, but called when current thread is idling.
717      * It repeatedly scans other threads for tasks,
718      * sleeping while none are available.
719      * <p>
720      * This differs from scan mainly in that
721      * since there is no reason to return to recheck any
722      * condition, we iterate until a task is found, backing
723      * off via sleeps if necessary.
724      **/

725
726     protected void scanWhileIdling() {
727         FJTask task = null;
728
729         boolean lowered = false;
730         long iters = 0;
731
732         FJTaskRunner[] ts = group.getArray();
733         int idx = victimRNG.nextInt(ts.length);
734
735         do {
736             for (int i = 0; i < ts.length; ++i) {
737
738                 FJTaskRunner t = ts[idx];
739                 if (++idx >= ts.length) idx = 0; // circularly traverse
740

741                 if (t != null && t != this) {
742                     if (COLLECT_STATS) ++scans;
743
744                     task = t.take();
745                     if (task != null) {
746                         if (COLLECT_STATS) ++steals;
747                         if (lowered) setPriority(runPriority);
748                         group.setActive(this);
749                         break;
750                     }
751                 }
752             }
753
754             if (task == null) {
755                 if (isInterrupted())
756                     return;
757
758                 if (COLLECT_STATS) ++scans;
759                 task = group.pollEntryQueue();
760
761                 if (task != null) {
762                     if (COLLECT_STATS) ++steals;
763                     if (lowered) setPriority(runPriority);
764                     group.setActive(this);
765                 } else {
766                     ++iters;
767                     // Check here for yield vs sleep to avoid entering group synch lock
768
if (iters >= group.SCANS_PER_SLEEP) {
769                         group.checkActive(this, iters);
770                         if (isInterrupted())
771                             return;
772                     } else if (!lowered) {
773                         lowered = true;
774                         setPriority(scanPriority);
775                     } else {
776                         yield();
777                     }
778                 }
779             }
780         } while (task == null);
781
782
783         if (!task.isDone()) {
784             if (COLLECT_STATS) ++runs;
785             task.run();
786             task.setDone();
787         }
788
789     }
790
791     /* ------------ composite operations ------------------- */
792
793
794     /**
795      * Main runloop
796      **/

797
798     public void run() {
799         try {
800             while (!interrupted()) {
801
802                 FJTask task = pop();
803                 if (task != null) {
804                     if (!task.isDone()) {
805                         // inline FJTask.invoke
806
if (COLLECT_STATS) ++runs;
807                         task.run();
808                         task.setDone();
809                     }
810                 } else
811                     scanWhileIdling();
812             }
813         } finally {
814             group.setInactive(this);
815         }
816     }
817
818     /**
819      * Execute a task in this thread. Generally called when current task
820      * cannot otherwise continue.
821      **/

822
823
824     protected final void taskYield() {
825         FJTask task = pop();
826         if (task != null) {
827             if (!task.isDone()) {
828                 if (COLLECT_STATS) ++runs;
829                 task.run();
830                 task.setDone();
831             }
832         } else
833             scan(null);
834     }
835
836
837     /**
838      * Process tasks until w is done.
839      * Equivalent to <code>while(!w.isDone()) taskYield(); </code>
840      **/

841
842     protected final void taskJoin(final FJTask w) {
843
844         while (!w.isDone()) {
845
846             FJTask task = pop();
847             if (task != null) {
848                 if (!task.isDone()) {
849                     if (COLLECT_STATS) ++runs;
850                     task.run();
851                     task.setDone();
852                     if (task == w) return; // fast exit if we just ran w
853
}
854             } else
855                 scan(w);
856         }
857     }
858
859     /**
860      * A specialized expansion of
861      * <code> w.fork(); invoke(v); w.join(); </code>
862      **/

863
864
865     protected final void coInvoke(final FJTask w, final FJTask v) {
866
867         // inline push
868

869         int t = top;
870         if (t < (base & (deq.length - 1)) + deq.length) {
871
872             deq[t & (deq.length - 1)].put(w);
873             top = t + 1;
874
875             // inline invoke
876

877             if (!v.isDone()) {
878                 if (COLLECT_STATS) ++runs;
879                 v.run();
880                 v.setDone();
881             }
882
883             // inline taskJoin
884

885             while (!w.isDone()) {
886                 FJTask task = pop();
887                 if (task != null) {
888                     if (!task.isDone()) {
889                         if (COLLECT_STATS) ++runs;
890                         task.run();
891                         task.setDone();
892                         if (task == w) return; // fast exit if we just ran w
893
}
894                 } else
895                     scan(w);
896             }
897         } else // handle non-inlinable cases
898
slowCoInvoke(w, v);
899     }
900
901
902     /**
903      * Backup to handle noninlinable cases of coInvoke
904      **/

905
906     protected void slowCoInvoke(final FJTask w, final FJTask v) {
907         push(w); // let push deal with overflow
908
FJTask.invoke(v);
909         taskJoin(w);
910     }
911
912
913     /**
914      * Array-based version of coInvoke
915      **/

916
917     protected final void coInvoke(FJTask[] tasks) {
918         int nforks = tasks.length - 1;
919
920         // inline bulk push of all but one task
921

922         int t = top;
923
924         if (nforks >= 0 && t + nforks < (base & (deq.length - 1)) + deq.length) {
925             for (int i = 0; i < nforks; ++i) {
926                 deq[t++ & (deq.length - 1)].put(tasks[i]);
927                 top = t;
928             }
929
930             // inline invoke of one task
931
FJTask v = tasks[nforks];
932             if (!v.isDone()) {
933                 if (COLLECT_STATS) ++runs;
934                 v.run();
935                 v.setDone();
936             }
937
938             // inline taskJoins
939

940             for (int i = 0; i < nforks; ++i) {
941                 FJTask w = tasks[i];
942                 while (!w.isDone()) {
943
944                     FJTask task = pop();
945                     if (task != null) {
946                         if (!task.isDone()) {
947                             if (COLLECT_STATS) ++runs;
948                             task.run();
949                             task.setDone();
950                         }
951                     } else
952                         scan(w);
953                 }
954             }
955         } else // handle non-inlinable cases
956
slowCoInvoke(tasks);
957     }
958
959     /**
960      * Backup to handle atypical or noninlinable cases of coInvoke
961      **/

962
963     protected void slowCoInvoke(FJTask[] tasks) {
964         for (int i = 0; i < tasks.length; ++i) push(tasks[i]);
965         for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]);
966     }
967
968 }
969
970
Popular Tags