| ||||
|
Code - Class EDU.oswego.cs.dl.util.concurrent.Rendezvous1 /* 2 File: Rendezvous.java 3 4 Originally written by Doug Lea and released into the public domain. 5 This may be used for any purposes whatsoever without acknowledgment. 6 Thanks for the assistance and support of Sun Microsystems Labs, 7 and everyone contributing, testing, and using this code. 8 9 History: 10 Date Who What 11 11Jun1998 dl Create public version 12 30Jul1998 dl Minor code simplifications 13 */ 14 15 package EDU.oswego.cs.dl.util.concurrent; 16 17 /** 18 * A rendezvous is a barrier that: 19 * <ul> 20 * <li> Unlike a CyclicBarrier, is not restricted to use 21 * with fixed-sized groups of threads. 22 * Any number of threads can attempt to enter a rendezvous, 23 * but only the predetermined number of parties enter 24 * and later become released from the rendezvous at any give time. 25 * <li> Enables each participating thread to exchange information 26 * with others at the rendezvous point. Each entering thread 27 * presents some object on entry to the rendezvous, and 28 * returns some object on release. The object returned is 29 * the result of a RendezvousFunction that is run once per 30 * rendezvous, (it is run by the last-entering thread). By 31 * default, the function applied is a rotation, so each 32 * thread returns the object given by the next (modulo parties) 33 * entering thread. This default function faciliates simple 34 * application of a common use of rendezvous, as exchangers. 35 * </ul> 36 * <p> 37 * Rendezvous use an all-or-none breakage model 38 * for failed synchronization attempts: If threads 39 * leave a rendezvous point prematurely because of timeout 40 * or interruption, others will also leave abnormally 41 * (via BrokenBarrierException), until 42 * the rendezvous is <code>restart</code>ed. This is usually 43 * the simplest and best strategy for sharing knowledge 44 * about failures among cooperating threads in the most 45 * common usages contexts of Rendezvous. 46 * <p> 47 * While any positive number (including 1) of parties can 48 * be handled, the most common case is to have two parties. 49 * <p> 50 * <b>Sample Usage</b><p> 51 * Here are the highlights of a class that uses a Rendezvous to 52 * swap buffers between threads so that the thread filling the 53 * buffer gets a freshly 54 * emptied one when it needs it, handing off the filled one to 55 * the thread emptying the buffer. 56 * <pre> 57 * class FillAndEmpty { 58 * Rendezvous exchanger = new Rendezvous(2); 59 * Buffer initialEmptyBuffer = ... a made-up type 60 * Buffer initialFullBuffer = ... 61 * 62 * class FillingLoop implements Runnable { 63 * public void run() { 64 * Buffer currentBuffer = initialEmptyBuffer; 65 * try { 66 * while (currentBuffer != null) { 67 * addToBuffer(currentBuffer); 68 * if (currentBuffer.full()) 69 * currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer)); 70 * } 71 * } 72 * catch (BrokenBarrierException ex) { 73 * return; 74 * } 75 * catch (InterruptedException ex) { 76 * Thread.currentThread().interrupt(); 77 * } 78 * } 79 * } 80 * 81 * class EmptyingLoop implements Runnable { 82 * public void run() { 83 * Buffer currentBuffer = initialFullBuffer; 84 * try { 85 * while (currentBuffer != null) { 86 * takeFromBuffer(currentBuffer); 87 * if (currentBuffer.empty()) 88 * currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer)); 89 * } 90 * } 91 * catch (BrokenBarrierException ex) { 92 * return; 93 * } 94 * catch (InterruptedException ex) { 95 * Thread.currentThread().interrupt(); 96 * } 97 * } 98 * } 99 * 100 * void start() { 101 * new Thread(new FillingLoop()).start(); 102 * new Thread(new EmptyingLoop()).start(); 103 * } 104 * } 105 * </pre> 106 * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] 107 108 **/ 109 110 public class Rendezvous implements Barrier { 111 112 /** 113 * Interface for functions run at rendezvous points 114 **/ 115 public interface RendezvousFunction { 116 /** 117 * Perform some function on the objects presented at 118 * a rendezvous. The objects array holds all presented 119 * items; one per thread. Its length is the number of parties. 120 * The array is ordered by arrival into the rendezvous. 121 * So, the last element (at objects[objects.length-1]) 122 * is guaranteed to have been presented by the thread performing 123 * this function. No identifying information is 124 * otherwise kept about which thread presented which item. 125 * If you need to 126 * trace origins, you will need to use an item type for rendezvous 127 * that includes identifying information. After return of this 128 * function, other threads are released, and each returns with 129 * the item with the same index as the one it presented. 130 **/ 131 public void rendezvousFunction(Object[] objects); 132 } 133 134 /** 135 * The default rendezvous function. Rotates the array 136 * so that each thread returns an item presented by some 137 * other thread (or itself, if parties is 1). 138 **/ 139 public static class Rotator implements RendezvousFunction { 140 /** Rotate the array **/ 141 public void rendezvousFunction(Object[] objects) { 142 int lastIdx = objects.length - 1; 143 Object first = objects[0]; 144 for (int i = 0; i < lastIdx; ++i) objects[i] = objects[i+1]; 145 objects[lastIdx] = first; 146 } 147 } 148 149 150 protected final int parties_; 151 152 153 protected boolean broken_ = false; 154 155 /** 156 * Number of threads that have entered rendezvous 157 **/ 158 protected int entries_ = 0; 159 160 /** 161 * Number of threads that are permitted to depart rendezvous 162 **/ 163 protected long departures_ = 0; 164 165 /** 166 * Incoming threads pile up on entry until last set done. 167 **/ 168 protected final Semaphore entryGate_; 169 170 /** 171 * Temporary holder for items in exchange 172 **/ 173 protected final Object[] slots_; 174 175 /** 176 * The function to run at rendezvous point 177 **/ 178 179 protected RendezvousFunction rendezvousFunction_; 180 181 /** 182 * Create a Barrier for the indicated number of parties, 183 * and the default Rotator function to run at each barrier point. 184 * @exception IllegalArgumentException if parties less than or equal to zero. 185 **/ 186 187 public Rendezvous(int parties) { 188 this(parties, new Rotator()); 189 } 190 191 /** 192 * Create a Barrier for the indicated number of parties. 193 * and the given function to run at each barrier point. 194 * @exception IllegalArgumentException if parties less than or equal to zero. 195 **/ 196 197 public Rendezvous(int parties, RendezvousFunction function) { 198 if (parties <= 0) throw new IllegalArgumentException(); 199 parties_ = parties; 200 rendezvousFunction_ = function; 201 entryGate_ = new WaiterPreferenceSemaphore(parties); 202 slots_ = new Object[parties]; 203 } 204 205 /** 206 * Set the function to call at the point at which all threads reach the 207 * rendezvous. This function is run exactly once, by the thread 208 * that trips the barrier. The function is not run if the barrier is 209 * broken. 210 * @param function the function to run. If null, no function is run. 211 * @return the previous function 212 **/ 213 214 215 public synchronized RendezvousFunction setRendezvousFunction(RendezvousFunction function) { 216 RendezvousFunction old = rendezvousFunction_; 217 rendezvousFunction_ = function; 218 return old; 219 } 220 221 public int parties() { return parties_; } 222 223 public synchronized boolean broken() { return broken_; } 224 225 /** 226 * Reset to initial state. Clears both the broken status 227 * and any record of waiting threads, and releases all 228 * currently waiting threads with indeterminate return status. 229 * This method is intended only for use in recovery actions 230 * in which it is somehow known 231 * that no thread could possibly be relying on the 232 * the synchronization properties of this barrier. 233 **/ 234 235 public void restart() { 236 // This is not very good, but probably the best that can be done 237 for (;;) { 238 synchronized(this) { 239 if (entries_ != 0) { 240 notifyAll(); 241 } 242 else { 243 broken_ = false; 244 return; 245 } 246 } 247 Thread.yield(); 248 } 249 } 250 251 252 /** 253 * Enter a rendezvous; returning after all other parties arrive. 254 * @param x the item to present at rendezvous point. 255 * By default, this item is exchanged with another. 256 * @return an item x given by some thread, and/or processed 257 * by the rendezvousFunction. 258 * @exception BrokenBarrierException 259 * if any other thread 260 * in any previous or current barrier 261 * since either creation or the last <code>restart</code> 262 * operation left the barrier 263 * prematurely due to interruption or time-out. (If so, 264 * the <code>broken</code> status is also set.) 265 * Also returns as 266 * broken if the RendezvousFunction encountered a run-time exception. 267 * Threads that are noticed to have been 268 * interrupted <em>after</em> being released are not considered 269 * to have broken the barrier. 270 * In all cases, the interruption 271 * status of the current thread is preserved, so can be tested 272 * by checking <code>Thread.interrupted</code>. 273 * @exception InterruptedException if this thread was interrupted 274 * during the exchange. If so, <code>broken</code> status is also set. 275 **/ 276 277 278 public Object rendezvous(Object x) throws InterruptedException, BrokenBarrierException { 279 return doRendezvous(x, false, 0); 280 } 281 282 /** 283 * Wait msecs to complete a rendezvous. 284 * @param x the item to present at rendezvous point. 285 * By default, this item is exchanged with another. 286 * @param msecs The maximum time to wait. 287 * @return an item x given by some thread, and/or processed 288 * by the rendezvousFunction. 289 * @exception BrokenBarrierException 290 * if any other thread 291 * in any previous or current barrier 292 * since either creation or the last <code>restart</code> 293 * operation left the barrier 294 * prematurely due to interruption or time-out. (If so, 295 * the <code>broken</code> status is also set.) 296 * Also returns as 297 * broken if the RendezvousFunction encountered a run-time exception. 298 * Threads that are noticed to have been 299 * interrupted <em>after</em> being released are not considered 300 * to have broken the barrier. 301 * In all cases, the interruption 302 * status of the current thread is preserved, so can be tested 303 * by checking <code>Thread.interrupted</code>. 304 * @exception InterruptedException if this thread was interrupted 305 * during the exchange. If so, <code>broken</code> status is also set. 306 * @exception TimeoutException if this thread timed out waiting for 307 * the exchange. If the timeout occured while already in the 308 * exchange, <code>broken</code> status is also set. 309 **/ 310 311 312 public Object attemptRendezvous(Object x, long msecs) 313 throws InterruptedException, TimeoutException, BrokenBarrierException { 314 return doRendezvous(x, true, msecs); 315 } 316 317 protected Object doRendezvous(Object x, boolean timed, long msecs) 318 throws InterruptedException, TimeoutException, BrokenBarrierException { 319 320 // rely on semaphore to throw interrupt on entry 321 322 long startTime; 323 324 if (timed) { 325 startTime = System.currentTimeMillis(); 326 if (!entryGate_.attempt(msecs)) { 327 throw new TimeoutException(msecs); 328 } 329 } 330 else { 331 startTime = 0; 332 entryGate_.acquire(); 333 } 334 335 synchronized(this) { 336 337 Object y = null; 338 339 int index = entries_++; 340 slots_[index] = x; 341 342 try { 343 // last one in runs function and releases 344 if (entries_ == parties_) { 345 346 departures_ = entries_; 347 notifyAll(); 348 349 try { 350 if (!broken_ && rendezvousFunction_ != null) 351 rendezvousFunction_.rendezvousFunction(slots_); 352 } 353 catch (RuntimeException ex) { 354 broken_ = true; 355 } 356 357 } 358 359 else { 360 361 while (!broken_ && departures_ < 1) { 362 long timeLeft = 0; 363 if (timed) { 364 timeLeft = msecs - (System.currentTimeMillis() - startTime); 365 if (timeLeft <= 0) { 366 broken_ = true; 367 departures_ = entries_; 368 notifyAll(); 369 throw new TimeoutException(msecs); 370 } 371 } 372 373 try { 374 wait(timeLeft); 375 } 376 catch (InterruptedException ex) { 377 if (broken_ || departures_ > 0) { // interrupted after release 378 Thread.currentThread().interrupt(); 379 break; 380 } 381 else { 382 broken_ = true; 383 departures_ = entries_; 384 notifyAll(); 385 throw ex; 386 } 387 } 388 } 389 } 390 391 } 392 393 finally { 394 395 y = slots_[index]; 396 397 // Last one out cleans up and allows next set of threads in 398 if (--departures_ <= 0) { 399 for (int i = 0; i < slots_.length; ++i) slots_[i] = null; 400 entryGate_.release(entries_); 401 entries_ = 0; 402 } 403 } 404 405 // continue if no IE/TO throw 406 if (broken_) 407 throw new BrokenBarrierException(index); 408 else 409 return y; 410 } 411 } 412 413 } 414 415 416 |
|||
Java API By Example, From Geeks To Geeks. |
Conditions of Use |
About Us
© 2002 - 2005, KickJava.com, or its affiliates
|