Code - Class EDU.oswego.cs.dl.util.concurrent.Rendezvous


1 /*
2   File: Rendezvous.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jun1998 dl Create public version
12   30Jul1998 dl Minor code simplifications
13 */

14
15 package EDU.oswego.cs.dl.util.concurrent;
16
17 /**
18  * A rendezvous is a barrier that:
19  * <ul>
20  * <li> Unlike a CyclicBarrier, is not restricted to use
21  * with fixed-sized groups of threads.
22  * Any number of threads can attempt to enter a rendezvous,
23  * but only the predetermined number of parties enter
24  * and later become released from the rendezvous at any give time.
25  * <li> Enables each participating thread to exchange information
26  * with others at the rendezvous point. Each entering thread
27  * presents some object on entry to the rendezvous, and
28  * returns some object on release. The object returned is
29  * the result of a RendezvousFunction that is run once per
30  * rendezvous, (it is run by the last-entering thread). By
31  * default, the function applied is a rotation, so each
32  * thread returns the object given by the next (modulo parties)
33  * entering thread. This default function faciliates simple
34  * application of a common use of rendezvous, as exchangers.
35  * </ul>
36  * <p>
37  * Rendezvous use an all-or-none breakage model
38  * for failed synchronization attempts: If threads
39  * leave a rendezvous point prematurely because of timeout
40  * or interruption, others will also leave abnormally
41  * (via BrokenBarrierException), until
42  * the rendezvous is <code>restart</code>ed. This is usually
43  * the simplest and best strategy for sharing knowledge
44  * about failures among cooperating threads in the most
45  * common usages contexts of Rendezvous.
46  * <p>
47  * While any positive number (including 1) of parties can
48  * be handled, the most common case is to have two parties.
49  * <p>
50  * <b>Sample Usage</b><p>
51  * Here are the highlights of a class that uses a Rendezvous to
52  * swap buffers between threads so that the thread filling the
53  * buffer gets a freshly
54  * emptied one when it needs it, handing off the filled one to
55  * the thread emptying the buffer.
56  * <pre>
57  * class FillAndEmpty {
58  * Rendezvous exchanger = new Rendezvous(2);
59  * Buffer initialEmptyBuffer = ... a made-up type
60  * Buffer initialFullBuffer = ...
61  *
62  * class FillingLoop implements Runnable {
63  * public void run() {
64  * Buffer currentBuffer = initialEmptyBuffer;
65  * try {
66  * while (currentBuffer != null) {
67  * addToBuffer(currentBuffer);
68  * if (currentBuffer.full())
69  * currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer));
70  * }
71  * }
72  * catch (BrokenBarrierException ex) {
73  * return;
74  * }
75  * catch (InterruptedException ex) {
76  * Thread.currentThread().interrupt();
77  * }
78  * }
79  * }
80  *
81  * class EmptyingLoop implements Runnable {
82  * public void run() {
83  * Buffer currentBuffer = initialFullBuffer;
84  * try {
85  * while (currentBuffer != null) {
86  * takeFromBuffer(currentBuffer);
87  * if (currentBuffer.empty())
88  * currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer));
89  * }
90  * }
91  * catch (BrokenBarrierException ex) {
92  * return;
93  * }
94  * catch (InterruptedException ex) {
95  * Thread.currentThread().interrupt();
96  * }
97  * }
98  * }
99  *
100  * void start() {
101  * new Thread(new FillingLoop()).start();
102  * new Thread(new EmptyingLoop()).start();
103  * }
104  * }
105  * </pre>
106  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
107
108  **/

