KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > ArrayBlockingQueue


1 /*
2  * @(#)ArrayBlockingQueue.java 1.9 04/06/14
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
14  * array. This queue orders elements FIFO (first-in-first-out). The
15  * <em>head</em> of the queue is that element that has been on the
16  * queue the longest time. The <em>tail</em> of the queue is that
17  * element that has been on the queue the shortest time. New elements
18  * are inserted at the tail of the queue, and the queue retrieval
19  * operations obtain elements at the head of the queue.
20  *
21  * <p>This is a classic &quot;bounded buffer&quot;, in which a
22  * fixed-sized array holds elements inserted by producers and
23  * extracted by consumers. Once created, the capacity cannot be
24  * increased. Attempts to put an element to a full queue will
25  * result in the put operation blocking; attempts to retrieve an
26  * element from an empty queue will similarly block.
27  *
28  * <p> This class supports an optional fairness policy for ordering
29  * waiting producer and consumer threads. By default, this ordering
30  * is not guaranteed. However, a queue constructed with fairness set
31  * to <tt>true</tt> grants threads access in FIFO order. Fairness
32  * generally decreases throughput but reduces variability and avoids
33  * starvation.
34  *
35  * <p>This class and its iterator implement all of the
36  * <em>optional</em> methods of the {@link Collection} and {@link
37  * Iterator} interfaces.
38  *
39  * <p>This class is a member of the
40  * <a HREF="{@docRoot}/../guide/collections/index.html">
41  * Java Collections Framework</a>.
42  *
43  * @since 1.5
44  * @author Doug Lea
45  * @param <E> the type of elements held in this collection
46  */

47 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
48         implements BlockingQueue JavaDoc<E>, java.io.Serializable JavaDoc {
49
50     /**
51      * Serialization ID. This class relies on default serialization
52      * even for the items array, which is default-serialized, even if
53      * it is empty. Otherwise it could not be declared final, which is
54      * necessary here.
55      */

56     private static final long serialVersionUID = -817911632652898426L;
57
58     /** The queued items */
59     private final E[] items;
60     /** items index for next take, poll or remove */
61     private transient int takeIndex;
62     /** items index for next put, offer, or add. */
63     private transient int putIndex;
64     /** Number of items in the queue */
65     private int count;
66
67     /*
68      * Concurrency control uses the classic two-condition algorithm
69      * found in any textbook.
70      */

71
72     /** Main lock guarding all access */
73     private final ReentrantLock lock;
74     /** Condition for waiting takes */
75     private final Condition notEmpty;
76     /** Condition for waiting puts */
77     private final Condition notFull;
78
79     // Internal helper methods
80

81     /**
82      * Circularly increment i.
83      */

84     final int inc(int i) {
85         return (++i == items.length)? 0 : i;
86     }
87
88     /**
89      * Insert element at current put position, advance, and signal.
90      * Call only when holding lock.
91      */

92     private void insert(E x) {
93         items[putIndex] = x;
94         putIndex = inc(putIndex);
95         ++count;
96         notEmpty.signal();
97     }
98
99     /**
100      * Extract element at current take position, advance, and signal.
101      * Call only when holding lock.
102      */

103     private E extract() {
104         final E[] items = this.items;
105         E x = items[takeIndex];
106         items[takeIndex] = null;
107         takeIndex = inc(takeIndex);
108         --count;
109         notFull.signal();
110         return x;
111     }
112
113     /**
114      * Utility for remove and iterator.remove: Delete item at position i.
115      * Call only when holding lock.
116      */

117     void removeAt(int i) {
118         final E[] items = this.items;
119         // if removing front item, just advance
120
if (i == takeIndex) {
121             items[takeIndex] = null;
122             takeIndex = inc(takeIndex);
123         } else {
124             // slide over all others up through putIndex.
125
for (;;) {
126                 int nexti = inc(i);
127                 if (nexti != putIndex) {
128                     items[i] = items[nexti];
129                     i = nexti;
130                 } else {
131                     items[i] = null;
132                     putIndex = i;
133                     break;
134                 }
135             }
136         }
137         --count;
138         notFull.signal();
139     }
140
141     /**
142      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
143      * capacity and default access policy.
144      * @param capacity the capacity of this queue
145      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
146      */

147     public ArrayBlockingQueue(int capacity) {
148         this(capacity, false);
149     }
150
151     /**
152      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
153      * capacity and the specified access policy.
154      * @param capacity the capacity of this queue
155      * @param fair if <tt>true</tt> then queue accesses for threads blocked
156      * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
157      * the access order is unspecified.
158      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
159      */

160     public ArrayBlockingQueue(int capacity, boolean fair) {
161         if (capacity <= 0)
162             throw new IllegalArgumentException JavaDoc();
163         this.items = (E[]) new Object JavaDoc[capacity];
164         lock = new ReentrantLock(fair);
165         notEmpty = lock.newCondition();
166         notFull = lock.newCondition();
167     }
168
169     /**
170      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
171      * capacity, the specified access policy and initially containing the
172      * elements of the given collection,
173      * added in traversal order of the collection's iterator.
174      * @param capacity the capacity of this queue
175      * @param fair if <tt>true</tt> then queue accesses for threads blocked
176      * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
177      * the access order is unspecified.
178      * @param c the collection of elements to initially contain
179      * @throws IllegalArgumentException if <tt>capacity</tt> is less than
180      * <tt>c.size()</tt>, or less than 1.
181      * @throws NullPointerException if <tt>c</tt> or any element within it
182      * is <tt>null</tt>
183      */

184     public ArrayBlockingQueue(int capacity, boolean fair,
185                               Collection<? extends E> c) {
186         this(capacity, fair);
187         if (capacity < c.size())
188             throw new IllegalArgumentException JavaDoc();
189
190         for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
191             add(it.next());
192     }
193
194     /**
195      * Inserts the specified element at the tail of this queue if possible,
196      * returning immediately if this queue is full.
197      *
198      * @param o the element to add.
199      * @return <tt>true</tt> if it was possible to add the element to
200      * this queue, else <tt>false</tt>
201      * @throws NullPointerException if the specified element is <tt>null</tt>
202      */

203     public boolean offer(E o) {
204         if (o == null) throw new NullPointerException JavaDoc();
205         final ReentrantLock lock = this.lock;
206         lock.lock();
207         try {
208             if (count == items.length)
209                 return false;
210             else {
211                 insert(o);
212                 return true;
213             }
214         } finally {
215             lock.unlock();
216         }
217     }
218
219     /**
220      * Inserts the specified element at the tail of this queue, waiting if
221      * necessary up to the specified wait time for space to become available.
222      * @param o the element to add
223      * @param timeout how long to wait before giving up, in units of
224      * <tt>unit</tt>
225      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
226      * <tt>timeout</tt> parameter
227      * @return <tt>true</tt> if successful, or <tt>false</tt> if
228      * the specified waiting time elapses before space is available.
229      * @throws InterruptedException if interrupted while waiting.
230      * @throws NullPointerException if the specified element is <tt>null</tt>.
231      */

232     public boolean offer(E o, long timeout, TimeUnit JavaDoc unit)
233         throws InterruptedException JavaDoc {
234
235         if (o == null) throw new NullPointerException JavaDoc();
236         final ReentrantLock lock = this.lock;
237         lock.lockInterruptibly();
238         try {
239             long nanos = unit.toNanos(timeout);
240             for (;;) {
241                 if (count != items.length) {
242                     insert(o);
243                     return true;
244                 }
245                 if (nanos <= 0)
246                     return false;
247                 try {
248                     nanos = notFull.awaitNanos(nanos);
249                 } catch (InterruptedException JavaDoc ie) {
250                     notFull.signal(); // propagate to non-interrupted thread
251
throw ie;
252                 }
253             }
254         } finally {
255             lock.unlock();
256         }
257     }
258
259
260     public E poll() {
261         final ReentrantLock lock = this.lock;
262         lock.lock();
263         try {
264             if (count == 0)
265                 return null;
266             E x = extract();
267             return x;
268         } finally {
269             lock.unlock();
270         }
271     }
272
273     public E poll(long timeout, TimeUnit JavaDoc unit) throws InterruptedException JavaDoc {
274         final ReentrantLock lock = this.lock;
275         lock.lockInterruptibly();
276         try {
277             long nanos = unit.toNanos(timeout);
278             for (;;) {
279                 if (count != 0) {
280                     E x = extract();
281                     return x;
282                 }
283                 if (nanos <= 0)
284                     return null;
285                 try {
286                     nanos = notEmpty.awaitNanos(nanos);
287                 } catch (InterruptedException JavaDoc ie) {
288                     notEmpty.signal(); // propagate to non-interrupted thread
289
throw ie;
290                 }
291
292             }
293         } finally {
294             lock.unlock();
295         }
296     }
297
298     /**
299      * Removes a single instance of the specified element from this
300      * queue, if it is present.
301      */

302     public boolean remove(Object JavaDoc o) {
303         if (o == null) return false;
304         final E[] items = this.items;
305         final ReentrantLock lock = this.lock;
306         lock.lock();
307         try {
308             int i = takeIndex;
309             int k = 0;
310             for (;;) {
311                 if (k++ >= count)
312                     return false;
313                 if (o.equals(items[i])) {
314                     removeAt(i);
315                     return true;
316                 }
317                 i = inc(i);
318             }
319
320         } finally {
321             lock.unlock();
322         }
323     }
324
325     public E peek() {
326         final ReentrantLock lock = this.lock;
327         lock.lock();
328         try {
329             return (count == 0) ? null : items[takeIndex];
330         } finally {
331             lock.unlock();
332         }
333     }
334
335     public E take() throws InterruptedException JavaDoc {
336         final ReentrantLock lock = this.lock;
337         lock.lockInterruptibly();
338         try {
339             try {
340                 while (count == 0)
341                     notEmpty.await();
342             } catch (InterruptedException JavaDoc ie) {
343                 notEmpty.signal(); // propagate to non-interrupted thread
344
throw ie;
345             }
346             E x = extract();
347             return x;
348         } finally {
349             lock.unlock();
350         }
351     }
352
353     /**
354      * Adds the specified element to the tail of this queue, waiting if
355      * necessary for space to become available.
356      * @param o the element to add
357      * @throws InterruptedException if interrupted while waiting.
358      * @throws NullPointerException if the specified element is <tt>null</tt>.
359      */

360     public void put(E o) throws InterruptedException JavaDoc {
361         if (o == null) throw new NullPointerException JavaDoc();
362         final E[] items = this.items;
363         final ReentrantLock lock = this.lock;
364         lock.lockInterruptibly();
365         try {
366             try {
367                 while (count == items.length)
368                     notFull.await();
369             } catch (InterruptedException JavaDoc ie) {
370                 notFull.signal(); // propagate to non-interrupted thread
371
throw ie;
372             }
373             insert(o);
374         } finally {
375             lock.unlock();
376         }
377     }
378
379     // this doc comment is overridden to remove the reference to collections
380
// greater in size than Integer.MAX_VALUE
381
/**
382      * Returns the number of elements in this queue.
383      *
384      * @return the number of elements in this queue.
385      */

386     public int size() {
387         final ReentrantLock lock = this.lock;
388         lock.lock();
389         try {
390             return count;
391         } finally {
392             lock.unlock();
393         }
394     }
395
396     // this doc comment is a modified copy of the inherited doc comment,
397
// without the reference to unlimited queues.
398
/**
399      * Returns the number of elements that this queue can ideally (in
400      * the absence of memory or resource constraints) accept without
401      * blocking. This is always equal to the initial capacity of this queue
402      * less the current <tt>size</tt> of this queue.
403      * <p>Note that you <em>cannot</em> always tell if
404      * an attempt to <tt>add</tt> an element will succeed by
405      * inspecting <tt>remainingCapacity</tt> because it may be the
406      * case that a waiting consumer is ready to <tt>take</tt> an
407      * element out of an otherwise full queue.
408      */

409     public int remainingCapacity() {
410         final ReentrantLock lock = this.lock;
411         lock.lock();
412         try {
413             return items.length - count;
414         } finally {
415             lock.unlock();
416         }
417     }
418
419
420     public boolean contains(Object JavaDoc o) {
421         if (o == null) return false;
422         final E[] items = this.items;
423         final ReentrantLock lock = this.lock;
424         lock.lock();
425         try {
426             int i = takeIndex;
427             int k = 0;
428             while (k++ < count) {
429                 if (o.equals(items[i]))
430                     return true;
431                 i = inc(i);
432             }
433             return false;
434         } finally {
435             lock.unlock();
436         }
437     }
438
439     public Object JavaDoc[] toArray() {
440         final E[] items = this.items;
441         final ReentrantLock lock = this.lock;
442         lock.lock();
443         try {
444             Object JavaDoc[] a = new Object JavaDoc[count];
445             int k = 0;
446             int i = takeIndex;
447             while (k < count) {
448                 a[k++] = items[i];
449                 i = inc(i);
450             }
451             return a;
452         } finally {
453             lock.unlock();
454         }
455     }
456
457     public <T> T[] toArray(T[] a) {
458         final E[] items = this.items;
459         final ReentrantLock lock = this.lock;
460         lock.lock();
461         try {
462             if (a.length < count)
463                 a = (T[])java.lang.reflect.Array.newInstance(
464                     a.getClass().getComponentType(),
465                     count
466                     );
467
468             int k = 0;
469             int i = takeIndex;
470             while (k < count) {
471                 a[k++] = (T)items[i];
472                 i = inc(i);
473             }
474             if (a.length > count)
475                 a[count] = null;
476             return a;
477         } finally {
478             lock.unlock();
479         }
480     }
481
482     public String JavaDoc toString() {
483         final ReentrantLock lock = this.lock;
484         lock.lock();
485         try {
486             return super.toString();
487         } finally {
488             lock.unlock();
489         }
490     }
491
492
493     /**
494      * Atomically removes all of the elements from this queue.
495      * The queue will be empty after this call returns.
496      */

497     public void clear() {
498         final E[] items = this.items;
499         final ReentrantLock lock = this.lock;
500         lock.lock();
501         try {
502             int i = takeIndex;
503             int k = count;
504             while (k-- > 0) {
505                 items[i] = null;
506                 i = inc(i);
507             }
508             count = 0;
509             putIndex = 0;
510             takeIndex = 0;
511             notFull.signalAll();
512         } finally {
513             lock.unlock();
514         }
515     }
516
517     public int drainTo(Collection<? super E> c) {
518         if (c == null)
519             throw new NullPointerException JavaDoc();
520         if (c == this)
521             throw new IllegalArgumentException JavaDoc();
522         final E[] items = this.items;
523         final ReentrantLock lock = this.lock;
524         lock.lock();
525         try {
526             int i = takeIndex;
527             int n = 0;
528             int max = count;
529             while (n < max) {
530                 c.add(items[i]);
531                 items[i] = null;
532                 i = inc(i);
533                 ++n;
534             }
535             if (n > 0) {
536                 count = 0;
537                 putIndex = 0;
538                 takeIndex = 0;
539                 notFull.signalAll();
540             }
541             return n;
542         } finally {
543             lock.unlock();
544         }
545     }
546
547
548     public int drainTo(Collection<? super E> c, int maxElements) {
549         if (c == null)
550             throw new NullPointerException JavaDoc();
551         if (c == this)
552             throw new IllegalArgumentException JavaDoc();
553         if (maxElements <= 0)
554             return 0;
555         final E[] items = this.items;
556         final ReentrantLock lock = this.lock;
557         lock.lock();
558         try {
559             int i = takeIndex;
560             int n = 0;
561             int sz = count;
562             int max = (maxElements < count)? maxElements : count;
563             while (n < max) {
564                 c.add(items[i]);
565                 items[i] = null;
566                 i = inc(i);
567                 ++n;
568             }
569             if (n > 0) {
570                 count -= n;
571                 takeIndex = i;
572                 notFull.signalAll();
573             }
574             return n;
575         } finally {
576             lock.unlock();
577         }
578     }
579
580
581     /**
582      * Returns an iterator over the elements in this queue in proper sequence.
583      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
584      * will never throw {@link java.util.ConcurrentModificationException},
585      * and guarantees to traverse elements as they existed upon
586      * construction of the iterator, and may (but is not guaranteed to)
587      * reflect any modifications subsequent to construction.
588      *
589      * @return an iterator over the elements in this queue in proper sequence.
590      */

591     public Iterator<E> iterator() {
592         final ReentrantLock lock = this.lock;
593         lock.lock();
594         try {
595             return new Itr();
596         } finally {
597             lock.unlock();
598         }
599     }
600
601     /**
602      * Iterator for ArrayBlockingQueue
603      */

604     private class Itr implements Iterator<E> {
605         /**
606          * Index of element to be returned by next,
607          * or a negative number if no such.
608          */

609         private int nextIndex;
610
611         /**
612          * nextItem holds on to item fields because once we claim
613          * that an element exists in hasNext(), we must return it in
614          * the following next() call even if it was in the process of
615          * being removed when hasNext() was called.
616          **/

617         private E nextItem;
618
619         /**
620          * Index of element returned by most recent call to next.
621          * Reset to -1 if this element is deleted by a call to remove.
622          */

623         private int lastRet;
624
625         Itr() {
626             lastRet = -1;
627             if (count == 0)
628                 nextIndex = -1;
629             else {
630                 nextIndex = takeIndex;
631                 nextItem = items[takeIndex];
632             }
633         }
634
635         public boolean hasNext() {
636             /*
637              * No sync. We can return true by mistake here
638              * only if this iterator passed across threads,
639              * which we don't support anyway.
640              */

641             return nextIndex >= 0;
642         }
643
644         /**
645          * Check whether nextIndex is valid; if so setting nextItem.
646          * Stops iterator when either hits putIndex or sees null item.
647          */

648         private void checkNext() {
649             if (nextIndex == putIndex) {
650                 nextIndex = -1;
651                 nextItem = null;
652             } else {
653                 nextItem = items[nextIndex];
654                 if (nextItem == null)
655                     nextIndex = -1;
656             }
657         }
658
659         public E next() {
660             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
661             lock.lock();
662             try {
663                 if (nextIndex < 0)
664                     throw new NoSuchElementException();
665                 lastRet = nextIndex;
666                 E x = nextItem;
667                 nextIndex = inc(nextIndex);
668                 checkNext();
669                 return x;
670             } finally {
671                 lock.unlock();
672             }
673         }
674
675         public void remove() {
676             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
677             lock.lock();
678             try {
679                 int i = lastRet;
680                 if (i == -1)
681                     throw new IllegalStateException JavaDoc();
682                 lastRet = -1;
683
684                 int ti = takeIndex;
685                 removeAt(i);
686                 // back up cursor (reset to front if was first element)
687
nextIndex = (i == ti) ? takeIndex : i;
688                 checkNext();
689             } finally {
690                 lock.unlock();
691             }
692         }
693     }
694 }
695
Popular Tags