KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > corba > se > impl > orbutil > concurrent > CondVar


1 /*
2   File: ConditionVariable.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jun1998 dl Create public version
12   08dec2001 kmc Added support for Reentrant Mutexes
13 */

14
15 package com.sun.corba.se.impl.orbutil.concurrent;
16
17 import com.sun.corba.se.impl.orbutil.ORBUtility ;
18
19 /**
20  * This class is designed for fans of POSIX pthreads programming.
21  * If you restrict yourself to Mutexes and CondVars, you can
22  * use most of your favorite constructions. Don't randomly mix them
23  * with synchronized methods or blocks though.
24  * <p>
25  * Method names and behavior are as close as is reasonable to
26  * those in POSIX.
27  * <p>
28  * <b>Sample Usage.</b> Here is a full version of a bounded buffer
29  * that implements the BoundedChannel interface, written in
30  * a style reminscent of that in POSIX programming books.
31  * <pre>
32  * class CVBuffer implements BoundedChannel {
33  * private final Mutex mutex;
34  * private final CondVar notFull;
35  * private final CondVar notEmpty;
36  * private int count = 0;
37  * private int takePtr = 0;
38  * private int putPtr = 0;
39  * private final Object[] array;
40  *
41  * public CVBuffer(int capacity) {
42  * array = new Object[capacity];
43  * mutex = new Mutex();
44  * notFull = new CondVar(mutex);
45  * notEmpty = new CondVar(mutex);
46  * }
47  *
48  * public int capacity() { return array.length; }
49  *
50  * public void put(Object x) throws InterruptedException {
51  * mutex.acquire();
52  * try {
53  * while (count == array.length) {
54  * notFull.await();
55  * }
56  * array[putPtr] = x;
57  * putPtr = (putPtr + 1) % array.length;
58  * ++count;
59  * notEmpty.signal();
60  * }
61  * finally {
62  * mutex.release();
63  * }
64  * }
65  *
66  * public Object take() throws InterruptedException {
67  * Object x = null;
68  * mutex.acquire();
69  * try {
70  * while (count == 0) {
71  * notEmpty.await();
72  * }
73  * x = array[takePtr];
74  * array[takePtr] = null;
75  * takePtr = (takePtr + 1) % array.length;
76  * --count;
77  * notFull.signal();
78  * }
79  * finally {
80  * mutex.release();
81  * }
82  * return x;
83  * }
84  *
85  * public boolean offer(Object x, long msecs) throws InterruptedException {
86  * mutex.acquire();
87  * try {
88  * if (count == array.length) {
89  * notFull.timedwait(msecs);
90  * if (count == array.length)
91  * return false;
92  * }
93  * array[putPtr] = x;
94  * putPtr = (putPtr + 1) % array.length;
95  * ++count;
96  * notEmpty.signal();
97  * return true;
98  * }
99  * finally {
100  * mutex.release();
101  * }
102  * }
103  *
104  * public Object poll(long msecs) throws InterruptedException {
105  * Object x = null;
106  * mutex.acquire();
107  * try {
108  * if (count == 0) {
109  * notEmpty.timedwait(msecs);
110  * if (count == 0)
111  * return null;
112  * }
113  * x = array[takePtr];
114  * array[takePtr] = null;
115  * takePtr = (takePtr + 1) % array.length;
116  * --count;
117  * notFull.signal();
118  * }
119  * finally {
120  * mutex.release();
121  * }
122  * return x;
123  * }
124  * }
125  *
126  * </pre>
127  * @see Mutex
128  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
129
130  **/

131
132 public class CondVar {
133
134     protected boolean debug_ ;
135
136     /** The mutex **/
137     protected final Sync mutex_;
138     protected final ReentrantMutex remutex_;
139
140     private int releaseMutex()
141     {
142     int count = 1 ;
143
144     if (remutex_!=null)
145         count = remutex_.releaseAll() ;
146     else
147         mutex_.release() ;
148
149     return count ;
150     }
151
152     private void acquireMutex( int count ) throws InterruptedException JavaDoc
153     {
154     if (remutex_!=null)
155         remutex_.acquireAll( count ) ;
156     else
157         mutex_.acquire() ;
158     }
159   
160   /**
161    * Create a new CondVar that relies on the given mutual
162    * exclusion lock.
163    * @param mutex A mutual exclusion lock which must either be non-reentrant,
164    * or else be ReentrantMutex.
165    * Standard usage is to supply an instance of <code>Mutex</code>,
166    * but, for example, a Semaphore initialized to 1 also works.
167    * On the other hand, many other Sync implementations would not
168    * work here, so some care is required to supply a sensible
169    * synchronization object.
170    * In normal use, the mutex should be one that is used for <em>all</em>
171    * synchronization of the object using the CondVar. Generally,
172    * to prevent nested monitor lockouts, this
173    * object should not use any native Java synchronized blocks.
174    **/

175  
176   public CondVar(Sync mutex, boolean debug) {
177     debug_ = debug ;
178     mutex_ = mutex;
179     if (mutex instanceof ReentrantMutex)
180     remutex_ = (ReentrantMutex)mutex;
181     else
182     remutex_ = null;
183   }
184
185   public CondVar( Sync mutex ) {
186       this( mutex, false ) ;
187   }
188
189   /**
190    * Wait for notification. This operation at least momentarily
191    * releases the mutex. The mutex is always held upon return,
192    * even if interrupted.
193    * @exception InterruptedException if the thread was interrupted
194    * before or during the wait. However, if the thread is interrupted
195    * after the wait but during mutex re-acquisition, the interruption
196    * is ignored, while still ensuring
197    * that the currentThread's interruption state stays true, so can
198    * be probed by callers.
199    **/

200     public void await() throws InterruptedException JavaDoc {
201     int count = 0 ;
202     if (Thread.interrupted())
203         throw new InterruptedException JavaDoc();
204
205     try {
206         if (debug_)
207         ORBUtility.dprintTrace( this, "await enter" ) ;
208
209         synchronized(this) {
210         count = releaseMutex() ;
211         try {
212             wait();
213         } catch (InterruptedException JavaDoc ex) {
214             notify();
215             throw ex;
216         }
217         }
218     } finally {
219         // Must ignore interrupt on re-acquire
220
boolean interrupted = false;
221         for (;;) {
222         try {
223             acquireMutex( count );
224             break;
225         } catch (InterruptedException JavaDoc ex) {
226             interrupted = true;
227         }
228         }
229
230         if (interrupted) {
231         Thread.currentThread().interrupt();
232         }
233
234         if (debug_)
235         ORBUtility.dprintTrace( this, "await exit" ) ;
236     }
237     }
238
239     /**
240     * Wait for at most msecs for notification.
241     * This operation at least momentarily
242     * releases the mutex. The mutex is always held upon return,
243     * even if interrupted.
244     * @param msecs The time to wait. A value less than or equal to zero
245     * causes a momentarily release
246     * and re-acquire of the mutex, and always returns false.
247     * @return false if at least msecs have elapsed
248     * upon resumption; else true. A
249     * false return does NOT necessarily imply that the thread was
250     * not notified. For example, it might have been notified
251     * after the time elapsed but just before resuming.
252     * @exception InterruptedException if the thread was interrupted
253     * before or during the wait.
254     **/

255
256     public boolean timedwait(long msecs) throws InterruptedException JavaDoc {
257
258     if (Thread.interrupted())
259         throw new InterruptedException JavaDoc();
260
261     boolean success = false;
262     int count = 0;
263
264     try {
265         if (debug_)
266         ORBUtility.dprintTrace( this, "timedwait enter" ) ;
267
268         synchronized(this) {
269         count = releaseMutex() ;
270         try {
271             if (msecs > 0) {
272             long start = System.currentTimeMillis();
273             wait(msecs);
274             success = System.currentTimeMillis() - start <= msecs;
275             }
276         } catch (InterruptedException JavaDoc ex) {
277             notify();
278             throw ex;
279         }
280         }
281     } finally {
282         // Must ignore interrupt on re-acquire
283
boolean interrupted = false;
284         for (;;) {
285         try {
286             acquireMutex( count ) ;
287             break;
288         } catch (InterruptedException JavaDoc ex) {
289             interrupted = true;
290         }
291         }
292
293         if (interrupted) {
294         Thread.currentThread().interrupt();
295         }
296
297         if (debug_)
298         ORBUtility.dprintTrace( this, "timedwait exit" ) ;
299     }
300     return success;
301     }
302   
303     /**
304     * Notify a waiting thread.
305     * If one exists, a non-interrupted thread will return
306     * normally (i.e., not via InterruptedException) from await or timedwait.
307     **/

308     public synchronized void signal() {
309     notify();
310     }
311
312     /** Notify all waiting threads **/
313     public synchronized void broadcast() {
314     notifyAll();
315     }
316 }
317
Popular Tags