KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > LinkedBlockingQueue


1 /*
2  * @(#)LinkedBlockingQueue.java 1.11 05/09/02
3  *
4  * Copyright 2005 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent;
9 import java.util.concurrent.atomic.*;
10 import java.util.concurrent.locks.*;
11 import java.util.*;
12
13 /**
14  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
15  * linked nodes.
16  * This queue orders elements FIFO (first-in-first-out).
17  * The <em>head</em> of the queue is that element that has been on the
18  * queue the longest time.
19  * The <em>tail</em> of the queue is that element that has been on the
20  * queue the shortest time. New elements
21  * are inserted at the tail of the queue, and the queue retrieval
22  * operations obtain elements at the head of the queue.
23  * Linked queues typically have higher throughput than array-based queues but
24  * less predictable performance in most concurrent applications.
25  *
26  * <p> The optional capacity bound constructor argument serves as a
27  * way to prevent excessive queue expansion. The capacity, if unspecified,
28  * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
29  * dynamically created upon each insertion unless this would bring the
30  * queue above capacity.
31  *
32  * <p>This class and its iterator implement all of the
33  * <em>optional</em> methods of the {@link Collection} and {@link
34  * Iterator} interfaces.
35  *
36  * <p>This class is a member of the
37  * <a HREF="{@docRoot}/../guide/collections/index.html">
38  * Java Collections Framework</a>.
39  *
40  * @since 1.5
41  * @author Doug Lea
42  * @param <E> the type of elements held in this collection
43  *
44  **/

