1 7 8 package java.util.concurrent; 9 import java.util.concurrent.locks.*; 10 import java.util.*; 11 12 47 public class ArrayBlockingQueue<E> extends AbstractQueue<E> 48 implements BlockingQueue <E>, java.io.Serializable { 49 50 56 private static final long serialVersionUID = -817911632652898426L; 57 58 59 private final E[] items; 60 61 private transient int takeIndex; 62 63 private transient int putIndex; 64 65 private int count; 66 67 71 72 73 private final ReentrantLock lock; 74 75 private final Condition notEmpty; 76 77 private final Condition notFull; 78 79 81 84 final int inc(int i) { 85 return (++i == items.length)? 0 : i; 86 } 87 88 92 private void insert(E x) { 93 items[putIndex] = x; 94 putIndex = inc(putIndex); 95 ++count; 96 notEmpty.signal(); 97 } 98 99 103 private E extract() { 104 final E[] items = this.items; 105 E x = items[takeIndex]; 106 items[takeIndex] = null; 107 takeIndex = inc(takeIndex); 108 --count; 109 notFull.signal(); 110 return x; 111 } 112 113 117 void removeAt(int i) { 118 final E[] items = this.items; 119 if (i == takeIndex) { 121 items[takeIndex] = null; 122 takeIndex = inc(takeIndex); 123 } else { 124 for (;;) { 126 int nexti = inc(i); 127 if (nexti != putIndex) { 128 items[i] = items[nexti]; 129 i = nexti; 130 } else { 131 items[i] = null; 132 putIndex = i; 133 break; 134 } 135 } 136 } 137 --count; 138 notFull.signal(); 139 } 140 141 147 public ArrayBlockingQueue(int capacity) { 148 this(capacity, false); 149 } 150 151 160 public ArrayBlockingQueue(int capacity, boolean fair) { 161 if (capacity <= 0) 162 throw new IllegalArgumentException (); 163 this.items = (E[]) new Object [capacity]; 164 lock = new ReentrantLock(fair); 165 notEmpty = lock.newCondition(); 166 notFull = lock.newCondition(); 167 } 168 169 184 public ArrayBlockingQueue(int capacity, boolean fair, 185 Collection<? extends E> c) { 186 this(capacity, fair); 187 if (capacity < c.size()) 188 throw new IllegalArgumentException (); 189 190 for (Iterator<? extends E> it = c.iterator(); it.hasNext();) 191 add(it.next()); 192 } 193 194 203 public boolean offer(E o) { 204 if (o == null) throw new NullPointerException (); 205 final ReentrantLock lock = this.lock; 206 lock.lock(); 207 try { 208 if (count == items.length) 209 return false; 210 else { 211 insert(o); 212 return true; 213 } 214 } finally { 215 lock.unlock(); 216 } 217 } 218 219 232 public boolean offer(E o, long timeout, TimeUnit unit) 233 throws InterruptedException { 234 235 if (o == null) throw new NullPointerException (); 236 final ReentrantLock lock = this.lock; 237 lock.lockInterruptibly(); 238 try { 239 long nanos = unit.toNanos(timeout); 240 for (;;) { 241 if (count != items.length) { 242 insert(o); 243 return true; 244 } 245 if (nanos <= 0) 246 return false; 247 try { 248 nanos = notFull.awaitNanos(nanos); 249 } catch (InterruptedException ie) { 250 notFull.signal(); throw ie; 252 } 253 } 254 } finally { 255 lock.unlock(); 256 } 257 } 258 259 260 public E poll() { 261 final ReentrantLock lock = this.lock; 262 lock.lock(); 263 try { 264 if (count == 0) 265 return null; 266 E x = extract(); 267 return x; 268 } finally { 269 lock.unlock(); 270 } 271 } 272 273 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 274 final ReentrantLock lock = this.lock; 275 lock.lockInterruptibly(); 276 try { 277 long nanos = unit.toNanos(timeout); 278 for (;;) { 279 if (count != 0) { 280 E x = extract(); 281 return x; 282 } 283 if (nanos <= 0) 284 return null; 285 try { 286 nanos = notEmpty.awaitNanos(nanos); 287 } catch (InterruptedException ie) { 288 notEmpty.signal(); throw ie; 290 } 291 292 } 293 } finally { 294 lock.unlock(); 295 } 296 } 297 298 302 public boolean remove(Object o) { 303 if (o == null) return false; 304 final E[] items = this.items; 305 final ReentrantLock lock = this.lock; 306 lock.lock(); 307 try { 308 int i = takeIndex; 309 int k = 0; 310 for (;;) { 311 if (k++ >= count) 312 return false; 313 if (o.equals(items[i])) { 314 removeAt(i); 315 return true; 316 } 317 i = inc(i); 318 } 319 320 } finally { 321 lock.unlock(); 322 } 323 } 324 325 public E peek() { 326 final ReentrantLock lock = this.lock; 327 lock.lock(); 328 try { 329 return (count == 0) ? null : items[takeIndex]; 330 } finally { 331 lock.unlock(); 332 } 333 } 334 335 public E take() throws InterruptedException { 336 final ReentrantLock lock = this.lock; 337 lock.lockInterruptibly(); 338 try { 339 try { 340 while (count == 0) 341 notEmpty.await(); 342 } catch (InterruptedException ie) { 343 notEmpty.signal(); throw ie; 345 } 346 E x = extract(); 347 return x; 348 } finally { 349 lock.unlock(); 350 } 351 } 352 353 360 public void put(E o) throws InterruptedException { 361 if (o == null) throw new NullPointerException (); 362 final E[] items = this.items; 363 final ReentrantLock lock = this.lock; 364 lock.lockInterruptibly(); 365 try { 366 try { 367 while (count == items.length) 368 notFull.await(); 369 } catch (InterruptedException ie) { 370 notFull.signal(); throw ie; 372 } 373 insert(o); 374 } finally { 375 lock.unlock(); 376 } 377 } 378 379 386 public int size() { 387 final ReentrantLock lock = this.lock; 388 lock.lock(); 389 try { 390 return count; 391 } finally { 392 lock.unlock(); 393 } 394 } 395 396 409 public int remainingCapacity() { 410 final ReentrantLock lock = this.lock; 411 lock.lock(); 412 try { 413 return items.length - count; 414 } finally { 415 lock.unlock(); 416 } 417 } 418 419 420 public boolean contains(Object o) { 421 if (o == null) return false; 422 final E[] items = this.items; 423 final ReentrantLock lock = this.lock; 424 lock.lock(); 425 try { 426 int i = takeIndex; 427 int k = 0; 428 while (k++ < count) { 429 if (o.equals(items[i])) 430 return true; 431 i = inc(i); 432 } 433 return false; 434 } finally { 435 lock.unlock(); 436 } 437 } 438 439 public Object [] toArray() { 440 final E[] items = this.items; 441 final ReentrantLock lock = this.lock; 442 lock.lock(); 443 try { 444 Object [] a = new Object [count]; 445 int k = 0; 446 int i = takeIndex; 447 while (k < count) { 448 a[k++] = items[i]; 449 i = inc(i); 450 } 451 return a; 452 } finally { 453 lock.unlock(); 454 } 455 } 456 457 public <T> T[] toArray(T[] a) { 458 final E[] items = this.items; 459 final ReentrantLock lock = this.lock; 460 lock.lock(); 461 try { 462 if (a.length < count) 463 a = (T[])java.lang.reflect.Array.newInstance( 464 a.getClass().getComponentType(), 465 count 466 ); 467 468 int k = 0; 469 int i = takeIndex; 470 while (k < count) { 471 a[k++] = (T)items[i]; 472 i = inc(i); 473 } 474 if (a.length > count) 475 a[count] = null; 476 return a; 477 } finally { 478 lock.unlock(); 479 } 480 } 481 482 public String toString() { 483 final ReentrantLock lock = this.lock; 484 lock.lock(); 485 try { 486 return super.toString(); 487 } finally { 488 lock.unlock(); 489 } 490 } 491 492 493 497 public void clear() { 498 final E[] items = this.items; 499 final ReentrantLock lock = this.lock; 500 lock.lock(); 501 try { 502 int i = takeIndex; 503 int k = count; 504 while (k-- > 0) { 505 items[i] = null; 506 i = inc(i); 507 } 508 count = 0; 509 putIndex = 0; 510 takeIndex = 0; 511 notFull.signalAll(); 512 } finally { 513 lock.unlock(); 514 } 515 } 516 517 public int drainTo(Collection<? super E> c) { 518 if (c == null) 519 throw new NullPointerException (); 520 if (c == this) 521 throw new IllegalArgumentException (); 522 final E[] items = this.items; 523 final ReentrantLock lock = this.lock; 524 lock.lock(); 525 try { 526 int i = takeIndex; 527 int n = 0; 528 int max = count; 529 while (n < max) { 530 c.add(items[i]); 531 items[i] = null; 532 i = inc(i); 533 ++n; 534 } 535 if (n > 0) { 536 count = 0; 537 putIndex = 0; 538 takeIndex = 0; 539 notFull.signalAll(); 540 } 541 return n; 542 } finally { 543 lock.unlock(); 544 } 545 } 546 547 548 public int drainTo(Collection<? super E> c, int maxElements) { 549 if (c == null) 550 throw new NullPointerException (); 551 if (c == this) 552 throw new IllegalArgumentException (); 553 if (maxElements <= 0) 554 return 0; 555 final E[] items = this.items; 556 final ReentrantLock lock = this.lock; 557 lock.lock(); 558 try { 559 int i = takeIndex; 560 int n = 0; 561 int sz = count; 562 int max = (maxElements < count)? maxElements : count; 563 while (n < max) { 564 c.add(items[i]); 565 items[i] = null; 566 i = inc(i); 567 ++n; 568 } 569 if (n > 0) { 570 count -= n; 571 takeIndex = i; 572 notFull.signalAll(); 573 } 574 return n; 575 } finally { 576 lock.unlock(); 577 } 578 } 579 580 581 591 public Iterator<E> iterator() { 592 final ReentrantLock lock = this.lock; 593 lock.lock(); 594 try { 595 return new Itr(); 596 } finally { 597 lock.unlock(); 598 } 599 } 600 601 604 private class Itr implements Iterator<E> { 605 609 private int nextIndex; 610 611 617 private E nextItem; 618 619 623 private int lastRet; 624 625 Itr() { 626 lastRet = -1; 627 if (count == 0) 628 nextIndex = -1; 629 else { 630 nextIndex = takeIndex; 631 nextItem = items[takeIndex]; 632 } 633 } 634 635 public boolean hasNext() { 636 641 return nextIndex >= 0; 642 } 643 644 648 private void checkNext() { 649 if (nextIndex == putIndex) { 650 nextIndex = -1; 651 nextItem = null; 652 } else { 653 nextItem = items[nextIndex]; 654 if (nextItem == null) 655 nextIndex = -1; 656 } 657 } 658 659 public E next() { 660 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 661 lock.lock(); 662 try { 663 if (nextIndex < 0) 664 throw new NoSuchElementException(); 665 lastRet = nextIndex; 666 E x = nextItem; 667 nextIndex = inc(nextIndex); 668 checkNext(); 669 return x; 670 } finally { 671 lock.unlock(); 672 } 673 } 674 675 public void remove() { 676 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 677 lock.lock(); 678 try { 679 int i = lastRet; 680 if (i == -1) 681 throw new IllegalStateException (); 682 lastRet = -1; 683 684 int ti = takeIndex; 685 removeAt(i); 686 nextIndex = (i == ti) ? takeIndex : i; 688 checkNext(); 689 } finally { 690 lock.unlock(); 691 } 692 } 693 } 694 } 695 | Popular Tags |