Code - Class EDU.oswego.cs.dl.util.concurrent.SynchronousChannel


1 /*
2   File: SynchronousChannel.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jun1998 dl Create public version
12   17Jul1998 dl Disabled direct semaphore permit check
13   31Jul1998 dl Replaced main algorithm with one with
14                               better scaling and fairness properties.
15   25aug1998 dl added peek
16   24Nov2001 dl Replaced main algorithm with faster one.
17 */

18
19 package EDU.oswego.cs.dl.util.concurrent;
20
21 /**
22  * A rendezvous channel, similar to those used in CSP and Ada. Each
23  * put must wait for a take, and vice versa. Synchronous channels
24  * are well suited for handoff designs, in which an object running in
25  * one thread must synch up with an object running in another thread
26  * in order to hand it some information, event, or task.
27  * <p> If you only need threads to synch up without
28  * exchanging information, consider using a Barrier. If you need
29  * bidirectional exchanges, consider using a Rendezvous. <p>
30  *
31  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
32  * @see CyclicBarrier
33  * @see Rendezvous
34 **/

35
36 public class SynchronousChannel implements BoundedChannel {
37
38   /*
39     This implementation divides actions into two cases for puts:
40
41     * An arriving putter that does not already have a waiting taker
42       creates a node holding item, and then waits for a taker to take it.
43     * An arriving putter that does already have a waiting taker fills
44       the slot node created by the taker, and notifies it to continue.
45
46    And symmetrically, two for takes:
47
48     * An arriving taker that does not already have a waiting putter
49       creates an empty slot node, and then waits for a putter to fill it.
50     * An arriving taker that does already have a waiting putter takes
51       item from the node created by the putter, and notifies it to continue.
52
53    This requires keeping two simple queues: waitingPuts and waitingTakes.
54    
55    When a put or take waiting for the actions of its counterpart
56    aborts due to interruption or timeout, it marks the node
57    it created as "CANCELLED", which causes its counterpart to retry
58    the entire put or take sequence.
59   */

60
61   /**
62    * Special marker used in queue nodes to indicate that
63    * the thread waiting for a change in the node has timed out
64    * or been interrupted.
65    **/

66   protected static final Object CANCELLED = new Object();
67   
68   /**
69    * Simple FIFO queue class to hold waiting puts/takes.
70    **/

71   protected static class Queue {
72     protected LinkedNode head;
73     protected LinkedNode last;
74
75     protected void enq(LinkedNode p) {
76       if (last == null)
77         last = head = p;
78       else
79         last = last.next = p;
80     }
81
82     protected LinkedNode deq() {
83       LinkedNode p = head;
84       if (p != null && (head = p.next) == null)
85         last = null;
86       return p;
87     }
88   }
89
90   protected final Queue waitingPuts = new Queue();
91   protected final Queue waitingTakes = new Queue();
92
93   /**
94    * @return zero --
95    * Synchronous channels have no internal capacity.
96    **/

97   public int capacity() { return 0; }
98
99   /**
100    * @return null --
101    * Synchronous channels do not hold contents unless actively taken
102    **/

103   public Object peek() { return null; }
104
105
106   public void put(Object x) throws InterruptedException {
107     if (x == null) throw new IllegalArgumentException();
108
109     // This code is conceptually straightforward, but messy
110
// because we need to intertwine handling of put-arrives first
111
// vs take-arrives first cases.
112

113     // Outer loop is to handle retry due to cancelled waiting taker
114
for (;;) {
115
116       // Get out now if we are interrupted
117
if (Thread.interrupted()) throw new InterruptedException();
118
119       // Exactly one of item or slot will be nonnull at end of
120
// synchronized block, depending on whether a put or a take
121
// arrived first.
122
LinkedNode slot;
123       LinkedNode item = null;
124
125       synchronized(this) {
126         // Try to match up with a waiting taker; fill and signal it below
127
slot = waitingTakes.deq();
128
129         // If no takers yet, create a node and wait below
130
if (slot == null)
131           waitingPuts.enq(item = new LinkedNode(x));
132       }
133
134       if (slot != null) { // There is a waiting taker.
135
// Fill in the slot created by the taker and signal taker to
136
// continue.
137
synchronized(slot) {
138           if (slot.value != CANCELLED) {
139             slot.value = x;
140             slot.notify();
141             return;
142           }
143           // else the taker has cancelled, so retry outer loop
144
}
145       }
146
147       else {
148         // Wait for a taker to arrive and take the item.
149
synchronized(item) {
150           try {
151             while (item.value != null)
152               item.wait();
153             return;
154           }
155           catch (InterruptedException ie) {
156             // If item was taken, return normally but set interrupt status
157
if (item.value == null) {
158               Thread.currentThread().interrupt();
159               return;
160             }
161             else {
162               item.value = CANCELLED;
163               throw ie;
164             }
165           }
166         }
167       }
168     }
169   }
170
171   public Object take() throws InterruptedException {
172     // Entirely symmetric to put()
173

174     for (;;) {
175       if (Thread.interrupted()) throw new InterruptedException();
176
177       LinkedNode item;
178       LinkedNode slot = null;
179
180       synchronized(this) {
181         item = waitingPuts.deq();
182         if (item == null)
183           waitingTakes.enq(slot = new LinkedNode());
184       }
185
186       if (item != null) {
187         synchronized(item) {
188           Object x = item.value;
189           if (x != CANCELLED) {
190             item.value = null;
191             item.next = null;
192             item.notify();
193             return x;
194           }
195         }
196       }
197
198       else {
199         synchronized(slot) {
200           try {
201             for (;;) {
202               Object x = slot.value;
203               if (x != null) {
204                 slot.value = null;
205                 slot.next = null;
206                 return x;
207               }
208               else
209                 slot.wait();
210             }
211           }
212           catch(InterruptedException ie) {
213             Object x = slot.value;
214             if (x != null) {
215               slot.value = null;
216               slot.next = null;
217               Thread.currentThread().interrupt();
218               return x;
219             }
220             else {
221               slot.value = CANCELLED;
222               throw ie;
223             }
224           }
225         }
226       }
227     }
228   }
229
230   /*
231     Offer and poll are just like put and take, except even messier.
232    */

233
234
235   public boolean offer(Object x, long msecs) throws InterruptedException {
236     if (x == null) throw new IllegalArgumentException();
237     long waitTime = msecs;
238     long startTime = 0; // lazily initialize below if needed
239

240     for (;;) {
241       if (Thread.interrupted()) throw new InterruptedException();
242
243       LinkedNode slot;
244       LinkedNode item = null;
245
246       synchronized(this) {
247         slot = waitingTakes.deq();
248         if (slot == null) {
249           if (waitTime <= 0)
250             return false;
251           else
252             waitingPuts.enq(item = new LinkedNode(x));
253         }
254       }
255
256       if (slot != null) {
257         synchronized(slot) {
258           if (slot.value != CANCELLED) {
259             slot.value = x;
260             slot.notify();
261             return true;
262           }
263         }
264       }
265
266       long now = System.currentTimeMillis();
267       if (startTime == 0)
268         startTime = now;
269       else
270         waitTime = msecs - (now - startTime);
271
272       if (item != null) {
273         synchronized(item) {
274           try {
275             for (;;) {
276               if (item.value == null)
277                 return true;
278               if (waitTime <= 0) {
279                 item.value = CANCELLED;
280                 return false;
281               }
282               item.wait(waitTime);
283               waitTime = msecs - (System.currentTimeMillis() - startTime);
284             }
285           }
286           catch (InterruptedException ie) {
287             if (item.value == null) {
288               Thread.currentThread().interrupt();
289               return true;
290             }
291             else {
292               item.value = CANCELLED;
293               throw ie;
294             }
295           }
296         }
297       }
298     }
299   }
300
301   public Object poll(long msecs) throws InterruptedException {
302     long waitTime = msecs;
303     long startTime = 0;
304
305     for (;;) {
306       if (Thread.interrupted()) throw new InterruptedException();
307
308       LinkedNode item;
309       LinkedNode slot = null;
310
311       synchronized(this) {
312         item = waitingPuts.deq();
313         if (item == null) {
314           if (waitTime <= 0)
315             return null;
316           else
317             waitingTakes.enq(slot = new LinkedNode());
318         }
319       }
320
321       if (item != null) {
322         synchronized(item) {
323           Object x = item.value;
324           if (x != CANCELLED) {
325             item.value = null;
326             item.next = null;
327             item.notify();
328             return x;
329           }
330         }
331       }
332
333       long now = System.currentTimeMillis();
334       if (startTime == 0)
335         startTime = now;
336       else
337         waitTime = msecs - (now - startTime);
338
339       if (slot != null) {
340         synchronized(slot) {
341           try {
342             for (;;) {
343               Object x = slot.value;
344               if (x != null) {
345                 slot.value = null;
346                 slot.next = null;
347                 return x;
348               }
349               if (waitTime <= 0) {
350                 slot.value = CANCELLED;
351                 return null;
352               }
353               slot.wait(waitTime);
354               waitTime = msecs - (System.currentTimeMillis() - startTime);
355             }
356           }
357           catch(InterruptedException ie) {
358             Object x = slot.value;
359             if (x != null) {
360               slot.value = null;
361               slot.next = null;
362               Thread.currentThread().interrupt();
363               return x;
364             }
365             else {
366               slot.value = CANCELLED;
367               throw ie;
368             }
369           }
370         }
371       }
372     }
373   }
374
375 }
376

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates