| ||||
|
Code - Class EDU.oswego.cs.dl.util.concurrent.SynchronousChannel1 /* 2 File: SynchronousChannel.java 3 4 Originally written by Doug Lea and released into the public domain. 5 This may be used for any purposes whatsoever without acknowledgment. 6 Thanks for the assistance and support of Sun Microsystems Labs, 7 and everyone contributing, testing, and using this code. 8 9 History: 10 Date Who What 11 11Jun1998 dl Create public version 12 17Jul1998 dl Disabled direct semaphore permit check 13 31Jul1998 dl Replaced main algorithm with one with 14 better scaling and fairness properties. 15 25aug1998 dl added peek 16 24Nov2001 dl Replaced main algorithm with faster one. 17 */ 18 19 package EDU.oswego.cs.dl.util.concurrent; 20 21 /** 22 * A rendezvous channel, similar to those used in CSP and Ada. Each 23 * put must wait for a take, and vice versa. Synchronous channels 24 * are well suited for handoff designs, in which an object running in 25 * one thread must synch up with an object running in another thread 26 * in order to hand it some information, event, or task. 27 * <p> If you only need threads to synch up without 28 * exchanging information, consider using a Barrier. If you need 29 * bidirectional exchanges, consider using a Rendezvous. <p> 30 * 31 * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] 32 * @see CyclicBarrier 33 * @see Rendezvous 34 **/ 35 36 public class SynchronousChannel implements BoundedChannel { 37 38 /* 39 This implementation divides actions into two cases for puts: 40 41 * An arriving putter that does not already have a waiting taker 42 creates a node holding item, and then waits for a taker to take it. 43 * An arriving putter that does already have a waiting taker fills 44 the slot node created by the taker, and notifies it to continue. 45 46 And symmetrically, two for takes: 47 48 * An arriving taker that does not already have a waiting putter 49 creates an empty slot node, and then waits for a putter to fill it. 50 * An arriving taker that does already have a waiting putter takes 51 item from the node created by the putter, and notifies it to continue. 52 53 This requires keeping two simple queues: waitingPuts and waitingTakes. 54 55 When a put or take waiting for the actions of its counterpart 56 aborts due to interruption or timeout, it marks the node 57 it created as "CANCELLED", which causes its counterpart to retry 58 the entire put or take sequence. 59 */ 60 61 /** 62 * Special marker used in queue nodes to indicate that 63 * the thread waiting for a change in the node has timed out 64 * or been interrupted. 65 **/ 66 protected static final Object CANCELLED = new Object(); 67 68 /** 69 * Simple FIFO queue class to hold waiting puts/takes. 70 **/ 71 protected static class Queue { 72 protected LinkedNode head; 73 protected LinkedNode last; 74 75 protected void enq(LinkedNode p) { 76 if (last == null) 77 last = head = p; 78 else 79 last = last.next = p; 80 } 81 82 protected LinkedNode deq() { 83 LinkedNode p = head; 84 if (p != null && (head = p.next) == null) 85 last = null; 86 return p; 87 } 88 } 89 90 protected final Queue waitingPuts = new Queue(); 91 protected final Queue waitingTakes = new Queue(); 92 93 /** 94 * @return zero -- 95 * Synchronous channels have no internal capacity. 96 **/ 97 public int capacity() { return 0; } 98 99 /** 100 * @return null -- 101 * Synchronous channels do not hold contents unless actively taken 102 **/ 103 public Object peek() { return null; } 104 105 106 public void put(Object x) throws InterruptedException { 107 if (x == null) throw new IllegalArgumentException(); 108 109 // This code is conceptually straightforward, but messy 110 // because we need to intertwine handling of put-arrives first 111 // vs take-arrives first cases. 112 113 // Outer loop is to handle retry due to cancelled waiting taker 114 for (;;) { 115 116 // Get out now if we are interrupted 117 if (Thread.interrupted()) throw new InterruptedException(); 118 119 // Exactly one of item or slot will be nonnull at end of 120 // synchronized block, depending on whether a put or a take 121 // arrived first. 122 LinkedNode slot; 123 LinkedNode item = null; 124 125 synchronized(this) { 126 // Try to match up with a waiting taker; fill and signal it below 127 slot = waitingTakes.deq(); 128 129 // If no takers yet, create a node and wait below 130 if (slot == null) 131 waitingPuts.enq(item = new LinkedNode(x)); 132 } 133 134 if (slot != null) { // There is a waiting taker. 135 // Fill in the slot created by the taker and signal taker to 136 // continue. 137 synchronized(slot) { 138 if (slot.value != CANCELLED) { 139 slot.value = x; 140 slot.notify(); 141 return; 142 } 143 // else the taker has cancelled, so retry outer loop 144 } 145 } 146 147 else { 148 // Wait for a taker to arrive and take the item. 149 synchronized(item) { 150 try { 151 while (item.value != null) 152 item.wait(); 153 return; 154 } 155 catch (InterruptedException ie) { 156 // If item was taken, return normally but set interrupt status 157 if (item.value == null) { 158 Thread.currentThread().interrupt(); 159 return; 160 } 161 else { 162 item.value = CANCELLED; 163 throw ie; 164 } 165 } 166 } 167 } 168 } 169 } 170 171 public Object take() throws InterruptedException { 172 // Entirely symmetric to put() 173 174 for (;;) { 175 if (Thread.interrupted()) throw new InterruptedException(); 176 177 LinkedNode item; 178 LinkedNode slot = null; 179 180 synchronized(this) { 181 item = waitingPuts.deq(); 182 if (item == null) 183 waitingTakes.enq(slot = new LinkedNode()); 184 } 185 186 if (item != null) { 187 synchronized(item) { 188 Object x = item.value; 189 if (x != CANCELLED) { 190 item.value = null; 191 item.next = null; 192 item.notify(); 193 return x; 194 } 195 } 196 } 197 198 else { 199 synchronized(slot) { 200 try { 201 for (;;) { 202 Object x = slot.value; 203 if (x != null) { 204 slot.value = null; 205 slot.next = null; 206 return x; 207 } 208 else 209 slot.wait(); 210 } 211 } 212 catch(InterruptedException ie) { 213 Object x = slot.value; 214 if (x != null) { 215 slot.value = null; 216 slot.next = null; 217 Thread.currentThread().interrupt(); 218 return x; 219 } 220 else { 221 slot.value = CANCELLED; 222 throw ie; 223 } 224 } 225 } 226 } 227 } 228 } 229 230 /* 231 Offer and poll are just like put and take, except even messier. 232 */ 233 234 235 public boolean offer(Object x, long msecs) throws InterruptedException { 236 if (x == null) throw new IllegalArgumentException(); 237 long waitTime = msecs; 238 long startTime = 0; // lazily initialize below if needed 239 240 for (;;) { 241 if (Thread.interrupted()) throw new InterruptedException(); 242 243 LinkedNode slot; 244 LinkedNode item = null; 245 246 synchronized(this) { 247 slot = waitingTakes.deq(); 248 if (slot == null) { 249 if (waitTime <= 0) 250 return false; 251 else 252 waitingPuts.enq(item = new LinkedNode(x)); 253 } 254 } 255 256 if (slot != null) { 257 synchronized(slot) { 258 if (slot.value != CANCELLED) { 259 slot.value = x; 260 slot.notify(); 261 return true; 262 } 263 } 264 } 265 266 long now = System.currentTimeMillis(); 267 if (startTime == 0) 268 startTime = now; 269 else 270 waitTime = msecs - (now - startTime); 271 272 if (item != null) { 273 synchronized(item) { 274 try { 275 for (;;) { 276 if (item.value == null) 277 return true; 278 if (waitTime <= 0) { 279 item.value = CANCELLED; 280 return false; 281 } 282 item.wait(waitTime); 283 waitTime = msecs - (System.currentTimeMillis() - startTime); 284 } 285 } 286 catch (InterruptedException ie) { 287 if (item.value == null) { 288 Thread.currentThread().interrupt(); 289 return true; 290 } 291 else { 292 item.value = CANCELLED; 293 throw ie; 294 } 295 } 296 } 297 } 298 } 299 } 300 301 public Object poll(long msecs) throws InterruptedException { 302 long waitTime = msecs; 303 long startTime = 0; 304 305 for (;;) { 306 if (Thread.interrupted()) throw new InterruptedException(); 307 308 LinkedNode item; 309 LinkedNode slot = null; 310 311 synchronized(this) { 312 item = waitingPuts.deq(); 313 if (item == null) { 314 if (waitTime <= 0) 315 return null; 316 else 317 waitingTakes.enq(slot = new LinkedNode()); 318 } 319 } 320 321 if (item != null) { 322 synchronized(item) { 323 Object x = item.value; 324 if (x != CANCELLED) { 325 item.value = null; 326 item.next = null; 327 item.notify(); 328 return x; 329 } 330 } 331 } 332 333 long now = System.currentTimeMillis(); 334 if (startTime == 0) 335 startTime = now; 336 else 337 waitTime = msecs - (now - startTime); 338 339 if (slot != null) { 340 synchronized(slot) { 341 try { 342 for (;;) { 343 Object x = slot.value; 344 if (x != null) { 345 slot.value = null; 346 slot.next = null; 347 return x; 348 } 349 if (waitTime <= 0) { 350 slot.value = CANCELLED; 351 return null; 352 } 353 slot.wait(waitTime); 354 waitTime = msecs - (System.currentTimeMillis() - startTime); 355 } 356 } 357 catch(InterruptedException ie) { 358 Object x = slot.value; 359 if (x != null) { 360 slot.value = null; 361 slot.next = null; 362 Thread.currentThread().interrupt(); 363 return x; 364 } 365 else { 366 slot.value = CANCELLED; 367 throw ie; 368 } 369 } 370 } 371 } 372 } 373 } 374 375 } 376 |
|||
Java API By Example, From Geeks To Geeks. |
Conditions of Use |
About Us
© 2002 - 2005, KickJava.com, or its affiliates
|