Code - Class EDU.oswego.cs.dl.util.concurrent.LinkedQueue


1 /*
2   File: LinkedQueue.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jun1998 dl Create public version
12   25aug1998 dl added peek
13   10dec1998 dl added isEmpty
14   10oct1999 dl lock on node object to ensure visibility
15 */

16
17 package EDU.oswego.cs.dl.util.concurrent;
18
19 /**
20  * A linked list based channel implementation.
21  * The algorithm avoids contention between puts
22  * and takes when the queue is not empty.
23  * Normally a put and a take can proceed simultaneously.
24  * (Although it does not allow multiple concurrent puts or takes.)
25  * This class tends to perform more efficently than
26  * other Channel implementations in producer/consumer
27  * applications.
28  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
29  **/

30
31 public class LinkedQueue implements Channel {
32
33
34   /**
35    * Dummy header node of list. The first actual node, if it exists, is always
36    * at head_.next. After each take, the old first node becomes the head.
37    **/

38   protected LinkedNode head_;
39
40   /**
41    * Helper monitor for managing access to last node.
42    **/

43   protected final Object putLock_ = new Object();
44
45   /**
46    * The last node of list. Put() appends to list, so modifies last_
47    **/

48   protected LinkedNode last_;
49
50   /**
51    * The number of threads waiting for a take.
52    * Notifications are provided in put only if greater than zero.
53    * The bookkeeping is worth it here since in reasonably balanced
54    * usages, the notifications will hardly ever be necessary, so
55    * the call overhead to notify can be eliminated.
56    **/

57   protected int waitingForTake_ = 0;
58
59   public LinkedQueue() {
60     head_ = new LinkedNode(null);
61     last_ = head_;
62   }
63
64   /** Main mechanics for put/offer **/
65   protected void insert(Object x) {
66     synchronized(putLock_) {
67       LinkedNode p = new LinkedNode(x);
68       synchronized(last_) {
69         last_.next = p;
70         last_ = p;
71       }
72       if (waitingForTake_ > 0)
73         putLock_.notify();
74     }
75   }
76
77   /** Main mechanics for take/poll **/
78   protected synchronized Object extract() {
79     synchronized(head_) {
80       Object x = null;
81       LinkedNode first = head_.next;
82       if (first != null) {
83         x = first.value;
84         first.value = null;
85         head_ = first;
86       }
87       return x;
88     }
89   }
90
91
92   public void put(Object x) throws InterruptedException {
93     if (x == null) throw new IllegalArgumentException();
94     if (Thread.interrupted()) throw new InterruptedException();
95     insert(x);
96   }
97
98   public boolean offer(Object x, long msecs) throws InterruptedException {
99     if (x == null) throw new IllegalArgumentException();
100     if (Thread.interrupted()) throw new InterruptedException();
101     insert(x);
102     return true;
103   }
104
105   public Object take() throws InterruptedException {
106     if (Thread.interrupted()) throw new InterruptedException();
107     // try to extract. If fail, then enter wait-based retry loop
108
Object x = extract();
109     if (x != null)
110       return x;
111     else {
112       synchronized(putLock_) {
113         try {
114           ++waitingForTake_;
115           for (;;) {
116             x = extract();
117             if (x != null) {
118               --waitingForTake_;
119               return x;
120             }
121             else {
122               putLock_.wait();
123             }
124           }
125         }
126         catch(InterruptedException ex) {
127           --waitingForTake_;
128           putLock_.notify();
129           throw ex;
130         }
131       }
132     }
133   }
134
135   public Object peek() {
136     synchronized(head_) {
137       LinkedNode first = head_.next;
138       if (first != null)
139         return first.value;
140       else
141         return null;
142     }
143   }
144
145
146   public boolean isEmpty() {
147     synchronized(head_) {
148       return head_.next == null;
149     }
150   }
151
152   public Object poll(long msecs) throws InterruptedException {
153     if (Thread.interrupted()) throw new InterruptedException();
154     Object x = extract();
155     if (x != null)
156       return x;
157     else {
158       synchronized(putLock_) {
159         try {
160           long waitTime = msecs;
161           long start = (msecs <= 0)? 0 : System.currentTimeMillis();
162           ++waitingForTake_;
163           for (;;) {
164             x = extract();
165             if (x != null || waitTime <= 0) {
166               --waitingForTake_;
167               return x;
168             }
169             else {
170               putLock_.wait(waitTime);
171               waitTime = msecs - (System.currentTimeMillis() - start);
172             }
173           }
174         }
175         catch(InterruptedException ex) {
176           --waitingForTake_;
177           putLock_.notify();
178           throw ex;
179         }
180       }
181     }
182   }
183 }
184
185
186

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates