KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > ConcurrentLinkedQueue


1 /*
2  * @(#)ConcurrentLinkedQueue.java 1.7 04/06/11
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent;
9 import java.util.*;
10 import java.util.concurrent.atomic.*;
11
12
13 /**
14  * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
15  * This queue orders elements FIFO (first-in-first-out).
16  * The <em>head</em> of the queue is that element that has been on the
17  * queue the longest time.
18  * The <em>tail</em> of the queue is that element that has been on the
19  * queue the shortest time. New elements
20  * are inserted at the tail of the queue, and the queue retrieval
21  * operations obtain elements at the head of the queue.
22  * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
23  * many threads will share access to a common collection.
24  * This queue does not permit <tt>null</tt> elements.
25  *
26  * <p>This implementation employs an efficient &quot;wait-free&quot;
27  * algorithm based on one described in <a
28  * HREF="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
29  * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
30  * Algorithms</a> by Maged M. Michael and Michael L. Scott.
31  *
32  * <p>Beware that, unlike in most collections, the <tt>size</tt> method
33  * is <em>NOT</em> a constant-time operation. Because of the
34  * asynchronous nature of these queues, determining the current number
35  * of elements requires a traversal of the elements.
36  *
37  * <p>This class and its iterator implement all of the
38  * <em>optional</em> methods of the {@link Collection} and {@link
39  * Iterator} interfaces.
40  *
41  * <p>This class is a member of the
42  * <a HREF="{@docRoot}/../guide/collections/index.html">
43  * Java Collections Framework</a>.
44  *
45  * @since 1.5
46  * @author Doug Lea
47  * @param <E> the type of elements held in this collection
48  *
49  */

