1 7 8 package java.util.concurrent; 9 import java.util.concurrent.atomic.*; 10 import java.util.concurrent.locks.*; 11 import java.util.*; 12 13 45 public class LinkedBlockingQueue<E> extends AbstractQueue<E> 46 implements BlockingQueue <E>, java.io.Serializable { 47 private static final long serialVersionUID = -6903933977591709194L; 48 49 62 63 66 static class Node<E> { 67 68 volatile E item; 69 Node<E> next; 70 Node(E x) { item = x; } 71 } 72 73 74 private final int capacity; 75 76 77 private final AtomicInteger count = new AtomicInteger(0); 78 79 80 private transient Node<E> head; 81 82 83 private transient Node<E> last; 84 85 86 private final ReentrantLock takeLock = new ReentrantLock(); 87 88 89 private final Condition notEmpty = takeLock.newCondition(); 90 91 92 private final ReentrantLock putLock = new ReentrantLock(); 93 94 95 private final Condition notFull = putLock.newCondition(); 96 97 101 private void signalNotEmpty() { 102 final ReentrantLock takeLock = this.takeLock; 103 takeLock.lock(); 104 try { 105 notEmpty.signal(); 106 } finally { 107 takeLock.unlock(); 108 } 109 } 110 111 114 private void signalNotFull() { 115 final ReentrantLock putLock = this.putLock; 116 putLock.lock(); 117 try { 118 notFull.signal(); 119 } finally { 120 putLock.unlock(); 121 } 122 } 123 124 128 private void insert(E x) { 129 last = last.next = new Node<E>(x); 130 } 131 132 136 private E extract() { 137 Node<E> first = head.next; 138 head = first; 139 E x = first.item; 140 first.item = null; 141 return x; 142 } 143 144 147 private void fullyLock() { 148 putLock.lock(); 149 takeLock.lock(); 150 } 151 152 155 private void fullyUnlock() { 156 takeLock.unlock(); 157 putLock.unlock(); 158 } 159 160 161 165 public LinkedBlockingQueue() { 166 this(Integer.MAX_VALUE); 167 } 168 169 176 public LinkedBlockingQueue(int capacity) { 177 if (capacity <= 0) throw new IllegalArgumentException (); 178 this.capacity = capacity; 179 last = head = new Node<E>(null); 180 } 181 182 191 public LinkedBlockingQueue(Collection<? extends E> c) { 192 this(Integer.MAX_VALUE); 193 for (E e : c) 194 add(e); 195 } 196 197 198 205 public int size() { 206 return count.get(); 207 } 208 209 222 public int remainingCapacity() { 223 return capacity - count.get(); 224 } 225 226 233 public void put(E o) throws InterruptedException { 234 if (o == null) throw new NullPointerException (); 235 int c = -1; 238 final ReentrantLock putLock = this.putLock; 239 final AtomicInteger count = this.count; 240 putLock.lockInterruptibly(); 241 try { 242 251 try { 252 while (count.get() == capacity) 253 notFull.await(); 254 } catch (InterruptedException ie) { 255 notFull.signal(); throw ie; 257 } 258 insert(o); 259 c = count.getAndIncrement(); 260 if (c + 1 < capacity) 261 notFull.signal(); 262 } finally { 263 putLock.unlock(); 264 } 265 if (c == 0) 266 signalNotEmpty(); 267 } 268 269 282 public boolean offer(E o, long timeout, TimeUnit unit) 283 throws InterruptedException { 284 285 if (o == null) throw new NullPointerException (); 286 long nanos = unit.toNanos(timeout); 287 int c = -1; 288 final ReentrantLock putLock = this.putLock; 289 final AtomicInteger count = this.count; 290 putLock.lockInterruptibly(); 291 try { 292 for (;;) { 293 if (count.get() < capacity) { 294 insert(o); 295 c = count.getAndIncrement(); 296 if (c + 1 < capacity) 297 notFull.signal(); 298 break; 299 } 300 if (nanos <= 0) 301 return false; 302 try { 303 nanos = notFull.awaitNanos(nanos); 304 } catch (InterruptedException ie) { 305 notFull.signal(); throw ie; 307 } 308 } 309 } finally { 310 putLock.unlock(); 311 } 312 if (c == 0) 313 signalNotEmpty(); 314 return true; 315 } 316 317 326 public boolean offer(E o) { 327 if (o == null) throw new NullPointerException (); 328 final AtomicInteger count = this.count; 329 if (count.get() == capacity) 330 return false; 331 int c = -1; 332 final ReentrantLock putLock = this.putLock; 333 putLock.lock(); 334 try { 335 if (count.get() < capacity) { 336 insert(o); 337 c = count.getAndIncrement(); 338 if (c + 1 < capacity) 339 notFull.signal(); 340 } 341 } finally { 342 putLock.unlock(); 343 } 344 if (c == 0) 345 signalNotEmpty(); 346 return c >= 0; 347 } 348 349 350 public E take() throws InterruptedException { 351 E x; 352 int c = -1; 353 final AtomicInteger count = this.count; 354 final ReentrantLock takeLock = this.takeLock; 355 takeLock.lockInterruptibly(); 356 try { 357 try { 358 while (count.get() == 0) 359 notEmpty.await(); 360 } catch (InterruptedException ie) { 361 notEmpty.signal(); throw ie; 363 } 364 365 x = extract(); 366 c = count.getAndDecrement(); 367 if (c > 1) 368 notEmpty.signal(); 369 } finally { 370 takeLock.unlock(); 371 } 372 if (c == capacity) 373 signalNotFull(); 374 return x; 375 } 376 377 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 378 E x = null; 379 int c = -1; 380 long nanos = unit.toNanos(timeout); 381 final AtomicInteger count = this.count; 382 final ReentrantLock takeLock = this.takeLock; 383 takeLock.lockInterruptibly(); 384 try { 385 for (;;) { 386 if (count.get() > 0) { 387 x = extract(); 388 c = count.getAndDecrement(); 389 if (c > 1) 390 notEmpty.signal(); 391 break; 392 } 393 if (nanos <= 0) 394 return null; 395 try { 396 nanos = notEmpty.awaitNanos(nanos); 397 } catch (InterruptedException ie) { 398 notEmpty.signal(); throw ie; 400 } 401 } 402 } finally { 403 takeLock.unlock(); 404 } 405 if (c == capacity) 406 signalNotFull(); 407 return x; 408 } 409 410 public E poll() { 411 final AtomicInteger count = this.count; 412 if (count.get() == 0) 413 return null; 414 E x = null; 415 int c = -1; 416 final ReentrantLock takeLock = this.takeLock; 417 takeLock.lock(); 418 try { 419 if (count.get() > 0) { 420 x = extract(); 421 c = count.getAndDecrement(); 422 if (c > 1) 423 notEmpty.signal(); 424 } 425 } finally { 426 takeLock.unlock(); 427 } 428 if (c == capacity) 429 signalNotFull(); 430 return x; 431 } 432 433 434 public E peek() { 435 if (count.get() == 0) 436 return null; 437 final ReentrantLock takeLock = this.takeLock; 438 takeLock.lock(); 439 try { 440 Node<E> first = head.next; 441 if (first == null) 442 return null; 443 else 444 return first.item; 445 } finally { 446 takeLock.unlock(); 447 } 448 } 449 450 454 public boolean remove(Object o) { 455 if (o == null) return false; 456 boolean removed = false; 457 fullyLock(); 458 try { 459 Node<E> trail = head; 460 Node<E> p = head.next; 461 while (p != null) { 462 if (o.equals(p.item)) { 463 removed = true; 464 break; 465 } 466 trail = p; 467 p = p.next; 468 } 469 if (removed) { 470 p.item = null; 471 trail.next = p.next; 472 if (last == p) 473 last = trail; 474 if (count.getAndDecrement() == capacity) 475 notFull.signalAll(); 476 } 477 } finally { 478 fullyUnlock(); 479 } 480 return removed; 481 } 482 483 public Object [] toArray() { 484 fullyLock(); 485 try { 486 int size = count.get(); 487 Object [] a = new Object [size]; 488 int k = 0; 489 for (Node<E> p = head.next; p != null; p = p.next) 490 a[k++] = p.item; 491 return a; 492 } finally { 493 fullyUnlock(); 494 } 495 } 496 497 public <T> T[] toArray(T[] a) { 498 fullyLock(); 499 try { 500 int size = count.get(); 501 if (a.length < size) 502 a = (T[])java.lang.reflect.Array.newInstance 503 (a.getClass().getComponentType(), size); 504 505 int k = 0; 506 for (Node p = head.next; p != null; p = p.next) 507 a[k++] = (T)p.item; 508 if (a.length > k) 509 a[k] = null; 510 return a; 511 } finally { 512 fullyUnlock(); 513 } 514 } 515 516 public String toString() { 517 fullyLock(); 518 try { 519 return super.toString(); 520 } finally { 521 fullyUnlock(); 522 } 523 } 524 525 529 public void clear() { 530 fullyLock(); 531 try { 532 head.next = null; 533 assert head.item == null; 534 last = head; 535 if (count.getAndSet(0) == capacity) 536 notFull.signalAll(); 537 } finally { 538 fullyUnlock(); 539 } 540 } 541 542 public int drainTo(Collection<? super E> c) { 543 if (c == null) 544 throw new NullPointerException (); 545 if (c == this) 546 throw new IllegalArgumentException (); 547 Node first; 548 fullyLock(); 549 try { 550 first = head.next; 551 head.next = null; 552 assert head.item == null; 553 last = head; 554 if (count.getAndSet(0) == capacity) 555 notFull.signalAll(); 556 } finally { 557 fullyUnlock(); 558 } 559 int n = 0; 561 for (Node<E> p = first; p != null; p = p.next) { 562 c.add(p.item); 563 p.item = null; 564 ++n; 565 } 566 return n; 567 } 568 569 public int drainTo(Collection<? super E> c, int maxElements) { 570 if (c == null) 571 throw new NullPointerException (); 572 if (c == this) 573 throw new IllegalArgumentException (); 574 fullyLock(); 575 try { 576 int n = 0; 577 Node<E> p = head.next; 578 while (p != null && n < maxElements) { 579 c.add(p.item); 580 p.item = null; 581 p = p.next; 582 ++n; 583 } 584 if (n != 0) { 585 head.next = p; 586 assert head.item == null; 587 if (p == null) 588 last = head; 589 if (count.getAndAdd(-n) == capacity) 590 notFull.signalAll(); 591 } 592 return n; 593 } finally { 594 fullyUnlock(); 595 } 596 } 597 598 608 public Iterator<E> iterator() { 609 return new Itr(); 610 } 611 612 private class Itr implements Iterator<E> { 613 618 private Node<E> current; 619 private Node<E> lastRet; 620 private E currentElement; 621 622 Itr() { 623 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; 624 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; 625 putLock.lock(); 626 takeLock.lock(); 627 try { 628 current = head.next; 629 if (current != null) 630 currentElement = current.item; 631 } finally { 632 takeLock.unlock(); 633 putLock.unlock(); 634 } 635 } 636 637 public boolean hasNext() { 638 return current != null; 639 } 640 641 public E next() { 642 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; 643 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; 644 putLock.lock(); 645 takeLock.lock(); 646 try { 647 if (current == null) 648 throw new NoSuchElementException(); 649 E x = currentElement; 650 lastRet = current; 651 current = current.next; 652 if (current != null) 653 currentElement = current.item; 654 return x; 655 } finally { 656 takeLock.unlock(); 657 putLock.unlock(); 658 } 659 } 660 661 public void remove() { 662 if (lastRet == null) 663 throw new IllegalStateException (); 664 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; 665 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; 666 putLock.lock(); 667 takeLock.lock(); 668 try { 669 Node<E> node = lastRet; 670 lastRet = null; 671 Node<E> trail = head; 672 Node<E> p = head.next; 673 while (p != null && p != node) { 674 trail = p; 675 p = p.next; 676 } 677 if (p == node) { 678 p.item = null; 679 trail.next = p.next; 680 if (last == p) 681 last = trail; 682 int c = count.getAndDecrement(); 683 if (c == capacity) 684 notFull.signalAll(); 685 } 686 } finally { 687 takeLock.unlock(); 688 putLock.unlock(); 689 } 690 } 691 } 692 693 701 private void writeObject(java.io.ObjectOutputStream s) 702 throws java.io.IOException { 703 704 fullyLock(); 705 try { 706 s.defaultWriteObject(); 708 709 for (Node<E> p = head.next; p != null; p = p.next) 711 s.writeObject(p.item); 712 713 s.writeObject(null); 715 } finally { 716 fullyUnlock(); 717 } 718 } 719 720 725 private void readObject(java.io.ObjectInputStream s) 726 throws java.io.IOException , ClassNotFoundException { 727 s.defaultReadObject(); 729 730 count.set(0); 731 last = head = new Node<E>(null); 732 733 for (;;) { 735 E item = (E)s.readObject(); 736 if (item == null) 737 break; 738 add(item); 739 } 740 } 741 } 742 | Popular Tags |