1 18 19 package EDU.oswego.cs.dl.util.concurrent; 20 21 35 36 public class SynchronousChannel implements BoundedChannel { 37 38 60 61 66 protected static final Object CANCELLED = new Object (); 67 68 71 protected static class Queue { 72 protected LinkedNode head; 73 protected LinkedNode last; 74 75 protected void enq(LinkedNode p) { 76 if (last == null) 77 last = head = p; 78 else 79 last = last.next = p; 80 } 81 82 protected LinkedNode deq() { 83 LinkedNode p = head; 84 if (p != null && (head = p.next) == null) 85 last = null; 86 return p; 87 } 88 } 89 90 protected final Queue waitingPuts = new Queue(); 91 protected final Queue waitingTakes = new Queue(); 92 93 97 public int capacity() { return 0; } 98 99 103 public Object peek() { return null; } 104 105 106 public void put(Object x) throws InterruptedException { 107 if (x == null) throw new IllegalArgumentException (); 108 109 113 for (;;) { 115 116 if (Thread.interrupted()) throw new InterruptedException (); 118 119 LinkedNode slot; 123 LinkedNode item = null; 124 125 synchronized(this) { 126 slot = waitingTakes.deq(); 128 129 if (slot == null) 131 waitingPuts.enq(item = new LinkedNode(x)); 132 } 133 134 if (slot != null) { synchronized(slot) { 138 if (slot.value != CANCELLED) { 139 slot.value = x; 140 slot.notify(); 141 return; 142 } 143 } 145 } 146 147 else { 148 synchronized(item) { 150 try { 151 while (item.value != null) 152 item.wait(); 153 return; 154 } 155 catch (InterruptedException ie) { 156 if (item.value == null) { 158 Thread.currentThread().interrupt(); 159 return; 160 } 161 else { 162 item.value = CANCELLED; 163 throw ie; 164 } 165 } 166 } 167 } 168 } 169 } 170 171 public Object take() throws InterruptedException { 172 174 for (;;) { 175 if (Thread.interrupted()) throw new InterruptedException (); 176 177 LinkedNode item; 178 LinkedNode slot = null; 179 180 synchronized(this) { 181 item = waitingPuts.deq(); 182 if (item == null) 183 waitingTakes.enq(slot = new LinkedNode()); 184 } 185 186 if (item != null) { 187 synchronized(item) { 188 Object x = item.value; 189 if (x != CANCELLED) { 190 item.value = null; 191 item.next = null; 192 item.notify(); 193 return x; 194 } 195 } 196 } 197 198 else { 199 synchronized(slot) { 200 try { 201 for (;;) { 202 Object x = slot.value; 203 if (x != null) { 204 slot.value = null; 205 slot.next = null; 206 return x; 207 } 208 else 209 slot.wait(); 210 } 211 } 212 catch(InterruptedException ie) { 213 Object x = slot.value; 214 if (x != null) { 215 slot.value = null; 216 slot.next = null; 217 Thread.currentThread().interrupt(); 218 return x; 219 } 220 else { 221 slot.value = CANCELLED; 222 throw ie; 223 } 224 } 225 } 226 } 227 } 228 } 229 230 233 234 235 public boolean offer(Object x, long msecs) throws InterruptedException { 236 if (x == null) throw new IllegalArgumentException (); 237 long waitTime = msecs; 238 long startTime = 0; 240 for (;;) { 241 if (Thread.interrupted()) throw new InterruptedException (); 242 243 LinkedNode slot; 244 LinkedNode item = null; 245 246 synchronized(this) { 247 slot = waitingTakes.deq(); 248 if (slot == null) { 249 if (waitTime <= 0) 250 return false; 251 else 252 waitingPuts.enq(item = new LinkedNode(x)); 253 } 254 } 255 256 if (slot != null) { 257 synchronized(slot) { 258 if (slot.value != CANCELLED) { 259 slot.value = x; 260 slot.notify(); 261 return true; 262 } 263 } 264 } 265 266 long now = System.currentTimeMillis(); 267 if (startTime == 0) 268 startTime = now; 269 else 270 waitTime = msecs - (now - startTime); 271 272 if (item != null) { 273 synchronized(item) { 274 try { 275 for (;;) { 276 if (item.value == null) 277 return true; 278 if (waitTime <= 0) { 279 item.value = CANCELLED; 280 return false; 281 } 282 item.wait(waitTime); 283 waitTime = msecs - (System.currentTimeMillis() - startTime); 284 } 285 } 286 catch (InterruptedException ie) { 287 if (item.value == null) { 288 Thread.currentThread().interrupt(); 289 return true; 290 } 291 else { 292 item.value = CANCELLED; 293 throw ie; 294 } 295 } 296 } 297 } 298 } 299 } 300 301 public Object poll(long msecs) throws InterruptedException { 302 long waitTime = msecs; 303 long startTime = 0; 304 305 for (;;) { 306 if (Thread.interrupted()) throw new InterruptedException (); 307 308 LinkedNode item; 309 LinkedNode slot = null; 310 311 synchronized(this) { 312 item = waitingPuts.deq(); 313 if (item == null) { 314 if (waitTime <= 0) 315 return null; 316 else 317 waitingTakes.enq(slot = new LinkedNode()); 318 } 319 } 320 321 if (item != null) { 322 synchronized(item) { 323 Object x = item.value; 324 if (x != CANCELLED) { 325 item.value = null; 326 item.next = null; 327 item.notify(); 328 return x; 329 } 330 } 331 } 332 333 long now = System.currentTimeMillis(); 334 if (startTime == 0) 335 startTime = now; 336 else 337 waitTime = msecs - (now - startTime); 338 339 if (slot != null) { 340 synchronized(slot) { 341 try { 342 for (;;) { 343 Object x = slot.value; 344 if (x != null) { 345 slot.value = null; 346 slot.next = null; 347 return x; 348 } 349 if (waitTime <= 0) { 350 slot.value = CANCELLED; 351 return null; 352 } 353 slot.wait(waitTime); 354 waitTime = msecs - (System.currentTimeMillis() - startTime); 355 } 356 } 357 catch(InterruptedException ie) { 358 Object x = slot.value; 359 if (x != null) { 360 slot.value = null; 361 slot.next = null; 362 Thread.currentThread().interrupt(); 363 return x; 364 } 365 else { 366 slot.value = CANCELLED; 367 throw ie; 368 } 369 } 370 } 371 } 372 } 373 } 374 375 } 376 | Popular Tags |