KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > SynchronousQueue


1 /*
2  * @(#)SynchronousQueue.java 1.8 04/06/11
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13  * A {@linkplain BlockingQueue blocking queue} in which each
14  * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
15  * synchronous queue does not have any internal capacity, not even a
16  * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
17  * because an element is only present when you try to take it; you
18  * cannot add an element (using any method) unless another thread is
19  * trying to remove it; you cannot iterate as there is nothing to
20  * iterate. The <em>head</em> of the queue is the element that the
21  * first queued thread is trying to add to the queue; if there are no
22  * queued threads then no element is being added and the head is
23  * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods
24  * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
25  * as an empty collection. This queue does not permit <tt>null</tt>
26  * elements.
27  *
28  * <p>Synchronous queues are similar to rendezvous channels used in
29  * CSP and Ada. They are well suited for handoff designs, in which an
30  * object running in one thread must sync up with an object running
31  * in another thread in order to hand it some information, event, or
32  * task.
33  *
34  * <p> This class supports an optional fairness policy for ordering
35  * waiting producer and consumer threads. By default, this ordering
36  * is not guaranteed. However, a queue constructed with fairness set
37  * to <tt>true</tt> grants threads access in FIFO order. Fairness
38  * generally decreases throughput but reduces variability and avoids
39  * starvation.
40  *
41  * <p>This class and its iterator implement all of the
42  * <em>optional</em> methods of the {@link Collection} and {@link
43  * Iterator} interfaces.
44  *
45  * <p>This class is a member of the
46  * <a HREF="{@docRoot}/../guide/collections/index.html">
47  * Java Collections Framework</a>.
48  *
49  * @since 1.5
50  * @author Doug Lea
51  * @param <E> the type of elements held in this collection
52  */

53 public class SynchronousQueue<E> extends AbstractQueue<E>
54         implements BlockingQueue JavaDoc<E>, java.io.Serializable JavaDoc {
55     private static final long serialVersionUID = -3223113410248163686L;
56
57     /*
58       This implementation divides actions into two cases for puts:
59
60       * An arriving producer that does not already have a waiting consumer
61       creates a node holding item, and then waits for a consumer to take it.
62       * An arriving producer that does already have a waiting consumer fills
63       the slot node created by the consumer, and notifies it to continue.
64
65       And symmetrically, two for takes:
66
67       * An arriving consumer that does not already have a waiting producer
68       creates an empty slot node, and then waits for a producer to fill it.
69       * An arriving consumer that does already have a waiting producer takes
70       item from the node created by the producer, and notifies it to continue.
71
72       When a put or take waiting for the actions of its counterpart
73       aborts due to interruption or timeout, it marks the node
74       it created as "CANCELLED", which causes its counterpart to retry
75       the entire put or take sequence.
76
77       This requires keeping two simple queues, waitingProducers and
78       waitingConsumers. Each of these can be FIFO (preserves fairness)
79       or LIFO (improves throughput).
80     */

81
82     /** Lock protecting both wait queues */
83     private final ReentrantLock qlock;
84     /** Queue holding waiting puts */
85     private final WaitQueue waitingProducers;
86     /** Queue holding waiting takes */
87     private final WaitQueue waitingConsumers;
88
89     /**
90      * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
91      */

92     public SynchronousQueue() {
93         this(false);
94     }
95
96     /**
97      * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
98      * @param fair if true, threads contend in FIFO order for access;
99      * otherwise the order is unspecified.
100      */

101     public SynchronousQueue(boolean fair) {
102         if (fair) {
103             qlock = new ReentrantLock(true);
104             waitingProducers = new FifoWaitQueue();
105             waitingConsumers = new FifoWaitQueue();
106         }
107         else {
108             qlock = new ReentrantLock();
109             waitingProducers = new LifoWaitQueue();
110             waitingConsumers = new LifoWaitQueue();
111         }
112     }
113
114     /**
115      * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
116      * These queues have all transient fields, but are serializable
117      * in order to recover fairness settings when deserialized.
118      */

119     static abstract class WaitQueue implements java.io.Serializable JavaDoc {
120         /** Create, add, and return node for x */
121         abstract Node enq(Object JavaDoc x);
122         /** Remove and return node, or null if empty */
123         abstract Node deq();
124     }
125
126     /**
127      * FIFO queue to hold waiting puts/takes.
128      */

129     static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable JavaDoc {
130         private static final long serialVersionUID = -3623113410248163686L;
131         private transient Node head;
132         private transient Node last;
133
134         Node enq(Object JavaDoc x) {
135             Node p = new Node(x);
136             if (last == null)
137                 last = head = p;
138             else
139                 last = last.next = p;
140             return p;
141         }
142
143         Node deq() {
144             Node p = head;
145             if (p != null) {
146                 if ((head = p.next) == null)
147                     last = null;
148                 p.next = null;
149             }
150             return p;
151         }
152     }
153
154     /**
155      * LIFO queue to hold waiting puts/takes.
156      */

157     static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable JavaDoc {
158         private static final long serialVersionUID = -3633113410248163686L;
159         private transient Node head;
160
161         Node enq(Object JavaDoc x) {
162             return head = new Node(x, head);
163         }
164
165         Node deq() {
166             Node p = head;
167             if (p != null) {
168                 head = p.next;
169                 p.next = null;
170             }
171             return p;
172         }
173     }
174
175     /**
176      * Nodes each maintain an item and handle waits and signals for
177      * getting and setting it. The class extends
178      * AbstractQueuedSynchronizer to manage blocking, using AQS state
179      * 0 for waiting, 1 for ack, -1 for cancelled.
180      */

181     static final class Node extends AbstractQueuedSynchronizer {
182         /** Synchronization state value representing that node acked */
183         private static final int ACK = 1;
184         /** Synchronization state value representing that node cancelled */
185         private static final int CANCEL = -1;
186
187         /** The item being transferred */
188         Object JavaDoc item;
189         /** Next node in wait queue */
190         Node next;
191
192         /** Creates a node with initial item */
193         Node(Object JavaDoc x) { item = x; }
194
195         /** Creates a node with initial item and next */
196         Node(Object JavaDoc x, Node n) { item = x; next = n; }
197
198         /**
199          * Implements AQS base acquire to succeed if not in WAITING state
200          */

201         protected boolean tryAcquire(int ignore) {
202             return getState() != 0;
203         }
204
205         /**
206          * Implements AQS base release to signal if state changed
207          */

208         protected boolean tryRelease(int newState) {
209             return compareAndSetState(0, newState);
210         }
211
212         /**
213          * Takes item and nulls out field (for sake of GC)
214          */

215         private Object JavaDoc extract() {
216             Object JavaDoc x = item;
217             item = null;
218             return x;
219         }
220
221         /**
222          * Tries to cancel on interrupt; if so rethrowing,
223          * else setting interrupt state
224          */

225         private void checkCancellationOnInterrupt(InterruptedException JavaDoc ie)
226             throws InterruptedException JavaDoc {
227             if (release(CANCEL))
228                 throw ie;
229             Thread.currentThread().interrupt();
230         }
231
232         /**
233          * Fills in the slot created by the consumer and signal consumer to
234          * continue.
235          */

236         boolean setItem(Object JavaDoc x) {
237             item = x; // can place in slot even if cancelled
238
return release(ACK);
239         }
240
241         /**
242          * Removes item from slot created by producer and signal producer
243          * to continue.
244          */

245         Object JavaDoc getItem() {
246             return (release(ACK))? extract() : null;
247         }
248
249         /**
250          * Waits for a consumer to take item placed by producer.
251          */

252         void waitForTake() throws InterruptedException JavaDoc {
253             try {
254                 acquireInterruptibly(0);
255             } catch (InterruptedException JavaDoc ie) {
256                 checkCancellationOnInterrupt(ie);
257             }
258         }
259
260         /**
261          * Waits for a producer to put item placed by consumer.
262          */

263         Object JavaDoc waitForPut() throws InterruptedException JavaDoc {
264             try {
265                 acquireInterruptibly(0);
266             } catch (InterruptedException JavaDoc ie) {
267                 checkCancellationOnInterrupt(ie);
268             }
269             return extract();
270         }
271
272         /**
273          * Waits for a consumer to take item placed by producer or time out.
274          */

275         boolean waitForTake(long nanos) throws InterruptedException JavaDoc {
276             try {
277                 if (!tryAcquireNanos(0, nanos) &&
278                     release(CANCEL))
279                     return false;
280             } catch (InterruptedException JavaDoc ie) {
281                 checkCancellationOnInterrupt(ie);
282             }
283             return true;
284         }
285
286         /**
287          * Waits for a producer to put item placed by consumer, or time out.
288          */

289         Object JavaDoc waitForPut(long nanos) throws InterruptedException JavaDoc {
290             try {
291                 if (!tryAcquireNanos(0, nanos) &&
292                     release(CANCEL))
293                     return null;
294             } catch (InterruptedException JavaDoc ie) {
295                 checkCancellationOnInterrupt(ie);
296             }
297             return extract();
298         }
299     }
300
301     /**
302      * Adds the specified element to this queue, waiting if necessary for
303      * another thread to receive it.
304      * @param o the element to add
305      * @throws InterruptedException if interrupted while waiting.
306      * @throws NullPointerException if the specified element is <tt>null</tt>.
307      */

308     public void put(E o) throws InterruptedException JavaDoc {
309         if (o == null) throw new NullPointerException JavaDoc();
310         final ReentrantLock qlock = this.qlock;
311
312         for (;;) {
313             Node node;
314             boolean mustWait;
315             if (Thread.interrupted()) throw new InterruptedException JavaDoc();
316             qlock.lock();
317             try {
318                 node = waitingConsumers.deq();
319                 if ( (mustWait = (node == null)) )
320                     node = waitingProducers.enq(o);
321             } finally {
322                 qlock.unlock();
323             }
324
325             if (mustWait) {
326                 node.waitForTake();
327                 return;
328             }
329
330             else if (node.setItem(o))
331                 return;
332
333             // else consumer cancelled, so retry
334
}
335     }
336
337     /**
338      * Inserts the specified element into this queue, waiting if necessary
339      * up to the specified wait time for another thread to receive it.
340      * @param o the element to add
341      * @param timeout how long to wait before giving up, in units of
342      * <tt>unit</tt>
343      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
344      * <tt>timeout</tt> parameter
345      * @return <tt>true</tt> if successful, or <tt>false</tt> if
346      * the specified waiting time elapses before a consumer appears.
347      * @throws InterruptedException if interrupted while waiting.
348      * @throws NullPointerException if the specified element is <tt>null</tt>.
349      */

350     public boolean offer(E o, long timeout, TimeUnit JavaDoc unit) throws InterruptedException JavaDoc {
351         if (o == null) throw new NullPointerException JavaDoc();
352         long nanos = unit.toNanos(timeout);
353         final ReentrantLock qlock = this.qlock;
354         for (;;) {
355             Node node;
356             boolean mustWait;
357             if (Thread.interrupted()) throw new InterruptedException JavaDoc();
358             qlock.lock();
359             try {
360                 node = waitingConsumers.deq();
361                 if ( (mustWait = (node == null)) )
362                     node = waitingProducers.enq(o);
363             } finally {
364                 qlock.unlock();
365             }
366
367             if (mustWait)
368                 return node.waitForTake(nanos);
369
370             else if (node.setItem(o))
371                 return true;
372
373             // else consumer cancelled, so retry
374
}
375     }
376
377     /**
378      * Retrieves and removes the head of this queue, waiting if necessary
379      * for another thread to insert it.
380      * @throws InterruptedException if interrupted while waiting.
381      * @return the head of this queue
382      */

383     public E take() throws InterruptedException JavaDoc {
384         final ReentrantLock qlock = this.qlock;
385         for (;;) {
386             Node node;
387             boolean mustWait;
388
389             if (Thread.interrupted()) throw new InterruptedException JavaDoc();
390             qlock.lock();
391             try {
392                 node = waitingProducers.deq();
393                 if ( (mustWait = (node == null)) )
394                     node = waitingConsumers.enq(null);
395             } finally {
396                 qlock.unlock();
397             }
398
399             if (mustWait) {
400                 Object JavaDoc x = node.waitForPut();
401                 return (E)x;
402             }
403             else {
404                 Object JavaDoc x = node.getItem();
405                 if (x != null)
406                     return (E)x;
407                 // else cancelled, so retry
408
}
409         }
410     }
411
412     /**
413      * Retrieves and removes the head of this queue, waiting
414      * if necessary up to the specified wait time, for another thread
415      * to insert it.
416      * @param timeout how long to wait before giving up, in units of
417      * <tt>unit</tt>
418      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
419      * <tt>timeout</tt> parameter
420      * @return the head of this queue, or <tt>null</tt> if the
421      * specified waiting time elapses before an element is present.
422      * @throws InterruptedException if interrupted while waiting.
423      */

424     public E poll(long timeout, TimeUnit JavaDoc unit) throws InterruptedException JavaDoc {
425         long nanos = unit.toNanos(timeout);
426         final ReentrantLock qlock = this.qlock;
427
428         for (;;) {
429             Node node;
430             boolean mustWait;
431
432             if (Thread.interrupted()) throw new InterruptedException JavaDoc();
433             qlock.lock();
434             try {
435                 node = waitingProducers.deq();
436                 if ( (mustWait = (node == null)) )
437                     node = waitingConsumers.enq(null);
438             } finally {
439                 qlock.unlock();
440             }
441
442             if (mustWait) {
443                 Object JavaDoc x = node.waitForPut(nanos);
444                 return (E)x;
445             }
446             else {
447                 Object JavaDoc x = node.getItem();
448                 if (x != null)
449                     return (E)x;
450                 // else cancelled, so retry
451
}
452         }
453     }
454
455     // Untimed nonblocking versions
456

457    /**
458     * Inserts the specified element into this queue, if another thread is
459     * waiting to receive it.
460     *
461     * @param o the element to add.
462     * @return <tt>true</tt> if it was possible to add the element to
463     * this queue, else <tt>false</tt>
464     * @throws NullPointerException if the specified element is <tt>null</tt>
465     */

466     public boolean offer(E o) {
467         if (o == null) throw new NullPointerException JavaDoc();
468         final ReentrantLock qlock = this.qlock;
469
470         for (;;) {
471             Node node;
472             qlock.lock();
473             try {
474                 node = waitingConsumers.deq();
475             } finally {
476                 qlock.unlock();
477             }
478             if (node == null)
479                 return false;
480
481             else if (node.setItem(o))
482                 return true;
483             // else retry
484
}
485     }
486
487     /**
488      * Retrieves and removes the head of this queue, if another thread
489      * is currently making an element available.
490      *
491      * @return the head of this queue, or <tt>null</tt> if no
492      * element is available.
493      */

494     public E poll() {
495         final ReentrantLock qlock = this.qlock;
496         for (;;) {
497             Node node;
498             qlock.lock();
499             try {
500                 node = waitingProducers.deq();
501             } finally {
502                 qlock.unlock();
503             }
504             if (node == null)
505                 return null;
506
507             else {
508                 Object JavaDoc x = node.getItem();
509                 if (x != null)
510                     return (E)x;
511                 // else retry
512
}
513         }
514     }
515
516     /**
517      * Always returns <tt>true</tt>.
518      * A <tt>SynchronousQueue</tt> has no internal capacity.
519      * @return <tt>true</tt>
520      */

521     public boolean isEmpty() {
522         return true;
523     }
524
525     /**
526      * Always returns zero.
527      * A <tt>SynchronousQueue</tt> has no internal capacity.
528      * @return zero.
529      */

530     public int size() {
531         return 0;
532     }
533
534     /**
535      * Always returns zero.
536      * A <tt>SynchronousQueue</tt> has no internal capacity.
537      * @return zero.
538      */

539     public int remainingCapacity() {
540         return 0;
541     }
542
543     /**
544      * Does nothing.
545      * A <tt>SynchronousQueue</tt> has no internal capacity.
546      */

547     public void clear() {}
548
549     /**
550      * Always returns <tt>false</tt>.
551      * A <tt>SynchronousQueue</tt> has no internal capacity.
552      * @param o the element
553      * @return <tt>false</tt>
554      */

555     public boolean contains(Object JavaDoc o) {
556         return false;
557     }
558
559     /**
560      * Always returns <tt>false</tt>.
561      * A <tt>SynchronousQueue</tt> has no internal capacity.
562      *
563      * @param o the element to remove
564      * @return <tt>false</tt>
565      */

566     public boolean remove(Object JavaDoc o) {
567         return false;
568     }
569
570     /**
571      * Returns <tt>false</tt> unless given collection is empty.
572      * A <tt>SynchronousQueue</tt> has no internal capacity.
573      * @param c the collection
574      * @return <tt>false</tt> unless given collection is empty
575      */

576     public boolean containsAll(Collection<?> c) {
577         return c.isEmpty();
578     }
579
580     /**
581      * Always returns <tt>false</tt>.
582      * A <tt>SynchronousQueue</tt> has no internal capacity.
583      * @param c the collection
584      * @return <tt>false</tt>
585      */

586     public boolean removeAll(Collection<?> c) {
587         return false;
588     }
589
590     /**
591      * Always returns <tt>false</tt>.
592      * A <tt>SynchronousQueue</tt> has no internal capacity.
593      * @param c the collection
594      * @return <tt>false</tt>
595      */

596     public boolean retainAll(Collection<?> c) {
597         return false;
598     }
599
600     /**
601      * Always returns <tt>null</tt>.
602      * A <tt>SynchronousQueue</tt> does not return elements
603      * unless actively waited on.
604      * @return <tt>null</tt>
605      */

606     public E peek() {
607         return null;
608     }
609
610
611     static class EmptyIterator<E> implements Iterator<E> {
612         public boolean hasNext() {
613             return false;
614         }
615         public E next() {
616             throw new NoSuchElementException();
617         }
618         public void remove() {
619             throw new IllegalStateException JavaDoc();
620         }
621     }
622
623     /**
624      * Returns an empty iterator in which <tt>hasNext</tt> always returns
625      * <tt>false</tt>.
626      *
627      * @return an empty iterator
628      */

629     public Iterator<E> iterator() {
630         return new EmptyIterator<E>();
631     }
632
633
634     /**
635      * Returns a zero-length array.
636      * @return a zero-length array
637      */

638     public Object JavaDoc[] toArray() {
639         return new Object JavaDoc[0];
640     }
641
642     /**
643      * Sets the zeroeth element of the specified array to <tt>null</tt>
644      * (if the array has non-zero length) and returns it.
645      * @param a the array
646      * @return the specified array
647      */

648     public <T> T[] toArray(T[] a) {
649         if (a.length > 0)
650             a[0] = null;
651         return a;
652     }
653
654
655     public int drainTo(Collection<? super E> c) {
656         if (c == null)
657             throw new NullPointerException JavaDoc();
658         if (c == this)
659             throw new IllegalArgumentException JavaDoc();
660         int n = 0;
661         E e;
662         while ( (e = poll()) != null) {
663             c.add(e);
664             ++n;
665         }
666         return n;
667     }
668
669     public int drainTo(Collection<? super E> c, int maxElements) {
670         if (c == null)
671             throw new NullPointerException JavaDoc();
672         if (c == this)
673             throw new IllegalArgumentException JavaDoc();
674         int n = 0;
675         E e;
676         while (n < maxElements && (e = poll()) != null) {
677             c.add(e);
678             ++n;
679         }
680         return n;
681     }
682 }
683
684
685
686
687
688
Popular Tags