KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > DelayQueue


1 /*
2  * @(#)DelayQueue.java 1.8 05/09/24
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13  * An unbounded {@linkplain BlockingQueue blocking queue} of
14  * <tt>Delayed</tt> elements, in which an element can only be taken
15  * when its delay has expired. The <em>head</em> of the queue is that
16  * <tt>Delayed</tt> element whose delay expired furthest in the
17  * past. If no delay has expired there is no head and <tt>poll</tt>
18  * will return <tt>null</tt>. Expiration occurs when an element's
19  * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
20  * than or equal to zero. This queue does not permit <tt>null</tt>
21  * elements.
22  *
23  * <p>This class and its iterator implement all of the
24  * <em>optional</em> methods of the {@link Collection} and {@link
25  * Iterator} interfaces.
26  *
27  * <p>This class is a member of the
28  * <a HREF="{@docRoot}/../guide/collections/index.html">
29  * Java Collections Framework</a>.
30  *
31  * @since 1.5
32  * @author Doug Lea
33  * @param <E> the type of elements held in this collection
34  */

35
36 public class DelayQueue<E extends Delayed JavaDoc> extends AbstractQueue<E>
37     implements BlockingQueue JavaDoc<E> {
38
39     private transient final ReentrantLock lock = new ReentrantLock();
40     private transient final Condition available = lock.newCondition();
41     private final PriorityQueue<E> q = new PriorityQueue<E>();
42
43     /**
44      * Creates a new <tt>DelayQueue</tt> that is initially empty.
45      */

46     public DelayQueue() {}
47
48     /**
49      * Creates a <tt>DelayQueue</tt> initially containing the elements of the
50      * given collection of {@link Delayed} instances.
51      *
52      * @param c the collection
53      * @throws NullPointerException if <tt>c</tt> or any element within it
54      * is <tt>null</tt>
55      *
56      */

57     public DelayQueue(Collection<? extends E> c) {
58         this.addAll(c);
59     }
60
61     /**
62      * Inserts the specified element into this delay queue.
63      *
64      * @param o the element to add
65      * @return <tt>true</tt>
66      * @throws NullPointerException if the specified element is <tt>null</tt>.
67      */

68     public boolean offer(E o) {
69         final ReentrantLock lock = this.lock;
70         lock.lock();
71         try {
72             E first = q.peek();
73             q.offer(o);
74             if (first == null || o.compareTo(first) < 0)
75                 available.signalAll();
76             return true;
77         } finally {
78             lock.unlock();
79         }
80     }
81
82
83     /**
84      * Adds the specified element to this delay queue. As the queue is
85      * unbounded this method will never block.
86      * @param o the element to add
87      * @throws NullPointerException if the specified element is <tt>null</tt>.
88      */

89     public void put(E o) {
90         offer(o);
91     }
92
93     /**
94      * Inserts the specified element into this delay queue. As the queue is
95      * unbounded this method will never block.
96      * @param o the element to add
97      * @param timeout This parameter is ignored as the method never blocks
98      * @param unit This parameter is ignored as the method never blocks
99      * @return <tt>true</tt>
100      * @throws NullPointerException if the specified element is <tt>null</tt>.
101      */

102     public boolean offer(E o, long timeout, TimeUnit JavaDoc unit) {
103         return offer(o);
104     }
105
106     /**
107      * Adds the specified element to this queue.
108      * @param o the element to add
109      * @return <tt>true</tt> (as per the general contract of
110      * <tt>Collection.add</tt>).
111      *
112      * @throws NullPointerException if the specified element is <tt>null</tt>.
113      */

114     public boolean add(E o) {
115         return offer(o);
116     }
117
118     /**
119      * Retrieves and removes the head of this queue, waiting
120      * if no elements with an unexpired delay are present on this queue.
121      * @return the head of this queue
122      * @throws InterruptedException if interrupted while waiting.
123      */

124     public E take() throws InterruptedException JavaDoc {
125         final ReentrantLock lock = this.lock;
126         lock.lockInterruptibly();
127         try {
128             for (;;) {
129                 E first = q.peek();
130                 if (first == null) {
131                     available.await();
132                 } else {
133                     long delay = first.getDelay(TimeUnit.NANOSECONDS);
134                     if (delay > 0) {
135                         long tl = available.awaitNanos(delay);
136                     } else {
137                         E x = q.poll();
138                         assert x != null;
139                         if (q.size() != 0)
140                             available.signalAll(); // wake up other takers
141
return x;
142
143                     }
144                 }
145             }
146         } finally {
147             lock.unlock();
148         }
149     }
150
151     /**
152      * Retrieves and removes the head of this queue, waiting
153      * if necessary up to the specified wait time if no elements with
154      * an unexpired delay are
155      * present on this queue.
156      * @param timeout how long to wait before giving up, in units of
157      * <tt>unit</tt>
158      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
159      * <tt>timeout</tt> parameter
160      * @return the head of this queue, or <tt>null</tt> if the
161      * specified waiting time elapses before an element with
162      * an unexpired delay is present.
163      * @throws InterruptedException if interrupted while waiting.
164      */

165     public E poll(long timeout, TimeUnit JavaDoc unit) throws InterruptedException JavaDoc {
166         final ReentrantLock lock = this.lock;
167         lock.lockInterruptibly();
168         long nanos = unit.toNanos(timeout);
169         try {
170             for (;;) {
171                 E first = q.peek();
172                 if (first == null) {
173                     if (nanos <= 0)
174                         return null;
175                     else
176                         nanos = available.awaitNanos(nanos);
177                 } else {
178                     long delay = first.getDelay(TimeUnit.NANOSECONDS);
179                     if (delay > 0) {
180                         if (nanos <= 0)
181                             return null;
182                         if (delay > nanos)
183                             delay = nanos;
184                         long timeLeft = available.awaitNanos(delay);
185                         nanos -= delay - timeLeft;
186                     } else {
187                         E x = q.poll();
188                         assert x != null;
189                         if (q.size() != 0)
190                             available.signalAll();
191                         return x;
192                     }
193                 }
194             }
195         } finally {
196             lock.unlock();
197         }
198     }
199
200
201     /**
202      * Retrieves and removes the head of this queue, or <tt>null</tt>
203      * if this queue has no elements with an unexpired delay.
204      *
205      * @return the head of this queue, or <tt>null</tt> if this
206      * queue has no elements with an unexpired delay.
207      */

208     public E poll() {
209         final ReentrantLock lock = this.lock;
210         lock.lock();
211         try {
212             E first = q.peek();
213             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
214                 return null;
215             else {
216                 E x = q.poll();
217                 assert x != null;
218                 if (q.size() != 0)
219                     available.signalAll();
220                 return x;
221             }
222         } finally {
223             lock.unlock();
224         }
225     }
226
227     /**
228      * Retrieves, but does not remove, the head of this queue,
229      * returning <tt>null</tt> if this queue has no elements with an
230      * unexpired delay.
231      *
232      * @return the head of this queue, or <tt>null</tt> if this queue
233      * has no elements with an unexpired delay.
234      */

235     public E peek() {
236         final ReentrantLock lock = this.lock;
237         lock.lock();
238         try {
239             return q.peek();
240         } finally {
241             lock.unlock();
242         }
243     }
244
245     public int size() {
246         final ReentrantLock lock = this.lock;
247         lock.lock();
248         try {
249             return q.size();
250         } finally {
251             lock.unlock();
252         }
253     }
254
255     public int drainTo(Collection<? super E> c) {
256         if (c == null)
257             throw new NullPointerException JavaDoc();
258         if (c == this)
259             throw new IllegalArgumentException JavaDoc();
260         final ReentrantLock lock = this.lock;
261         lock.lock();
262         try {
263             int n = 0;
264             for (;;) {
265                 E first = q.peek();
266                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
267                     break;
268                 c.add(q.poll());
269                 ++n;
270             }
271             if (n > 0)
272                 available.signalAll();
273             return n;
274         } finally {
275             lock.unlock();
276         }
277     }
278
279     public int drainTo(Collection<? super E> c, int maxElements) {
280         if (c == null)
281             throw new NullPointerException JavaDoc();
282         if (c == this)
283             throw new IllegalArgumentException JavaDoc();
284         if (maxElements <= 0)
285             return 0;
286         final ReentrantLock lock = this.lock;
287         lock.lock();
288         try {
289             int n = 0;
290             while (n < maxElements) {
291                 E first = q.peek();
292                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
293                     break;
294                 c.add(q.poll());
295                 ++n;
296             }
297             if (n > 0)
298                 available.signalAll();
299             return n;
300         } finally {
301             lock.unlock();
302         }
303     }
304
305     /**
306      * Atomically removes all of the elements from this delay queue.
307      * The queue will be empty after this call returns.
308      */

309     public void clear() {
310         final ReentrantLock lock = this.lock;
311         lock.lock();
312         try {
313             q.clear();
314         } finally {
315             lock.unlock();
316         }
317     }
318
319     /**
320      * Always returns <tt>Integer.MAX_VALUE</tt> because
321      * a <tt>DelayQueue</tt> is not capacity constrained.
322      * @return <tt>Integer.MAX_VALUE</tt>
323      */

324     public int remainingCapacity() {
325         return Integer.MAX_VALUE;
326     }
327
328     public Object JavaDoc[] toArray() {
329         final ReentrantLock lock = this.lock;
330         lock.lock();
331         try {
332             return q.toArray();
333         } finally {
334             lock.unlock();
335         }
336     }
337
338     public <T> T[] toArray(T[] array) {
339         final ReentrantLock lock = this.lock;
340         lock.lock();
341         try {
342             return q.toArray(array);
343         } finally {
344             lock.unlock();
345         }
346     }
347
348     /**
349      * Removes a single instance of the specified element from this
350      * queue, if it is present.
351      */

352     public boolean remove(Object JavaDoc o) {
353         final ReentrantLock lock = this.lock;
354         lock.lock();
355         try {
356             return q.remove(o);
357         } finally {
358             lock.unlock();
359         }
360     }
361
362     /**
363      * Returns an iterator over the elements in this queue. The iterator
364      * does not return the elements in any particular order. The
365      * returned iterator is a thread-safe "fast-fail" iterator that will
366      * throw {@link java.util.ConcurrentModificationException}
367      * upon detected interference.
368      *
369      * @return an iterator over the elements in this queue.
370      */

371     public Iterator<E> iterator() {
372         final ReentrantLock lock = this.lock;
373         lock.lock();
374         try {
375             return new Itr(q.iterator());
376         } finally {
377             lock.unlock();
378         }
379     }
380
381     private class Itr<E> implements Iterator<E> {
382         private final Iterator<E> iter;
383         Itr(Iterator<E> i) {
384             iter = i;
385         }
386
387         public boolean hasNext() {
388             return iter.hasNext();
389         }
390
391         public E next() {
392             final ReentrantLock lock = DelayQueue.this.lock;
393             lock.lock();
394             try {
395                 return iter.next();
396             } finally {
397                 lock.unlock();
398             }
399         }
400
401         public void remove() {
402             final ReentrantLock lock = DelayQueue.this.lock;
403             lock.lock();
404             try {
405                 iter.remove();
406             } finally {
407                 lock.unlock();
408             }
409         }
410     }
411
412 }
413
Popular Tags