109
110 public class Rendezvous implements Barrier {
111
112   /**
113    * Interface for functions run at rendezvous points
114    **/

115   public interface RendezvousFunction {
116     /**
117      * Perform some function on the objects presented at
118      * a rendezvous. The objects array holds all presented
119      * items; one per thread. Its length is the number of parties.
120      * The array is ordered by arrival into the rendezvous.
121      * So, the last element (at objects[objects.length-1])
122      * is guaranteed to have been presented by the thread performing
123      * this function. No identifying information is
124      * otherwise kept about which thread presented which item.
125      * If you need to
126      * trace origins, you will need to use an item type for rendezvous
127      * that includes identifying information. After return of this
128      * function, other threads are released, and each returns with
129      * the item with the same index as the one it presented.
130      **/

131     public void rendezvousFunction(Object[] objects);
132   }
133
134   /**
135    * The default rendezvous function. Rotates the array
136    * so that each thread returns an item presented by some
137    * other thread (or itself, if parties is 1).
138    **/

139   public static class Rotator implements RendezvousFunction {
140     /** Rotate the array **/
141     public void rendezvousFunction(Object[] objects) {
142       int lastIdx = objects.length - 1;
143       Object first = objects[0];
144       for (int i = 0; i < lastIdx; ++i) objects[i] = objects[i+1];
145       objects[lastIdx] = first;
146     }
147   }
148
149
150   protected final int parties_;
151
152
153   protected boolean broken_ = false;
154
155   /**
156    * Number of threads that have entered rendezvous
157    **/

158   protected int entries_ = 0;
159
160   /**
161    * Number of threads that are permitted to depart rendezvous
162    **/

163   protected long departures_ = 0;
164
165   /**
166    * Incoming threads pile up on entry until last set done.
167    **/

168   protected final Semaphore entryGate_;
169
170   /**
171    * Temporary holder for items in exchange
172    **/

173   protected final Object[] slots_;
174
175   /**
176    * The function to run at rendezvous point
177    **/

178
179   protected RendezvousFunction rendezvousFunction_;
180
181   /**
182    * Create a Barrier for the indicated number of parties,
183    * and the default Rotator function to run at each barrier point.
184    * @exception IllegalArgumentException if parties less than or equal to zero.
185    **/

186
187   public Rendezvous(int parties) {
188     this(parties, new Rotator());
189   }
190
191   /**
192    * Create a Barrier for the indicated number of parties.
193    * and the given function to run at each barrier point.
194    * @exception IllegalArgumentException if parties less than or equal to zero.
195    **/

196
197   public Rendezvous(int parties, RendezvousFunction function) {
198     if (parties <= 0) throw new IllegalArgumentException();
199     parties_ = parties;
200     rendezvousFunction_ = function;
201     entryGate_ = new WaiterPreferenceSemaphore(parties);
202     slots_ = new Object[parties];
203   }
204
205   /**
206    * Set the function to call at the point at which all threads reach the
207    * rendezvous. This function is run exactly once, by the thread
208    * that trips the barrier. The function is not run if the barrier is
209    * broken.
210    * @param function the function to run. If null, no function is run.
211    * @return the previous function
212    **/

213
214
215   public synchronized RendezvousFunction setRendezvousFunction(RendezvousFunction function) {
216     RendezvousFunction old = rendezvousFunction_;
217     rendezvousFunction_ = function;
218     return old;
219   }
220
221   public int parties() { return parties_; }
222
223   public synchronized boolean broken() { return broken_; }
224
225   /**
226    * Reset to initial state. Clears both the broken status
227    * and any record of waiting threads, and releases all
228    * currently waiting threads with indeterminate return status.
229    * This method is intended only for use in recovery actions
230    * in which it is somehow known
231    * that no thread could possibly be relying on the
232    * the synchronization properties of this barrier.
233    **/

234
235   public void restart() {
236     // This is not very good, but probably the best that can be done
237
for (;;) {
238       synchronized(this) {
239         if (entries_ != 0) {
240           notifyAll();
241         }
242         else {
243           broken_ = false;
244           return;
245         }
246       }
247       Thread.yield();
248     }
249   }
250
251
252   /**
253    * Enter a rendezvous; returning after all other parties arrive.
254    * @param x the item to present at rendezvous point.
255    * By default, this item is exchanged with another.
256    * @return an item x given by some thread, and/or processed
257    * by the rendezvousFunction.
258    * @exception BrokenBarrierException
259    * if any other thread
260    * in any previous or current barrier
261    * since either creation or the last <code>restart</code>
262    * operation left the barrier
263    * prematurely due to interruption or time-out. (If so,
264    * the <code>broken</code> status is also set.)
265    * Also returns as
266    * broken if the RendezvousFunction encountered a run-time exception.
267    * Threads that are noticed to have been
268    * interrupted <em>after</em> being released are not considered
269    * to have broken the barrier.
270    * In all cases, the interruption
271    * status of the current thread is preserved, so can be tested
272    * by checking <code>Thread.interrupted</code>.
273    * @exception InterruptedException if this thread was interrupted
274    * during the exchange. If so, <code>broken</code> status is also set.
275    **/

276
277
278   public Object rendezvous(Object x) throws InterruptedException, BrokenBarrierException {
279     return doRendezvous(x, false, 0);
280   }
281
282   /**
283    * Wait msecs to complete a rendezvous.
284    * @param x the item to present at rendezvous point.
285    * By default, this item is exchanged with another.
286    * @param msecs The maximum time to wait.
287    * @return an item x given by some thread, and/or processed
288    * by the rendezvousFunction.
289    * @exception BrokenBarrierException
290    * if any other thread
291    * in any previous or current barrier
292    * since either creation or the last <code>restart</code>
293    * operation left the barrier
294    * prematurely due to interruption or time-out. (If so,
295    * the <code>broken</code> status is also set.)
296    * Also returns as
297    * broken if the RendezvousFunction encountered a run-time exception.
298    * Threads that are noticed to have been
299    * interrupted <em>after</em> being released are not considered
300    * to have broken the barrier.
301    * In all cases, the interruption
302    * status of the current thread is preserved, so can be tested
303    * by checking <code>Thread.interrupted</code>.
304    * @exception InterruptedException if this thread was interrupted
305    * during the exchange. If so, <code>broken</code> status is also set.
306    * @exception TimeoutException if this thread timed out waiting for
307    * the exchange. If the timeout occured while already in the
308    * exchange, <code>broken</code> status is also set.
309    **/

310
311
312   public Object attemptRendezvous(Object x, long msecs)
313     throws InterruptedException, TimeoutException, BrokenBarrierException {
314     return doRendezvous(x, true, msecs);
315   }
316
317   protected Object doRendezvous(Object x, boolean timed, long msecs)
318     throws InterruptedException, TimeoutException, BrokenBarrierException {
319
320     // rely on semaphore to throw interrupt on entry
321

322     long startTime;
323
324     if (timed) {
325       startTime = System.currentTimeMillis();
326       if (!entryGate_.attempt(msecs)) {
327         throw new TimeoutException(msecs);
328       }
329     }
330     else {
331       startTime = 0;
332       entryGate_.acquire();
333     }
334
335     synchronized(this) {
336
337       Object y = null;
338
339       int index = entries_++;
340       slots_[index] = x;
341
342       try {
343         // last one in runs function and releases
344
if (entries_ == parties_) {
345
346           departures_ = entries_;
347           notifyAll();
348
349           try {
350             if (!broken_ && rendezvousFunction_ != null)
351             rendezvousFunction_.rendezvousFunction(slots_);
352           }
353           catch (RuntimeException ex) {
354             broken_ = true;
355           }
356
357         }
358
359         else {
360
361           while (!broken_ && departures_ < 1) {
362             long timeLeft = 0;
363             if (timed) {
364               timeLeft = msecs - (System.currentTimeMillis() - startTime);
365               if (timeLeft <= 0) {
366                 broken_ = true;
367                 departures_ = entries_;
368                 notifyAll();
369                 throw new TimeoutException(msecs);
370               }
371             }
372             
373             try {
374               wait(timeLeft);
375             }
376             catch (InterruptedException ex) {
377               if (broken_ || departures_ > 0) { // interrupted after release
378
Thread.currentThread().interrupt();
379                 break;
380               }
381               else {
382                 broken_ = true;
383                 departures_ = entries_;
384                 notifyAll();
385                 throw ex;
386               }
387             }
388           }
389         }
390
391       }
392
393       finally {
394
395         y = slots_[index];
396         
397         // Last one out cleans up and allows next set of threads in
398
if (--departures_ <= 0) {
399           for (int i = 0; i < slots_.length; ++i) slots_[i] = null;
400           entryGate_.release(entries_);
401           entries_ = 0;
402         }
403       }
404
405       // continue if no IE/TO throw
406
if (broken_)
407         throw new BrokenBarrierException(index);
408       else
409         return y;
410     }
411   }
412
413 }
414
415
416

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates