1 14 15 package EDU.oswego.cs.dl.util.concurrent; 16 17 109 110 public class Rendezvous implements Barrier { 111 112 115 public interface RendezvousFunction { 116 131 public void rendezvousFunction(Object [] objects); 132 } 133 134 139 public static class Rotator implements RendezvousFunction { 140 141 public void rendezvousFunction(Object [] objects) { 142 int lastIdx = objects.length - 1; 143 Object first = objects[0]; 144 for (int i = 0; i < lastIdx; ++i) objects[i] = objects[i+1]; 145 objects[lastIdx] = first; 146 } 147 } 148 149 150 protected final int parties_; 151 152 153 protected boolean broken_ = false; 154 155 158 protected int entries_ = 0; 159 160 163 protected long departures_ = 0; 164 165 168 protected final Semaphore entryGate_; 169 170 173 protected final Object [] slots_; 174 175 178 179 protected RendezvousFunction rendezvousFunction_; 180 181 186 187 public Rendezvous(int parties) { 188 this(parties, new Rotator()); 189 } 190 191 196 197 public Rendezvous(int parties, RendezvousFunction function) { 198 if (parties <= 0) throw new IllegalArgumentException (); 199 parties_ = parties; 200 rendezvousFunction_ = function; 201 entryGate_ = new WaiterPreferenceSemaphore(parties); 202 slots_ = new Object [parties]; 203 } 204 205 213 214 215 public synchronized RendezvousFunction setRendezvousFunction(RendezvousFunction function) { 216 RendezvousFunction old = rendezvousFunction_; 217 rendezvousFunction_ = function; 218 return old; 219 } 220 221 public int parties() { return parties_; } 222 223 public synchronized boolean broken() { return broken_; } 224 225 234 235 public void restart() { 236 for (;;) { 238 synchronized(this) { 239 if (entries_ != 0) { 240 notifyAll(); 241 } 242 else { 243 broken_ = false; 244 return; 245 } 246 } 247 Thread.yield(); 248 } 249 } 250 251 252 276 277 278 public Object rendezvous(Object x) throws InterruptedException , BrokenBarrierException { 279 return doRendezvous(x, false, 0); 280 } 281 282 310 311 312 public Object attemptRendezvous(Object x, long msecs) 313 throws InterruptedException , TimeoutException, BrokenBarrierException { 314 return doRendezvous(x, true, msecs); 315 } 316 317 protected Object doRendezvous(Object x, boolean timed, long msecs) 318 throws InterruptedException , TimeoutException, BrokenBarrierException { 319 320 322 long startTime; 323 324 if (timed) { 325 startTime = System.currentTimeMillis(); 326 if (!entryGate_.attempt(msecs)) { 327 throw new TimeoutException(msecs); 328 } 329 } 330 else { 331 startTime = 0; 332 entryGate_.acquire(); 333 } 334 335 synchronized(this) { 336 337 Object y = null; 338 339 int index = entries_++; 340 slots_[index] = x; 341 342 try { 343 if (entries_ == parties_) { 345 346 departures_ = entries_; 347 notifyAll(); 348 349 try { 350 if (!broken_ && rendezvousFunction_ != null) 351 rendezvousFunction_.rendezvousFunction(slots_); 352 } 353 catch (RuntimeException ex) { 354 broken_ = true; 355 } 356 357 } 358 359 else { 360 361 while (!broken_ && departures_ < 1) { 362 long timeLeft = 0; 363 if (timed) { 364 timeLeft = msecs - (System.currentTimeMillis() - startTime); 365 if (timeLeft <= 0) { 366 broken_ = true; 367 departures_ = entries_; 368 notifyAll(); 369 throw new TimeoutException(msecs); 370 } 371 } 372 373 try { 374 wait(timeLeft); 375 } 376 catch (InterruptedException ex) { 377 if (broken_ || departures_ > 0) { Thread.currentThread().interrupt(); 379 break; 380 } 381 else { 382 broken_ = true; 383 departures_ = entries_; 384 notifyAll(); 385 throw ex; 386 } 387 } 388 } 389 } 390 391 } 392 393 finally { 394 395 y = slots_[index]; 396 397 if (--departures_ <= 0) { 399 for (int i = 0; i < slots_.length; ++i) slots_[i] = null; 400 entryGate_.release(entries_); 401 entries_ = 0; 402 } 403 } 404 405 if (broken_) 407 throw new BrokenBarrierException(index); 408 else 409 return y; 410 } 411 } 412 413 } 414 415 416 | Popular Tags |