KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > CyclicBarrier


1 /*
2  * @(#)CyclicBarrier.java 1.12 06/01/03
3  *
4  * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10
11 /**
12  * A synchronization aid that allows a set of threads to all wait for
13  * each other to reach a common barrier point. CyclicBarriers are
14  * useful in programs involving a fixed sized party of threads that
15  * must occasionally wait for each other. The barrier is called
16  * <em>cyclic</em> because it can be re-used after the waiting threads
17  * are released.
18  *
19  * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
20  * that is run once per barrier point, after the last thread in the party
21  * arrives, but before any threads are released.
22  * This <em>barrier action</em> is useful
23  * for updating shared-state before any of the parties continue.
24  *
25  * <p><b>Sample usage:</b> Here is an example of
26  * using a barrier in a parallel decomposition design:
27  * <pre>
28  * class Solver {
29  * final int N;
30  * final float[][] data;
31  * final CyclicBarrier barrier;
32  *
33  * class Worker implements Runnable {
34  * int myRow;
35  * Worker(int row) { myRow = row; }
36  * public void run() {
37  * while (!done()) {
38  * processRow(myRow);
39  *
40  * try {
41  * barrier.await();
42  * } catch (InterruptedException ex) {
43  * return;
44  * } catch (BrokenBarrierException ex) {
45  * return;
46  * }
47  * }
48  * }
49  * }
50  *
51  * public Solver(float[][] matrix) {
52  * data = matrix;
53  * N = matrix.length;
54  * barrier = new CyclicBarrier(N,
55  * new Runnable() {
56  * public void run() {
57  * mergeRows(...);
58  * }
59  * });
60  * for (int i = 0; i < N; ++i)
61  * new Thread(new Worker(i)).start();
62  *
63  * waitUntilDone();
64  * }
65  * }
66  * </pre>
67  * Here, each worker thread processes a row of the matrix then waits at the
68  * barrier until all rows have been processed. When all rows are processed
69  * the supplied {@link Runnable} barrier action is executed and merges the
70  * rows. If the merger
71  * determines that a solution has been found then <tt>done()</tt> will return
72  * <tt>true</tt> and each worker will terminate.
73  *
74  * <p>If the barrier action does not rely on the parties being suspended when
75  * it is executed, then any of the threads in the party could execute that
76  * action when it is released. To facilitate this, each invocation of
77  * {@link #await} returns the arrival index of that thread at the barrier.
78  * You can then choose which thread should execute the barrier action, for
79  * example:
80  * <pre> if (barrier.await() == 0) {
81  * // log the completion of this iteration
82  * }</pre>
83  *
84  * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
85  * for failed synchronization attempts: If a thread leaves a barrier
86  * point prematurely because of interruption, failure, or timeout, all
87  * other threads waiting at that barrier point will also leave
88  * abnormally via {@link BrokenBarrierException} (or
89  * {@link InterruptedException} if they too were interrupted at about
90  * the same time).
91  *
92  * @since 1.5
93  * @see CountDownLatch
94  *
95  * @author Doug Lea
96  */

