Code - Class EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue


1 /*
2   File: BoundedLinkedQueue.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jun1998 dl Create public version
12   17Jul1998 dl Simplified by eliminating wait counts
13   25aug1998 dl added peek
14   10oct1999 dl lock on node object to ensure visibility
15   27jan2000 dl setCapacity forces immediate permit reconcile
16 */

17
18 package EDU.oswego.cs.dl.util.concurrent;
19
20 /**
21  * A bounded variant of
22  * LinkedQueue
23  * class. This class may be
24  * preferable to
25  * BoundedBuffer
26  * because it allows a bit more
27  * concurency among puts and takes, because it does not
28  * pre-allocate fixed storage for elements, and allows
29  * capacity to be dynamically reset.
30  * On the other hand, since it allocates a node object
31  * on each put, it can be slow on systems with slow
32  * allocation and GC.
33  * Also, it may be
34  * preferable to
35  * LinkedQueue
36  * when you need to limit
37  * the capacity to prevent resource exhaustion. This protection
38  * normally does not hurt much performance-wise: When the
39  * queue is not empty or full, most puts and
40  * takes are still usually able to execute concurrently.
41  * @see LinkedQueue
42  * @see BoundedBuffer
43  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
44  **/

45
46 public class BoundedLinkedQueue implements BoundedChannel {
47
48   /*
49    * It might be a bit nicer if this were declared as
50    * a subclass of LinkedQueue, or a sibling class of
51    * a common abstract class. It shares much of the
52    * basic design and bookkeeping fields. But too
53    * many details differ to make this worth doing.
54    */

55
56
57
58   /**
59    * Dummy header node of list. The first actual node, if it exists, is always
60    * at head_.next. After each take, the old first node becomes the head.
61    **/

62   protected LinkedNode head_;
63
64   /**
65    * The last node of list. Put() appends to list, so modifies last_
66    **/

67   protected LinkedNode last_;
68
69
70   /**
71    * Helper monitor. Ensures that only one put at a time executes.
72    **/

73
74   protected final Object putGuard_ = new Object();
75
76   /**
77    * Helper monitor. Protects and provides wait queue for takes
78    **/

79
80   protected final Object takeGuard_ = new Object();
81
82
83   /** Number of elements allowed **/
84   protected int capacity_;
85
86   
87   /**
88    * One side of a split permit count.
89    * The counts represent permits to do a put. (The queue is full when zero).
90    * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
91    * (The length is never separately recorded, so this cannot be
92    * checked explicitly.)
93    * To minimize contention between puts and takes, the
94    * put side uses up all of its permits before transfering them from
95    * the take side. The take side just increments the count upon each take.
96    * Thus, most puts and take can run independently of each other unless
97    * the queue is empty or full.
98    * Initial value is queue capacity.
99    **/

100
101   protected int putSidePutPermits_;
102
103   /** Number of takes since last reconcile **/
104   protected int takeSidePutPermits_ = 0;
105
106
107   /**
108    * Create a queue with the given capacity
109    * @exception IllegalArgumentException if capacity less or equal to zero
110    **/

111   public BoundedLinkedQueue(int capacity) {
112     if (capacity <= 0) throw new IllegalArgumentException();
113     capacity_ = capacity;
114     putSidePutPermits_ = capacity;
115     head_ = new LinkedNode(null);
116     last_ = head_;
117   }
118
119   /**
120    * Create a queue with the current default capacity
121    **/

122
123   public BoundedLinkedQueue() {
124     this(DefaultChannelCapacity.get());
125   }
126
127   /**
128    * Move put permits from take side to put side;
129    * return the number of put side permits that are available.
130    * Call only under synch on puGuard_ AND this.
131    **/

132   protected final int reconcilePutPermits() {
133     putSidePutPermits_ += takeSidePutPermits_;
134     takeSidePutPermits_ = 0;
135     return putSidePutPermits_;
136   }
137
138
139   /** Return the current capacity of this queue **/
140   public synchronized int capacity() { return capacity_; }
141
142
143   /**
144    * Return the number of elements in the queue.
145    * This is only a snapshot value, that may be in the midst
146    * of changing. The returned value will be unreliable in the presence of
147    * active puts and takes, and should only be used as a heuristic
148    * estimate, for example for resource monitoring purposes.
149    **/

150   public synchronized int size() {
151     /*
152       This should ideally synch on putGuard_, but
153       doing so would cause it to block waiting for an in-progress
154       put, which might be stuck. So we instead use whatever
155       value of putSidePutPermits_ that we happen to read.
156     */

157     return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
158   }
159
160
161   /**
162    * Reset the capacity of this queue.
163    * If the new capacity is less than the old capacity,
164    * existing elements are NOT removed, but
165    * incoming puts will not proceed until the number of elements
166    * is less than the new capacity.
167    * @exception IllegalArgumentException if capacity less or equal to zero
168    **/

169
170   public void setCapacity(int newCapacity) {
171     if (newCapacity <= 0) throw new IllegalArgumentException();
172     synchronized (putGuard_) {
173       synchronized(this) {
174         takeSidePutPermits_ += (newCapacity - capacity_);
175         capacity_ = newCapacity;
176         
177         // Force immediate reconcilation.
178
reconcilePutPermits();
179         notifyAll();
180       }
181     }
182   }
183
184
185   /** Main mechanics for take/poll **/
186   protected synchronized Object extract() {
187     synchronized(head_) {
188       Object x = null;
189       LinkedNode first = head_.next;
190       if (first != null) {
191         x = first.value;
192         first.value = null;
193         head_ = first;
194         ++takeSidePutPermits_;
195         notify();
196       }
197       return x;
198     }
199   }
200
201   public Object peek() {
202     synchronized(head_) {
203       LinkedNode first = head_.next;
204       if (first != null)
205         return first.value;
206       else
207         return null;
208     }
209   }
210
211   public Object take() throws InterruptedException {
212     if (Thread.interrupted()) throw new InterruptedException();
213     Object x = extract();
214     if (x != null)
215       return x;
216     else {
217       synchronized(takeGuard_) {
218         try {
219           for (;;) {
220             x = extract();
221             if (x != null) {
222               return x;
223             }
224             else {
225               takeGuard_.wait();
226             }
227           }
228         }
229         catch(InterruptedException ex) {
230           takeGuard_.notify();
231           throw ex;
232         }
233       }
234     }
235   }
236
237   public Object poll(long msecs) throws InterruptedException {
238     if (Thread.interrupted()) throw new InterruptedException();
239     Object x = extract();
240     if (x != null)
241       return x;
242     else {
243       synchronized(takeGuard_) {
244         try {
245           long waitTime = msecs;
246           long start = (msecs <= 0)? 0: System.currentTimeMillis();
247           for (;;) {
248             x = extract();
249             if (x != null || waitTime <= 0) {
250               return x;
251             }
252             else {
253               takeGuard_.wait(waitTime);
254               waitTime = msecs - (System.currentTimeMillis() - start);
255             }
256           }
257         }
258         catch(InterruptedException ex) {
259           takeGuard_.notify();
260           throw ex;
261         }
262       }
263     }
264   }
265
266   /** Notify a waiting take if needed **/
267   protected final void allowTake() {
268     synchronized(takeGuard_) {
269       takeGuard_.notify();
270     }
271   }
272
273
274   /**
275    * Create and insert a node.
276    * Call only under synch on putGuard_
277    **/

278   protected void insert(Object x) {
279     --putSidePutPermits_;
280     LinkedNode p = new LinkedNode(x);
281     synchronized(last_) {
282       last_.next = p;
283       last_ = p;
284     }
285   }
286
287
288   /*
289      put and offer(ms) differ only in policy before insert/allowTake
290   */

291
292   public void put(Object x) throws InterruptedException {
293     if (x == null) throw new IllegalArgumentException();
294     if (Thread.interrupted()) throw new InterruptedException();
295
296     synchronized(putGuard_) {
297
298       if (putSidePutPermits_ <= 0) { // wait for permit.
299
synchronized(this) {
300           if (reconcilePutPermits() <= 0) {
301             try {
302               for(;;) {
303                 wait();
304                 if (reconcilePutPermits() > 0) {
305                   break;
306                 }
307               }
308             }
309             catch (InterruptedException ex) {
310               notify();
311               throw ex;
312             }
313           }
314         }
315       }
316       insert(x);
317     }
318     // call outside of lock to loosen put/take coupling
319
allowTake();
320   }
321
322   public boolean offer(Object x, long msecs) throws InterruptedException {
323     if (x == null) throw new IllegalArgumentException();
324     if (Thread.interrupted()) throw new InterruptedException();
325
326     synchronized(putGuard_) {
327
328       if (putSidePutPermits_ <= 0) {
329         synchronized(this) {
330           if (reconcilePutPermits() <= 0) {
331             if (msecs <= 0)
332               return false;
333             else {
334               try {
335                 long waitTime = msecs;
336                 long start = System.currentTimeMillis();
337                 
338                 for(;;) {
339                   wait(waitTime);
340                   if (reconcilePutPermits() > 0) {
341                     break;
342                   }
343                   else {
344                     waitTime = msecs - (System.currentTimeMillis() - start);
345                     if (waitTime <= 0) {
346                       return false;
347                     }
348                   }
349                 }
350               }
351               catch (InterruptedException ex) {
352                 notify();
353                 throw ex;
354               }
355             }
356           }
357         }
358       }
359
360       insert(x);
361     }
362
363     allowTake();
364     return true;
365   }
366
367   public boolean isEmpty() {
368     synchronized(head_) {
369       return head_.next == null;
370     }
371   }
372     
373 }
374

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates