KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > logicalcobwebs > concurrent > LinkedQueue


1 /*
2   File: LinkedQueue.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jun1998 dl Create public version
12   25aug1998 dl added peek
13   10dec1998 dl added isEmpty
14   10oct1999 dl lock on node object to ensure visibility
15 */

16
17 package org.logicalcobwebs.concurrent;
18
19 /**
20  * A linked list based channel implementation.
21  * The algorithm avoids contention between puts
22  * and takes when the queue is not empty.
23  * Normally a put and a take can proceed simultaneously.
24  * (Although it does not allow multiple concurrent puts or takes.)
25  * This class tends to perform more efficently than
26  * other Channel implementations in producer/consumer
27  * applications.
28  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
29  **/

30
31 public class LinkedQueue implements Channel {
32
33
34     /**
35      * Dummy header node of list. The first actual node, if it exists, is always
36      * at head_.next. After each take, the old first node becomes the head.
37      **/

38     protected LinkedNode head_;
39
40     /**
41      * Helper monitor for managing access to last node.
42      **/

43     protected final Object JavaDoc putLock_ = new Object JavaDoc();
44
45     /**
46      * The last node of list. Put() appends to list, so modifies last_
47      **/

48     protected LinkedNode last_;
49
50     /**
51      * The number of threads waiting for a take.
52      * Notifications are provided in put only if greater than zero.
53      * The bookkeeping is worth it here since in reasonably balanced
54      * usages, the notifications will hardly ever be necessary, so
55      * the call overhead to notify can be eliminated.
56      **/

57     protected int waitingForTake_ = 0;
58
59     public LinkedQueue() {
60         head_ = new LinkedNode(null);
61         last_ = head_;
62     }
63
64     /** Main mechanics for put/offer **/
65     protected void insert(Object JavaDoc x) {
66         synchronized (putLock_) {
67             LinkedNode p = new LinkedNode(x);
68             synchronized (last_) {
69                 last_.next = p;
70                 last_ = p;
71             }
72             if (waitingForTake_ > 0)
73                 putLock_.notify();
74         }
75     }
76
77     /** Main mechanics for take/poll **/
78     protected synchronized Object JavaDoc extract() {
79         synchronized (head_) {
80             Object JavaDoc x = null;
81             LinkedNode first = head_.next;
82             if (first != null) {
83                 x = first.value;
84                 first.value = null;
85                 head_ = first;
86             }
87             return x;
88         }
89     }
90
91
92     public void put(Object JavaDoc x) throws InterruptedException JavaDoc {
93         if (x == null) throw new IllegalArgumentException JavaDoc();
94         if (Thread.interrupted()) throw new InterruptedException JavaDoc();
95         insert(x);
96     }
97
98     public boolean offer(Object JavaDoc x, long msecs) throws InterruptedException JavaDoc {
99         if (x == null) throw new IllegalArgumentException JavaDoc();
100         if (Thread.interrupted()) throw new InterruptedException JavaDoc();
101         insert(x);
102         return true;
103     }
104
105     public Object JavaDoc take() throws InterruptedException JavaDoc {
106         if (Thread.interrupted()) throw new InterruptedException JavaDoc();
107         // try to extract. If fail, then enter wait-based retry loop
108
Object JavaDoc x = extract();
109         if (x != null)
110             return x;
111         else {
112             synchronized (putLock_) {
113                 try {
114                     ++waitingForTake_;
115                     for (; ;) {
116                         x = extract();
117                         if (x != null) {
118                             --waitingForTake_;
119                             return x;
120                         } else {
121                             putLock_.wait();
122                         }
123                     }
124                 } catch (InterruptedException JavaDoc ex) {
125                     --waitingForTake_;
126                     putLock_.notify();
127                     throw ex;
128                 }
129             }
130         }
131     }
132
133     public Object JavaDoc peek() {
134         synchronized (head_) {
135             LinkedNode first = head_.next;
136             if (first != null)
137                 return first.value;
138             else
139                 return null;
140         }
141     }
142
143
144     public boolean isEmpty() {
145         synchronized (head_) {
146             return head_.next == null;
147         }
148     }
149
150     public Object JavaDoc poll(long msecs) throws InterruptedException JavaDoc {
151         if (Thread.interrupted()) throw new InterruptedException JavaDoc();
152         Object JavaDoc x = extract();
153         if (x != null)
154             return x;
155         else {
156             synchronized (putLock_) {
157                 try {
158                     long waitTime = msecs;
159                     long start = (msecs <= 0) ? 0 : System.currentTimeMillis();
160                     ++waitingForTake_;
161                     for (; ;) {
162                         x = extract();
163                         if (x != null || waitTime <= 0) {
164                             --waitingForTake_;
165                             return x;
166                         } else {
167                             putLock_.wait(waitTime);
168                             waitTime = msecs - (System.currentTimeMillis() - start);
169                         }
170                     }
171                 } catch (InterruptedException JavaDoc ex) {
172                     --waitingForTake_;
173                     putLock_.notify();
174                     throw ex;
175                 }
176             }
177         }
178     }
179 }
180
181
182
Popular Tags