Code - Class EDU.oswego.cs.dl.util.concurrent.CyclicBarrier


1 /*
2   File: CyclicBarrier.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jul1998 dl Create public version
12   28Aug1998 dl minor code simplification
13 */

14
15 package EDU.oswego.cs.dl.util.concurrent;
16
17 /**
18  * A cyclic barrier is a reasonable choice for a barrier in contexts
19  * involving a fixed sized group of threads that
20  * must occasionally wait for each other.
21  * (A Rendezvous better handles applications in which
22  * any number of threads meet, n-at-a-time.)
23  * <p>
24  * CyclicBarriers use an all-or-none breakage model
25  * for failed synchronization attempts: If threads
26  * leave a barrier point prematurely because of timeout
27  * or interruption, others will also leave abnormally
28  * (via BrokenBarrierException), until
29  * the barrier is <code>restart</code>ed. This is usually
30  * the simplest and best strategy for sharing knowledge
31  * about failures among cooperating threads in the most
32  * common usages contexts of Barriers.
33  * This implementation has the property that interruptions
34  * among newly arriving threads can cause as-yet-unresumed
35  * threads from a previous barrier cycle to return out
36  * as broken. This transmits breakage
37  * as early as possible, but with the possible byproduct that
38  * only some threads returning out of a barrier will realize
39  * that it is newly broken. (Others will not realize this until a
40  * future cycle.) (The Rendezvous class has a more uniform, but
41  * sometimes less desirable policy.)
42  * <p>
43  * Barriers support an optional Runnable command
44  * that is run once per barrier point.
45  * <p>
46  * <b>Sample usage</b> Here is a code sketch of
47  * a barrier in a parallel decomposition design.
48  * <pre>
49  * class Solver {
50  * final int N;
51  * final float[][] data;
52  * final CyclicBarrier barrier;
53  *
54  * class Worker implements Runnable {
55  * int myRow;
56  * Worker(int row) { myRow = row; }
57  * public void run() {
58  * while (!done()) {
59  * processRow(myRow);
60  *
61  * try {
62  * barrier.barrier();
63  * }
64  * catch (InterruptedException ex) { return; }
65  * catch (BrokenBarrierException ex) { return; }
66  * }
67  * }
68  * }
69  *
70  * public Solver(float[][] matrix) {
71  * data = matrix;
72  * N = matrix.length;
73  * barrier = new CyclicBarrier(N);
74  * barrier.setBarrierCommand(new Runnable() {
75  * public void run() { mergeRows(...); }
76  * });
77  * for (int i = 0; i < N; ++i) {
78  * new Thread(new Worker(i)).start();
79  * waitUntilDone();
80  * }
81  * }
82  * </pre>
83  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
84
85  **/

86 public class CyclicBarrier implements Barrier {
87
88   protected final int parties_;
89   protected boolean broken_ = false;
90   protected Runnable barrierCommand_ = null;
91   protected int count_; // number of parties still waiting
92
protected int resets_ = 0; // incremented on each release
93

94   /**
95    * Create a CyclicBarrier for the indicated number of parties,
96    * and no command to run at each barrier.
97    * @exception IllegalArgumentException if parties less than or equal to zero.
98    **/

99
100   public CyclicBarrier(int parties) { this(parties, null); }
101
102   /**
103    * Create a CyclicBarrier for the indicated number of parties.
104    * and the given command to run at each barrier point.
105    * @exception IllegalArgumentException if parties less than or equal to zero.
106    **/

107
108   public CyclicBarrier(int parties, Runnable command) {
109     if (parties <= 0) throw new IllegalArgumentException();
110     parties_ = parties;
111     count_ = parties;
112     barrierCommand_ = command;
113   }
114
115   /**
116    * Set the command to run at the point at which all threads reach the
117    * barrier. This command is run exactly once, by the thread
118    * that trips the barrier. The command is not run if the barrier is
119    * broken.
120    * @param command the command to run. If null, no command is run.
121    * @return the previous command
122    **/

123
124   public synchronized Runnable setBarrierCommand(Runnable command) {
125     Runnable old = barrierCommand_;
126     barrierCommand_ = command;
127     return old;
128   }
129
130   public synchronized boolean broken() { return broken_; }
131
132   /**
133    * Reset to initial state. Clears both the broken status
134    * and any record of waiting threads, and releases all
135    * currently waiting threads with indeterminate return status.
136    * This method is intended only for use in recovery actions
137    * in which it is somehow known
138    * that no thread could possibly be relying on the
139    * the synchronization properties of this barrier.
140    **/

141
142   public synchronized void restart() {
143     broken_ = false;
144     ++resets_;
145     count_ = parties_;
146     notifyAll();
147   }
148   
149  
150   public int parties() { return parties_; }
151
152   /**
153    * Enter barrier and wait for the other parties()-1 threads.
154    * @return the arrival index: the number of other parties
155    * that were still waiting
156    * upon entry. This is a unique value from zero to parties()-1.
157    * If it is zero, then the current
158    * thread was the last party to hit barrier point
159    * and so was responsible for releasing the others.
160    * @exception BrokenBarrierException if any other thread
161    * in any previous or current barrier
162    * since either creation or the last <code>restart</code>
163    * operation left the barrier
164    * prematurely due to interruption or time-out. (If so,
165    * the <code>broken</code> status is also set.)
166    * Threads that are noticied to have been
167    * interrupted <em>after</em> being released are not considered
168    * to have broken the barrier.
169    * In all cases, the interruption
170    * status of the current thread is preserved, so can be tested
171    * by checking <code>Thread.interrupted</code>.
172    * @exception InterruptedException if this thread was interrupted
173    * during the barrier, and was the one causing breakage.
174    * If so, <code>broken</code> status is also set.
175    **/

176
177   public int barrier() throws InterruptedException, BrokenBarrierException {
178     return doBarrier(false, 0);
179   }
180
181   /**
182    * Enter barrier and wait at most msecs for the other parties()-1 threads.
183    * @return if not timed out, the arrival index: the number of other parties
184    * that were still waiting
185    * upon entry. This is a unique value from zero to parties()-1.
186    * If it is zero, then the current
187    * thread was the last party to hit barrier point
188    * and so was responsible for releasing the others.
189    * @exception BrokenBarrierException
190    * if any other thread
191    * in any previous or current barrier
192    * since either creation or the last <code>restart</code>
193    * operation left the barrier
194    * prematurely due to interruption or time-out. (If so,
195    * the <code>broken</code> status is also set.)
196    * Threads that are noticed to have been
197    * interrupted <em>after</em> being released are not considered
198    * to have broken the barrier.
199    * In all cases, the interruption
200    * status of the current thread is preserved, so can be tested
201    * by checking <code>Thread.interrupted</code>.
202    * @exception InterruptedException if this thread was interrupted
203    * during the barrier. If so, <code>broken</code> status is also set.
204    * @exception TimeoutException if this thread timed out waiting for
205    * the barrier. If the timeout occured while already in the
206    * barrier, <code>broken</code> status is also set.
207    **/

208
209   public int attemptBarrier(long msecs)
210     throws InterruptedException, TimeoutException, BrokenBarrierException {
211     return doBarrier(true, msecs);
212   }
213
214   protected synchronized int doBarrier(boolean timed, long msecs)
215     throws InterruptedException, TimeoutException, BrokenBarrierException {
216     
217     int index = --count_;
218
219     if (broken_) {
220       throw new BrokenBarrierException(index);
221     }
222     else if (Thread.interrupted()) {
223       broken_ = true;
224       notifyAll();
225       throw new InterruptedException();
226     }
227     else if (index == 0) { // tripped
228
count_ = parties_;
229       ++resets_;
230       notifyAll();
231       try {
232         if (barrierCommand_ != null)
233           barrierCommand_.run();
234         return 0;
235       }
236       catch (RuntimeException ex) {
237         broken_ = true;
238         return 0;
239       }
240     }
241     else if (timed && msecs <= 0) {
242       broken_ = true;
243       notifyAll();
244       throw new TimeoutException(msecs);
245     }
246     else { // wait until next reset
247
int r = resets_;
248       long startTime = (timed)? System.currentTimeMillis() : 0;
249       long waitTime = msecs;
250       for (;;) {
251         try {
252           wait(waitTime);
253         }
254         catch (InterruptedException ex) {
255           // Only claim that broken if interrupted before reset
256
if (resets_ == r) {
257             broken_ = true;
258             notifyAll();
259             throw ex;
260           }
261           else {
262             Thread.currentThread().interrupt(); // propagate
263
}
264         }
265
266         if (broken_)
267           throw new BrokenBarrierException(index);
268
269         else if (r != resets_)
270           return index;
271
272         else if (timed) {
273           waitTime = msecs - (System.currentTimeMillis() - startTime);
274           if (waitTime <= 0) {
275             broken_ = true;
276             notifyAll();
277             throw new TimeoutException(msecs);
278           }
279         }
280       }
281     }
282   }
283
284 }
285

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates