KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > Exchanger


1 /*
2  * @(#)Exchanger.java 1.6 06/09/20
3  *
4  * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent;
9 import java.util.concurrent.atomic.*;
10 import java.util.concurrent.locks.LockSupport JavaDoc;
11
12 /**
13  * A synchronization point at which two threads can exchange objects.
14  * Each thread presents some object on entry to the {@link #exchange
15  * exchange} method, and receives the object presented by the other
16  * thread on return.
17  *
18  * <p><b>Sample Usage:</b>
19  * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
20  * swap buffers between threads so that the thread filling the
21  * buffer gets a freshly
22  * emptied one when it needs it, handing off the filled one to
23  * the thread emptying the buffer.
24  * <pre>
25  * class FillAndEmpty {
26  * Exchanger&lt;DataBuffer&gt; exchanger = new Exchanger();
27  * DataBuffer initialEmptyBuffer = ... a made-up type
28  * DataBuffer initialFullBuffer = ...
29  *
30  * class FillingLoop implements Runnable {
31  * public void run() {
32  * DataBuffer currentBuffer = initialEmptyBuffer;
33  * try {
34  * while (currentBuffer != null) {
35  * addToBuffer(currentBuffer);
36  * if (currentBuffer.full())
37  * currentBuffer = exchanger.exchange(currentBuffer);
38  * }
39  * } catch (InterruptedException ex) { ... handle ... }
40  * }
41  * }
42  *
43  * class EmptyingLoop implements Runnable {
44  * public void run() {
45  * DataBuffer currentBuffer = initialFullBuffer;
46  * try {
47  * while (currentBuffer != null) {
48  * takeFromBuffer(currentBuffer);
49  * if (currentBuffer.empty())
50  * currentBuffer = exchanger.exchange(currentBuffer);
51  * }
52  * } catch (InterruptedException ex) { ... handle ...}
53  * }
54  * }
55  *
56  * void start() {
57  * new Thread(new FillingLoop()).start();
58  * new Thread(new EmptyingLoop()).start();
59  * }
60  * }
61  * </pre>
62  *
63  * @since 1.5
64  * @author Doug Lea
65  * @param <V> The type of objects that may be exchanged
66  */