50 public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
51         implements Queue<E>, java.io.Serializable JavaDoc {
52     private static final long serialVersionUID = 196745693267521676L;
53
54     /*
55      * This is a straight adaptation of Michael & Scott algorithm.
56      * For explanation, read the paper. The only (minor) algorithmic
57      * difference is that this version supports lazy deletion of
58      * internal nodes (method remove(Object)) -- remove CAS'es item
59      * fields to null. The normal queue operations unlink but then
60      * pass over nodes with null item fields. Similarly, iteration
61      * methods ignore those with nulls.
62      */

63
64     private static class Node<E> {
65         private volatile E item;
66         private volatile Node<E> next;
67         
68         private static final
69             AtomicReferenceFieldUpdater<Node, Node>
70             nextUpdater =
71             AtomicReferenceFieldUpdater.newUpdater
72             (Node.class, Node.class, "next");
73         private static final
74             AtomicReferenceFieldUpdater<Node, Object JavaDoc>
75             itemUpdater =
76             AtomicReferenceFieldUpdater.newUpdater
77             (Node.class, Object JavaDoc.class, "item");
78         
79         Node(E x) { item = x; }
80         
81         Node(E x, Node<E> n) { item = x; next = n; }
82         
83         E getItem() {
84             return item;
85         }
86         
87         boolean casItem(E cmp, E val) {
88             return itemUpdater.compareAndSet(this, cmp, val);
89         }
90         
91         void setItem(E val) {
92             itemUpdater.set(this, val);
93         }
94         
95         Node<E> getNext() {
96             return next;
97         }
98         
99         boolean casNext(Node<E> cmp, Node<E> val) {
100             return nextUpdater.compareAndSet(this, cmp, val);
101         }
102         
103         void setNext(Node<E> val) {
104             nextUpdater.set(this, val);
105         }
106         
107     }
108
109     private static final
110         AtomicReferenceFieldUpdater<ConcurrentLinkedQueue JavaDoc, Node>
111         tailUpdater =
112         AtomicReferenceFieldUpdater.newUpdater
113         (ConcurrentLinkedQueue JavaDoc.class, Node.class, "tail");
114     private static final
115         AtomicReferenceFieldUpdater<ConcurrentLinkedQueue JavaDoc, Node>
116         headUpdater =
117         AtomicReferenceFieldUpdater.newUpdater
118         (ConcurrentLinkedQueue JavaDoc.class, Node.class, "head");
119
120     private boolean casTail(Node<E> cmp, Node<E> val) {
121         return tailUpdater.compareAndSet(this, cmp, val);
122     }
123
124     private boolean casHead(Node<E> cmp, Node<E> val) {
125         return headUpdater.compareAndSet(this, cmp, val);
126     }
127
128
129     /**
130      * Pointer to header node, initialized to a dummy node. The first
131      * actual node is at head.getNext().
132      */

133     private transient volatile Node<E> head = new Node<E>(null, null);
134
135     /** Pointer to last node on list **/
136     private transient volatile Node<E> tail = head;
137
138
139     /**
140      * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
141      */

142     public ConcurrentLinkedQueue() {}
143
144     /**
145      * Creates a <tt>ConcurrentLinkedQueue</tt>
146      * initially containing the elements of the given collection,
147      * added in traversal order of the collection's iterator.
148      * @param c the collection of elements to initially contain
149      * @throws NullPointerException if <tt>c</tt> or any element within it
150      * is <tt>null</tt>
151      */

152     public ConcurrentLinkedQueue(Collection<? extends E> c) {
153         for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
154             add(it.next());
155     }
156
157     // Have to override just to update the javadoc
158

159     /**
160      * Adds the specified element to the tail of this queue.
161      * @param o the element to add.
162      * @return <tt>true</tt> (as per the general contract of
163      * <tt>Collection.add</tt>).
164      *
165      * @throws NullPointerException if the specified element is <tt>null</tt>
166      */

167     public boolean add(E o) {
168         return offer(o);
169     }
170
171     /**
172      * Inserts the specified element to the tail of this queue.
173      *
174      * @param o the element to add.
175      * @return <tt>true</tt> (as per the general contract of
176      * <tt>Queue.offer</tt>).
177      * @throws NullPointerException if the specified element is <tt>null</tt>
178      */

179     public boolean offer(E o) {
180         if (o == null) throw new NullPointerException JavaDoc();
181         Node<E> n = new Node<E>(o, null);
182         for(;;) {
183             Node<E> t = tail;
184             Node<E> s = t.getNext();
185             if (t == tail) {
186                 if (s == null) {
187                     if (t.casNext(s, n)) {
188                         casTail(t, n);
189                         return true;
190                     }
191                 } else {
192                     casTail(t, s);
193                 }
194             }
195         }
196     }
197
198     public E poll() {
199         for (;;) {
200             Node<E> h = head;
201             Node<E> t = tail;
202             Node<E> first = h.getNext();
203             if (h == head) {
204                 if (h == t) {
205                     if (first == null)
206                         return null;
207                     else
208                         casTail(t, first);
209                 } else if (casHead(h, first)) {
210                     E item = first.getItem();
211                     if (item != null) {
212                         first.setItem(null);
213                         return item;
214                     }
215                     // else skip over deleted item, continue loop,
216
}
217             }
218         }
219     }
220
221     public E peek() { // same as poll except don't remove item
222
for (;;) {
223             Node<E> h = head;
224             Node<E> t = tail;
225             Node<E> first = h.getNext();
226             if (h == head) {
227                 if (h == t) {
228                     if (first == null)
229                         return null;
230                     else
231                         casTail(t, first);
232                 } else {
233                     E item = first.getItem();
234                     if (item != null)
235                         return item;
236                     else // remove deleted node and continue
237
casHead(h, first);
238                 }
239             }
240         }
241     }
242
243     /**
244      * Returns the first actual (non-header) node on list. This is yet
245      * another variant of poll/peek; here returning out the first
246      * node, not element (so we cannot collapse with peek() without
247      * introducing race.)
248      */

249     Node<E> first() {
250         for (;;) {
251             Node<E> h = head;
252             Node<E> t = tail;
253             Node<E> first = h.getNext();
254             if (h == head) {
255                 if (h == t) {
256                     if (first == null)
257                         return null;
258                     else
259                         casTail(t, first);
260                 } else {
261                     if (first.getItem() != null)
262                         return first;
263                     else // remove deleted node and continue
264
casHead(h, first);
265                 }
266             }
267         }
268     }
269
270
271     public boolean isEmpty() {
272         return first() == null;
273     }
274
275     /**
276      * Returns the number of elements in this queue. If this queue
277      * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
278      * <tt>Integer.MAX_VALUE</tt>.
279      *
280      * <p>Beware that, unlike in most collections, this method is
281      * <em>NOT</em> a constant-time operation. Because of the
282      * asynchronous nature of these queues, determining the current
283      * number of elements requires an O(n) traversal.
284      *
285      * @return the number of elements in this queue.
286      */

287     public int size() {
288         int count = 0;
289         for (Node<E> p = first(); p != null; p = p.getNext()) {
290             if (p.getItem() != null) {
291                 // Collections.size() spec says to max out
292
if (++count == Integer.MAX_VALUE)
293                     break;
294             }
295         }
296         return count;
297     }
298
299     public boolean contains(Object JavaDoc o) {
300         if (o == null) return false;
301         for (Node<E> p = first(); p != null; p = p.getNext()) {
302             E item = p.getItem();
303             if (item != null &&
304                 o.equals(item))
305                 return true;
306         }
307         return false;
308     }
309
310     public boolean remove(Object JavaDoc o) {
311         if (o == null) return false;
312         for (Node<E> p = first(); p != null; p = p.getNext()) {
313             E item = p.getItem();
314             if (item != null &&
315                 o.equals(item) &&
316                 p.casItem(item, null))
317                 return true;
318         }
319         return false;
320     }
321
322     public Object JavaDoc[] toArray() {
323         // Use ArrayList to deal with resizing.
324
ArrayList<E> al = new ArrayList<E>();
325         for (Node<E> p = first(); p != null; p = p.getNext()) {
326             E item = p.getItem();
327             if (item != null)
328                 al.add(item);
329         }
330         return al.toArray();
331     }
332
333     public <T> T[] toArray(T[] a) {
334         // try to use sent-in array
335
int k = 0;
336         Node<E> p;
337         for (p = first(); p != null && k < a.length; p = p.getNext()) {
338             E item = p.getItem();
339             if (item != null)
340                 a[k++] = (T)item;
341         }
342         if (p == null) {
343             if (k < a.length)
344                 a[k] = null;
345             return a;
346         }
347
348         // If won't fit, use ArrayList version
349
ArrayList<E> al = new ArrayList<E>();
350         for (Node<E> q = first(); q != null; q = q.getNext()) {
351             E item = q.getItem();
352             if (item != null)
353                 al.add(item);
354         }
355         return (T[])al.toArray(a);
356     }
357
358     /**
359      * Returns an iterator over the elements in this queue in proper sequence.
360      * The returned iterator is a "weakly consistent" iterator that
361      * will never throw {@link java.util.ConcurrentModificationException},
362      * and guarantees to traverse elements as they existed upon
363      * construction of the iterator, and may (but is not guaranteed to)
364      * reflect any modifications subsequent to construction.
365      *
366      * @return an iterator over the elements in this queue in proper sequence.
367      */

368     public Iterator<E> iterator() {
369         return new Itr();
370     }
371
372     private class Itr implements Iterator<E> {
373         /**
374          * Next node to return item for.
375          */

376         private Node<E> nextNode;
377
378         /**
379          * nextItem holds on to item fields because once we claim
380          * that an element exists in hasNext(), we must return it in
381          * the following next() call even if it was in the process of
382          * being removed when hasNext() was called.
383          **/

384         private E nextItem;
385
386         /**
387          * Node of the last returned item, to support remove.
388          */

389         private Node<E> lastRet;
390
391         Itr() {
392             advance();
393         }
394
395         /**
396          * Moves to next valid node and returns item to return for
397          * next(), or null if no such.
398          */

399         private E advance() {
400             lastRet = nextNode;
401             E x = nextItem;
402
403             Node<E> p = (nextNode == null)? first() : nextNode.getNext();
404             for (;;) {
405                 if (p == null) {
406                     nextNode = null;
407                     nextItem = null;
408                     return x;
409                 }
410                 E item = p.getItem();
411                 if (item != null) {
412                     nextNode = p;
413                     nextItem = item;
414                     return x;
415                 } else // skip over nulls
416
p = p.getNext();
417             }
418         }
419
420         public boolean hasNext() {
421             return nextNode != null;
422         }
423
424         public E next() {
425             if (nextNode == null) throw new NoSuchElementException();
426             return advance();
427         }
428
429         public void remove() {
430             Node<E> l = lastRet;
431             if (l == null) throw new IllegalStateException JavaDoc();
432             // rely on a future traversal to relink.
433
l.setItem(null);
434             lastRet = null;
435         }
436     }
437
438     /**
439      * Save the state to a stream (that is, serialize it).
440      *
441      * @serialData All of the elements (each an <tt>E</tt>) in
442      * the proper order, followed by a null
443      * @param s the stream
444      */

445     private void writeObject(java.io.ObjectOutputStream JavaDoc s)
446         throws java.io.IOException JavaDoc {
447
448         // Write out any hidden stuff
449
s.defaultWriteObject();
450
451         // Write out all elements in the proper order.
452
for (Node<E> p = first(); p != null; p = p.getNext()) {
453             Object JavaDoc item = p.getItem();
454             if (item != null)
455                 s.writeObject(item);
456         }
457
458         // Use trailing null as sentinel
459
s.writeObject(null);
460     }
461
462     /**
463      * Reconstitute the Queue instance from a stream (that is,
464      * deserialize it).
465      * @param s the stream
466      */

467     private void readObject(java.io.ObjectInputStream JavaDoc s)
468         throws java.io.IOException JavaDoc, ClassNotFoundException JavaDoc {
469         // Read in capacity, and any hidden stuff
470
s.defaultReadObject();
471         head = new Node<E>(null, null);
472         tail = head;
473         // Read in all elements and place in queue
474
for (;;) {
475             E item = (E)s.readObject();
476             if (item == null)
477                 break;
478             else
479                 offer(item);
480         }
481     }
482
483 }
484
Popular Tags