| ||||
|
Code - Class EDU.oswego.cs.dl.util.concurrent.CondVar1 /* 2 File: ConditionVariable.java 3 4 Originally written by Doug Lea and released into the public domain. 5 This may be used for any purposes whatsoever without acknowledgment. 6 Thanks for the assistance and support of Sun Microsystems Labs, 7 and everyone contributing, testing, and using this code. 8 9 History: 10 Date Who What 11 11Jun1998 dl Create public version 12 */ 13 14 package EDU.oswego.cs.dl.util.concurrent; 15 16 /** 17 * This class is designed for fans of POSIX pthreads programming. 18 * If you restrict yourself to Mutexes and CondVars, you can 19 * use most of your favorite constructions. Don't randomly mix them 20 * with synchronized methods or blocks though. 21 * <p> 22 * Method names and behavior are as close as is reasonable to 23 * those in POSIX. 24 * <p> 25 * <b>Sample Usage.</b> Here is a full version of a bounded buffer 26 * that implements the BoundedChannel interface, written in 27 * a style reminscent of that in POSIX programming books. 28 * <pre> 29 * class CVBuffer implements BoundedChannel { 30 * private final Mutex mutex; 31 * private final CondVar notFull; 32 * private final CondVar notEmpty; 33 * private int count = 0; 34 * private int takePtr = 0; 35 * private int putPtr = 0; 36 * private final Object[] array; 37 * 38 * public CVBuffer(int capacity) { 39 * array = new Object[capacity]; 40 * mutex = new Mutex(); 41 * notFull = new CondVar(mutex); 42 * notEmpty = new CondVar(mutex); 43 * } 44 * 45 * public int capacity() { return array.length; } 46 * 47 * public void put(Object x) throws InterruptedException { 48 * mutex.acquire(); 49 * try { 50 * while (count == array.length) { 51 * notFull.await(); 52 * } 53 * array[putPtr] = x; 54 * putPtr = (putPtr + 1) % array.length; 55 * ++count; 56 * notEmpty.signal(); 57 * } 58 * finally { 59 * mutex.release(); 60 * } 61 * } 62 * 63 * public Object take() throws InterruptedException { 64 * Object x = null; 65 * mutex.acquire(); 66 * try { 67 * while (count == 0) { 68 * notEmpty.await(); 69 * } 70 * x = array[takePtr]; 71 * array[takePtr] = null; 72 * takePtr = (takePtr + 1) % array.length; 73 * --count; 74 * notFull.signal(); 75 * } 76 * finally { 77 * mutex.release(); 78 * } 79 * return x; 80 * } 81 * 82 * public boolean offer(Object x, long msecs) throws InterruptedException { 83 * mutex.acquire(); 84 * try { 85 * if (count == array.length) { 86 * notFull.timedwait(msecs); 87 * if (count == array.length) 88 * return false; 89 * } 90 * array[putPtr] = x; 91 * putPtr = (putPtr + 1) % array.length; 92 * ++count; 93 * notEmpty.signal(); 94 * return true; 95 * } 96 * finally { 97 * mutex.release(); 98 * } 99 * } 100 * 101 * public Object poll(long msecs) throws InterruptedException { 102 * Object x = null; 103 * mutex.acquire(); 104 * try { 105 * if (count == 0) { 106 * notEmpty.timedwait(msecs); 107 * if (count == 0) 108 * return null; 109 * } 110 * x = array[takePtr]; 111 * array[takePtr] = null; 112 * takePtr = (takePtr + 1) % array.length; 113 * --count; 114 * notFull.signal(); 115 * } 116 * finally { 117 * mutex.release(); 118 * } 119 * return x; 120 * } 121 * } 122 * 123 * </pre> 124 * @see Mutex 125 * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] 126 127 **/ 128 129 public class CondVar { 130 131 /** The mutex **/ 132 protected final Sync mutex_; 133 134 /** 135 * Create a new CondVar that relies on the given mutual 136 * exclusion lock. 137 * @param mutex A non-reentrant mutual exclusion lock. 138 * Standard usage is to supply an instance of <code>Mutex</code>, 139 * but, for example, a Semaphore initialized to 1 also works. 140 * On the other hand, many other Sync implementations would not 141 * work here, so some care is required to supply a sensible 142 * synchronization object. 143 * In normal use, the mutex should be one that is used for <em>all</em> 144 * synchronization of the object using the CondVar. Generally, 145 * to prevent nested monitor lockouts, this 146 * object should not use any native Java synchronized blocks. 147 **/ 148 149 public CondVar(Sync mutex) { 150 mutex_ = mutex; 151 } 152 153 /** 154 * Wait for notification. This operation at least momentarily 155 * releases the mutex. The mutex is always held upon return, 156 * even if interrupted. 157 * @exception InterruptedException if the thread was interrupted 158 * before or during the wait. However, if the thread is interrupted 159 * after the wait but during mutex re-acquisition, the interruption 160 * is ignored, while still ensuring 161 * that the currentThread's interruption state stays true, so can 162 * be probed by callers. 163 **/ 164 public void await() throws InterruptedException { 165 if (Thread.interrupted()) throw new InterruptedException(); 166 try { 167 synchronized(this) { 168 mutex_.release(); 169 try { 170 wait(); 171 } 172 catch (InterruptedException ex) { 173 notify(); 174 throw ex; 175 } 176 } 177 } 178 finally { 179 // Must ignore interrupt on re-acquire 180 boolean interrupted = false; 181 for (;;) { 182 try { 183 mutex_.acquire(); 184 break; 185 } 186 catch (InterruptedException ex) { 187 interrupted = true; 188 } 189 } 190 if (interrupted) { 191 Thread.currentThread().interrupt(); 192 } 193 } 194 } 195 196 /** 197 * Wait for at most msecs for notification. 198 * This operation at least momentarily 199 * releases the mutex. The mutex is always held upon return, 200 * even if interrupted. 201 * @param msecs The time to wait. A value less than or equal to zero 202 * causes a momentarily release 203 * and re-acquire of the mutex, and always returns false. 204 * @return false if at least msecs have elapsed 205 * upon resumption; else true. A 206 * false return does NOT necessarily imply that the thread was 207 * not notified. For example, it might have been notified 208 * after the time elapsed but just before resuming. 209 * @exception InterruptedException if the thread was interrupted 210 * before or during the wait. 211 **/ 212 213 public boolean timedwait(long msecs) throws InterruptedException { 214 if (Thread.interrupted()) throw new InterruptedException(); 215 boolean success = false; 216 try { 217 synchronized(this) { 218 mutex_.release(); 219 try { 220 if (msecs > 0) { 221 long start = System.currentTimeMillis(); 222 wait(msecs); 223 success = System.currentTimeMillis() - start <= msecs; 224 } 225 } 226 catch (InterruptedException ex) { 227 notify(); 228 throw ex; 229 } 230 } 231 } 232 finally { 233 // Must ignore interrupt on re-acquire 234 boolean interrupted = false; 235 for (;;) { 236 try { 237 mutex_.acquire(); 238 break; 239 } 240 catch (InterruptedException ex) { 241 interrupted = true; 242 } 243 } 244 if (interrupted) { 245 Thread.currentThread().interrupt(); 246 } 247 } 248 return success; 249 } 250 251 /** 252 * Notify a waiting thread. 253 * If one exists, a non-interrupted thread will return 254 * normally (i.e., not via InterruptedException) from await or timedwait. 255 **/ 256 public synchronized void signal() { 257 notify(); 258 } 259 260 /** Notify all waiting threads **/ 261 public synchronized void broadcast() { 262 notifyAll(); 263 } 264 } 265 |
|||
Java API By Example, From Geeks To Geeks. |
Conditions of Use |
About Us
© 2002 - 2005, KickJava.com, or its affiliates
|