Code - Class EDU.oswego.cs.dl.util.concurrent.Channel


1 /*
2   File: Channel.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jun1998 dl Create public version
12   25aug1998 dl added peek
13 */

14
15 package EDU.oswego.cs.dl.util.concurrent;
16
17 /**
18  * Main interface for buffers, queues, pipes, conduits, etc.
19  * <p>
20  * A Channel represents anything that you can put items
21  * into and take them out of. As with the Sync
22  * interface, both
23  * blocking (put(x), take),
24  * and timeouts (offer(x, msecs), poll(msecs)) policies
25  * are provided. Using a
26  * zero timeout for offer and poll results in a pure balking policy.
27  * <p>
28  * To aid in efforts to use Channels in a more typesafe manner,
29  * this interface extends Puttable and Takable. You can restrict
30  * arguments of instance variables to this type as a way of
31  * guaranteeing that producers never try to take, or consumers put.
32  * for example:
33  * <pre>
34  * class Producer implements Runnable {
35  * final Puttable chan;
36  * Producer(Puttable channel) { chan = channel; }
37  * public void run() {
38  * try {
39  * for(;;) { chan.put(produce()); }
40  * }
41  * catch (InterruptedException ex) {}
42  * }
43  * Object produce() { ... }
44  * }
45  *
46  *
47  * class Consumer implements Runnable {
48  * final Takable chan;
49  * Consumer(Takable channel) { chan = channel; }
50  * public void run() {
51  * try {
52  * for(;;) { consume(chan.take()); }
53  * }
54  * catch (InterruptedException ex) {}
55  * }
56  * void consume(Object x) { ... }
57  * }
58  *
59  * class Setup {
60  * void main() {
61  * Channel chan = new SomeChannelImplementation();
62  * Producer p = new Producer(chan);
63  * Consumer c = new Consumer(chan);
64  * new Thread(p).start();
65  * new Thread(c).start();
66  * }
67  * }
68  * </pre>
69  * <p>
70  * A given channel implementation might or might not have bounded
71  * capacity or other insertion constraints, so in general, you cannot tell if
72  * a given put will block. However,
73  * Channels that are designed to
74  * have an element capacity (and so always block when full)
75  * should implement the
76  * BoundedChannel
77  * subinterface.
78  * <p>
79  * Channels may hold any kind of item. However,
80  * insertion of null is not in general supported. Implementations
81  * may (all currently do) throw IllegalArgumentExceptions upon attempts to
82  * insert null.
83  * <p>
84  * By design, the Channel interface does not support any methods to determine
85  * the current number of elements being held in the channel.
86  * This decision reflects the fact that in
87  * concurrent programming, such methods are so rarely useful
88  * that including them invites misuse; at best they could
89  * provide a snapshot of current
90  * state, that could change immediately after being reported.
91  * It is better practice to instead use poll and offer to try
92  * to take and put elements without blocking. For example,
93  * to empty out the current contents of a channel, you could write:
94  * <pre>
95  * try {
96  * for (;;) {
97  * Object item = channel.poll(0);
98  * if (item != null)
99  * process(item);
100  * else
101  * break;
102  * }
103  * }
104  * catch(InterruptedException ex) { ... }
105  * </pre>
106  * <p>
107  * However, it is possible to determine whether an item
108  * exists in a Channel via <code>peek</code>, which returns
109  * but does NOT remove the next item that can be taken (or null
110  * if there is no such item). The peek operation has a limited
111  * range of applicability, and must be used with care. Unless it
112  * is known that a given thread is the only possible consumer
113  * of a channel, and that no time-out-based <code>offer</code> operations
114  * are ever invoked, there is no guarantee that the item returned
115  * by peek will be available for a subsequent take.
116  * <p>
117  * When appropriate, you can define an isEmpty method to
118  * return whether <code>peek</code> returns null.
119  * <p>
120  * Also, as a compromise, even though it does not appear in interface,
121  * implementation classes that can readily compute the number
122  * of elements support a <code>size()</code> method. This allows careful
123  * use, for example in queue length monitors, appropriate to the
124  * particular implementation constraints and properties.
125  * <p>
126  * All channels allow multiple producers and/or consumers.
127  * They do not support any kind of <em>close</em> method
128  * to shut down operation or indicate completion of particular
129  * producer or consumer threads.
130  * If you need to signal completion, one way to do it is to
131  * create a class such as
132  * <pre>
133  * class EndOfStream {
134  * // Application-dependent field/methods
135  * }
136  * </pre>
137  * And to have producers put an instance of this class into
138  * the channel when they are done. The consumer side can then
139  * check this via
140  * <pre>
141  * Object x = aChannel.take();
142  * if (x instanceof EndOfStream)
143  * // special actions; perhaps terminate
144  * else
145  * // process normally
146  * </pre>
147  * <p>
148  * In time-out based methods (poll(msecs) and offer(x, msecs),
149  * time bounds are interpreted in
150  * a coarse-grained, best-effort fashion. Since there is no
151  * way in Java to escape out of a wait for a synchronized
152  * method/block, time bounds can sometimes be exceeded when
153  * there is a lot contention for the channel. Additionally,
154  * some Channel semantics entail a ``point of
155  * no return'' where, once some parts of the operation have completed,
156  * others must follow, regardless of time bound.
157  * <p>
158  * Interruptions are in general handled as early as possible
159  * in all methods. Normally, InterruptionExceptions are thrown
160  * in put/take and offer(msec)/poll(msec) if interruption
161  * is detected upon entry to the method, as well as in any
162  * later context surrounding waits.
163  * <p>
164  * If a put returns normally, an offer
165  * returns true, or a put or poll returns non-null, the operation
166  * completed successfully.
167  * In all other cases, the operation fails cleanly -- the
168  * element is not put or taken.
169  * <p>
170  * As with Sync classes, spinloops are not directly supported,
171  * are not particularly recommended for routine use, but are not hard
172  * to construct. For example, here is an exponential backoff version:
173  * <pre>
174  * Object backOffTake(Channel q) throws InterruptedException {
175  * long waitTime = 0;
176  * for (;;) {
177  * Object x = q.poll(0);
178  * if (x != null)
179  * return x;
180  * else {
181  * Thread.sleep(waitTime);
182  * waitTime = 3 * waitTime / 2 + 1;
183  * }
184  * }
185  * </pre>
186  * <p>
187  * <b>Sample Usage</b>. Here is a producer/consumer design
188  * where the channel is used to hold Runnable commands representing
189  * background tasks.
190  * <pre>
191  * class Service {
192  * private final Channel channel = ... some Channel implementation;
193  *
194  * private void backgroundTask(int taskParam) { ... }
195  *
196  * public void action(final int arg) {
197  * Runnable command =
198  * new Runnable() {
199  * public void run() { backgroundTask(arg); }
200  * };
201  * try { channel.put(command) }
202  * catch (InterruptedException ex) {
203  * Thread.currentThread().interrupt(); // ignore but propagate
204  * }
205  * }
206  *
207  * public Service() {
208  * Runnable backgroundLoop =
209  * new Runnable() {
210  * public void run() {
211  * for (;;) {
212  * try {
213  * Runnable task = (Runnable)(channel.take());
214  * task.run();
215  * }
216  * catch (InterruptedException ex) { return; }
217  * }
218  * }
219  * };
220  * new Thread(backgroundLoop).start();
221  * }
222  * }
223  *
224  * </pre>
225  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
226  * @see Sync
227  * @see BoundedChannel
228 **/

229
230 public interface Channel extends Puttable, Takable {
231
232   /**
233    * Place item in the channel, possibly waiting indefinitely until
234    * it can be accepted. Channels implementing the BoundedChannel
235    * subinterface are generally guaranteed to block on puts upon
236    * reaching capacity, but other implementations may or may not block.
237    * @param item the element to be inserted. Should be non-null.
238    * @exception InterruptedException if the current thread has
239    * been interrupted at a point at which interruption
240    * is detected, in which case the element is guaranteed not
241    * to be inserted. Otherwise, on normal return, the element is guaranteed
242    * to have been inserted.
243   **/

244   public void put(Object item) throws InterruptedException;
245
246   /**
247    * Place item in channel only if it can be accepted within
248    * msecs milliseconds. The time bound is interpreted in
249    * a coarse-grained, best-effort fashion.
250    * @param item the element to be inserted. Should be non-null.
251    * @param msecs the number of milliseconds to wait. If less than
252    * or equal to zero, the method does not perform any timed waits,
253    * but might still require
254    * access to a synchronization lock, which can impose unbounded
255    * delay if there is a lot of contention for the channel.
256    * @return true if accepted, else false
257    * @exception InterruptedException if the current thread has
258    * been interrupted at a point at which interruption
259    * is detected, in which case the element is guaranteed not
260    * to be inserted (i.e., is equivalent to a false return).
261   **/

262   public boolean offer(Object item, long msecs) throws InterruptedException;
263
264   /**
265    * Return and remove an item from channel,
266    * possibly waiting indefinitely until
267    * such an item exists.
268    * @return some item from the channel. Different implementations
269    * may guarantee various properties (such as FIFO) about that item
270    * @exception InterruptedException if the current thread has
271    * been interrupted at a point at which interruption
272    * is detected, in which case state of the channel is unchanged.
273    *
274   **/

275   public Object take() throws InterruptedException;
276
277
278   /**
279    * Return and remove an item from channel only if one is available within
280    * msecs milliseconds. The time bound is interpreted in a coarse
281    * grained, best-effort fashion.
282    * @param msecs the number of milliseconds to wait. If less than
283    * or equal to zero, the operation does not perform any timed waits,
284    * but might still require
285    * access to a synchronization lock, which can impose unbounded
286    * delay if there is a lot of contention for the channel.
287    * @return some item, or null if the channel is empty.
288    * @exception InterruptedException if the current thread has
289    * been interrupted at a point at which interruption
290    * is detected, in which case state of the channel is unchanged
291    * (i.e., equivalent to a null return).
292   **/

293
294   public Object poll(long msecs) throws InterruptedException;
295
296   /**
297    * Return, but do not remove object at head of Channel,
298    * or null if it is empty.
299    **/

300
301   public Object peek();
302
303 }
304
305

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates