| ||||
|
Code - Class EDU.oswego.cs.dl.util.concurrent.CyclicBarrier1 /* 2 File: CyclicBarrier.java 3 4 Originally written by Doug Lea and released into the public domain. 5 This may be used for any purposes whatsoever without acknowledgment. 6 Thanks for the assistance and support of Sun Microsystems Labs, 7 and everyone contributing, testing, and using this code. 8 9 History: 10 Date Who What 11 11Jul1998 dl Create public version 12 28Aug1998 dl minor code simplification 13 */ 14 15 package EDU.oswego.cs.dl.util.concurrent; 16 17 /** 18 * A cyclic barrier is a reasonable choice for a barrier in contexts 19 * involving a fixed sized group of threads that 20 * must occasionally wait for each other. 21 * (A Rendezvous better handles applications in which 22 * any number of threads meet, n-at-a-time.) 23 * <p> 24 * CyclicBarriers use an all-or-none breakage model 25 * for failed synchronization attempts: If threads 26 * leave a barrier point prematurely because of timeout 27 * or interruption, others will also leave abnormally 28 * (via BrokenBarrierException), until 29 * the barrier is <code>restart</code>ed. This is usually 30 * the simplest and best strategy for sharing knowledge 31 * about failures among cooperating threads in the most 32 * common usages contexts of Barriers. 33 * This implementation has the property that interruptions 34 * among newly arriving threads can cause as-yet-unresumed 35 * threads from a previous barrier cycle to return out 36 * as broken. This transmits breakage 37 * as early as possible, but with the possible byproduct that 38 * only some threads returning out of a barrier will realize 39 * that it is newly broken. (Others will not realize this until a 40 * future cycle.) (The Rendezvous class has a more uniform, but 41 * sometimes less desirable policy.) 42 * <p> 43 * Barriers support an optional Runnable command 44 * that is run once per barrier point. 45 * <p> 46 * <b>Sample usage</b> Here is a code sketch of 47 * a barrier in a parallel decomposition design. 48 * <pre> 49 * class Solver { 50 * final int N; 51 * final float[][] data; 52 * final CyclicBarrier barrier; 53 * 54 * class Worker implements Runnable { 55 * int myRow; 56 * Worker(int row) { myRow = row; } 57 * public void run() { 58 * while (!done()) { 59 * processRow(myRow); 60 * 61 * try { 62 * barrier.barrier(); 63 * } 64 * catch (InterruptedException ex) { return; } 65 * catch (BrokenBarrierException ex) { return; } 66 * } 67 * } 68 * } 69 * 70 * public Solver(float[][] matrix) { 71 * data = matrix; 72 * N = matrix.length; 73 * barrier = new CyclicBarrier(N); 74 * barrier.setBarrierCommand(new Runnable() { 75 * public void run() { mergeRows(...); } 76 * }); 77 * for (int i = 0; i < N; ++i) { 78 * new Thread(new Worker(i)).start(); 79 * waitUntilDone(); 80 * } 81 * } 82 * </pre> 83 * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] 84 85 **/ 86 public class CyclicBarrier implements Barrier { 87 88 protected final int parties_; 89 protected boolean broken_ = false; 90 protected Runnable barrierCommand_ = null; 91 protected int count_; // number of parties still waiting 92 protected int resets_ = 0; // incremented on each release 93 94 /** 95 * Create a CyclicBarrier for the indicated number of parties, 96 * and no command to run at each barrier. 97 * @exception IllegalArgumentException if parties less than or equal to zero. 98 **/ 99 100 public CyclicBarrier(int parties) { this(parties, null); } 101 102 /** 103 * Create a CyclicBarrier for the indicated number of parties. 104 * and the given command to run at each barrier point. 105 * @exception IllegalArgumentException if parties less than or equal to zero. 106 **/ 107 108 public CyclicBarrier(int parties, Runnable command) { 109 if (parties <= 0) throw new IllegalArgumentException(); 110 parties_ = parties; 111 count_ = parties; 112 barrierCommand_ = command; 113 } 114 115 /** 116 * Set the command to run at the point at which all threads reach the 117 * barrier. This command is run exactly once, by the thread 118 * that trips the barrier. The command is not run if the barrier is 119 * broken. 120 * @param command the command to run. If null, no command is run. 121 * @return the previous command 122 **/ 123 124 public synchronized Runnable setBarrierCommand(Runnable command) { 125 Runnable old = barrierCommand_; 126 barrierCommand_ = command; 127 return old; 128 } 129 130 public synchronized boolean broken() { return broken_; } 131 132 /** 133 * Reset to initial state. Clears both the broken status 134 * and any record of waiting threads, and releases all 135 * currently waiting threads with indeterminate return status. 136 * This method is intended only for use in recovery actions 137 * in which it is somehow known 138 * that no thread could possibly be relying on the 139 * the synchronization properties of this barrier. 140 **/ 141 142 public synchronized void restart() { 143 broken_ = false; 144 ++resets_; 145 count_ = parties_; 146 notifyAll(); 147 } 148 149 150 public int parties() { return parties_; } 151 152 /** 153 * Enter barrier and wait for the other parties()-1 threads. 154 * @return the arrival index: the number of other parties 155 * that were still waiting 156 * upon entry. This is a unique value from zero to parties()-1. 157 * If it is zero, then the current 158 * thread was the last party to hit barrier point 159 * and so was responsible for releasing the others. 160 * @exception BrokenBarrierException if any other thread 161 * in any previous or current barrier 162 * since either creation or the last <code>restart</code> 163 * operation left the barrier 164 * prematurely due to interruption or time-out. (If so, 165 * the <code>broken</code> status is also set.) 166 * Threads that are noticied to have been 167 * interrupted <em>after</em> being released are not considered 168 * to have broken the barrier. 169 * In all cases, the interruption 170 * status of the current thread is preserved, so can be tested 171 * by checking <code>Thread.interrupted</code>. 172 * @exception InterruptedException if this thread was interrupted 173 * during the barrier, and was the one causing breakage. 174 * If so, <code>broken</code> status is also set. 175 **/ 176 177 public int barrier() throws InterruptedException, BrokenBarrierException { 178 return doBarrier(false, 0); 179 } 180 181 /** 182 * Enter barrier and wait at most msecs for the other parties()-1 threads. 183 * @return if not timed out, the arrival index: the number of other parties 184 * that were still waiting 185 * upon entry. This is a unique value from zero to parties()-1. 186 * If it is zero, then the current 187 * thread was the last party to hit barrier point 188 * and so was responsible for releasing the others. 189 * @exception BrokenBarrierException 190 * if any other thread 191 * in any previous or current barrier 192 * since either creation or the last <code>restart</code> 193 * operation left the barrier 194 * prematurely due to interruption or time-out. (If so, 195 * the <code>broken</code> status is also set.) 196 * Threads that are noticed to have been 197 * interrupted <em>after</em> being released are not considered 198 * to have broken the barrier. 199 * In all cases, the interruption 200 * status of the current thread is preserved, so can be tested 201 * by checking <code>Thread.interrupted</code>. 202 * @exception InterruptedException if this thread was interrupted 203 * during the barrier. If so, <code>broken</code> status is also set. 204 * @exception TimeoutException if this thread timed out waiting for 205 * the barrier. If the timeout occured while already in the 206 * barrier, <code>broken</code> status is also set. 207 **/ 208 209 public int attemptBarrier(long msecs) 210 throws InterruptedException, TimeoutException, BrokenBarrierException { 211 return doBarrier(true, msecs); 212 } 213 214 protected synchronized int doBarrier(boolean timed, long msecs) 215 throws InterruptedException, TimeoutException, BrokenBarrierException { 216 217 int index = --count_; 218 219 if (broken_) { 220 throw new BrokenBarrierException(index); 221 } 222 else if (Thread.interrupted()) { 223 broken_ = true; 224 notifyAll(); 225 throw new InterruptedException(); 226 } 227 else if (index == 0) { // tripped 228 count_ = parties_; 229 ++resets_; 230 notifyAll(); 231 try { 232 if (barrierCommand_ != null) 233 barrierCommand_.run(); 234 return 0; 235 } 236 catch (RuntimeException ex) { 237 broken_ = true; 238 return 0; 239 } 240 } 241 else if (timed && msecs <= 0) { 242 broken_ = true; 243 notifyAll(); 244 throw new TimeoutException(msecs); 245 } 246 else { // wait until next reset 247 int r = resets_; 248 long startTime = (timed)? System.currentTimeMillis() : 0; 249 long waitTime = msecs; 250 for (;;) { 251 try { 252 wait(waitTime); 253 } 254 catch (InterruptedException ex) { 255 // Only claim that broken if interrupted before reset 256 if (resets_ == r) { 257 broken_ = true; 258 notifyAll(); 259 throw ex; 260 } 261 else { 262 Thread.currentThread().interrupt(); // propagate 263 } 264 } 265 266 if (broken_) 267 throw new BrokenBarrierException(index); 268 269 else if (r != resets_) 270 return index; 271 272 else if (timed) { 273 waitTime = msecs - (System.currentTimeMillis() - startTime); 274 if (waitTime <= 0) { 275 broken_ = true; 276 notifyAll(); 277 throw new TimeoutException(msecs); 278 } 279 } 280 } 281 } 282 } 283 284 } 285 |
|||
Java API By Example, From Geeks To Geeks. |
Conditions of Use |
About Us
© 2002 - 2005, KickJava.com, or its affiliates
|