1 14 15 package EDU.oswego.cs.dl.util.concurrent; 16 17 86 public class CyclicBarrier implements Barrier { 87 88 protected final int parties_; 89 protected boolean broken_ = false; 90 protected Runnable barrierCommand_ = null; 91 protected int count_; protected int resets_ = 0; 94 99 100 public CyclicBarrier(int parties) { this(parties, null); } 101 102 107 108 public CyclicBarrier(int parties, Runnable command) { 109 if (parties <= 0) throw new IllegalArgumentException (); 110 parties_ = parties; 111 count_ = parties; 112 barrierCommand_ = command; 113 } 114 115 123 124 public synchronized Runnable setBarrierCommand(Runnable command) { 125 Runnable old = barrierCommand_; 126 barrierCommand_ = command; 127 return old; 128 } 129 130 public synchronized boolean broken() { return broken_; } 131 132 141 142 public synchronized void restart() { 143 broken_ = false; 144 ++resets_; 145 count_ = parties_; 146 notifyAll(); 147 } 148 149 150 public int parties() { return parties_; } 151 152 176 177 public int barrier() throws InterruptedException , BrokenBarrierException { 178 return doBarrier(false, 0); 179 } 180 181 208 209 public int attemptBarrier(long msecs) 210 throws InterruptedException , TimeoutException, BrokenBarrierException { 211 return doBarrier(true, msecs); 212 } 213 214 protected synchronized int doBarrier(boolean timed, long msecs) 215 throws InterruptedException , TimeoutException, BrokenBarrierException { 216 217 int index = --count_; 218 219 if (broken_) { 220 throw new BrokenBarrierException(index); 221 } 222 else if (Thread.interrupted()) { 223 broken_ = true; 224 notifyAll(); 225 throw new InterruptedException (); 226 } 227 else if (index == 0) { count_ = parties_; 229 ++resets_; 230 notifyAll(); 231 try { 232 if (barrierCommand_ != null) 233 barrierCommand_.run(); 234 return 0; 235 } 236 catch (RuntimeException ex) { 237 broken_ = true; 238 return 0; 239 } 240 } 241 else if (timed && msecs <= 0) { 242 broken_ = true; 243 notifyAll(); 244 throw new TimeoutException(msecs); 245 } 246 else { int r = resets_; 248 long startTime = (timed)? System.currentTimeMillis() : 0; 249 long waitTime = msecs; 250 for (;;) { 251 try { 252 wait(waitTime); 253 } 254 catch (InterruptedException ex) { 255 if (resets_ == r) { 257 broken_ = true; 258 notifyAll(); 259 throw ex; 260 } 261 else { 262 Thread.currentThread().interrupt(); } 264 } 265 266 if (broken_) 267 throw new BrokenBarrierException(index); 268 269 else if (r != resets_) 270 return index; 271 272 else if (timed) { 273 waitTime = msecs - (System.currentTimeMillis() - startTime); 274 if (waitTime <= 0) { 275 broken_ = true; 276 notifyAll(); 277 throw new TimeoutException(msecs); 278 } 279 } 280 } 281 } 282 } 283 284 } 285 | Popular Tags |