45 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
46         implements BlockingQueue JavaDoc<E>, java.io.Serializable JavaDoc {
47     private static final long serialVersionUID = -6903933977591709194L;
48
49     /*
50      * A variant of the "two lock queue" algorithm. The putLock gates
51      * entry to put (and offer), and has an associated condition for
52      * waiting puts. Similarly for the takeLock. The "count" field
53      * that they both rely on is maintained as an atomic to avoid
54      * needing to get both locks in most cases. Also, to minimize need
55      * for puts to get takeLock and vice-versa, cascading notifies are
56      * used. When a put notices that it has enabled at least one take,
57      * it signals taker. That taker in turn signals others if more
58      * items have been entered since the signal. And symmetrically for
59      * takes signalling puts. Operations such as remove(Object) and
60      * iterators acquire both locks.
61      */

62
63     /**
64      * Linked list node class
65      */

66     static class Node<E> {
67         /** The item, volatile to ensure barrier separating write and read */
68         volatile E item;
69         Node<E> next;
70         Node(E x) { item = x; }
71     }
72
73     /** The capacity bound, or Integer.MAX_VALUE if none */
74     private final int capacity;
75
76     /** Current number of elements */
77     private final AtomicInteger count = new AtomicInteger(0);
78
79     /** Head of linked list */
80     private transient Node<E> head;
81
82     /** Tail of linked list */
83     private transient Node<E> last;
84
85     /** Lock held by take, poll, etc */
86     private final ReentrantLock takeLock = new ReentrantLock();
87
88     /** Wait queue for waiting takes */
89     private final Condition notEmpty = takeLock.newCondition();
90
91     /** Lock held by put, offer, etc */
92     private final ReentrantLock putLock = new ReentrantLock();
93
94     /** Wait queue for waiting puts */
95     private final Condition notFull = putLock.newCondition();
96
97     /**
98      * Signal a waiting take. Called only from put/offer (which do not
99      * otherwise ordinarily lock takeLock.)
100      */

101     private void signalNotEmpty() {
102         final ReentrantLock takeLock = this.takeLock;
103         takeLock.lock();
104         try {
105             notEmpty.signal();
106         } finally {
107             takeLock.unlock();
108         }
109     }
110
111     /**
112      * Signal a waiting put. Called only from take/poll.
113      */

114     private void signalNotFull() {
115         final ReentrantLock putLock = this.putLock;
116         putLock.lock();
117         try {
118             notFull.signal();
119         } finally {
120             putLock.unlock();
121         }
122     }
123
124     /**
125      * Create a node and link it at end of queue
126      * @param x the item
127      */

128     private void insert(E x) {
129         last = last.next = new Node<E>(x);
130     }
131
132     /**
133      * Remove a node from head of queue,
134      * @return the node
135      */

136     private E extract() {
137         Node<E> first = head.next;
138         head = first;
139         E x = first.item;
140         first.item = null;
141         return x;
142     }
143
144     /**
145      * Lock to prevent both puts and takes.
146      */

147     private void fullyLock() {
148         putLock.lock();
149         takeLock.lock();
150     }
151
152     /**
153      * Unlock to allow both puts and takes.
154      */

155     private void fullyUnlock() {
156         takeLock.unlock();
157         putLock.unlock();
158     }
159
160
161     /**
162      * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
163      * {@link Integer#MAX_VALUE}.
164      */

165     public LinkedBlockingQueue() {
166         this(Integer.MAX_VALUE);
167     }
168
169     /**
170      * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
171      *
172      * @param capacity the capacity of this queue.
173      * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
174      * than zero.
175      */

176     public LinkedBlockingQueue(int capacity) {
177         if (capacity <= 0) throw new IllegalArgumentException JavaDoc();
178         this.capacity = capacity;
179         last = head = new Node<E>(null);
180     }
181
182     /**
183      * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
184      * {@link Integer#MAX_VALUE}, initially containing the elements of the
185      * given collection,
186      * added in traversal order of the collection's iterator.
187      * @param c the collection of elements to initially contain
188      * @throws NullPointerException if <tt>c</tt> or any element within it
189      * is <tt>null</tt>
190      */

191     public LinkedBlockingQueue(Collection<? extends E> c) {
192         this(Integer.MAX_VALUE);
193         for (E e : c)
194             add(e);
195     }
196
197
198     // this doc comment is overridden to remove the reference to collections
199
// greater in size than Integer.MAX_VALUE
200
/**
201      * Returns the number of elements in this queue.
202      *
203      * @return the number of elements in this queue.
204      */

205     public int size() {
206         return count.get();
207     }
208
209     // this doc comment is a modified copy of the inherited doc comment,
210
// without the reference to unlimited queues.
211
/**
212      * Returns the number of elements that this queue can ideally (in
213      * the absence of memory or resource constraints) accept without
214      * blocking. This is always equal to the initial capacity of this queue
215      * less the current <tt>size</tt> of this queue.
216      * <p>Note that you <em>cannot</em> always tell if
217      * an attempt to <tt>add</tt> an element will succeed by
218      * inspecting <tt>remainingCapacity</tt> because it may be the
219      * case that a waiting consumer is ready to <tt>take</tt> an
220      * element out of an otherwise full queue.
221      */

222     public int remainingCapacity() {
223         return capacity - count.get();
224     }
225
226     /**
227      * Adds the specified element to the tail of this queue, waiting if
228      * necessary for space to become available.
229      * @param o the element to add
230      * @throws InterruptedException if interrupted while waiting.
231      * @throws NullPointerException if the specified element is <tt>null</tt>.
232      */

233     public void put(E o) throws InterruptedException JavaDoc {
234         if (o == null) throw new NullPointerException JavaDoc();
235         // Note: convention in all put/take/etc is to preset
236
// local var holding count negative to indicate failure unless set.
237
int c = -1;
238         final ReentrantLock putLock = this.putLock;
239         final AtomicInteger count = this.count;
240         putLock.lockInterruptibly();
241         try {
242             /*
243              * Note that count is used in wait guard even though it is
244              * not protected by lock. This works because count can
245              * only decrease at this point (all other puts are shut
246              * out by lock), and we (or some other waiting put) are
247              * signalled if it ever changes from
248              * capacity. Similarly for all other uses of count in
249              * other wait guards.
250              */

251             try {
252                 while (count.get() == capacity)
253                     notFull.await();
254             } catch (InterruptedException JavaDoc ie) {
255                 notFull.signal(); // propagate to a non-interrupted thread
256
throw ie;
257             }
258             insert(o);
259             c = count.getAndIncrement();
260             if (c + 1 < capacity)
261                 notFull.signal();
262         } finally {
263             putLock.unlock();
264         }
265         if (c == 0)
266             signalNotEmpty();
267     }
268
269     /**
270      * Inserts the specified element at the tail of this queue, waiting if
271      * necessary up to the specified wait time for space to become available.
272      * @param o the element to add
273      * @param timeout how long to wait before giving up, in units of
274      * <tt>unit</tt>
275      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
276      * <tt>timeout</tt> parameter
277      * @return <tt>true</tt> if successful, or <tt>false</tt> if
278      * the specified waiting time elapses before space is available.
279      * @throws InterruptedException if interrupted while waiting.
280      * @throws NullPointerException if the specified element is <tt>null</tt>.
281      */

282     public boolean offer(E o, long timeout, TimeUnit JavaDoc unit)
283         throws InterruptedException JavaDoc {
284
285         if (o == null) throw new NullPointerException JavaDoc();
286         long nanos = unit.toNanos(timeout);
287         int c = -1;
288         final ReentrantLock putLock = this.putLock;
289         final AtomicInteger count = this.count;
290         putLock.lockInterruptibly();
291         try {
292             for (;;) {
293                 if (count.get() < capacity) {
294                     insert(o);
295                     c = count.getAndIncrement();
296                     if (c + 1 < capacity)
297                         notFull.signal();
298                     break;
299                 }
300                 if (nanos <= 0)
301                     return false;
302                 try {
303                     nanos = notFull.awaitNanos(nanos);
304                 } catch (InterruptedException JavaDoc ie) {
305                     notFull.signal(); // propagate to a non-interrupted thread
306
throw ie;
307                 }
308             }
309         } finally {
310             putLock.unlock();
311         }
312         if (c == 0)
313             signalNotEmpty();
314         return true;
315     }
316
317     /**
318      * Inserts the specified element at the tail of this queue if possible,
319      * returning immediately if this queue is full.
320      *
321      * @param o the element to add.
322      * @return <tt>true</tt> if it was possible to add the element to
323      * this queue, else <tt>false</tt>
324      * @throws NullPointerException if the specified element is <tt>null</tt>
325      */

326     public boolean offer(E o) {
327         if (o == null) throw new NullPointerException JavaDoc();
328         final AtomicInteger count = this.count;
329         if (count.get() == capacity)
330             return false;
331         int c = -1;
332         final ReentrantLock putLock = this.putLock;
333         putLock.lock();
334         try {
335             if (count.get() < capacity) {
336                 insert(o);
337                 c = count.getAndIncrement();
338                 if (c + 1 < capacity)
339                     notFull.signal();
340             }
341         } finally {
342             putLock.unlock();
343         }
344         if (c == 0)
345             signalNotEmpty();
346         return c >= 0;
347     }
348
349
350     public E take() throws InterruptedException JavaDoc {
351         E x;
352         int c = -1;
353         final AtomicInteger count = this.count;
354         final ReentrantLock takeLock = this.takeLock;
355         takeLock.lockInterruptibly();
356         try {
357             try {
358                 while (count.get() == 0)
359                     notEmpty.await();
360             } catch (InterruptedException JavaDoc ie) {
361                 notEmpty.signal(); // propagate to a non-interrupted thread
362
throw ie;
363             }
364
365             x = extract();
366             c = count.getAndDecrement();
367             if (c > 1)
368                 notEmpty.signal();
369         } finally {
370             takeLock.unlock();
371         }
372         if (c == capacity)
373             signalNotFull();
374         return x;
375     }
376
377     public E poll(long timeout, TimeUnit JavaDoc unit) throws InterruptedException JavaDoc {
378         E x = null;
379         int c = -1;
380         long nanos = unit.toNanos(timeout);
381         final AtomicInteger count = this.count;
382         final ReentrantLock takeLock = this.takeLock;
383         takeLock.lockInterruptibly();
384         try {
385             for (;;) {
386                 if (count.get() > 0) {
387                     x = extract();
388                     c = count.getAndDecrement();
389                     if (c > 1)
390                         notEmpty.signal();
391                     break;
392                 }
393                 if (nanos <= 0)
394                     return null;
395                 try {
396                     nanos = notEmpty.awaitNanos(nanos);
397                 } catch (InterruptedException JavaDoc ie) {
398                     notEmpty.signal(); // propagate to a non-interrupted thread
399
throw ie;
400                 }
401             }
402         } finally {
403             takeLock.unlock();
404         }
405         if (c == capacity)
406             signalNotFull();
407         return x;
408     }
409
410     public E poll() {
411         final AtomicInteger count = this.count;
412         if (count.get() == 0)
413             return null;
414         E x = null;
415         int c = -1;
416         final ReentrantLock takeLock = this.takeLock;
417         takeLock.lock();
418         try {
419             if (count.get() > 0) {
420                 x = extract();
421                 c = count.getAndDecrement();
422                 if (c > 1)
423                     notEmpty.signal();
424             }
425         } finally {
426             takeLock.unlock();
427         }
428         if (c == capacity)
429             signalNotFull();
430         return x;
431     }
432
433
434     public E peek() {
435         if (count.get() == 0)
436             return null;
437         final ReentrantLock takeLock = this.takeLock;
438         takeLock.lock();
439         try {
440             Node<E> first = head.next;
441             if (first == null)
442                 return null;
443             else
444                 return first.item;
445         } finally {
446             takeLock.unlock();
447         }
448     }
449
450     /**
451      * Removes a single instance of the specified element from this
452      * queue, if it is present.
453      */

454     public boolean remove(Object JavaDoc o) {
455         if (o == null) return false;
456         boolean removed = false;
457         fullyLock();
458         try {
459             Node<E> trail = head;
460             Node<E> p = head.next;
461             while (p != null) {
462                 if (o.equals(p.item)) {
463                     removed = true;
464                     break;
465                 }
466                 trail = p;
467                 p = p.next;
468             }
469             if (removed) {
470                 p.item = null;
471                 trail.next = p.next;
472                 if (last == p)
473                     last = trail;
474                 if (count.getAndDecrement() == capacity)
475                     notFull.signalAll();
476             }
477         } finally {
478             fullyUnlock();
479         }
480         return removed;
481     }
482
483     public Object JavaDoc[] toArray() {
484         fullyLock();
485         try {
486             int size = count.get();
487             Object JavaDoc[] a = new Object JavaDoc[size];
488             int k = 0;
489             for (Node<E> p = head.next; p != null; p = p.next)
490                 a[k++] = p.item;
491             return a;
492         } finally {
493             fullyUnlock();
494         }
495     }
496
497     public <T> T[] toArray(T[] a) {
498         fullyLock();
499         try {
500             int size = count.get();
501             if (a.length < size)
502                 a = (T[])java.lang.reflect.Array.newInstance
503                     (a.getClass().getComponentType(), size);
504
505             int k = 0;
506             for (Node p = head.next; p != null; p = p.next)
507                 a[k++] = (T)p.item;
508             if (a.length > k)
509                 a[k] = null;
510             return a;
511         } finally {
512             fullyUnlock();
513         }
514     }
515
516     public String JavaDoc toString() {
517         fullyLock();
518         try {
519             return super.toString();
520         } finally {
521             fullyUnlock();
522         }
523     }
524
525     /**
526      * Atomically removes all of the elements from this queue.
527      * The queue will be empty after this call returns.
528      */

529     public void clear() {
530         fullyLock();
531         try {
532             head.next = null;
533         assert head.item == null;
534         last = head;
535             if (count.getAndSet(0) == capacity)
536                 notFull.signalAll();
537         } finally {
538             fullyUnlock();
539         }
540     }
541
542     public int drainTo(Collection<? super E> c) {
543         if (c == null)
544             throw new NullPointerException JavaDoc();
545         if (c == this)
546             throw new IllegalArgumentException JavaDoc();
547         Node first;
548         fullyLock();
549         try {
550             first = head.next;
551             head.next = null;
552         assert head.item == null;
553         last = head;
554             if (count.getAndSet(0) == capacity)
555                 notFull.signalAll();
556         } finally {
557             fullyUnlock();
558         }
559         // Transfer the elements outside of locks
560
int n = 0;
561         for (Node<E> p = first; p != null; p = p.next) {
562             c.add(p.item);
563             p.item = null;
564             ++n;
565         }
566         return n;
567     }
568         
569     public int drainTo(Collection<? super E> c, int maxElements) {
570         if (c == null)
571             throw new NullPointerException JavaDoc();
572         if (c == this)
573             throw new IllegalArgumentException JavaDoc();
574         fullyLock();
575         try {
576             int n = 0;
577             Node<E> p = head.next;
578             while (p != null && n < maxElements) {
579                 c.add(p.item);
580                 p.item = null;
581                 p = p.next;
582                 ++n;
583             }
584             if (n != 0) {
585                 head.next = p;
586         assert head.item == null;
587         if (p == null)
588             last = head;
589                 if (count.getAndAdd(-n) == capacity)
590                     notFull.signalAll();
591             }
592             return n;
593         } finally {
594             fullyUnlock();
595         }
596     }
597
598     /**
599      * Returns an iterator over the elements in this queue in proper sequence.
600      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
601      * will never throw {@link java.util.ConcurrentModificationException},
602      * and guarantees to traverse elements as they existed upon
603      * construction of the iterator, and may (but is not guaranteed to)
604      * reflect any modifications subsequent to construction.
605      *
606      * @return an iterator over the elements in this queue in proper sequence.
607      */

608     public Iterator<E> iterator() {
609       return new Itr();
610     }
611
612     private class Itr implements Iterator<E> {
613         /*
614          * Basic weak-consistent iterator. At all times hold the next
615          * item to hand out so that if hasNext() reports true, we will
616          * still have it to return even if lost race with a take etc.
617          */

618         private Node<E> current;
619         private Node<E> lastRet;
620         private E currentElement;
621
622         Itr() {
623             final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
624             final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
625             putLock.lock();
626             takeLock.lock();
627             try {
628                 current = head.next;
629                 if (current != null)
630                     currentElement = current.item;
631             } finally {
632                 takeLock.unlock();
633                 putLock.unlock();
634             }
635         }
636
637         public boolean hasNext() {
638             return current != null;
639         }
640
641         public E next() {
642             final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
643             final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
644             putLock.lock();
645             takeLock.lock();
646             try {
647                 if (current == null)
648                     throw new NoSuchElementException();
649                 E x = currentElement;
650                 lastRet = current;
651                 current = current.next;
652                 if (current != null)
653                     currentElement = current.item;
654                 return x;
655             } finally {
656                 takeLock.unlock();
657                 putLock.unlock();
658             }
659         }
660
661         public void remove() {
662             if (lastRet == null)
663                 throw new IllegalStateException JavaDoc();
664             final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
665             final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
666             putLock.lock();
667             takeLock.lock();
668             try {
669                 Node<E> node = lastRet;
670                 lastRet = null;
671                 Node<E> trail = head;
672                 Node<E> p = head.next;
673                 while (p != null && p != node) {
674                     trail = p;
675                     p = p.next;
676                 }
677                 if (p == node) {
678                     p.item = null;
679                     trail.next = p.next;
680                     if (last == p)
681                         last = trail;
682                     int c = count.getAndDecrement();
683                     if (c == capacity)
684                         notFull.signalAll();
685                 }
686             } finally {
687                 takeLock.unlock();
688                 putLock.unlock();
689             }
690         }
691     }
692
693     /**
694      * Save the state to a stream (that is, serialize it).
695      *
696      * @serialData The capacity is emitted (int), followed by all of
697      * its elements (each an <tt>Object</tt>) in the proper order,
698      * followed by a null
699      * @param s the stream
700      */

701     private void writeObject(java.io.ObjectOutputStream JavaDoc s)
702         throws java.io.IOException JavaDoc {
703
704         fullyLock();
705         try {
706             // Write out any hidden stuff, plus capacity
707
s.defaultWriteObject();
708
709             // Write out all elements in the proper order.
710
for (Node<E> p = head.next; p != null; p = p.next)
711                 s.writeObject(p.item);
712
713             // Use trailing null as sentinel
714
s.writeObject(null);
715         } finally {
716             fullyUnlock();
717         }
718     }
719
720     /**
721      * Reconstitute this queue instance from a stream (that is,
722      * deserialize it).
723      * @param s the stream
724      */

725     private void readObject(java.io.ObjectInputStream JavaDoc s)
726         throws java.io.IOException JavaDoc, ClassNotFoundException JavaDoc {
727         // Read in capacity, and any hidden stuff
728
s.defaultReadObject();
729
730         count.set(0);
731         last = head = new Node<E>(null);
732
733         // Read in all elements and place in queue
734
for (;;) {
735             E item = (E)s.readObject();
736             if (item == null)
737                 break;
738             add(item);
739         }
740     }
741 }
742
Popular Tags