67 public class Exchanger<V> {
68     /*
69      * Algorithm Description:
70      *
71      * The basic idea is to maintain a "slot", which is a reference to
72      * a Node containing both an Item to offer and a "hole" waiting to
73      * get filled in. If an incoming "occupying" thread sees that the
74      * slot is null, it CAS'es (compareAndSets) a Node there and waits
75      * for another to invoke exchange. That second "fulfilling" thread
76      * sees that the slot is non-null, and so CASes it back to null,
77      * also exchanging items by CASing the hole, plus waking up the
78      * occupying thread if it is blocked. In each case CAS'es may
79      * fail because a slot at first appears non-null but is null upon
80      * CAS, or vice-versa. So threads may need to retry these
81      * actions.
82      *
83      * This simple approach works great when there are only a few
84      * threads using an Exchanger, but performance rapidly
85      * deteriorates due to CAS contention on the single slot when
86      * there are lots of threads using an exchanger. So instead we use
87      * an "arena"; basically a kind of hash table with a dynamically
88      * varying number of slots, any one of which can be used by
89      * threads performing an exchange. Incoming threads pick slots
90      * based on a hash of their Thread ids. If an incoming thread
91      * fails to CAS in its chosen slot, it picks an alternative slot
92      * instead. And similarly from there. If a thread successfully
93      * CASes into a slot but no other thread arrives, it tries
94      * another, heading toward the zero slot, which always exists even
95      * if the table shrinks. The particular mechanics controlling this
96      * are as follows:
97      *
98      * Waiting: Slot zero is special in that it is the only slot that
99      * exists when there is no contention. A thread occupying slot
100      * zero will block if no thread fulfills it after a short spin.
101      * In other cases, occupying threads eventually give up and try
102      * another slot. Waiting threads spin for a while (a period that
103      * should be a little less than a typical context-switch time)
104      * before either blocking (if slot zero) or giving up (if other
105      * slots) and restarting. There is no reason for threads to block
106      * unless there are unlikely to be any other threads present.
107      * Occupants are mainly avoiding memory contention so sit there
108      * quietly polling for a shorter period than it would take to
109      * block and then unblock them. Non-slot-zero waits that elapse
110      * because of lack of other threads waste around one extra
111      * context-switch time per try, which is still on average much
112      * faster than alternative approaches.
113      *
114      * Sizing: Usually, using only a few slots suffices to reduce
115      * contention. Especially with small numbers of threads, using
116      * too many slots can lead to just as poor performance as using
117      * too few of them, and there's not much room for error. The
118      * variable "max" maintains the number of slots actually in
119      * use. It is increased when a thread sees too many CAS
120      * failures. (This is analogous to resizing a regular hash table
121      * based on a target load factor, except here, growth steps are
122      * just one-by-one rather than proportional.) Growth requires
123      * contention failures in each of three tried slots. Requiring
124      * multiple failures for expansion copes with the fact that some
125      * failed CASes are not due to contention but instead to simple
126      * races between two threads or thread pre-emptions occurring
127      * between reading and CASing. Also, very transient peak
128      * contention can be much higher than the average sustainable
129      * levels. The max limit is decreased on average 50% of the times
130      * that a non-slot-zero wait elapses without being fulfilled.
131      * Threads experiencing elapsed waits move closer to zero, so
132      * eventually find existing (or future) threads even if the table
133      * has been shrunk due to inactivity. The chosen mechanics and
134      * thresholds for growing and shrinking are intrinsically
135      * entangled with indexing and hashing inside the exchange code,
136      * and can't be nicely abstracted out.
137      *
138      * Hashing: Each thread picks its initial slot to use in accord
139      * with a simple hashcode. The sequence is the same on each
140      * encounter by any given thread, but effectively random across
141      * threads. Using arenas encounters the classic cost vs quality
142      * tradeoffs of all hash tables. Here, we use a one-step FNV-1a
143      * hash code based on the current thread's Thread.getId(), along
144      * with a cheap approximation to a mod operation to select an
145      * index. The downside of optimizing index selection in this way
146      * is that the code is hardwired to use a maximum table size of
147      * 32. But this value more than suffices for known platforms and
148      * applications.
149      *
150      * Probing: On sensed contention of a selected slot, we probe
151      * sequentially through the table, analogously to linear probing
152      * after collision in a hash table. (We move circularly, in
153      * reverse order, to mesh best with table growth and shrinkage
154      * rules.) Except that to minimize the effects of false-alarms
155      * and cache thrashing, we try the first selected slot twice
156      * before moving.
157      *
158      * Padding: Even with contention management, slots are heavily
159      * contended, so use cache-padding to avoid poor memory
160      * performance. Because of this, slots are lazily constructed
161      * only when used, to avoid wasting this space unnecessarily.
162      * While isolation of locations is not much of an issue at first
163      * in an application, as time goes on and garbage-collectors
164      * perform compaction, slots are very likely to be moved adjacent
165      * to each other, which can cause much thrashing of cache lines on
166      * MPs unless padding is employed.
167      *
168      * This is an improvement of the algorithm described in the paper
169      * "A Scalable Elimination-based Exchange Channel" by William
170      * Scherer, Doug Lea, and Michael Scott in Proceedings of SCOOL05
171      * workshop. Available at: http://hdl.handle.net/1802/2104
172      */

173
174     /** The number of CPUs, for sizing and spin control */
175     private static final int NCPU = Runtime.getRuntime().availableProcessors();
176
177     /**
178      * The capacity of the arena. Set to a value that provides more
179      * than enough space to handle contention. On small machines
180      * most slots won't be used, but it is still not wasted because
181      * the extra space provides some machine-level address padding
182      * to minimize interference with heavily CAS'ed Slot locations.
183      * And on very large machines, performance eventually becomes
184      * bounded by memory bandwidth, not numbers of threads/CPUs.
185      * This constant cannot be changed without also modifying
186      * indexing and hashing algorithms.
187      */

188     private static final int CAPACITY = 32;
189
190     /**
191      * The value of "max" that will hold all threads without
192      * contention. When this value is less than CAPACITY, some
193      * otherwise wasted expansion can be avoided.
194      */

195     private static final int FULL =
196         Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
197
198     /**
199      * The number of times to spin (doing nothing except polling a
200      * memory location) before blocking or giving up while waiting to
201      * be fulfilled. Should be zero on uniprocessors. On
202      * multiprocessors, this value should be large enough so that two
203      * threads exchanging items as fast as possible block only when
204      * one of them is stalled (due to GC or preemption), but not much
205      * longer, to avoid wasting CPU resources. Seen differently, this
206      * value is a little over half the number of cycles of an average
207      * context switch time on most systems. The value here is
208      * approximately the average of those across a range of tested
209      * systems.
210      */

211     private static final int SPINS = (NCPU == 1) ? 0 : 2000;
212
213     /**
214      * The number of times to spin before blocking in timed waits.
215      * Timed waits spin more slowly because checking the time takes
216      * time. The best value relies mainly on the relative rate of
217      * System.nanoTime vs memory accesses. The value is empirically
218      * derived to work well across a variety of systems.
219      */

220     private static final int TIMED_SPINS = SPINS / 20;
221
222     /**
223      * Sentinel item representing cancellation of a wait due to
224      * interruption, timeout, or elapsed spin-waits. This value is
225      * placed in holes on cancellation, and used as a return value
226      * from waiting methods to indicate failure to set or get hole.
227      */

228     private static final Object JavaDoc CANCEL = new Object JavaDoc();
229
230     /**
231      * Value representing null arguments/returns from public
232      * methods. This disambiguates from internal requirement that
233      * holes start out as null to mean they are not yet set.
234      */

235     private static final Object JavaDoc NULL_ITEM = new Object JavaDoc();
236
237     /**
238      * Nodes hold partially exchanged data. This class
239      * opportunistically subclasses AtomicReference to represent the
240      * hole. So get() returns hole, and compareAndSet CAS'es value
241      * into hole. This class cannot be parameterized as "V" because
242      * of the use of non-V CANCEL sentinels.
243      */

244     private static final class Node extends AtomicReference<Object JavaDoc> {
245         /** The element offered by the Thread creating this node. */
246         public final Object JavaDoc item;
247
248         /** The Thread waiting to be signalled; null until waiting. */
249         public volatile Thread JavaDoc waiter;
250
251         /**
252          * Creates node with given item and empty hole.
253          * @param item the item
254          */

255         public Node(Object JavaDoc item) {
256             this.item = item;
257         }
258     }
259
260     /**
261      * A Slot is an AtomicReference with heuristic padding to lessen
262      * cache effects of this heavily CAS'ed location. While the
263      * padding adds noticeable space, all slots are created only on
264      * demand, and there will be more than one of them only when it
265      * would improve throughput more than enough to outweigh using
266      * extra space.
267      */

268     private static final class Slot extends AtomicReference<Object JavaDoc> {
269         // Improve likelihood of isolation on <= 64 byte cache lines
270
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
271     }
272
273     /**
274      * Slot array. Elements are lazily initialized when needed.
275      * Declared volatile to enable double-checked lazy construction.
276      */

277     private volatile Slot[] arena = new Slot[CAPACITY];
278
279     /**
280      * The maximum slot index being used. The value sometimes
281      * increases when a thread experiences too many CAS contentions,
282      * and sometimes decreases when a spin-wait elapses. Changes
283      * are performed only via compareAndSet, to avoid stale values
284      * when a thread happens to stall right before setting.
285      */

286     private final AtomicInteger max = new AtomicInteger();
287
288     /**
289      * Main exchange function, handling the different policy variants.
290      * Uses Object, not "V" as argument and return value to simplify
291      * handling of sentinel values. Callers from public methods decode
292      * and cast accordingly.
293      *
294      * @param item the (non-null) item to exchange
295      * @param timed true if the wait is timed
296      * @param nanos if timed, the maximum wait time
297      * @return the other thread's item, or CANCEL if interrupted or timed out
298      */

299     private Object JavaDoc doExchange(Object JavaDoc item, boolean timed, long nanos) {
300         Node me = new Node(item); // Create in case occupying
301
int index = hashIndex(); // Index of current slot
302
int fails = 0; // Number of CAS failures
303

304         for (;;) {
305             Object JavaDoc y; // Contents of current slot
306
Slot slot = arena[index];
307             if (slot == null) // Lazily initialize slots
308
createSlot(index); // Continue loop to reread
309
else if ((y = slot.get()) != null && // Try to fulfill
310
slot.compareAndSet(y, null)) {
311                 Node you = (Node)y; // Transfer item
312
if (you.compareAndSet(null, item)) {
313                     LockSupport.unpark(you.waiter);
314                     return you.item;
315                 } // Else cancelled; continue
316
}
317             else if (y == null && // Try to occupy
318
slot.compareAndSet(null, me)) {
319                 if (index == 0) // Blocking wait for slot 0
320
return timed? awaitNanos(me, slot, nanos): await(me, slot);
321                 Object JavaDoc v = spinWait(me, slot); // Spin wait for non-0
322
if (v != CANCEL)
323                     return v;
324                 me = new Node(item); // Throw away cancelled node
325
int m = max.get();
326                 if (m > (index >>>= 1)) // Decrease index
327
max.compareAndSet(m, m - 1); // Maybe shrink table
328
}
329             else if (++fails > 1) { // Allow 2 fails on 1st slot
330
int m = max.get();
331                 if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
332                     index = m + 1; // Grow on 3rd failed slot
333
else if (--index < 0)
334                     index = m; // Circularly traverse
335
}
336         }
337     }
338
339     /**
340      * Returns a hash index for the current thread. Uses a one-step
341      * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
342      * based on the current thread's Thread.getId(). These hash codes
343      * have more uniform distribution properties with respect to small
344      * moduli (here 1-31) than do other simple hashing functions.
345      *
346      * <p>To return an index between 0 and max, we use a cheap
347      * approximation to a mod operation, that also corrects for bias
348      * due to non-power-of-2 remaindering (see {@link
349      * java.util.Random#nextInt}). Bits of the hashcode are masked
350      * with "nbits", the ceiling power of two of table size (looked up
351      * in a table packed into three ints). If too large, this is
352      * retried after rotating the hash by nbits bits, while forcing new
353      * top bit to 0, which guarantees eventual termination (although
354      * with a non-random-bias). This requires an average of less than
355      * 2 tries for all table sizes, and has a maximum 2% difference
356      * from perfectly uniform slot probabilities when applied to all
357      * possible hash codes for sizes less than 32.
358      *
359      * @return a per-thread-random index, 0 <= index < max
360      */

361     private final int hashIndex() {
362         long id = Thread.currentThread().getId();
363         int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
364
365         int m = max.get();
366         int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1))
367
((0x000001f8 >>> m) & 2) | // The constants hold
368
((0xffff00f2 >>> m) & 1)); // a lookup table
369
int index;
370         while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on
371
hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
372
return index;
373     }
374
375     /**
376      * Creates a new slot at given index. Called only when the slot
377      * appears to be null. Relies on double-check using builtin
378      * locks, since they rarely contend. This in turn relies on the
379      * arena array being declared volatile.
380      *
381      * @param index the index to add slot at
382      */

383     private void createSlot(int index) {
384         // Create slot outside of lock to narrow sync region
385
Slot newSlot = new Slot();
386         Slot[] a = arena;
387         synchronized (a) {
388             if (a[index] == null)
389                 a[index] = newSlot;
390         }
391     }
392
393     /**
394      * Tries to cancel a wait for the given node waiting in the given
395      * slot, if so, helping clear the node from its slot to avoid
396      * garbage retention.
397      *
398      * @param node the waiting node
399      * @param the slot it is waiting in
400      * @return true if successfully cancelled
401      */

402     private static boolean tryCancel(Node node, Slot slot) {
403         if (!node.compareAndSet(null, CANCEL))
404             return false;
405         if (slot.get() == node) // pre-check to minimize contention
406
slot.compareAndSet(node, null);
407         return true;
408     }
409
410     // Three forms of waiting. Each just different enough not to merge
411
// code with others.
412

413     /**
414      * Spin-waits for hole for a non-0 slot. Fails if spin elapses
415      * before hole filled. Does not check interrupt, relying on check
416      * in public exchange method to abort if interrupted on entry.
417      *
418      * @param node the waiting node
419      * @return on success, the hole; on failure, CANCEL
420      */

421     private static Object JavaDoc spinWait(Node node, Slot slot) {
422         int spins = SPINS;
423         for (;;) {
424             Object JavaDoc v = node.get();
425             if (v != null)
426                 return v;
427             else if (spins > 0)
428                 --spins;
429             else
430                 tryCancel(node, slot);
431         }
432     }
433
434     /**
435      * Waits for (by spinning and/or blocking) and gets the hole
436      * filled in by another thread. Fails if interrupted before
437      * hole filled.
438      *
439      * When a node/thread is about to block, it sets its waiter field
440      * and then rechecks state at least one more time before actually
441      * parking, thus covering race vs fulfiller noticing that waiter
442      * is non-null so should be woken.
443      *
444      * Thread interruption status is checked only surrounding calls to
445      * park. The caller is assumed to have checked interrupt status
446      * on entry.
447      *
448      * @param node the waiting node
449      * @return on success, the hole; on failure, CANCEL
450      */

451     private static Object JavaDoc await(Node node, Slot slot) {
452         Thread JavaDoc w = Thread.currentThread();
453         int spins = SPINS;
454         for (;;) {
455             Object JavaDoc v = node.get();
456             if (v != null)
457                 return v;
458             else if (spins > 0) // Spin-wait phase
459
--spins;
460             else if (node.waiter == null) // Set up to block next
461
node.waiter = w;
462             else if (w.isInterrupted()) // Abort on interrupt
463
tryCancel(node, slot);
464             else // Block
465
LockSupport.park();
466         }
467     }
468
469     /**
470      * Waits for (at index 0) and gets the hole filled in by another
471      * thread. Fails if timed out or interrupted before hole filled.
472      * Same basic logic as untimed version, but a bit messier.
473      *
474      * @param node the waiting node
475      * @param nanos the wait time
476      * @return on success, the hole; on failure, CANCEL
477      */

478     private Object JavaDoc awaitNanos(Node node, Slot slot, long nanos) {
479         int spins = TIMED_SPINS;
480         long lastTime = 0;
481         Thread JavaDoc w = null;
482         for (;;) {
483             Object JavaDoc v = node.get();
484             if (v != null)
485                 return v;
486             long now = System.nanoTime();
487             if (w == null)
488                 w = Thread.currentThread();
489             else
490                 nanos -= now - lastTime;
491             lastTime = now;
492             if (nanos > 0) {
493                 if (spins > 0)
494                     --spins;
495                 else if (node.waiter == null)
496                     node.waiter = w;
497                 else if (w.isInterrupted())
498                     tryCancel(node, slot);
499                 else
500                     LockSupport.parkNanos(nanos);
501             }
502             else if (tryCancel(node, slot) && !w.isInterrupted())
503                 return scanOnTimeout(node);
504         }
505     }
506
507     /**
508      * Sweeps through arena checking for any waiting threads. Called
509      * only upon return from timeout while waiting in slot 0. When a
510      * thread gives up on a timed wait, it is possible that a
511      * previously-entered thread is still waiting in some other
512      * slot. So we scan to check for any. This is almost always
513      * overkill, but decreases the likelihood of timeouts when there
514      * are other threads present to far less than that in lock-based
515      * exchangers in which earlier-arriving threads may still be
516      * waiting on entry locks.
517      *
518      * @param node the waiting node
519      * @return another thread's item, or CANCEL
520      */

