KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > PriorityBlockingQueue


1 /*
2  * @(#)PriorityBlockingQueue.java 1.9 04/06/11
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent;
9
10 import java.util.concurrent.locks.*;
11 import java.util.*;
12
13 /**
14  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
15  * the same ordering rules as class {@link PriorityQueue} and supplies
16  * blocking retrieval operations. While this queue is logically
17  * unbounded, attempted additions may fail due to resource exhaustion
18  * (causing <tt>OutOfMemoryError</tt>). This class does not permit
19  * <tt>null</tt> elements. A priority queue relying on natural
20  * ordering also does not permit insertion of non-comparable objects
21  * (doing so results in <tt>ClassCastException</tt>).
22  *
23  * <p>This class and its iterator implement all of the
24  * <em>optional</em> methods of the {@link Collection} and {@link
25  * Iterator} interfaces.
26  * The Iterator provided in method {@link #iterator()} is
27  * <em>not</em> guaranteed to traverse the elements of the
28  * PriorityBlockingQueue in any particular order. If you need ordered
29  * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
30  *
31  * <p>This class is a member of the
32  * <a HREF="{@docRoot}/../guide/collections/index.html">
33  * Java Collections Framework</a>.
34  *
35  * @since 1.5
36  * @author Doug Lea
37  * @param <E> the type of elements held in this collection
38  */

