| ||||
|
Code - Class EDU.oswego.cs.dl.util.concurrent.Channel1 /* 2 File: Channel.java 3 4 Originally written by Doug Lea and released into the public domain. 5 This may be used for any purposes whatsoever without acknowledgment. 6 Thanks for the assistance and support of Sun Microsystems Labs, 7 and everyone contributing, testing, and using this code. 8 9 History: 10 Date Who What 11 11Jun1998 dl Create public version 12 25aug1998 dl added peek 13 */ 14 15 package EDU.oswego.cs.dl.util.concurrent; 16 17 /** 18 * Main interface for buffers, queues, pipes, conduits, etc. 19 * <p> 20 * A Channel represents anything that you can put items 21 * into and take them out of. As with the Sync 22 * interface, both 23 * blocking (put(x), take), 24 * and timeouts (offer(x, msecs), poll(msecs)) policies 25 * are provided. Using a 26 * zero timeout for offer and poll results in a pure balking policy. 27 * <p> 28 * To aid in efforts to use Channels in a more typesafe manner, 29 * this interface extends Puttable and Takable. You can restrict 30 * arguments of instance variables to this type as a way of 31 * guaranteeing that producers never try to take, or consumers put. 32 * for example: 33 * <pre> 34 * class Producer implements Runnable { 35 * final Puttable chan; 36 * Producer(Puttable channel) { chan = channel; } 37 * public void run() { 38 * try { 39 * for(;;) { chan.put(produce()); } 40 * } 41 * catch (InterruptedException ex) {} 42 * } 43 * Object produce() { ... } 44 * } 45 * 46 * 47 * class Consumer implements Runnable { 48 * final Takable chan; 49 * Consumer(Takable channel) { chan = channel; } 50 * public void run() { 51 * try { 52 * for(;;) { consume(chan.take()); } 53 * } 54 * catch (InterruptedException ex) {} 55 * } 56 * void consume(Object x) { ... } 57 * } 58 * 59 * class Setup { 60 * void main() { 61 * Channel chan = new SomeChannelImplementation(); 62 * Producer p = new Producer(chan); 63 * Consumer c = new Consumer(chan); 64 * new Thread(p).start(); 65 * new Thread(c).start(); 66 * } 67 * } 68 * </pre> 69 * <p> 70 * A given channel implementation might or might not have bounded 71 * capacity or other insertion constraints, so in general, you cannot tell if 72 * a given put will block. However, 73 * Channels that are designed to 74 * have an element capacity (and so always block when full) 75 * should implement the 76 * BoundedChannel 77 * subinterface. 78 * <p> 79 * Channels may hold any kind of item. However, 80 * insertion of null is not in general supported. Implementations 81 * may (all currently do) throw IllegalArgumentExceptions upon attempts to 82 * insert null. 83 * <p> 84 * By design, the Channel interface does not support any methods to determine 85 * the current number of elements being held in the channel. 86 * This decision reflects the fact that in 87 * concurrent programming, such methods are so rarely useful 88 * that including them invites misuse; at best they could 89 * provide a snapshot of current 90 * state, that could change immediately after being reported. 91 * It is better practice to instead use poll and offer to try 92 * to take and put elements without blocking. For example, 93 * to empty out the current contents of a channel, you could write: 94 * <pre> 95 * try { 96 * for (;;) { 97 * Object item = channel.poll(0); 98 * if (item != null) 99 * process(item); 100 * else 101 * break; 102 * } 103 * } 104 * catch(InterruptedException ex) { ... } 105 * </pre> 106 * <p> 107 * However, it is possible to determine whether an item 108 * exists in a Channel via <code>peek</code>, which returns 109 * but does NOT remove the next item that can be taken (or null 110 * if there is no such item). The peek operation has a limited 111 * range of applicability, and must be used with care. Unless it 112 * is known that a given thread is the only possible consumer 113 * of a channel, and that no time-out-based <code>offer</code> operations 114 * are ever invoked, there is no guarantee that the item returned 115 * by peek will be available for a subsequent take. 116 * <p> 117 * When appropriate, you can define an isEmpty method to 118 * return whether <code>peek</code> returns null. 119 * <p> 120 * Also, as a compromise, even though it does not appear in interface, 121 * implementation classes that can readily compute the number 122 * of elements support a <code>size()</code> method. This allows careful 123 * use, for example in queue length monitors, appropriate to the 124 * particular implementation constraints and properties. 125 * <p> 126 * All channels allow multiple producers and/or consumers. 127 * They do not support any kind of <em>close</em> method 128 * to shut down operation or indicate completion of particular 129 * producer or consumer threads. 130 * If you need to signal completion, one way to do it is to 131 * create a class such as 132 * <pre> 133 * class EndOfStream { 134 * // Application-dependent field/methods 135 * } 136 * </pre> 137 * And to have producers put an instance of this class into 138 * the channel when they are done. The consumer side can then 139 * check this via 140 * <pre> 141 * Object x = aChannel.take(); 142 * if (x instanceof EndOfStream) 143 * // special actions; perhaps terminate 144 * else 145 * // process normally 146 * </pre> 147 * <p> 148 * In time-out based methods (poll(msecs) and offer(x, msecs), 149 * time bounds are interpreted in 150 * a coarse-grained, best-effort fashion. Since there is no 151 * way in Java to escape out of a wait for a synchronized 152 * method/block, time bounds can sometimes be exceeded when 153 * there is a lot contention for the channel. Additionally, 154 * some Channel semantics entail a ``point of 155 * no return'' where, once some parts of the operation have completed, 156 * others must follow, regardless of time bound. 157 * <p> 158 * Interruptions are in general handled as early as possible 159 * in all methods. Normally, InterruptionExceptions are thrown 160 * in put/take and offer(msec)/poll(msec) if interruption 161 * is detected upon entry to the method, as well as in any 162 * later context surrounding waits. 163 * <p> 164 * If a put returns normally, an offer 165 * returns true, or a put or poll returns non-null, the operation 166 * completed successfully. 167 * In all other cases, the operation fails cleanly -- the 168 * element is not put or taken. 169 * <p> 170 * As with Sync classes, spinloops are not directly supported, 171 * are not particularly recommended for routine use, but are not hard 172 * to construct. For example, here is an exponential backoff version: 173 * <pre> 174 * Object backOffTake(Channel q) throws InterruptedException { 175 * long waitTime = 0; 176 * for (;;) { 177 * Object x = q.poll(0); 178 * if (x != null) 179 * return x; 180 * else { 181 * Thread.sleep(waitTime); 182 * waitTime = 3 * waitTime / 2 + 1; 183 * } 184 * } 185 * </pre> 186 * <p> 187 * <b>Sample Usage</b>. Here is a producer/consumer design 188 * where the channel is used to hold Runnable commands representing 189 * background tasks. 190 * <pre> 191 * class Service { 192 * private final Channel channel = ... some Channel implementation; 193 * 194 * private void backgroundTask(int taskParam) { ... } 195 * 196 * public void action(final int arg) { 197 * Runnable command = 198 * new Runnable() { 199 * public void run() { backgroundTask(arg); } 200 * }; 201 * try { channel.put(command) } 202 * catch (InterruptedException ex) { 203 * Thread.currentThread().interrupt(); // ignore but propagate 204 * } 205 * } 206 * 207 * public Service() { 208 * Runnable backgroundLoop = 209 * new Runnable() { 210 * public void run() { 211 * for (;;) { 212 * try { 213 * Runnable task = (Runnable)(channel.take()); 214 * task.run(); 215 * } 216 * catch (InterruptedException ex) { return; } 217 * } 218 * } 219 * }; 220 * new Thread(backgroundLoop).start(); 221 * } 222 * } 223 * 224 * </pre> 225 * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] 226 * @see Sync 227 * @see BoundedChannel 228 **/ 229 230 public interface Channel extends Puttable, Takable { 231 232 /** 233 * Place item in the channel, possibly waiting indefinitely until 234 * it can be accepted. Channels implementing the BoundedChannel 235 * subinterface are generally guaranteed to block on puts upon 236 * reaching capacity, but other implementations may or may not block. 237 * @param item the element to be inserted. Should be non-null. 238 * @exception InterruptedException if the current thread has 239 * been interrupted at a point at which interruption 240 * is detected, in which case the element is guaranteed not 241 * to be inserted. Otherwise, on normal return, the element is guaranteed 242 * to have been inserted. 243 **/ 244 public void put(Object item) throws InterruptedException; 245 246 /** 247 * Place item in channel only if it can be accepted within 248 * msecs milliseconds. The time bound is interpreted in 249 * a coarse-grained, best-effort fashion. 250 * @param item the element to be inserted. Should be non-null. 251 * @param msecs the number of milliseconds to wait. If less than 252 * or equal to zero, the method does not perform any timed waits, 253 * but might still require 254 * access to a synchronization lock, which can impose unbounded 255 * delay if there is a lot of contention for the channel. 256 * @return true if accepted, else false 257 * @exception InterruptedException if the current thread has 258 * been interrupted at a point at which interruption 259 * is detected, in which case the element is guaranteed not 260 * to be inserted (i.e., is equivalent to a false return). 261 **/ 262 public boolean offer(Object item, long msecs) throws InterruptedException; 263 264 /** 265 * Return and remove an item from channel, 266 * possibly waiting indefinitely until 267 * such an item exists. 268 * @return some item from the channel. Different implementations 269 * may guarantee various properties (such as FIFO) about that item 270 * @exception InterruptedException if the current thread has 271 * been interrupted at a point at which interruption 272 * is detected, in which case state of the channel is unchanged. 273 * 274 **/ 275 public Object take() throws InterruptedException; 276 277 278 /** 279 * Return and remove an item from channel only if one is available within 280 * msecs milliseconds. The time bound is interpreted in a coarse 281 * grained, best-effort fashion. 282 * @param msecs the number of milliseconds to wait. If less than 283 * or equal to zero, the operation does not perform any timed waits, 284 * but might still require 285 * access to a synchronization lock, which can impose unbounded 286 * delay if there is a lot of contention for the channel. 287 * @return some item, or null if the channel is empty. 288 * @exception InterruptedException if the current thread has 289 * been interrupted at a point at which interruption 290 * is detected, in which case state of the channel is unchanged 291 * (i.e., equivalent to a null return). 292 **/ 293 294 public Object poll(long msecs) throws InterruptedException; 295 296 /** 297 * Return, but do not remove object at head of Channel, 298 * or null if it is empty. 299 **/ 300 301 public Object peek(); 302 303 } 304 305 |
|||
Java API By Example, From Geeks To Geeks. |
Conditions of Use |
About Us
© 2002 - 2005, KickJava.com, or its affiliates
|