521     private Object JavaDoc scanOnTimeout(Node node) {
522         Object JavaDoc y;
523         for (int j = arena.length - 1; j >= 0; --j) {
524             Slot slot = arena[j];
525             if (slot != null) {
526                 while ((y = slot.get()) != null) {
527                     if (slot.compareAndSet(y, null)) {
528                         Node you = (Node)y;
529                         if (you.compareAndSet(null, node.item)) {
530                             LockSupport.unpark(you.waiter);
531                             return you.item;
532                         }
533                     }
534                 }
535             }
536         }
537         return CANCEL;
538     }
539
540     /**
541      * Create a new Exchanger.
542      **/

543     public Exchanger() {
544     }
545
546     /**
547      * Waits for another thread to arrive at this exchange point (unless
548      * it is {@link Thread#interrupt interrupted}),
549      * and then transfers the given object to it, receiving its object
550      * in return.
551      * <p>If another thread is already waiting at the exchange point then
552      * it is resumed for thread scheduling purposes and receives the object
553      * passed in by the current thread. The current thread returns immediately,
554      * receiving the object passed to the exchange by that other thread.
555      * <p>If no other thread is already waiting at the exchange then the
556      * current thread is disabled for thread scheduling purposes and lies
557      * dormant until one of two things happens:
558      * <ul>
559      * <li>Some other thread enters the exchange; or
560      * <li>Some other thread {@link Thread#interrupt interrupts} the current
561      * thread.
562      * </ul>
563      * <p>If the current thread:
564      * <ul>
565      * <li>has its interrupted status set on entry to this method; or
566      * <li>is {@link Thread#interrupt interrupted} while waiting
567      * for the exchange,
568      * </ul>
569      * then {@link InterruptedException} is thrown and the current thread's
570      * interrupted status is cleared.
571      *
572      * @param x the object to exchange
573      * @return the object provided by the other thread.
574      * @throws InterruptedException if current thread was interrupted
575      * while waiting
576      **/