39 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
40     implements BlockingQueue JavaDoc<E>, java.io.Serializable JavaDoc {
41     private static final long serialVersionUID = 5595510919245408276L;
42
43     private final PriorityQueue<E> q;
44     private final ReentrantLock lock = new ReentrantLock(true);
45     private final Condition notEmpty = lock.newCondition();
46
47     /**
48      * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
49      * capacity
50      * (11) that orders its elements according to their natural
51      * ordering (using <tt>Comparable</tt>).
52      */

53     public PriorityBlockingQueue() {
54         q = new PriorityQueue<E>();
55     }
56
57     /**
58      * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
59      * capacity
60      * that orders its elements according to their natural ordering
61      * (using <tt>Comparable</tt>).
62      *
63      * @param initialCapacity the initial capacity for this priority queue.
64      * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
65      * than 1
66      */

67     public PriorityBlockingQueue(int initialCapacity) {
68         q = new PriorityQueue<E>(initialCapacity, null);
69     }
70
71     /**
72      * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
73      * capacity
74      * that orders its elements according to the specified comparator.
75      *
76      * @param initialCapacity the initial capacity for this priority queue.
77      * @param comparator the comparator used to order this priority queue.
78      * If <tt>null</tt> then the order depends on the elements' natural
79      * ordering.
80      * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
81      * than 1
82      */

83     public PriorityBlockingQueue(int initialCapacity,
84                                  Comparator<? super E> comparator) {
85         q = new PriorityQueue<E>(initialCapacity, comparator);
86     }
87
88     /**
89      * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
90      * in the specified collection. The priority queue has an initial
91      * capacity of 110% of the size of the specified collection. If
92      * the specified collection is a {@link SortedSet} or a {@link
93      * PriorityQueue}, this priority queue will be sorted according to
94      * the same comparator, or according to its elements' natural
95      * order if the collection is sorted according to its elements'
96      * natural order. Otherwise, this priority queue is ordered
97      * according to its elements' natural order.
98      *
99      * @param c the collection whose elements are to be placed
100      * into this priority queue.
101      * @throws ClassCastException if elements of the specified collection
102      * cannot be compared to one another according to the priority
103      * queue's ordering.
104      * @throws NullPointerException if <tt>c</tt> or any element within it
105      * is <tt>null</tt>
106      */

107     public PriorityBlockingQueue(Collection<? extends E> c) {
108         q = new PriorityQueue<E>(c);
109     }
110
111
112     // these first few override just to update doc comments
113

114     /**
115      * Adds the specified element to this queue.
116      * @param o the element to add
117      * @return <tt>true</tt> (as per the general contract of
118      * <tt>Collection.add</tt>).
119      *
120      * @throws NullPointerException if the specified element is <tt>null</tt>.
121      * @throws ClassCastException if the specified element cannot be compared
122      * with elements currently in the priority queue according
123      * to the priority queue's ordering.
124      */

125     public boolean add(E o) {
126         return super.add(o);
127     }
128
129     /**
130      * Returns the comparator used to order this collection, or <tt>null</tt>
131      * if this collection is sorted according to its elements natural ordering
132      * (using <tt>Comparable</tt>).
133      *
134      * @return the comparator used to order this collection, or <tt>null</tt>
135      * if this collection is sorted according to its elements natural ordering.
136      */

137     public Comparator<? super E> comparator() {
138         return q.comparator();
139     }
140
141     /**
142      * Inserts the specified element into this priority queue.
143      *
144      * @param o the element to add
145      * @return <tt>true</tt>
146      * @throws ClassCastException if the specified element cannot be compared
147      * with elements currently in the priority queue according
148      * to the priority queue's ordering.
149      * @throws NullPointerException if the specified element is <tt>null</tt>.
150      */

151     public boolean offer(E o) {
152         if (o == null) throw new NullPointerException JavaDoc();
153         final ReentrantLock lock = this.lock;
154         lock.lock();
155         try {
156             boolean ok = q.offer(o);
157             assert ok;
158             notEmpty.signal();
159             return true;
160         } finally {
161             lock.unlock();
162         }
163     }
164
165     /**
166      * Adds the specified element to this priority queue. As the queue is
167      * unbounded this method will never block.
168      * @param o the element to add
169      * @throws ClassCastException if the element cannot be compared
170      * with elements currently in the priority queue according
171      * to the priority queue's ordering.
172      * @throws NullPointerException if the specified element is <tt>null</tt>.
173      */

174     public void put(E o) {
175         offer(o); // never need to block
176
}
177
178     /**
179      * Inserts the specified element into this priority queue. As the queue is
180      * unbounded this method will never block.
181      * @param o the element to add
182      * @param timeout This parameter is ignored as the method never blocks
183      * @param unit This parameter is ignored as the method never blocks
184      * @return <tt>true</tt>
185      * @throws ClassCastException if the element cannot be compared
186      * with elements currently in the priority queue according
187      * to the priority queue's ordering.
188      * @throws NullPointerException if the specified element is <tt>null</tt>.
189      */

190     public boolean offer(E o, long timeout, TimeUnit JavaDoc unit) {
191         return offer(o); // never need to block
192
}
193
194     public E take() throws InterruptedException JavaDoc {
195         final ReentrantLock lock = this.lock;
196         lock.lockInterruptibly();
197         try {
198             try {
199                 while (q.size() == 0)
200                     notEmpty.await();
201             } catch (InterruptedException JavaDoc ie) {
202                 notEmpty.signal(); // propagate to non-interrupted thread
203
throw ie;
204             }
205             E x = q.poll();
206             assert x != null;
207             return x;
208         } finally {
209             lock.unlock();
210         }
211     }
212
213
214     public E poll() {
215         final ReentrantLock lock = this.lock;
216         lock.lock();
217         try {
218             return q.poll();
219         } finally {
220             lock.unlock();
221         }
222     }
223
224     public E poll(long timeout, TimeUnit JavaDoc unit) throws InterruptedException JavaDoc {
225         long nanos = unit.toNanos(timeout);
226         final ReentrantLock lock = this.lock;
227         lock.lockInterruptibly();
228         try {
229             for (;;) {
230                 E x = q.poll();
231                 if (x != null)
232                     return x;
233                 if (nanos <= 0)
234                     return null;
235                 try {
236                     nanos = notEmpty.awaitNanos(nanos);
237                 } catch (InterruptedException JavaDoc ie) {
238                     notEmpty.signal(); // propagate to non-interrupted thread
239
throw ie;
240                 }
241             }
242         } finally {
243             lock.unlock();
244         }
245     }
246
247     public E peek() {
248         final ReentrantLock lock = this.lock;
249         lock.lock();
250         try {
251             return q.peek();
252         } finally {
253             lock.unlock();
254         }
255     }
256
257     public int size() {
258         final ReentrantLock lock = this.lock;
259         lock.lock();
260         try {
261             return q.size();
262         } finally {
263             lock.unlock();
264         }
265     }
266
267     /**
268      * Always returns <tt>Integer.MAX_VALUE</tt> because
269      * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
270      * @return <tt>Integer.MAX_VALUE</tt>
271      */

272     public int remainingCapacity() {
273         return Integer.MAX_VALUE;
274     }
275
276     /**
277      * Removes a single instance of the specified element from this
278      * queue, if it is present.
279      */

280     public boolean remove(Object JavaDoc o) {
281         final ReentrantLock lock = this.lock;
282         lock.lock();
283         try {
284             return q.remove(o);
285         } finally {
286             lock.unlock();
287         }
288     }
289
290     public boolean contains(Object JavaDoc o) {
291         final ReentrantLock lock = this.lock;
292         lock.lock();
293         try {
294             return q.contains(o);
295         } finally {
296             lock.unlock();
297         }
298     }
299
300     public Object JavaDoc[] toArray() {
301         final ReentrantLock lock = this.lock;
302         lock.lock();
303         try {
304             return q.toArray();
305         } finally {
306             lock.unlock();
307         }
308     }
309
310
311     public String JavaDoc toString() {
312         final ReentrantLock lock = this.lock;
313         lock.lock();
314         try {
315             return q.toString();
316         } finally {
317             lock.unlock();
318         }
319     }
320
321     public int drainTo(Collection<? super E> c) {
322         if (c == null)
323             throw new NullPointerException JavaDoc();
324         if (c == this)
325             throw new IllegalArgumentException JavaDoc();
326         final ReentrantLock lock = this.lock;
327         lock.lock();
328         try {
329             int n = 0;
330             E e;
331             while ( (e = q.poll()) != null) {
332                 c.add(e);
333                 ++n;
334             }
335             return n;
336         } finally {
337             lock.unlock();
338         }
339     }
340
341     public int drainTo(Collection<? super E> c, int maxElements) {
342         if (c == null)
343             throw new NullPointerException JavaDoc();
344         if (c == this)
345             throw new IllegalArgumentException JavaDoc();
346         if (maxElements <= 0)
347             return 0;
348         final ReentrantLock lock = this.lock;
349         lock.lock();
350         try {
351             int n = 0;
352             E e;
353             while (n < maxElements && (e = q.poll()) != null) {
354                 c.add(e);
355                 ++n;
356             }
357             return n;
358         } finally {
359             lock.unlock();
360         }
361     }
362
363     /**
364      * Atomically removes all of the elements from this queue.
365      * The queue will be empty after this call returns.
366      */

367     public void clear() {
368         final ReentrantLock lock = this.lock;
369         lock.lock();
370         try {
371             q.clear();
372         } finally {
373             lock.unlock();
374         }
375     }
376
377     public <T> T[] toArray(T[] a) {
378         final ReentrantLock lock = this.lock;
379         lock.lock();
380         try {
381             return q.toArray(a);
382         } finally {
383             lock.unlock();
384         }
385     }
386
387     /**
388      * Returns an iterator over the elements in this queue. The
389      * iterator does not return the elements in any particular order.
390      * The returned iterator is a thread-safe "fast-fail" iterator
391      * that will throw {@link
392      * java.util.ConcurrentModificationException} upon detected
393      * interference.
394      *
395      * @return an iterator over the elements in this queue.
396      */

397     public Iterator<E> iterator() {
398         final ReentrantLock lock = this.lock;
399         lock.lock();
400         try {
401             return new Itr(q.iterator());
402         } finally {
403             lock.unlock();
404         }
405     }
406
407     private class Itr<E> implements Iterator<E> {
408         private final Iterator<E> iter;
409         Itr(Iterator<E> i) {
410             iter = i;
411         }
412
413         public boolean hasNext() {
414             /*
415              * No sync -- we rely on underlying hasNext to be
416              * stateless, in which case we can return true by mistake
417              * only when next() will subsequently throw
418              * ConcurrentModificationException.
419              */

420             return iter.hasNext();
421         }
422
423         public E next() {
424             ReentrantLock lock = PriorityBlockingQueue.this.lock;
425             lock.lock();
426             try {
427                 return iter.next();
428             } finally {
429                 lock.unlock();
430             }
431         }
432
433         public void remove() {
434             ReentrantLock lock = PriorityBlockingQueue.this.lock;
435             lock.lock();
436             try {
437                 iter.remove();
438             } finally {
439                 lock.unlock();
440             }
441         }
442     }
443
444     /**
445      * Save the state to a stream (that is, serialize it). This
446      * merely wraps default serialization within lock. The
447      * serialization strategy for items is left to underlying
448      * Queue. Note that locking is not needed on deserialization, so
449      * readObject is not defined, just relying on default.
450      */

451     private void writeObject(java.io.ObjectOutputStream JavaDoc s)
452         throws java.io.IOException JavaDoc {
453         lock.lock();
454         try {
455             s.defaultWriteObject();
456         } finally {
457             lock.unlock();
458         }
459     }
460
461 }
462
Popular Tags