1 7 8 package java.util.concurrent; 9 import java.util.concurrent.atomic.*; 10 import java.util.concurrent.locks.LockSupport ; 11 12 67 public class Exchanger<V> { 68 173 174 175 private static final int NCPU = Runtime.getRuntime().availableProcessors(); 176 177 188 private static final int CAPACITY = 32; 189 190 195 private static final int FULL = 196 Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1); 197 198 211 private static final int SPINS = (NCPU == 1) ? 0 : 2000; 212 213 220 private static final int TIMED_SPINS = SPINS / 20; 221 222 228 private static final Object CANCEL = new Object (); 229 230 235 private static final Object NULL_ITEM = new Object (); 236 237 244 private static final class Node extends AtomicReference<Object > { 245 246 public final Object item; 247 248 249 public volatile Thread waiter; 250 251 255 public Node(Object item) { 256 this.item = item; 257 } 258 } 259 260 268 private static final class Slot extends AtomicReference<Object > { 269 long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; 271 } 272 273 277 private volatile Slot[] arena = new Slot[CAPACITY]; 278 279 286 private final AtomicInteger max = new AtomicInteger(); 287 288 299 private Object doExchange(Object item, boolean timed, long nanos) { 300 Node me = new Node(item); int index = hashIndex(); int fails = 0; 304 for (;;) { 305 Object y; Slot slot = arena[index]; 307 if (slot == null) createSlot(index); else if ((y = slot.get()) != null && slot.compareAndSet(y, null)) { 311 Node you = (Node)y; if (you.compareAndSet(null, item)) { 313 LockSupport.unpark(you.waiter); 314 return you.item; 315 } } 317 else if (y == null && slot.compareAndSet(null, me)) { 319 if (index == 0) return timed? awaitNanos(me, slot, nanos): await(me, slot); 321 Object v = spinWait(me, slot); if (v != CANCEL) 323 return v; 324 me = new Node(item); int m = max.get(); 326 if (m > (index >>>= 1)) max.compareAndSet(m, m - 1); } 329 else if (++fails > 1) { int m = max.get(); 331 if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) 332 index = m + 1; else if (--index < 0) 334 index = m; } 336 } 337 } 338 339 361 private final int hashIndex() { 362 long id = Thread.currentThread().getId(); 363 int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193; 364 365 int m = max.get(); 366 int nbits = (((0xfffffc00 >> m) & 4) | ((0x000001f8 >>> m) & 2) | ((0xffff00f2 >>> m) & 1)); int index; 370 while ((index = hash & ((1 << nbits) - 1)) > m) hash = (hash >>> nbits) | (hash << (33 - nbits)); return index; 373 } 374 375 383 private void createSlot(int index) { 384 Slot newSlot = new Slot(); 386 Slot[] a = arena; 387 synchronized (a) { 388 if (a[index] == null) 389 a[index] = newSlot; 390 } 391 } 392 393 402 private static boolean tryCancel(Node node, Slot slot) { 403 if (!node.compareAndSet(null, CANCEL)) 404 return false; 405 if (slot.get() == node) slot.compareAndSet(node, null); 407 return true; 408 } 409 410 413 421 private static Object spinWait(Node node, Slot slot) { 422 int spins = SPINS; 423 for (;;) { 424 Object v = node.get(); 425 if (v != null) 426 return v; 427 else if (spins > 0) 428 --spins; 429 else 430 tryCancel(node, slot); 431 } 432 } 433 434 451 private static Object await(Node node, Slot slot) { 452 Thread w = Thread.currentThread(); 453 int spins = SPINS; 454 for (;;) { 455 Object v = node.get(); 456 if (v != null) 457 return v; 458 else if (spins > 0) --spins; 460 else if (node.waiter == null) node.waiter = w; 462 else if (w.isInterrupted()) tryCancel(node, slot); 464 else LockSupport.park(); 466 } 467 } 468 469 478 private Object awaitNanos(Node node, Slot slot, long nanos) { 479 int spins = TIMED_SPINS; 480 long lastTime = 0; 481 Thread w = null; 482 for (;;) { 483 Object v = node.get(); 484 if (v != null) 485 return v; 486 long now = System.nanoTime(); 487 if (w == null) 488 w = Thread.currentThread(); 489 else 490 nanos -= now - lastTime; 491 lastTime = now; 492 if (nanos > 0) { 493 if (spins > 0) 494 --spins; 495 else if (node.waiter == null) 496 node.waiter = w; 497 else if (w.isInterrupted()) 498 tryCancel(node, slot); 499 else 500 LockSupport.parkNanos(nanos); 501 } 502 else if (tryCancel(node, slot) && !w.isInterrupted()) 503 return scanOnTimeout(node); 504 } 505 } 506 507 521 private Object scanOnTimeout(Node node) { 522 Object y; 523 for (int j = arena.length - 1; j >= 0; --j) { 524 Slot slot = arena[j]; 525 if (slot != null) { 526 while ((y = slot.get()) != null) { 527 if (slot.compareAndSet(y, null)) { 528 Node you = (Node)y; 529 if (you.compareAndSet(null, node.item)) { 530 LockSupport.unpark(you.waiter); 531 return you.item; 532 } 533 } 534 } 535 } 536 } 537 return CANCEL; 538 } 539 540 543 public Exchanger() { 544 } 545 546 577 public V exchange(V x) throws InterruptedException { 578 if (!Thread.interrupted()) { 579 Object v = doExchange(x == null? NULL_ITEM : x, false, 0); 580 if (v == NULL_ITEM) 581 return null; 582 if (v != CANCEL) 583 return (V)v; 584 Thread.interrupted(); } 586 throw new InterruptedException (); 587 } 588 589 633 public V exchange(V x, long timeout, TimeUnit unit) 634 throws InterruptedException , TimeoutException { 635 if (!Thread.interrupted()) { 636 Object v = doExchange(x == null? NULL_ITEM : x, 637 true, unit.toNanos(timeout)); 638 if (v == NULL_ITEM) 639 return null; 640 if (v != CANCEL) 641 return (V)v; 642 if (!Thread.interrupted()) 643 throw new TimeoutException (); 644 } 645 throw new InterruptedException (); 646 } 647 } 648 | Popular Tags |