97 public class CyclicBarrier {
98     /**
99      * Each use of the barrier is represented as a generation instance.
100      * The generation changes whenever the barrier is tripped, or
101      * is reset. There can be many generations associated with threads
102      * using the barrier - due to the non-deterministic way the lock
103      * may be allocated to waiting threads - but only one of these
104      * can be active at a time (the one to which <tt>count</tt> applies)
105      * and all the rest are either broken or tripped.
106      * There need not be an active generation if there has been a break
107      * but no subsequent reset.
108      */

109     private static class Generation {
110         boolean broken = false;
111     }
112
113     /** The lock for guarding barrier entry */
114     private final ReentrantLock lock = new ReentrantLock();
115     /** Condition to wait on until tripped */
116     private final Condition trip = lock.newCondition();
117     /** The number of parties */
118     private final int parties;
119     /* The command to run when tripped */
120     private final Runnable JavaDoc barrierCommand;
121     /** The current generation */
122     private Generation generation = new Generation();
123
124     /**
125      * Number of parties still waiting. Counts down from parties to 0
126      * on each generation. It is reset to parties on each new
127      * generation or when broken.
128      */

129     private int count;
130
131     /**
132      * Updates state on barrier trip and wakes up everyone.
133      * Called only while holding lock.
134      */

135     private void nextGeneration() {
136         // signal completion of last generation
137
trip.signalAll();
138         // set up next generation
139
count = parties;
140         generation = new Generation();
141     }
142
143     /**
144      * Sets current barrier generation as broken and wakes up everyone.
145      * Called only while holding lock.
146      */

147     private void breakBarrier() {
148         generation.broken = true;
149     count = parties;
150         trip.signalAll();
151     }
152
153     /**
154      * Main barrier code, covering the various policies.
155      */

156     private int dowait(boolean timed, long nanos)
157         throws InterruptedException JavaDoc, BrokenBarrierException JavaDoc,
158                TimeoutException JavaDoc {
159         final ReentrantLock lock = this.lock;
160         lock.lock();
161         try {
162             final Generation g = generation;
163
164             if (g.broken)
165                 throw new BrokenBarrierException JavaDoc();
166
167             if (Thread.interrupted()) {
168                 breakBarrier();
169                 throw new InterruptedException JavaDoc();
170             }
171
172            int index = --count;
173            if (index == 0) { // tripped
174
boolean ranAction = false;
175                try {
176            final Runnable JavaDoc command = barrierCommand;
177                    if (command != null)
178                        command.run();
179                    ranAction = true;
180                    nextGeneration();
181                    return 0;
182                } finally {
183                    if (!ranAction)
184                        breakBarrier();
185                }
186            }
187
188             // loop until tripped, broken, interrupted, or timed out
189
for (;;) {
190                 try {
191                     if (!timed)
192                         trip.await();
193                     else if (nanos > 0L)
194                         nanos = trip.awaitNanos(nanos);
195                 } catch (InterruptedException JavaDoc ie) {
196                     if (g == generation && ! g.broken) {
197                         breakBarrier();
198             throw ie;
199             } else {
200             // We're about to finish waiting even if we had not
201
// been interrupted, so this interrupt is deemed to
202
// "belong" to subsequent execution.
203
Thread.currentThread().interrupt();
204             }
205                 }
206
207                 if (g.broken)
208                     throw new BrokenBarrierException JavaDoc();
209
210                 if (g != generation)
211                     return index;
212
213                 if (timed && nanos <= 0L) {
214                     breakBarrier();
215                     throw new TimeoutException JavaDoc();
216                 }
217             }
218         } finally {
219             lock.unlock();
220         }
221     }
222
223     /**
224      * Creates a new <tt>CyclicBarrier</tt> that will trip when the
225      * given number of parties (threads) are waiting upon it, and which
226      * will execute the given barrier action when the barrier is tripped,
227      * performed by the last thread entering the barrier.
228      *
229      * @param parties the number of threads that must invoke {@link #await}
230      * before the barrier is tripped.
231      * @param barrierAction the command to execute when the barrier is
232      * tripped, or <tt>null</tt> if there is no action.
233      *
234      * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
235      */

236     public CyclicBarrier(int parties, Runnable JavaDoc barrierAction) {
237         if (parties <= 0) throw new IllegalArgumentException JavaDoc();
238         this.parties = parties;
239         this.count = parties;
240         this.barrierCommand = barrierAction;
241     }
242
243     /**
244      * Creates a new <tt>CyclicBarrier</tt> that will trip when the
245      * given number of parties (threads) are waiting upon it, and
246      * does not perform a predefined action when the barrier is tripped.
247      *
248      * @param parties the number of threads that must invoke {@link #await}
249      * before the barrier is tripped.
250      *
251      * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
252      */

253     public CyclicBarrier(int parties) {
254         this(parties, null);
255     }
256
257     /**
258      * Returns the number of parties required to trip this barrier.
259      * @return the number of parties required to trip this barrier.
260      */

261     public int getParties() {
262         return parties;
263     }
264
265     /**
266      * Waits until all {@link #getParties parties} have invoked <tt>await</tt>
267      * on this barrier.
268      *
269      * <p>If the current thread is not the last to arrive then it is
270      * disabled for thread scheduling purposes and lies dormant until
271      * one of the following things happens:
272      * <ul>
273      * <li>The last thread arrives; or
274      * <li>Some other thread {@link Thread#interrupt interrupts} the current
275      * thread; or
276      * <li>Some other thread {@link Thread#interrupt interrupts} one of the
277      * other waiting threads; or
278      * <li>Some other thread times out while waiting for barrier; or
279      * <li>Some other thread invokes {@link #reset} on this barrier.
280      * </ul>
281      * <p>If the current thread:
282      * <ul>
283      * <li>has its interrupted status set on entry to this method; or
284      * <li>is {@link Thread#interrupt interrupted} while waiting
285      * </ul>
286      * then {@link InterruptedException} is thrown and the current thread's
287      * interrupted status is cleared.
288      *
289      * <p>If the barrier is {@link #reset} while any thread is waiting, or if
290      * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
291      * or while any thread is waiting,
292      * then {@link BrokenBarrierException} is thrown.
293      *
294      * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
295      * then all other waiting threads will throw
296      * {@link BrokenBarrierException} and the barrier is placed in the broken
297      * state.
298      *
299      * <p>If the current thread is the last thread to arrive, and a
300      * non-null barrier action was supplied in the constructor, then the
301      * current thread runs the action before allowing the other threads to
302      * continue.
303      * If an exception occurs during the barrier action then that exception
304      * will be propagated in the current thread and the barrier is placed in
305      * the broken state.
306      *
307      * @return the arrival index of the current thread, where index
308      * <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
309      * zero indicates the last to arrive.
310      *
311      * @throws InterruptedException if the current thread was interrupted
312      * while waiting.
313      * @throws BrokenBarrierException if <em>another</em> thread was
314      * interrupted or timed out while the current thread was waiting,
315      * or the barrier was reset, or the barrier was broken when
316      * <tt>await</tt> was called, or the barrier action (if present)
317      * failed due an exception.
318      */

319     public int await() throws InterruptedException JavaDoc, BrokenBarrierException JavaDoc {
320         try {
321             return dowait(false, 0L);
322         } catch (TimeoutException JavaDoc toe) {
323             throw new Error JavaDoc(toe); // cannot happen;
324
}
325     }
326
327     /**
328      * Waits until all {@link #getParties parties} have invoked <tt>await</tt>
329      * on this barrier.
330      *
331      * <p>If the current thread is not the last to arrive then it is
332      * disabled for thread scheduling purposes and lies dormant until
333      * one of the following things happens:
334      * <ul>
335      * <li>The last thread arrives; or
336      * <li>The specified timeout elapses; or
337      * <li>Some other thread {@link Thread#interrupt interrupts} the current
338      * thread; or
339      * <li>Some other thread {@link Thread#interrupt interrupts} one of the
340      * other waiting threads; or
341      * <li>Some other thread times out while waiting for barrier; or
342      * <li>Some other thread invokes {@link #reset} on this barrier.
343      * </ul>
344      * <p>If the current thread:
345      * <ul>
346      * <li>has its interrupted status set on entry to this method; or
347      * <li>is {@link Thread#interrupt interrupted} while waiting
348      * </ul>
349      * then {@link InterruptedException} is thrown and the current thread's
350      * interrupted status is cleared.
351      *
352      * <p>If the specified waiting time elapses then {@link TimeoutException}
353      * is thrown. If the time is less than or equal to zero, the
354      * method will not wait at all.
355      *
356      * <p>If the barrier is {@link #reset} while any thread is waiting, or if
357      * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
358      * or while any thread is waiting,
359      * then {@link BrokenBarrierException} is thrown.
360      *
361      * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
362      * then all other waiting threads will throw
363      * {@link BrokenBarrierException} and the barrier is placed in the broken
364      * state.
365      *
366      * <p>If the current thread is the last thread to arrive, and a
367      * non-null barrier action was supplied in the constructor, then the
368      * current thread runs the action before allowing the other threads to
369      * continue.
370      * If an exception occurs during the barrier action then that exception
371      * will be propagated in the current thread and the barrier is placed in
372      * the broken state.
373      *
374      * @param timeout the time to wait for the barrier
375      * @param unit the time unit of the timeout parameter
376      * @return the arrival index of the current thread, where index
377      * <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
378      * zero indicates the last to arrive.
379      *
380      * @throws InterruptedException if the current thread was interrupted
381      * while waiting.
382      * @throws TimeoutException if the specified timeout elapses.
383      * @throws BrokenBarrierException if <em>another</em> thread was
384      * interrupted or timed out while the current thread was waiting,
385      * or the barrier was reset, or the barrier was broken when
386      * <tt>await</tt> was called, or the barrier action (if present)
387      * failed due an exception.
388      */

389     public int await(long timeout, TimeUnit JavaDoc unit)
390         throws InterruptedException JavaDoc,
391                BrokenBarrierException JavaDoc,
392                TimeoutException JavaDoc {
393         return dowait(true, unit.toNanos(timeout));
394     }
395
396     /**
397      * Queries if this barrier is in a broken state.
398      * @return <tt>true</tt> if one or more parties broke out of this
399      * barrier due to interruption or timeout since construction or
400      * the last reset, or a barrier action failed due to an exception;
401      * <tt>false</tt> otherwise.
402      */

403     public boolean isBroken() {
404         final ReentrantLock lock = this.lock;
405         lock.lock();
406         try {
407             return generation.broken;
408         } finally {
409             lock.unlock();
410         }
411     }
412
413     /**
414      * Resets the barrier to its initial state. If any parties are
415      * currently waiting at the barrier, they will return with a
416      * {@link BrokenBarrierException}. Note that resets <em>after</em>
417      * a breakage has occurred for other reasons can be complicated to
418      * carry out; threads need to re-synchronize in some other way,
419      * and choose one to perform the reset. It may be preferable to
420      * instead create a new barrier for subsequent use.
421      */

422     public void reset() {
423         final ReentrantLock lock = this.lock;
424         lock.lock();
425         try {
426             breakBarrier(); // break the current generation
427
nextGeneration(); // start a new generation
428
} finally {
429             lock.unlock();
430         }
431     }
432
433     /**
434      * Returns the number of parties currently waiting at the barrier.
435      * This method is primarily useful for debugging and assertions.
436      *
437      * @return the number of parties currently blocked in {@link #await}.
438      */

439     public int getNumberWaiting() {
440         final ReentrantLock lock = this.lock;
441         lock.lock();
442         try {
443             return parties - count;
444         } finally {
445             lock.unlock();
446         }
447     }
448 }
449
Free Books   Free Magazines  
Popular Tags