577     public V exchange(V x) throws InterruptedException JavaDoc {
578         if (!Thread.interrupted()) {
579             Object JavaDoc v = doExchange(x == null? NULL_ITEM : x, false, 0);
580             if (v == NULL_ITEM)
581                 return null;
582             if (v != CANCEL)
583                 return (V)v;
584             Thread.interrupted(); // Clear interrupt status on IE throw
585
}
586         throw new InterruptedException JavaDoc();
587     }
588
589     /**
590      * Waits for another thread to arrive at this exchange point (unless
591      * it is {@link Thread#interrupt interrupted}, or the specified waiting
592      * time elapses),
593      * and then transfers the given object to it, receiving its object
594      * in return.
595      *
596      * <p>If another thread is already waiting at the exchange point then
597      * it is resumed for thread scheduling purposes and receives the object
598      * passed in by the current thread. The current thread returns immediately,
599      * receiving the object passed to the exchange by that other thread.
600      *
601      * <p>If no other thread is already waiting at the exchange then the
602      * current thread is disabled for thread scheduling purposes and lies
603      * dormant until one of three things happens:
604      * <ul>
605      * <li>Some other thread enters the exchange; or
606      * <li>Some other thread {@link Thread#interrupt interrupts} the current
607      * thread; or
608      * <li>The specified waiting time elapses.
609      * </ul>
610      * <p>If the current thread:
611      * <ul>
612      * <li>has its interrupted status set on entry to this method; or
613      * <li>is {@link Thread#interrupt interrupted} while waiting
614      * for the exchange,
615      * </ul>
616      * then {@link InterruptedException} is thrown and the current thread's
617      * interrupted status is cleared.
618      *
619      * <p>If the specified waiting time elapses then {@link TimeoutException}
620      * is thrown.
621      * If the time is
622      * less than or equal to zero, the method will not wait at all.
623      *
624      * @param x the object to exchange
625      * @param timeout the maximum time to wait
626      * @param unit the time unit of the <tt>timeout</tt> argument.
627      * @return the object provided by the other thread.
628      * @throws InterruptedException if current thread was interrupted
629      * while waiting
630      * @throws TimeoutException if the specified waiting time elapses before
631      * another thread enters the exchange.
632      **/

633     public V exchange(V x, long timeout, TimeUnit JavaDoc unit)
634         throws InterruptedException JavaDoc, TimeoutException JavaDoc {
635         if (!Thread.interrupted()) {
636             Object JavaDoc v = doExchange(x == null? NULL_ITEM : x,
637                                   true, unit.toNanos(timeout));
638             if (v == NULL_ITEM)
639                 return null;
640             if (v != CANCEL)
641                 return (V)v;
642             if (!Thread.interrupted())
643                 throw new TimeoutException JavaDoc();
644         }
645         throw new InterruptedException JavaDoc();
646     }
647 }
648
Popular Tags