Code - Class EDU.oswego.cs.dl.util.concurrent.CondVar


1 /*
2   File: ConditionVariable.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jun1998 dl Create public version
12 */

13
14 package EDU.oswego.cs.dl.util.concurrent;
15
16 /**
17  * This class is designed for fans of POSIX pthreads programming.
18  * If you restrict yourself to Mutexes and CondVars, you can
19  * use most of your favorite constructions. Don't randomly mix them
20  * with synchronized methods or blocks though.
21  * <p>
22  * Method names and behavior are as close as is reasonable to
23  * those in POSIX.
24  * <p>
25  * <b>Sample Usage.</b> Here is a full version of a bounded buffer
26  * that implements the BoundedChannel interface, written in
27  * a style reminscent of that in POSIX programming books.
28  * <pre>
29  * class CVBuffer implements BoundedChannel {
30  * private final Mutex mutex;
31  * private final CondVar notFull;
32  * private final CondVar notEmpty;
33  * private int count = 0;
34  * private int takePtr = 0;
35  * private int putPtr = 0;
36  * private final Object[] array;
37  *
38  * public CVBuffer(int capacity) {
39  * array = new Object[capacity];
40  * mutex = new Mutex();
41  * notFull = new CondVar(mutex);
42  * notEmpty = new CondVar(mutex);
43  * }
44  *
45  * public int capacity() { return array.length; }
46  *
47  * public void put(Object x) throws InterruptedException {
48  * mutex.acquire();
49  * try {
50  * while (count == array.length) {
51  * notFull.await();
52  * }
53  * array[putPtr] = x;
54  * putPtr = (putPtr + 1) % array.length;
55  * ++count;
56  * notEmpty.signal();
57  * }
58  * finally {
59  * mutex.release();
60  * }
61  * }
62  *
63  * public Object take() throws InterruptedException {
64  * Object x = null;
65  * mutex.acquire();
66  * try {
67  * while (count == 0) {
68  * notEmpty.await();
69  * }
70  * x = array[takePtr];
71  * array[takePtr] = null;
72  * takePtr = (takePtr + 1) % array.length;
73  * --count;
74  * notFull.signal();
75  * }
76  * finally {
77  * mutex.release();
78  * }
79  * return x;
80  * }
81  *
82  * public boolean offer(Object x, long msecs) throws InterruptedException {
83  * mutex.acquire();
84  * try {
85  * if (count == array.length) {
86  * notFull.timedwait(msecs);
87  * if (count == array.length)
88  * return false;
89  * }
90  * array[putPtr] = x;
91  * putPtr = (putPtr + 1) % array.length;
92  * ++count;
93  * notEmpty.signal();
94  * return true;
95  * }
96  * finally {
97  * mutex.release();
98  * }
99  * }
100  *
101  * public Object poll(long msecs) throws InterruptedException {
102  * Object x = null;
103  * mutex.acquire();
104  * try {
105  * if (count == 0) {
106  * notEmpty.timedwait(msecs);
107  * if (count == 0)
108  * return null;
109  * }
110  * x = array[takePtr];
111  * array[takePtr] = null;
112  * takePtr = (takePtr + 1) % array.length;
113  * --count;
114  * notFull.signal();
115  * }
116  * finally {
117  * mutex.release();
118  * }
119  * return x;
120  * }
121  * }
122  *
123  * </pre>
124  * @see Mutex
125  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
126
127  **/

128
129 public class CondVar {
130
131   /** The mutex **/
132   protected final Sync mutex_;
133
134   /**
135    * Create a new CondVar that relies on the given mutual
136    * exclusion lock.
137    * @param mutex A non-reentrant mutual exclusion lock.
138    * Standard usage is to supply an instance of <code>Mutex</code>,
139    * but, for example, a Semaphore initialized to 1 also works.
140    * On the other hand, many other Sync implementations would not
141    * work here, so some care is required to supply a sensible
142    * synchronization object.
143    * In normal use, the mutex should be one that is used for <em>all</em>
144    * synchronization of the object using the CondVar. Generally,
145    * to prevent nested monitor lockouts, this
146    * object should not use any native Java synchronized blocks.
147    **/

148  
149   public CondVar(Sync mutex) {
150     mutex_ = mutex;
151   }
152
153   /**
154    * Wait for notification. This operation at least momentarily
155    * releases the mutex. The mutex is always held upon return,
156    * even if interrupted.
157    * @exception InterruptedException if the thread was interrupted
158    * before or during the wait. However, if the thread is interrupted
159    * after the wait but during mutex re-acquisition, the interruption
160    * is ignored, while still ensuring
161    * that the currentThread's interruption state stays true, so can
162    * be probed by callers.
163    **/

164   public void await() throws InterruptedException {
165     if (Thread.interrupted()) throw new InterruptedException();
166     try {
167       synchronized(this) {
168         mutex_.release();
169         try {
170           wait();
171         }
172         catch (InterruptedException ex) {
173           notify();
174           throw ex;
175         }
176       }
177     }
178     finally {
179       // Must ignore interrupt on re-acquire
180
boolean interrupted = false;
181       for (;;) {
182         try {
183           mutex_.acquire();
184           break;
185         }
186         catch (InterruptedException ex) {
187           interrupted = true;
188         }
189       }
190       if (interrupted) {
191         Thread.currentThread().interrupt();
192       }
193     }
194   }
195
196   /**
197    * Wait for at most msecs for notification.
198    * This operation at least momentarily
199    * releases the mutex. The mutex is always held upon return,
200    * even if interrupted.
201    * @param msecs The time to wait. A value less than or equal to zero
202    * causes a momentarily release
203    * and re-acquire of the mutex, and always returns false.
204    * @return false if at least msecs have elapsed
205    * upon resumption; else true. A
206    * false return does NOT necessarily imply that the thread was
207    * not notified. For example, it might have been notified
208    * after the time elapsed but just before resuming.
209    * @exception InterruptedException if the thread was interrupted
210    * before or during the wait.
211    **/

212
213   public boolean timedwait(long msecs) throws InterruptedException {
214     if (Thread.interrupted()) throw new InterruptedException();
215     boolean success = false;
216     try {
217       synchronized(this) {
218         mutex_.release();
219         try {
220           if (msecs > 0) {
221             long start = System.currentTimeMillis();
222             wait(msecs);
223             success = System.currentTimeMillis() - start <= msecs;
224           }
225         }
226         catch (InterruptedException ex) {
227           notify();
228           throw ex;
229         }
230       }
231     }
232     finally {
233       // Must ignore interrupt on re-acquire
234
boolean interrupted = false;
235       for (;;) {
236         try {
237           mutex_.acquire();
238           break;
239         }
240         catch (InterruptedException ex) {
241           interrupted = true;
242         }
243       }
244       if (interrupted) {
245         Thread.currentThread().interrupt();
246       }
247     }
248     return success;
249   }
250   
251   /**
252    * Notify a waiting thread.
253    * If one exists, a non-interrupted thread will return
254    * normally (i.e., not via InterruptedException) from await or timedwait.
255    **/

256   public synchronized void signal() {
257     notify();
258   }
259
260   /** Notify all waiting threads **/
261   public synchronized void broadcast() {
262     notifyAll();
263   }
264 }
265

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates