1 7 8 package java.util.concurrent; 9 import java.util.concurrent.locks.*; 10 import java.util.*; 11 12 53 public class SynchronousQueue<E> extends AbstractQueue<E> 54 implements BlockingQueue <E>, java.io.Serializable { 55 private static final long serialVersionUID = -3223113410248163686L; 56 57 81 82 83 private final ReentrantLock qlock; 84 85 private final WaitQueue waitingProducers; 86 87 private final WaitQueue waitingConsumers; 88 89 92 public SynchronousQueue() { 93 this(false); 94 } 95 96 101 public SynchronousQueue(boolean fair) { 102 if (fair) { 103 qlock = new ReentrantLock(true); 104 waitingProducers = new FifoWaitQueue(); 105 waitingConsumers = new FifoWaitQueue(); 106 } 107 else { 108 qlock = new ReentrantLock(); 109 waitingProducers = new LifoWaitQueue(); 110 waitingConsumers = new LifoWaitQueue(); 111 } 112 } 113 114 119 static abstract class WaitQueue implements java.io.Serializable { 120 121 abstract Node enq(Object x); 122 123 abstract Node deq(); 124 } 125 126 129 static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable { 130 private static final long serialVersionUID = -3623113410248163686L; 131 private transient Node head; 132 private transient Node last; 133 134 Node enq(Object x) { 135 Node p = new Node(x); 136 if (last == null) 137 last = head = p; 138 else 139 last = last.next = p; 140 return p; 141 } 142 143 Node deq() { 144 Node p = head; 145 if (p != null) { 146 if ((head = p.next) == null) 147 last = null; 148 p.next = null; 149 } 150 return p; 151 } 152 } 153 154 157 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable { 158 private static final long serialVersionUID = -3633113410248163686L; 159 private transient Node head; 160 161 Node enq(Object x) { 162 return head = new Node(x, head); 163 } 164 165 Node deq() { 166 Node p = head; 167 if (p != null) { 168 head = p.next; 169 p.next = null; 170 } 171 return p; 172 } 173 } 174 175 181 static final class Node extends AbstractQueuedSynchronizer { 182 183 private static final int ACK = 1; 184 185 private static final int CANCEL = -1; 186 187 188 Object item; 189 190 Node next; 191 192 193 Node(Object x) { item = x; } 194 195 196 Node(Object x, Node n) { item = x; next = n; } 197 198 201 protected boolean tryAcquire(int ignore) { 202 return getState() != 0; 203 } 204 205 208 protected boolean tryRelease(int newState) { 209 return compareAndSetState(0, newState); 210 } 211 212 215 private Object extract() { 216 Object x = item; 217 item = null; 218 return x; 219 } 220 221 225 private void checkCancellationOnInterrupt(InterruptedException ie) 226 throws InterruptedException { 227 if (release(CANCEL)) 228 throw ie; 229 Thread.currentThread().interrupt(); 230 } 231 232 236 boolean setItem(Object x) { 237 item = x; return release(ACK); 239 } 240 241 245 Object getItem() { 246 return (release(ACK))? extract() : null; 247 } 248 249 252 void waitForTake() throws InterruptedException { 253 try { 254 acquireInterruptibly(0); 255 } catch (InterruptedException ie) { 256 checkCancellationOnInterrupt(ie); 257 } 258 } 259 260 263 Object waitForPut() throws InterruptedException { 264 try { 265 acquireInterruptibly(0); 266 } catch (InterruptedException ie) { 267 checkCancellationOnInterrupt(ie); 268 } 269 return extract(); 270 } 271 272 275 boolean waitForTake(long nanos) throws InterruptedException { 276 try { 277 if (!tryAcquireNanos(0, nanos) && 278 release(CANCEL)) 279 return false; 280 } catch (InterruptedException ie) { 281 checkCancellationOnInterrupt(ie); 282 } 283 return true; 284 } 285 286 289 Object waitForPut(long nanos) throws InterruptedException { 290 try { 291 if (!tryAcquireNanos(0, nanos) && 292 release(CANCEL)) 293 return null; 294 } catch (InterruptedException ie) { 295 checkCancellationOnInterrupt(ie); 296 } 297 return extract(); 298 } 299 } 300 301 308 public void put(E o) throws InterruptedException { 309 if (o == null) throw new NullPointerException (); 310 final ReentrantLock qlock = this.qlock; 311 312 for (;;) { 313 Node node; 314 boolean mustWait; 315 if (Thread.interrupted()) throw new InterruptedException (); 316 qlock.lock(); 317 try { 318 node = waitingConsumers.deq(); 319 if ( (mustWait = (node == null)) ) 320 node = waitingProducers.enq(o); 321 } finally { 322 qlock.unlock(); 323 } 324 325 if (mustWait) { 326 node.waitForTake(); 327 return; 328 } 329 330 else if (node.setItem(o)) 331 return; 332 333 } 335 } 336 337 350 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { 351 if (o == null) throw new NullPointerException (); 352 long nanos = unit.toNanos(timeout); 353 final ReentrantLock qlock = this.qlock; 354 for (;;) { 355 Node node; 356 boolean mustWait; 357 if (Thread.interrupted()) throw new InterruptedException (); 358 qlock.lock(); 359 try { 360 node = waitingConsumers.deq(); 361 if ( (mustWait = (node == null)) ) 362 node = waitingProducers.enq(o); 363 } finally { 364 qlock.unlock(); 365 } 366 367 if (mustWait) 368 return node.waitForTake(nanos); 369 370 else if (node.setItem(o)) 371 return true; 372 373 } 375 } 376 377 383 public E take() throws InterruptedException { 384 final ReentrantLock qlock = this.qlock; 385 for (;;) { 386 Node node; 387 boolean mustWait; 388 389 if (Thread.interrupted()) throw new InterruptedException (); 390 qlock.lock(); 391 try { 392 node = waitingProducers.deq(); 393 if ( (mustWait = (node == null)) ) 394 node = waitingConsumers.enq(null); 395 } finally { 396 qlock.unlock(); 397 } 398 399 if (mustWait) { 400 Object x = node.waitForPut(); 401 return (E)x; 402 } 403 else { 404 Object x = node.getItem(); 405 if (x != null) 406 return (E)x; 407 } 409 } 410 } 411 412 424 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 425 long nanos = unit.toNanos(timeout); 426 final ReentrantLock qlock = this.qlock; 427 428 for (;;) { 429 Node node; 430 boolean mustWait; 431 432 if (Thread.interrupted()) throw new InterruptedException (); 433 qlock.lock(); 434 try { 435 node = waitingProducers.deq(); 436 if ( (mustWait = (node == null)) ) 437 node = waitingConsumers.enq(null); 438 } finally { 439 qlock.unlock(); 440 } 441 442 if (mustWait) { 443 Object x = node.waitForPut(nanos); 444 return (E)x; 445 } 446 else { 447 Object x = node.getItem(); 448 if (x != null) 449 return (E)x; 450 } 452 } 453 } 454 455 457 466 public boolean offer(E o) { 467 if (o == null) throw new NullPointerException (); 468 final ReentrantLock qlock = this.qlock; 469 470 for (;;) { 471 Node node; 472 qlock.lock(); 473 try { 474 node = waitingConsumers.deq(); 475 } finally { 476 qlock.unlock(); 477 } 478 if (node == null) 479 return false; 480 481 else if (node.setItem(o)) 482 return true; 483 } 485 } 486 487 494 public E poll() { 495 final ReentrantLock qlock = this.qlock; 496 for (;;) { 497 Node node; 498 qlock.lock(); 499 try { 500 node = waitingProducers.deq(); 501 } finally { 502 qlock.unlock(); 503 } 504 if (node == null) 505 return null; 506 507 else { 508 Object x = node.getItem(); 509 if (x != null) 510 return (E)x; 511 } 513 } 514 } 515 516 521 public boolean isEmpty() { 522 return true; 523 } 524 525 530 public int size() { 531 return 0; 532 } 533 534 539 public int remainingCapacity() { 540 return 0; 541 } 542 543 547 public void clear() {} 548 549 555 public boolean contains(Object o) { 556 return false; 557 } 558 559 566 public boolean remove(Object o) { 567 return false; 568 } 569 570 576 public boolean containsAll(Collection<?> c) { 577 return c.isEmpty(); 578 } 579 580 586 public boolean removeAll(Collection<?> c) { 587 return false; 588 } 589 590 596 public boolean retainAll(Collection<?> c) { 597 return false; 598 } 599 600 606 public E peek() { 607 return null; 608 } 609 610 611 static class EmptyIterator<E> implements Iterator<E> { 612 public boolean hasNext() { 613 return false; 614 } 615 public E next() { 616 throw new NoSuchElementException(); 617 } 618 public void remove() { 619 throw new IllegalStateException (); 620 } 621 } 622 623 629 public Iterator<E> iterator() { 630 return new EmptyIterator<E>(); 631 } 632 633 634 638 public Object [] toArray() { 639 return new Object [0]; 640 } 641 642 648 public <T> T[] toArray(T[] a) { 649 if (a.length > 0) 650 a[0] = null; 651 return a; 652 } 653 654 655 public int drainTo(Collection<? super E> c) { 656 if (c == null) 657 throw new NullPointerException (); 658 if (c == this) 659 throw new IllegalArgumentException (); 660 int n = 0; 661 E e; 662 while ( (e = poll()) != null) { 663 c.add(e); 664 ++n; 665 } 666 return n; 667 } 668 669 public int drainTo(Collection<? super E> c, int maxElements) { 670 if (c == null) 671 throw new NullPointerException (); 672 if (c == this) 673 throw new IllegalArgumentException (); 674 int n = 0; 675 E e; 676 while (n < maxElements && (e = poll()) != null) { 677 c.add(e); 678 ++n; 679 } 680 return n; 681 } 682 } 683 684 685 686 687 688 | Popular Tags |