Code - Class EDU.oswego.cs.dl.util.concurrent.WaitFreeQueue


1 /*
2   File: WaitFreeQueue.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   16Jun1998 dl Create public version
12    5Aug1998 dl replaced int counters with longs
13   17nov2001 dl Simplify given Bill Pugh's observation
14                               that counted pointers are unnecessary.
15 */

16
17 package EDU.oswego.cs.dl.util.concurrent;
18
19 /**
20  * A wait-free linked list based queue implementation.
21  * <p>
22  *
23  * While this class conforms to the full Channel interface, only the
24  * <code>put</code> and <code>poll</code> methods are useful in most
25  * applications. Because the queue does not support blocking
26  * operations, <code>take</code> relies on spin-loops, which can be
27  * extremely wasteful. <p>
28  *
29  * This class is adapted from the algorithm described in <a
30  * HREF="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
31  * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
32  * Algorithms</a> by Maged M. Michael and Michael L. Scott. This
33  * implementation is not strictly wait-free since it relies on locking
34  * for basic atomicity and visibility requirements. Locks can impose
35  * unbounded waits, although this should not be a major practical
36  * concern here since each lock is held for the duration of only a few
37  * statements. (However, the overhead of using so many locks can make
38  * it less attractive than other Channel implementations on JVMs where
39  * locking operations are very slow.) <p>
40  *
41  * @see BoundedLinkedQueue
42  * @see LinkedQueue
43  *
44  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
45
46  **/

47
48 public class WaitFreeQueue implements Channel {
49
50   /*
51     This is a straightforward adaptation of Michael & Scott
52     algorithm, with CAS's simulated via per-field locks,
53     and without version numbers for pointers since, under
54     Java Garbage Collection, you can never see the "wrong"
55     node with the same address as the one you think you have.
56   */

57
58   /** List nodes for Queue **/
59   protected final static class Node {
60     protected final Object value;
61     protected volatile Node next;
62
63     /** Make a new node with indicated item, and null link **/
64     protected Node(Object x) { value = x; }
65
66     /** Simulate a CAS operation for 'next' field **/
67     protected synchronized boolean CASNext(Node oldNext, Node newNext) {
68       if (next == oldNext) {
69         next = newNext;
70         return true;
71       }
72       else
73         return false;
74     }
75   }
76
77   /** Head of list is always a dummy node **/
78   protected volatile Node head = new Node(null);
79   /** Pointer to last node on list **/
80   protected volatile Node tail = head;
81
82   /** Lock for simulating CAS for tail field **/
83   protected final Object tailLock = new Object();
84
85   /** Simulate CAS for head field, using 'this' lock **/
86   protected synchronized boolean CASHead(Node oldHead, Node newHead) {
87     if (head == oldHead) {
88       head = newHead;
89       return true;
90     }
91     else
92       return false;
93   }
94
95   /** Simulate CAS for tail field **/
96   protected boolean CASTail(Node oldTail, Node newTail) {
97     synchronized(tailLock) {
98       if (tail == oldTail) {
99         tail = newTail;
100         return true;
101       }
102       else
103         return false;
104     }
105   }
106
107   public void put(Object x) throws InterruptedException {
108     if (x == null) throw new IllegalArgumentException();
109     if (Thread.interrupted()) throw new InterruptedException();
110     Node n = new Node(x);
111
112     for(;;) {
113       Node t = tail;
114       // Try to link new node to end of list.
115
if (t.CASNext(null, n)) {
116         // Must now change tail field.
117
// This CAS might fail, but if so, it will be fixed by others.
118
CASTail(t, n);
119         return;
120       }
121
122       // If cannot link, help out a previous failed attempt to move tail
123
CASTail(t, t.next);
124     }
125   }
126
127   public boolean offer(Object x, long msecs) throws InterruptedException {
128     put(x);
129     return true;
130   }
131
132   /** Main dequeue algorithm, called by poll, take. **/
133   protected Object extract() throws InterruptedException {
134     for (;;) {
135       Node h = head;
136       Node first = h.next;
137
138       if (first == null)
139         return null;
140
141       Object result = first.value;
142       if (CASHead(h, first))
143         return result;
144     }
145   }
146
147   public Object peek() {
148     Node first = head.next;
149
150     if (first == null)
151       return null;
152
153     // Note: This synch unnecessary after JSR-133.
154
// It exists only to guarantee visibility of returned object,
155
// No other synch is needed, but "old" memory model requires one.
156
synchronized(this) {
157       return first.value;
158     }
159   }
160
161   /**
162    * Spin until poll returns a non-null value.
163    * You probably don't want to call this method.
164    * A Thread.sleep(0) is performed on each iteration
165    * as a heuristic to reduce contention. If you would
166    * rather use, for example, an exponential backoff,
167    * you could manually set this up using poll.
168    **/

169   public Object take() throws InterruptedException {
170     if (Thread.interrupted()) throw new InterruptedException();
171     for(;;) {
172       Object x = extract();
173       if (x != null)
174         return x;
175       else
176         Thread.sleep(0);
177     }
178   }
179
180   /**
181    * Spin until poll returns a non-null value or time elapses.
182    * if msecs is positive, a Thread.sleep(0) is performed on each iteration
183    * as a heuristic to reduce contention.
184    **/

185   public Object poll(long msecs) throws InterruptedException {
186     if (Thread.interrupted()) throw new InterruptedException();
187     if (msecs <= 0)
188       return extract();
189
190     long startTime = System.currentTimeMillis();
191     for(;;) {
192       Object x = extract();
193       if (x != null)
194         return x;
195       else if (System.currentTimeMillis() - startTime >= msecs)
196         return null;
197       else
198         Thread.sleep(0);
199     }
200
201   }
202 }
203

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates