KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > locks > AbstractQueuedLongSynchronizer


1 /*
2  * @(#)AbstractQueuedLongSynchronizer.java 1.6 07/01/08
3  *
4  * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent.locks;
9 import java.util.*;
10 import java.util.concurrent.*;
11 import java.util.concurrent.atomic.*;
12 import sun.misc.Unsafe;
13
14 /**
15  * A version of {@link AbstractQueuedSynchronizer} in
16  * which synchronization state is maintained as a <tt>long</tt>.
17  * This class has exactly the same structure, properties, and methods
18  * as <tt>AbstractQueuedSynchronizer</tt> with the exception
19  * that all state-related parameters and results are defined
20  * as <tt>long</tt> rather than <tt>int</tt>. This class
21  * may be useful when creating synchronizers such as
22  * multilevel locks and barriers that require
23  * 64 bits of state.
24  *
25  * <p>See {@link AbstractQueuedSynchronizer} for usage
26  * notes and examples.
27  *
28  * @since 1.6
29  * @author Doug Lea
30  */

31 public abstract class AbstractQueuedLongSynchronizer
32     extends AbstractOwnableSynchronizer JavaDoc
33     implements java.io.Serializable JavaDoc {
34
35     private static final long serialVersionUID = 7373984972572414692L;
36
37     /*
38       To keep sources in sync, the remainder of this source file is
39       exactly cloned from AbstractQueuedSynchronizer, replacing class
40       name and changing ints related with sync state to longs. Please
41       keep it that way.
42     */

43
44     /**
45      * Creates a new <tt>AbstractQueuedLongSynchronizer</tt> instance
46      * with initial synchronization state of zero.
47      */

48     protected AbstractQueuedLongSynchronizer() { }
49
50     /**
51      * Wait queue node class.
52      *
53      * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
54      * Hagersten) lock queue. CLH locks are normally used for
55      * spinlocks. We instead use them for blocking synchronizers, but
56      * use the same basic tactic of holding some of the control
57      * information about a thread in the predecessor of its node. A
58      * "status" field in each node keeps track of whether a thread
59      * should block. A node is signalled when its predecessor
60      * releases. Each node of the queue otherwise serves as a
61      * specific-notification-style monitor holding a single waiting
62      * thread. The status field does NOT control whether threads are
63      * granted locks etc though. A thread may try to acquire if it is
64      * first in the queue. But being first does not guarantee success;
65      * it only gives the right to contend. So the currently released
66      * contender thread may need to rewait.
67      *
68      * <p>To enqueue into a CLH lock, you atomically splice it in as new
69      * tail. To dequeue, you just set the head field.
70      * <pre>
71      * +------+ prev +-----+ +-----+
72      * head | | <---- | | <---- | | tail
73      * +------+ +-----+ +-----+
74      * </pre>
75      *
76      * <p>Insertion into a CLH queue requires only a single atomic
77      * operation on "tail", so there is a simple atomic point of
78      * demarcation from unqueued to queued. Similarly, dequeing
79      * involves only updating the "head". However, it takes a bit
80      * more work for nodes to determine who their successors are,
81      * in part to deal with possible cancellation due to timeouts
82      * and interrupts.
83      *
84      * <p>The "prev" links (not used in original CLH locks), are mainly
85      * needed to handle cancellation. If a node is cancelled, its
86      * successor is (normally) relinked to a non-cancelled
87      * predecessor. For explanation of similar mechanics in the case
88      * of spin locks, see the papers by Scott and Scherer at
89      * http://www.cs.rochester.edu/u/scott/synchronization/
90      *
91      * <p>We also use "next" links to implement blocking mechanics.
92      * The thread id for each node is kept in its own node, so a
93      * predecessor signals the next node to wake up by traversing
94      * next link to determine which thread it is. Determination of
95      * successor must avoid races with newly queued nodes to set
96      * the "next" fields of their predecessors. This is solved
97      * when necessary by checking backwards from the atomically
98      * updated "tail" when a node's successor appears to be null.
99      * (Or, said differently, the next-links are an optimization
100      * so that we don't usually need a backward scan.)
101      *
102      * <p>Cancellation introduces some conservatism to the basic
103      * algorithms. Since we must poll for cancellation of other
104      * nodes, we can miss noticing whether a cancelled node is
105      * ahead or behind us. This is dealt with by always unparking
106      * successors upon cancellation, allowing them to stabilize on
107      * a new predecessor.
108      *
109      * <p>CLH queues need a dummy header node to get started. But
110      * we don't create them on construction, because it would be wasted
111      * effort if there is never contention. Instead, the node
112      * is constructed and head and tail pointers are set upon first
113      * contention.
114      *
115      * <p>Threads waiting on Conditions use the same nodes, but
116      * use an additional link. Conditions only need to link nodes
117      * in simple (non-concurrent) linked queues because they are
118      * only accessed when exclusively held. Upon await, a node is
119      * inserted into a condition queue. Upon signal, the node is
120      * transferred to the main queue. A special value of status
121      * field is used to mark which queue a node is on.
122      *
123      * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
124      * Scherer and Michael Scott, along with members of JSR-166
125      * expert group, for helpful ideas, discussions, and critiques
126      * on the design of this class.
127      */

128     static final class Node {
129         /** waitStatus value to indicate thread has cancelled */
130         static final int CANCELLED = 1;
131         /** waitStatus value to indicate successor's thread needs unparking */
132         static final int SIGNAL = -1;
133         /** waitStatus value to indicate thread is waiting on condition */
134         static final int CONDITION = -2;
135         /** Marker to indicate a node is waiting in shared mode */
136         static final Node SHARED = new Node();
137         /** Marker to indicate a node is waiting in exclusive mode */
138         static final Node EXCLUSIVE = null;
139
140         /**
141          * Status field, taking on only the values:
142          * SIGNAL: The successor of this node is (or will soon be)
143          * blocked (via park), so the current node must
144          * unpark its successor when it releases or
145          * cancels. To avoid races, acquire methods must
146          * first indicate they need a signal,
147          * then retry the atomic acquire, and then,
148          * on failure, block.
149          * CANCELLED: This node is cancelled due to timeout or interrupt.
150          * Nodes never leave this state. In particular,
151          * a thread with cancelled node never again blocks.
152          * CONDITION: This node is currently on a condition queue.
153          * It will not be used as a sync queue node until
154          * transferred. (Use of this value here
155          * has nothing to do with the other uses
156          * of the field, but simplifies mechanics.)
157          * 0: None of the above
158          *
159          * The values are arranged numerically to simplify use.
160          * Non-negative values mean that a node doesn't need to
161          * signal. So, most code doesn't need to check for particular
162          * values, just for sign.
163          *
164          * The field is initialized to 0 for normal sync nodes, and
165          * CONDITION for condition nodes. It is modified only using
166          * CAS.
167          */

168         volatile int waitStatus;
169
170         /**
171          * Link to predecessor node that current node/thread relies on
172          * for checking waitStatus. Assigned during enqueing, and nulled
173          * out (for sake of GC) only upon dequeuing. Also, upon
174          * cancellation of a predecessor, we short-circuit while
175          * finding a non-cancelled one, which will always exist
176          * because the head node is never cancelled: A node becomes
177          * head only as a result of successful acquire. A
178          * cancelled thread never succeeds in acquiring, and a thread only
179          * cancels itself, not any other node.
180          */

181         volatile Node prev;
182
183         /**
184          * Link to the successor node that the current node/thread
185          * unparks upon release. Assigned once during enqueuing, and
186          * nulled out (for sake of GC) when no longer needed. Upon
187          * cancellation, we cannot adjust this field, but can notice
188          * status and bypass the node if cancelled. The enq operation
189          * does not assign next field of a predecessor until after
190          * attachment, so seeing a null next field does not
191          * necessarily mean that node is at end of queue. However, if
192          * a next field appears to be null, we can scan prev's from
193          * the tail to double-check.
194          */

195         volatile Node next;
196
197         /**
198          * The thread that enqueued this node. Initialized on
199          * construction and nulled out after use.
200          */

201         volatile Thread JavaDoc thread;
202
203         /**
204          * Link to next node waiting on condition, or the special
205          * value SHARED. Because condition queues are accessed only
206          * when holding in exclusive mode, we just need a simple
207          * linked queue to hold nodes while they are waiting on
208          * conditions. They are then transferred to the queue to
209          * re-acquire. And because conditions can only be exclusive,
210          * we save a field by using special value to indicate shared
211          * mode.
212          */

213         Node nextWaiter;
214
215         /**
216          * Returns true if node is waiting in shared mode
217          */

218         final boolean isShared() {
219             return nextWaiter == SHARED;
220         }
221
222         /**
223          * Returns previous node, or throws NullPointerException if
224          * null. Use when predecessor cannot be null.
225          * @return the predecessor of this node
226          */

227         final Node predecessor() throws NullPointerException JavaDoc {
228             Node p = prev;
229             if (p == null)
230                 throw new NullPointerException JavaDoc();
231             else
232                 return p;
233         }
234
235         Node() { // Used to establish initial head or SHARED marker
236
}
237
238         Node(Thread JavaDoc thread, Node mode) { // Used by addWaiter
239
this.nextWaiter = mode;
240             this.thread = thread;
241         }
242
243         Node(Thread JavaDoc thread, int waitStatus) { // Used by Condition
244
this.waitStatus = waitStatus;
245             this.thread = thread;
246         }
247     }
248
249     /**
250      * Head of the wait queue, lazily initialized. Except for
251      * initialization, it is modified only via method setHead. Note:
252      * If head exists, its waitStatus is guaranteed not to be
253      * CANCELLED.
254      */

255     private transient volatile Node head;
256
257     /**
258      * Tail of the wait queue, lazily initialized. Modified only via
259      * method enq to add new wait node.
260      */

261     private transient volatile Node tail;
262
263     /**
264      * The synchronization state.
265      */

266     private volatile long state;
267
268     /**
269      * Returns the current value of synchronization state.
270      * This operation has memory semantics of a <tt>volatile</tt> read.
271      * @return current state value
272      */

273     protected final long getState() {
274         return state;
275     }
276
277     /**
278      * Sets the value of synchronization state.
279      * This operation has memory semantics of a <tt>volatile</tt> write.
280      * @param newState the new state value
281      */

282     protected final void setState(long newState) {
283         state = newState;
284     }
285
286     /**
287      * Atomically sets synchronization state to the given updated
288      * value if the current state value equals the expected value.
289      * This operation has memory semantics of a <tt>volatile</tt> read
290      * and write.
291      *
292      * @param expect the expected value
293      * @param update the new value
294      * @return true if successful. False return indicates that the actual
295      * value was not equal to the expected value.
296      */

297     protected final boolean compareAndSetState(long expect, long update) {
298         // See below for intrinsics setup to support this
299
return unsafe.compareAndSwapLong(this, stateOffset, expect, update);
300     }
301
302     // Queuing utilities
303

304     /**
305      * The number of nanoseconds for which it is faster to spin
306      * rather than to use timed park. A rough estimate suffices
307      * to improve responsiveness with very short timeouts.
308      */

309     static final long spinForTimeoutThreshold = 1000L;
310
311     /**
312      * Inserts node into queue, initializing if necessary. See picture above.
313      * @param node the node to insert
314      * @return node's predecessor
315      */

316     private Node enq(final Node node) {
317         for (;;) {
318             Node t = tail;
319             if (t == null) { // Must initialize
320
Node h = new Node(); // Dummy header
321
h.next = node;
322                 node.prev = h;
323                 if (compareAndSetHead(h)) {
324                     tail = node;
325                     return h;
326                 }
327             }
328             else {
329                 node.prev = t;
330                 if (compareAndSetTail(t, node)) {
331                     t.next = node;
332                     return t;
333                 }
334             }
335         }
336     }
337
338     /**
339      * Creates and enqueues node for given thread and mode.
340      *
341      * @param current the thread
342      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
343      * @return the new node
344      */

345     private Node addWaiter(Node mode) {
346         Node node = new Node(Thread.currentThread(), mode);
347         // Try the fast path of enq; backup to full enq on failure
348
Node pred = tail;
349         if (pred != null) {
350             node.prev = pred;
351             if (compareAndSetTail(pred, node)) {
352                 pred.next = node;
353                 return node;
354             }
355         }
356         enq(node);
357         return node;
358     }
359
360     /**
361      * Sets head of queue to be node, thus dequeuing. Called only by
362      * acquire methods. Also nulls out unused fields for sake of GC
363      * and to suppress unnecessary signals and traversals.
364      *
365      * @param node the node
366      */

367     private void setHead(Node node) {
368         head = node;
369         node.thread = null;
370         node.prev = null;
371     }
372
373     /**
374      * Wakes up node's successor, if one exists.
375      *
376      * @param node the node
377      */

378     private void unparkSuccessor(Node node) {
379         /*
380          * Try to clear status in anticipation of signalling. It is
381          * OK if this fails or if status is changed by waiting thread.
382          */

383         compareAndSetWaitStatus(node, Node.SIGNAL, 0);
384
385         /*
386          * Thread to unpark is held in successor, which is normally
387          * just the next node. But if cancelled or apparently null,
388          * traverse backwards from tail to find the actual
389          * non-cancelled successor.
390          */

391         Node s = node.next;
392         if (s == null || s.waitStatus > 0) {
393             s = null;
394             for (Node t = tail; t != null && t != node; t = t.prev)
395                 if (t.waitStatus <= 0)
396                     s = t;
397         }
398         if (s != null)
399             LockSupport.unpark(s.thread);
400     }
401
402     /**
403      * Sets head of queue, and checks if successor may be waiting
404      * in shared mode, if so propagating if propagate > 0.
405      *
406      * @param pred the node holding waitStatus for node
407      * @param node the node
408      * @param propagate the return value from a tryAcquireShared
409      */

410     private void setHeadAndPropagate(Node node, long propagate) {
411         setHead(node);
412         if (propagate > 0 && node.waitStatus != 0) {
413             /*
414              * Don't bother fully figuring out successor. If it
415              * looks null, call unparkSuccessor anyway to be safe.
416              */

417             Node s = node.next;
418             if (s == null || s.isShared())
419                 unparkSuccessor(node);
420         }
421     }
422
423     // Utilities for various versions of acquire
424

425     /**
426      * Cancels an ongoing attempt to acquire.
427      *
428      * @param node the node
429      */

430     private void cancelAcquire(Node node) {
431     // Ignore if node doesn't exist
432
if (node == null)
433         return;
434
435     node.thread = null;
436
437     // Skip cancelled predecessors
438
Node pred = node.prev;
439     while (pred.waitStatus > 0)
440         node.prev = pred = pred.prev;
441
442     // Getting this before setting waitStatus ensures staleness
443
Node predNext = pred.next;
444
445     // Can use unconditional write instead of CAS here
446
node.waitStatus = Node.CANCELLED;
447
448     // If we are the tail, remove ourselves
449
if (node == tail && compareAndSetTail(node, pred)) {
450         compareAndSetNext(pred, predNext, null);
451     } else {
452         // If "active" predecessor found...
453
if (pred != head
454         && (pred.waitStatus == Node.SIGNAL
455             || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
456         && pred.thread != null) {
457
458         // If successor is active, set predecessor's next link
459
Node next = node.next;
460         if (next != null && next.waitStatus <= 0)
461             compareAndSetNext(pred, predNext, next);
462         } else {
463         unparkSuccessor(node);
464         }
465
466         node.next = node; // help GC
467
}
468     }
469
470     /**
471      * Checks and updates status for a node that failed to acquire.
472      * Returns true if thread should block. This is the main signal
473      * control in all acquire loops. Requires that pred == node.prev
474      *
475      * @param pred node's predecessor holding status
476      * @param node the node
477      * @return {@code true} if thread should block
478      */

479     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
480         int s = pred.waitStatus;
481         if (s < 0)
482             /*
483              * This node has already set status asking a release
484              * to signal it, so it can safely park
485              */

486             return true;
487         if (s > 0) {
488             /*
489              * Predecessor was cancelled. Skip over predecessors and
490              * indicate retry.
491              */

492         do {
493         node.prev = pred = pred.prev;
494         } while (pred.waitStatus > 0);
495         pred.next = node;
496     }
497         else
498             /*
499              * Indicate that we need a signal, but don't park yet. Caller
500              * will need to retry to make sure it cannot acquire before
501              * parking.
502              */

503             compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
504         return false;
505     }
506
507     /**
508      * Convenience method to interrupt current thread.
509      */

510     private static void selfInterrupt() {
511         Thread.currentThread().interrupt();
512     }
513
514     /**
515      * Convenience method to park and then check if interrupted
516      *
517      * @return {@code true} if interrupted
518      */

519     private final boolean parkAndCheckInterrupt() {
520         LockSupport.park(this);
521         return Thread.interrupted();
522     }
523
524     /*
525      * Various flavors of acquire, varying in exclusive/shared and
526      * control modes. Each is mostly the same, but annoyingly
527      * different. Only a little bit of factoring is possible due to
528      * interactions of exception mechanics (including ensuring that we
529      * cancel if tryAcquire throws exception) and other control, at
530      * least not without hurting performance too much.
531      */

532
533     /**
534      * Acquires in exclusive uninterruptible mode for thread already in
535      * queue. Used by condition wait methods as well as acquire.
536      *
537      * @param node the node
538      * @param arg the acquire argument
539      * @return {@code true} if interrupted while waiting
540      */

541     final boolean acquireQueued(final Node node, long arg) {
542         try {
543             boolean interrupted = false;
544             for (;;) {
545                 final Node p = node.predecessor();
546                 if (p == head && tryAcquire(arg)) {
547                     setHead(node);
548                     p.next = null; // help GC
549
return interrupted;
550                 }
551                 if (shouldParkAfterFailedAcquire(p, node) &&
552                     parkAndCheckInterrupt())
553                     interrupted = true;
554             }
555         } catch (RuntimeException JavaDoc ex) {
556             cancelAcquire(node);
557             throw ex;
558         }
559     }
560
561     /**
562      * Acquires in exclusive interruptible mode.
563      * @param arg the acquire argument
564      */

565     private void doAcquireInterruptibly(long arg)
566         throws InterruptedException JavaDoc {
567         final Node node = addWaiter(Node.EXCLUSIVE);
568         try {
569             for (;;) {
570                 final Node p = node.predecessor();
571                 if (p == head && tryAcquire(arg)) {
572                     setHead(node);
573                     p.next = null; // help GC
574
return;
575                 }
576                 if (shouldParkAfterFailedAcquire(p, node) &&
577                     parkAndCheckInterrupt())
578                     break;
579             }
580         } catch (RuntimeException JavaDoc ex) {
581             cancelAcquire(node);
582             throw ex;
583         }
584         // Arrive here only if interrupted
585
cancelAcquire(node);
586         throw new InterruptedException JavaDoc();
587     }
588
589     /**
590      * Acquires in exclusive timed mode.
591      *
592      * @param arg the acquire argument
593      * @param nanosTimeout max wait time
594      * @return {@code true} if acquired
595      */

596     private boolean doAcquireNanos(long arg, long nanosTimeout)
597         throws InterruptedException JavaDoc {
598         long lastTime = System.nanoTime();
599         final Node node = addWaiter(Node.EXCLUSIVE);
600         try {
601             for (;;) {
602                 final Node p = node.predecessor();
603                 if (p == head && tryAcquire(arg)) {
604                     setHead(node);
605                     p.next = null; // help GC
606
return true;
607                 }
608                 if (nanosTimeout <= 0) {
609                     cancelAcquire(node);
610                     return false;
611                 }
612                 if (nanosTimeout > spinForTimeoutThreshold &&
613                     shouldParkAfterFailedAcquire(p, node))
614                     LockSupport.parkNanos(this, nanosTimeout);
615                 long now = System.nanoTime();
616                 nanosTimeout -= now - lastTime;
617                 lastTime = now;
618                 if (Thread.interrupted())
619                     break;
620             }
621         } catch (RuntimeException JavaDoc ex) {
622             cancelAcquire(node);
623             throw ex;
624         }
625         // Arrive here only if interrupted
626
cancelAcquire(node);
627         throw new InterruptedException JavaDoc();
628     }
629
630     /**
631      * Acquires in shared uninterruptible mode.
632      * @param arg the acquire argument
633      */

634     private void doAcquireShared(long arg) {
635         final Node node = addWaiter(Node.SHARED);
636         try {
637             boolean interrupted = false;
638             for (;;) {
639                 final Node p = node.predecessor();
640                 if (p == head) {
641                     long r = tryAcquireShared(arg);
642                     if (r >= 0) {
643                         setHeadAndPropagate(node, r);
644                         p.next = null; // help GC
645
if (interrupted)
646                             selfInterrupt();
647                         return;
648                     }
649                 }
650                 if (shouldParkAfterFailedAcquire(p, node) &&
651                     parkAndCheckInterrupt())
652                     interrupted = true;
653             }
654         } catch (RuntimeException JavaDoc ex) {
655             cancelAcquire(node);
656             throw ex;
657         }
658     }
659
660     /**
661      * Acquires in shared interruptible mode.
662      * @param arg the acquire argument
663      */

664     private void doAcquireSharedInterruptibly(long arg)
665         throws InterruptedException JavaDoc {
666         final Node node = addWaiter(Node.SHARED);
667         try {
668             for (;;) {
669                 final Node p = node.predecessor();
670                 if (p == head) {
671                     long r = tryAcquireShared(arg);
672                     if (r >= 0) {
673                         setHeadAndPropagate(node, r);
674                         p.next = null; // help GC
675
return;
676                     }
677                 }
678                 if (shouldParkAfterFailedAcquire(p, node) &&
679                     parkAndCheckInterrupt())
680                     break;
681             }
682         } catch (RuntimeException JavaDoc ex) {
683             cancelAcquire(node);
684             throw ex;
685         }
686         // Arrive here only if interrupted
687
cancelAcquire(node);
688         throw new InterruptedException JavaDoc();
689     }
690
691     /**
692      * Acquires in shared timed mode.
693      *
694      * @param arg the acquire argument
695      * @param nanosTimeout max wait time
696      * @return {@code true} if acquired
697      */

698     private boolean doAcquireSharedNanos(long arg, long nanosTimeout)
699         throws InterruptedException JavaDoc {
700
701         long lastTime = System.nanoTime();
702         final Node node = addWaiter(Node.SHARED);
703         try {
704             for (;;) {
705                 final Node p = node.predecessor();
706                 if (p == head) {
707                     long r = tryAcquireShared(arg);
708                     if (r >= 0) {
709                         setHeadAndPropagate(node, r);
710                         p.next = null; // help GC
711
return true;
712                     }
713                 }
714                 if (nanosTimeout <= 0) {
715                     cancelAcquire(node);
716                     return false;
717                 }
718                 if (nanosTimeout > spinForTimeoutThreshold &&
719                     shouldParkAfterFailedAcquire(p, node))
720                     LockSupport.parkNanos(this, nanosTimeout);
721                 long now = System.nanoTime();
722                 nanosTimeout -= now - lastTime;
723                 lastTime = now;
724                 if (Thread.interrupted())
725                     break;
726             }
727         } catch (RuntimeException JavaDoc ex) {
728             cancelAcquire(node);
729             throw ex;
730         }
731         // Arrive here only if interrupted
732
cancelAcquire(node);
733         throw new InterruptedException JavaDoc();
734     }
735
736     // Main exported methods
737

738     /**
739      * Attempts to acquire in exclusive mode. This method should query
740      * if the state of the object permits it to be acquired in the
741      * exclusive mode, and if so to acquire it.
742      *
743      * <p>This method is always invoked by the thread performing
744      * acquire. If this method reports failure, the acquire method
745      * may queue the thread, if it is not already queued, until it is
746      * signalled by a release from some other thread. This can be used
747      * to implement method {@link Lock#tryLock()}.
748      *
749      * <p>The default
750      * implementation throws {@link UnsupportedOperationException}.
751      *
752      * @param arg the acquire argument. This value is always the one
753      * passed to an acquire method, or is the value saved on entry
754      * to a condition wait. The value is otherwise uninterpreted
755      * and can represent anything you like.
756      * @return {@code true} if successful. Upon success, this object has
757      * been acquired.
758      * @throws IllegalMonitorStateException if acquiring would place this
759      * synchronizer in an illegal state. This exception must be
760      * thrown in a consistent fashion for synchronization to work
761      * correctly.
762      * @throws UnsupportedOperationException if exclusive mode is not supported
763      */

764     protected boolean tryAcquire(long arg) {
765         throw new UnsupportedOperationException JavaDoc();
766     }
767
768     /**
769      * Attempts to set the state to reflect a release in exclusive
770      * mode.
771      *
772      * <p>This method is always invoked by the thread performing release.
773      *
774      * <p>The default implementation throws
775      * {@link UnsupportedOperationException}.
776      *
777      * @param arg the release argument. This value is always the one
778      * passed to a release method, or the current state value upon
779      * entry to a condition wait. The value is otherwise
780      * uninterpreted and can represent anything you like.
781      * @return {@code true} if this object is now in a fully released
782      * state, so that any waiting threads may attempt to acquire;
783      * and {@code false} otherwise.
784      * @throws IllegalMonitorStateException if releasing would place this
785      * synchronizer in an illegal state. This exception must be
786      * thrown in a consistent fashion for synchronization to work
787      * correctly.
788      * @throws UnsupportedOperationException if exclusive mode is not supported
789      */

790     protected boolean tryRelease(long arg) {
791         throw new UnsupportedOperationException JavaDoc();
792     }
793
794     /**
795      * Attempts to acquire in shared mode. This method should query if
796      * the state of the object permits it to be acquired in the shared
797      * mode, and if so to acquire it.
798      *
799      * <p>This method is always invoked by the thread performing
800      * acquire. If this method reports failure, the acquire method
801      * may queue the thread, if it is not already queued, until it is
802      * signalled by a release from some other thread.
803      *
804      * <p>The default implementation throws {@link
805      * UnsupportedOperationException}.
806      *
807      * @param arg the acquire argument. This value is always the one
808      * passed to an acquire method, or is the value saved on entry
809      * to a condition wait. The value is otherwise uninterpreted
810      * and can represent anything you like.
811      * @return a negative value on failure; zero if acquisition in shared
812      * mode succeeded but no subsequent shared-mode acquire can
813      * succeed; and a positive value if acquisition in shared
814      * mode succeeded and subsequent shared-mode acquires might
815      * also succeed, in which case a subsequent waiting thread
816      * must check availability. (Support for three different
817      * return values enables this method to be used in contexts
818      * where acquires only sometimes act exclusively.) Upon
819      * success, this object has been acquired.
820      * @throws IllegalMonitorStateException if acquiring would place this
821      * synchronizer in an illegal state. This exception must be
822      * thrown in a consistent fashion for synchronization to work
823      * correctly.
824      * @throws UnsupportedOperationException if shared mode is not supported
825      */

826     protected long tryAcquireShared(long arg) {
827         throw new UnsupportedOperationException JavaDoc();
828     }
829
830     /**
831      * Attempts to set the state to reflect a release in shared mode.
832      *
833      * <p>This method is always invoked by the thread performing release.
834      *
835      * <p>The default implementation throws
836      * {@link UnsupportedOperationException}.
837      *
838      * @param arg the release argument. This value is always the one
839      * passed to a release method, or the current state value upon
840      * entry to a condition wait. The value is otherwise
841      * uninterpreted and can represent anything you like.
842      * @return {@code true} if this release of shared mode may permit a
843      * waiting acquire (shared or exclusive) to succeed; and
844      * {@code false} otherwise
845      * @throws IllegalMonitorStateException if releasing would place this
846      * synchronizer in an illegal state. This exception must be
847      * thrown in a consistent fashion for synchronization to work
848      * correctly.
849      * @throws UnsupportedOperationException if shared mode is not supported
850      */

851     protected boolean tryReleaseShared(long arg) {
852         throw new UnsupportedOperationException JavaDoc();
853     }
854
855     /**
856      * Returns {@code true} if synchronization is held exclusively with
857      * respect to the current (calling) thread. This method is invoked
858      * upon each call to a non-waiting {@link ConditionObject} method.
859      * (Waiting methods instead invoke {@link #release}.)
860      *
861      * <p>The default implementation throws {@link
862      * UnsupportedOperationException}. This method is invoked
863      * internally only within {@link ConditionObject} methods, so need
864      * not be defined if conditions are not used.
865      *
866      * @return {@code true} if synchronization is held exclusively;
867      * {@code false} otherwise
868      * @throws UnsupportedOperationException if conditions are not supported
869      */

870     protected boolean isHeldExclusively() {
871         throw new UnsupportedOperationException JavaDoc();
872     }
873
874     /**
875      * Acquires in exclusive mode, ignoring interrupts. Implemented
876      * by invoking at least once {@link #tryAcquire},
877      * returning on success. Otherwise the thread is queued, possibly
878      * repeatedly blocking and unblocking, invoking {@link
879      * #tryAcquire} until success. This method can be used
880      * to implement method {@link Lock#lock}.
881      *
882      * @param arg the acquire argument. This value is conveyed to
883      * {@link #tryAcquire} but is otherwise uninterpreted and
884      * can represent anything you like.
885      */

886     public final void acquire(long arg) {
887         if (!tryAcquire(arg) &&
888             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
889             selfInterrupt();
890     }
891
892     /**
893      * Acquires in exclusive mode, aborting if interrupted.
894      * Implemented by first checking interrupt status, then invoking
895      * at least once {@link #tryAcquire}, returning on
896      * success. Otherwise the thread is queued, possibly repeatedly
897      * blocking and unblocking, invoking {@link #tryAcquire}
898      * until success or the thread is interrupted. This method can be
899      * used to implement method {@link Lock#lockInterruptibly}.
900      *
901      * @param arg the acquire argument. This value is conveyed to
902      * {@link #tryAcquire} but is otherwise uninterpreted and
903      * can represent anything you like.
904      * @throws InterruptedException if the current thread is interrupted
905      */

906     public final void acquireInterruptibly(long arg) throws InterruptedException JavaDoc {
907         if (Thread.interrupted())
908             throw new InterruptedException JavaDoc();
909         if (!tryAcquire(arg))
910             doAcquireInterruptibly(arg);
911     }
912
913     /**
914      * Attempts to acquire in exclusive mode, aborting if interrupted,
915      * and failing if the given timeout elapses. Implemented by first
916      * checking interrupt status, then invoking at least once {@link
917      * #tryAcquire}, returning on success. Otherwise, the thread is
918      * queued, possibly repeatedly blocking and unblocking, invoking
919      * {@link #tryAcquire} until success or the thread is interrupted
920      * or the timeout elapses. This method can be used to implement
921      * method {@link Lock#tryLock(long, TimeUnit)}.
922      *
923      * @param arg the acquire argument. This value is conveyed to
924      * {@link #tryAcquire} but is otherwise uninterpreted and
925      * can represent anything you like.
926      * @param nanosTimeout the maximum number of nanoseconds to wait
927      * @return {@code true} if acquired; {@code false} if timed out
928      * @throws InterruptedException if the current thread is interrupted
929      */

930     public final boolean tryAcquireNanos(long arg, long nanosTimeout) throws InterruptedException JavaDoc {
931     if (Thread.interrupted())
932         throw new InterruptedException JavaDoc();
933     return tryAcquire(arg) ||
934         doAcquireNanos(arg, nanosTimeout);
935     }
936
937     /**
938      * Releases in exclusive mode. Implemented by unblocking one or
939      * more threads if {@link #tryRelease} returns true.
940      * This method can be used to implement method {@link Lock#unlock}.
941      *
942      * @param arg the release argument. This value is conveyed to
943      * {@link #tryRelease} but is otherwise uninterpreted and
944      * can represent anything you like.
945      * @return the value returned from {@link #tryRelease}
946      */

947     public final boolean release(long arg) {
948         if (tryRelease(arg)) {
949             Node h = head;
950             if (h != null && h.waitStatus != 0)
951                 unparkSuccessor(h);
952             return true;
953         }
954         return false;
955     }
956
957     /**
958      * Acquires in shared mode, ignoring interrupts. Implemented by
959      * first invoking at least once {@link #tryAcquireShared},
960      * returning on success. Otherwise the thread is queued, possibly
961      * repeatedly blocking and unblocking, invoking {@link
962      * #tryAcquireShared} until success.
963      *
964      * @param arg the acquire argument. This value is conveyed to
965      * {@link #tryAcquireShared} but is otherwise uninterpreted
966      * and can represent anything you like.
967      */

968     public final void acquireShared(long arg) {
969         if (tryAcquireShared(arg) < 0)
970             doAcquireShared(arg);
971     }
972
973     /**
974      * Acquires in shared mode, aborting if interrupted. Implemented
975      * by first checking interrupt status, then invoking at least once
976      * {@link #tryAcquireShared}, returning on success. Otherwise the
977      * thread is queued, possibly repeatedly blocking and unblocking,
978      * invoking {@link #tryAcquireShared} until success or the thread
979      * is interrupted.
980      * @param arg the acquire argument.
981      * This value is conveyed to {@link #tryAcquireShared} but is
982      * otherwise uninterpreted and can represent anything
983      * you like.
984      * @throws InterruptedException if the current thread is interrupted
985      */

986     public final void acquireSharedInterruptibly(long arg) throws InterruptedException JavaDoc {
987         if (Thread.interrupted())
988             throw new InterruptedException JavaDoc();
989         if (tryAcquireShared(arg) < 0)
990             doAcquireSharedInterruptibly(arg);
991     }
992
993     /**
994      * Attempts to acquire in shared mode, aborting if interrupted, and
995      * failing if the given timeout elapses. Implemented by first
996      * checking interrupt status, then invoking at least once {@link
997      * #tryAcquireShared}, returning on success. Otherwise, the
998      * thread is queued, possibly repeatedly blocking and unblocking,
999      * invoking {@link #tryAcquireShared} until success or the thread
1000     * is interrupted or the timeout elapses.
1001     *
1002     * @param arg the acquire argument. This value is conveyed to
1003     * {@link #tryAcquireShared} but is otherwise uninterpreted
1004     * and can represent anything you like.
1005     * @param nanosTimeout the maximum number of nanoseconds to wait
1006     * @return {@code true} if acquired; {@code false} if timed out
1007     * @throws InterruptedException if the current thread is interrupted
1008     */

1009    public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) throws InterruptedException JavaDoc {
1010    if (Thread.interrupted())
1011        throw new InterruptedException JavaDoc();
1012    return tryAcquireShared(arg) >= 0 ||
1013        doAcquireSharedNanos(arg, nanosTimeout);
1014    }
1015
1016    /**
1017     * Releases in shared mode. Implemented by unblocking one or more
1018     * threads if {@link #tryReleaseShared} returns true.
1019     *
1020     * @param arg the release argument. This value is conveyed to
1021     * {@link #tryReleaseShared} but is otherwise uninterpreted
1022     * and can represent anything you like.
1023     * @return the value returned from {@link #tryReleaseShared}
1024     */

1025    public final boolean releaseShared(long arg) {
1026        if (tryReleaseShared(arg)) {
1027            Node h = head;
1028            if (h != null && h.waitStatus != 0)
1029                unparkSuccessor(h);
1030            return true;
1031        }
1032        return false;
1033    }
1034
1035    // Queue inspection methods
1036

1037    /**
1038     * Queries whether any threads are waiting to acquire. Note that
1039     * because cancellations due to interrupts and timeouts may occur
1040     * at any time, a {@code true} return does not guarantee that any
1041     * other thread will ever acquire.
1042     *
1043     * <p>In this implementation, this operation returns in
1044     * constant time.
1045     *
1046     * @return {@code true} if there may be other threads waiting to acquire
1047     */

1048    public final boolean hasQueuedThreads() {
1049        return head != tail;
1050    }
1051
1052    /**
1053     * Queries whether any threads have ever contended to acquire this
1054     * synchronizer; that is if an acquire method has ever blocked.
1055     *
1056     * <p>In this implementation, this operation returns in
1057     * constant time.
1058     *
1059     * @return {@code true} if there has ever been contention
1060     */

1061    public final boolean hasContended() {
1062        return head != null;
1063    }
1064
1065    /**
1066     * Returns the first (longest-waiting) thread in the queue, or
1067     * {@code null} if no threads are currently queued.
1068     *
1069     * <p>In this implementation, this operation normally returns in
1070     * constant time, but may iterate upon contention if other threads are
1071     * concurrently modifying the queue.
1072     *
1073     * @return the first (longest-waiting) thread in the queue, or
1074     * {@code null} if no threads are currently queued
1075     */

1076    public final Thread JavaDoc getFirstQueuedThread() {
1077        // handle only fast path, else relay
1078
return (head == tail)? null : fullGetFirstQueuedThread();
1079    }
1080
1081    /**
1082     * Version of getFirstQueuedThread called when fastpath fails
1083     */

1084    private Thread JavaDoc fullGetFirstQueuedThread() {
1085        /*
1086         * The first node is normally h.next. Try to get its
1087         * thread field, ensuring consistent reads: If thread
1088         * field is nulled out or s.prev is no longer head, then
1089         * some other thread(s) concurrently performed setHead in
1090         * between some of our reads. We try this twice before
1091         * resorting to traversal.
1092         */

1093        Node h, s;
1094        Thread JavaDoc st;
1095        if (((h = head) != null && (s = h.next) != null &&
1096             s.prev == head && (st = s.thread) != null) ||
1097            ((h = head) != null && (s = h.next) != null &&
1098             s.prev == head && (st = s.thread) != null))
1099            return st;
1100
1101        /*
1102         * Head's next field might not have been set yet, or may have
1103         * been unset after setHead. So we must check to see if tail
1104         * is actually first node. If not, we continue on, safely
1105         * traversing from tail back to head to find first,
1106         * guaranteeing termination.
1107         */

1108
1109        Node t = tail;
1110        Thread JavaDoc firstThread = null;
1111        while (t != null && t != head) {
1112            Thread JavaDoc tt = t.thread;
1113            if (tt != null)
1114                firstThread = tt;
1115            t = t.prev;
1116        }
1117        return firstThread;
1118    }
1119
1120    /**
1121     * Returns true if the given thread is currently queued.
1122     *
1123     * <p>This implementation traverses the queue to determine
1124     * presence of the given thread.
1125     *
1126     * @param thread the thread
1127     * @return {@code true} if the given thread is on the queue
1128     * @throws NullPointerException if the thread is null
1129     */

1130    public final boolean isQueued(Thread JavaDoc thread) {
1131        if (thread == null)
1132            throw new NullPointerException JavaDoc();
1133        for (Node p = tail; p != null; p = p.prev)
1134            if (p.thread == thread)
1135                return true;
1136        return false;
1137    }
1138
1139    /**
1140     * Return {@code true} if the apparent first queued thread, if one
1141     * exists, is not waiting in exclusive mode. Used only as a heuristic
1142     * in ReentrantReadWriteLock.
1143     */

1144    final boolean apparentlyFirstQueuedIsExclusive() {
1145        Node h, s;
1146        return ((h = head) != null && (s = h.next) != null &&
1147                s.nextWaiter != Node.SHARED);
1148    }
1149
1150    /**
1151     * Return {@code true} if the queue is empty or if the given thread
1152     * is at the head of the queue. This is reliable only if
1153     * <tt>current</tt> is actually Thread.currentThread() of caller.
1154     */

1155    final boolean isFirst(Thread JavaDoc current) {
1156        Node h, s;
1157        return ((h = head) == null ||
1158                ((s = h.next) != null && s.thread == current) ||
1159                fullIsFirst(current));
1160    }
1161
1162    final boolean fullIsFirst(Thread JavaDoc current) {
1163        // same idea as fullGetFirstQueuedThread
1164
Node h, s;
1165        Thread JavaDoc firstThread = null;
1166        if (((h = head) != null && (s = h.next) != null &&
1167             s.prev == head && (firstThread = s.thread) != null))
1168            return firstThread == current;
1169        Node t = tail;
1170        while (t != null && t != head) {
1171            Thread JavaDoc tt = t.thread;
1172            if (tt != null)
1173                firstThread = tt;
1174            t = t.prev;
1175        }
1176        return firstThread == current || firstThread == null;
1177    }
1178
1179
1180    // Instrumentation and monitoring methods
1181

1182    /**
1183     * Returns an estimate of the number of threads waiting to
1184     * acquire. The value is only an estimate because the number of
1185     * threads may change dynamically while this method traverses
1186     * internal data structures. This method is designed for use in
1187     * monitoring system state, not for synchronization
1188     * control.
1189     *
1190     * @return the estimated number of threads waiting to acquire
1191     */

1192    public final int getQueueLength() {
1193        int n = 0;
1194        for (Node p = tail; p != null; p = p.prev) {
1195            if (p.thread != null)
1196                ++n;
1197        }
1198        return n;
1199    }
1200
1201    /**
1202     * Returns a collection containing threads that may be waiting to
1203     * acquire. Because the actual set of threads may change
1204     * dynamically while constructing this result, the returned
1205     * collection is only a best-effort estimate. The elements of the
1206     * returned collection are in no particular order. This method is
1207     * designed to facilitate construction of subclasses that provide
1208     * more extensive monitoring facilities.
1209     *
1210     * @return the collection of threads
1211     */

1212    public final Collection<Thread JavaDoc> getQueuedThreads() {
1213        ArrayList<Thread JavaDoc> list = new ArrayList<Thread JavaDoc>();
1214        for (Node p = tail; p != null; p = p.prev) {
1215            Thread JavaDoc t = p.thread;
1216            if (t != null)
1217                list.add(t);
1218        }
1219        return list;
1220    }
1221
1222    /**
1223     * Returns a collection containing threads that may be waiting to
1224     * acquire in exclusive mode. This has the same properties
1225     * as {@link #getQueuedThreads} except that it only returns
1226     * those threads waiting due to an exclusive acquire.
1227     *
1228     * @return the collection of threads
1229     */

1230    public final Collection<Thread JavaDoc> getExclusiveQueuedThreads() {
1231        ArrayList<Thread JavaDoc> list = new ArrayList<Thread JavaDoc>();
1232        for (Node p = tail; p != null; p = p.prev) {
1233            if (!p.isShared()) {
1234                Thread JavaDoc t = p.thread;
1235                if (t != null)
1236                    list.add(t);
1237            }
1238        }
1239        return list;
1240    }
1241
1242    /**
1243     * Returns a collection containing threads that may be waiting to
1244     * acquire in shared mode. This has the same properties
1245     * as {@link #getQueuedThreads} except that it only returns
1246     * those threads waiting due to a shared acquire.
1247     *
1248     * @return the collection of threads
1249     */

1250    public final Collection<Thread JavaDoc> getSharedQueuedThreads() {
1251        ArrayList<Thread JavaDoc> list = new ArrayList<Thread JavaDoc>();
1252        for (Node p = tail; p != null; p = p.prev) {
1253            if (p.isShared()) {
1254                Thread JavaDoc t = p.thread;
1255                if (t != null)
1256                    list.add(t);
1257            }
1258        }
1259        return list;
1260    }
1261
1262    /**
1263     * Returns a string identifying this synchronizer, as well as its state.
1264     * The state, in brackets, includes the String {@code "State ="}
1265     * followed by the current value of {@link #getState}, and either
1266     * {@code "nonempty"} or {@code "empty"} depending on whether the
1267     * queue is empty.
1268     *
1269     * @return a string identifying this synchronizer, as well as its state
1270     */

1271    public String JavaDoc toString() {
1272        long s = getState();
1273        String JavaDoc q = hasQueuedThreads()? "non" : "";
1274        return super.toString() +
1275            "[State = " + s + ", " + q + "empty queue]";
1276    }
1277
1278
1279    // Internal support methods for Conditions
1280

1281    /**
1282     * Returns true if a node, always one that was initially placed on
1283     * a condition queue, is now waiting to reacquire on sync queue.
1284     * @param node the node
1285     * @return true if is reacquiring
1286     */

1287    final boolean isOnSyncQueue(Node node) {
1288        if (node.waitStatus == Node.CONDITION || node.prev == null)
1289            return false;
1290        if (node.next != null) // If has successor, it must be on queue
1291
return true;
1292        /*
1293         * node.prev can be non-null, but not yet on queue because
1294         * the CAS to place it on queue can fail. So we have to
1295         * traverse from tail to make sure it actually made it. It
1296         * will always be near the tail in calls to this method, and
1297         * unless the CAS failed (which is unlikely), it will be
1298         * there, so we hardly ever traverse much.
1299         */

1300        return findNodeFromTail(node);
1301    }
1302
1303    /**
1304     * Returns true if node is on sync queue by searching backwards from tail.
1305     * Called only when needed by isOnSyncQueue.
1306     * @return true if present
1307     */

1308    private boolean findNodeFromTail(Node node) {
1309        Node t = tail;
1310        for (;;) {
1311            if (t == node)
1312                return true;
1313            if (t == null)
1314                return false;
1315            t = t.prev;
1316        }
1317    }
1318
1319    /**
1320     * Transfers a node from a condition queue onto sync queue.
1321     * Returns true if successful.
1322     * @param node the node
1323     * @return true if successfully transferred (else the node was
1324     * cancelled before signal).
1325     */

1326    final boolean transferForSignal(Node node) {
1327        /*
1328         * If cannot change waitStatus, the node has been cancelled.
1329         */

1330        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
1331            return false;
1332
1333        /*
1334         * Splice onto queue and try to set waitStatus of predecessor to
1335         * indicate that thread is (probably) waiting. If cancelled or
1336         * attempt to set waitStatus fails, wake up to resync (in which
1337         * case the waitStatus can be transiently and harmlessly wrong).
1338         */

1339        Node p = enq(node);
1340        int c = p.waitStatus;
1341        if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
1342            LockSupport.unpark(node.thread);
1343        return true;
1344    }
1345
1346    /**
1347     * Transfers node, if necessary, to sync queue after a cancelled
1348     * wait. Returns true if thread was cancelled before being
1349     * signalled.
1350     * @param current the waiting thread
1351     * @param node its node
1352     * @return true if cancelled before the node was signalled.
1353     */

1354    final boolean transferAfterCancelledWait(Node node) {
1355        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
1356            enq(node);
1357            return true;
1358        }
1359        /*
1360         * If we lost out to a signal(), then we can't proceed
1361         * until it finishes its enq(). Cancelling during an
1362         * incomplete transfer is both rare and transient, so just
1363         * spin.
1364         */

1365        while (!isOnSyncQueue(node))
1366            Thread.yield();
1367        return false;
1368    }
1369
1370    /**
1371     * Invokes release with current state value; returns saved state.
1372     * Cancels node and throws exception on failure.
1373     * @param node the condition node for this wait
1374     * @return previous sync state
1375     */

1376    final long fullyRelease(Node node) {
1377        try {
1378            long savedState = getState();
1379            if (release(savedState))
1380                return savedState;
1381        } catch (RuntimeException JavaDoc ex) {
1382            node.waitStatus = Node.CANCELLED;
1383            throw ex;
1384        }
1385        // reach here if release fails
1386
node.waitStatus = Node.CANCELLED;
1387        throw new IllegalMonitorStateException JavaDoc();
1388    }
1389
1390    // Instrumentation methods for conditions
1391

1392    /**
1393     * Queries whether the given ConditionObject
1394     * uses this synchronizer as its lock.
1395     *
1396     * @param condition the condition
1397     * @return <tt>true</tt> if owned
1398     * @throws NullPointerException if the condition is null
1399     */

1400    public final boolean owns(ConditionObject condition) {
1401        if (condition == null)
1402            throw new NullPointerException JavaDoc();
1403        return condition.isOwnedBy(this);
1404    }
1405
1406    /**
1407     * Queries whether any threads are waiting on the given condition
1408     * associated with this synchronizer. Note that because timeouts
1409     * and interrupts may occur at any time, a <tt>true</tt> return
1410     * does not guarantee that a future <tt>signal</tt> will awaken
1411     * any threads. This method is designed primarily for use in
1412     * monitoring of the system state.
1413     *
1414     * @param condition the condition
1415     * @return <tt>true</tt> if there are any waiting threads
1416     * @throws IllegalMonitorStateException if exclusive synchronization
1417     * is not held
1418     * @throws IllegalArgumentException if the given condition is
1419     * not associated with this synchronizer
1420     * @throws NullPointerException if the condition is null
1421     */

1422    public final boolean hasWaiters(ConditionObject condition) {
1423        if (!owns(condition))
1424            throw new IllegalArgumentException JavaDoc("Not owner");
1425        return condition.hasWaiters();
1426    }
1427
1428    /**
1429     * Returns an estimate of the number of threads waiting on the
1430     * given condition associated with this synchronizer. Note that
1431     * because timeouts and interrupts may occur at any time, the
1432     * estimate serves only as an upper bound on the actual number of
1433     * waiters. This method is designed for use in monitoring of the
1434     * system state, not for synchronization control.
1435     *
1436     * @param condition the condition
1437     * @return the estimated number of waiting threads
1438     * @throws IllegalMonitorStateException if exclusive synchronization
1439     * is not held
1440     * @throws IllegalArgumentException if the given condition is
1441     * not associated with this synchronizer
1442     * @throws NullPointerException if the condition is null
1443     */

1444    public final int getWaitQueueLength(ConditionObject condition) {
1445        if (!owns(condition))
1446            throw new IllegalArgumentException JavaDoc("Not owner");
1447        return condition.getWaitQueueLength();
1448    }
1449
1450    /**
1451     * Returns a collection containing those threads that may be
1452     * waiting on the given condition associated with this
1453     * synchronizer. Because the actual set of threads may change
1454     * dynamically while constructing this result, the returned
1455     * collection is only a best-effort estimate. The elements of the
1456     * returned collection are in no particular order.
1457     *
1458     * @param condition the condition
1459     * @return the collection of threads
1460     * @throws IllegalMonitorStateException if exclusive synchronization
1461     * is not held
1462     * @throws IllegalArgumentException if the given condition is
1463     * not associated with this synchronizer
1464     * @throws NullPointerException if the condition is null
1465     */

1466    public final Collection<Thread JavaDoc> getWaitingThreads(ConditionObject condition) {
1467        if (!owns(condition))
1468            throw new IllegalArgumentException JavaDoc("Not owner");
1469        return condition.getWaitingThreads();
1470    }
1471
1472    /**
1473     * Condition implementation for a {@link
1474     * AbstractQueuedLongSynchronizer} serving as the basis of a {@link
1475     * Lock} implementation.
1476     *
1477     * <p>Method documentation for this class describes mechanics,
1478     * not behavioral specifications from the point of view of Lock
1479     * and Condition users. Exported versions of this class will in
1480     * general need to be accompanied by documentation describing
1481     * condition semantics that rely on those of the associated
1482     * <tt>AbstractQueuedLongSynchronizer</tt>.
1483     *
1484     * <p>This class is Serializable, but all fields are transient,
1485     * so deserialized conditions have no waiters.
1486     *
1487     * @since 1.6
1488     */

1489    public class ConditionObject implements Condition JavaDoc, java.io.Serializable JavaDoc {
1490        private static final long serialVersionUID = 1173984872572414699L;
1491        /** First node of condition queue. */
1492        private transient Node firstWaiter;
1493        /** Last node of condition queue. */
1494        private transient Node lastWaiter;
1495
1496        /**
1497         * Creates a new <tt>ConditionObject</tt> instance.
1498         */

1499        public ConditionObject() { }
1500
1501        // Internal methods
1502

1503        /**
1504         * Adds a new waiter to wait queue.
1505         * @return its new wait node
1506         */

1507        private Node addConditionWaiter() {
1508            Node t = lastWaiter;
1509            // If lastWaiter is cancelled, clean out.
1510
if (t != null && t.waitStatus != Node.CONDITION) {
1511                unlinkCancelledWaiters();
1512                t = lastWaiter;
1513            }
1514            Node node = new Node(Thread.currentThread(), Node.CONDITION);
1515            if (t == null)
1516                firstWaiter = node;
1517            else
1518                t.nextWaiter = node;
1519            lastWaiter = node;
1520            return node;
1521        }
1522
1523        /**
1524         * Removes and transfers nodes until hit non-cancelled one or
1525         * null. Split out from signal in part to encourage compilers
1526         * to inline the case of no waiters.
1527         * @param first (non-null) the first node on condition queue
1528         */

1529        private void doSignal(Node first) {
1530            do {
1531                if ( (firstWaiter = first.nextWaiter) == null)
1532                    lastWaiter = null;
1533                first.nextWaiter = null;
1534            } while (!transferForSignal(first) &&
1535                     (first = firstWaiter) != null);
1536        }
1537
1538        /**
1539         * Removes and transfers all nodes.
1540         * @param first (non-null) the first node on condition queue
1541         */

1542        private void doSignalAll(Node first) {
1543            lastWaiter = firstWaiter = null;
1544            do {
1545                Node next = first.nextWaiter;
1546                first.nextWaiter = null;
1547                transferForSignal(first);
1548                first = next;
1549            } while (first != null);
1550        }
1551
1552        /**
1553         * Unlinks cancelled waiter nodes from condition queue.
1554         * Called only while holding lock. This is called when
1555         * cancellation occurred during condition wait, and upon
1556         * insertion of a new waiter when lastWaiter is seen to have
1557         * been cancelled. This method is needed to avoid garbage
1558         * retention in the absence of signals. So even though it may
1559         * require a full traversal, it comes into play only when
1560         * timeouts or cancellations occur in the absence of
1561         * signals. It traverses all nodes rather than stopping at a
1562         * particular target to unlink all pointers to garbage nodes
1563         * without requiring many re-traversals during cancellation
1564         * storms.
1565         */

1566        private void unlinkCancelledWaiters() {
1567            Node t = firstWaiter;
1568            Node trail = null;
1569            while (t != null) {
1570                Node next = t.nextWaiter;
1571                if (t.waitStatus != Node.CONDITION) {
1572                    t.nextWaiter = null;
1573                    if (trail == null)
1574                        firstWaiter = next;
1575                    else
1576                        trail.nextWaiter = next;
1577                    if (next == null)
1578                        lastWaiter = trail;
1579                }
1580                else
1581                    trail = t;
1582                t = next;
1583            }
1584        }
1585
1586        // public methods
1587

1588        /**
1589         * Moves the longest-waiting thread, if one exists, from the
1590         * wait queue for this condition to the wait queue for the
1591         * owning lock.
1592         *
1593         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1594         * returns {@code false}
1595         */

1596        public final void signal() {
1597            if (!isHeldExclusively())
1598                throw new IllegalMonitorStateException JavaDoc();
1599            Node first = firstWaiter;
1600            if (first != null)
1601                doSignal(first);
1602        }
1603
1604        /**
1605         * Moves all threads from the wait queue for this condition to
1606         * the wait queue for the owning lock.
1607         *
1608         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1609         * returns {@code false}
1610         */

1611        public final void signalAll() {
1612            if (!isHeldExclusively())
1613                throw new IllegalMonitorStateException JavaDoc();
1614            Node first = firstWaiter;
1615            if (first != null)
1616                doSignalAll(first);
1617        }
1618
1619        /**
1620         * Implements uninterruptible condition wait.
1621         * <ol>
1622         * <li> Save lock state returned by {@link #getState}
1623         * <li> Invoke {@link #release} with
1624         * saved state as argument, throwing
1625         * IllegalMonitorStateException if it fails.
1626         * <li> Block until signalled
1627         * <li> Reacquire by invoking specialized version of
1628         * {@link #acquire} with saved state as argument.
1629         * </ol>
1630         */

1631        public final void awaitUninterruptibly() {
1632            Node node = addConditionWaiter();
1633            long savedState = fullyRelease(node);
1634            boolean interrupted = false;
1635            while (!isOnSyncQueue(node)) {
1636                LockSupport.park(this);
1637                if (Thread.interrupted())
1638                    interrupted = true;
1639            }
1640            if (acquireQueued(node, savedState) || interrupted)
1641                selfInterrupt();
1642        }
1643
1644        /*
1645         * For interruptible waits, we need to track whether to throw
1646         * InterruptedException, if interrupted while blocked on
1647         * condition, versus reinterrupt current thread, if
1648         * interrupted while blocked waiting to re-acquire.
1649         */

1650
1651        /** Mode meaning to reinterrupt on exit from wait */
1652        private static final int REINTERRUPT = 1;
1653        /** Mode meaning to throw InterruptedException on exit from wait */
1654        private static final int THROW_IE = -1;
1655
1656        /**
1657         * Checks for interrupt, returning THROW_IE if interrupted
1658         * before signalled, REINTERRUPT if after signalled, or
1659         * 0 if not interrupted.
1660         */

1661        private int checkInterruptWhileWaiting(Node node) {
1662            return (Thread.interrupted()) ?
1663                ((transferAfterCancelledWait(node))? THROW_IE : REINTERRUPT) :
1664                0;
1665        }
1666
1667        /**
1668         * Throws InterruptedException, reinterrupts current thread, or
1669         * does nothing, depending on mode.
1670         */

1671        private void reportInterruptAfterWait(int interruptMode)
1672            throws InterruptedException JavaDoc {
1673            if (interruptMode == THROW_IE)
1674                throw new InterruptedException JavaDoc();
1675            else if (interruptMode == REINTERRUPT)
1676                selfInterrupt();
1677        }
1678
1679        /**
1680         * Implements interruptible condition wait.
1681         * <ol>
1682         * <li> If current thread is interrupted, throw InterruptedException
1683         * <li> Save lock state returned by {@link #getState}
1684         * <li> Invoke {@link #release} with
1685         * saved state as argument, throwing
1686         * IllegalMonitorStateException if it fails.
1687         * <li> Block until signalled or interrupted
1688         * <li> Reacquire by invoking specialized version of
1689         * {@link #acquire} with saved state as argument.
1690         * <li> If interrupted while blocked in step 4, throw exception
1691         * </ol>
1692         */

1693        public final void await() throws InterruptedException JavaDoc {
1694            if (Thread.interrupted())
1695                throw new InterruptedException JavaDoc();
1696            Node node = addConditionWaiter();
1697            long savedState = fullyRelease(node);
1698            int interruptMode = 0;
1699            while (!isOnSyncQueue(node)) {
1700                LockSupport.park(this);
1701                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1702                    break;
1703            }
1704            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1705                interruptMode = REINTERRUPT;
1706            if (node.nextWaiter != null)
1707                unlinkCancelledWaiters();
1708            if (interruptMode != 0)
1709                reportInterruptAfterWait(interruptMode);
1710        }
1711
1712        /**
1713         * Implements timed condition wait.
1714         * <ol>
1715         * <li> If current thread is interrupted, throw InterruptedException
1716         * <li> Save lock state returned by {@link #getState}
1717         * <li> Invoke {@link #release} with
1718         * saved state as argument, throwing
1719         * IllegalMonitorStateException if it fails.
1720         * <li> Block until signalled, interrupted, or timed out
1721         * <li> Reacquire by invoking specialized version of
1722         * {@link #acquire} with saved state as argument.
1723         * <li> If interrupted while blocked in step 4, throw InterruptedException
1724         * </ol>
1725         */

1726        public final long awaitNanos(long nanosTimeout) throws InterruptedException JavaDoc {
1727            if (Thread.interrupted())
1728                throw new InterruptedException JavaDoc();
1729            Node node = addConditionWaiter();
1730            long savedState = fullyRelease(node);
1731            long lastTime = System.nanoTime();
1732            int interruptMode = 0;
1733            while (!isOnSyncQueue(node)) {
1734                if (nanosTimeout <= 0L) {
1735                    transferAfterCancelledWait(node);
1736                    break;
1737                }
1738                LockSupport.parkNanos(this, nanosTimeout);
1739                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1740                    break;
1741
1742                long now = System.nanoTime();
1743                nanosTimeout -= now - lastTime;
1744                lastTime = now;
1745            }
1746            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1747                interruptMode = REINTERRUPT;
1748            if (node.nextWaiter != null)
1749                unlinkCancelledWaiters();
1750            if (interruptMode != 0)
1751                reportInterruptAfterWait(interruptMode);
1752            return nanosTimeout - (System.nanoTime() - lastTime);
1753        }
1754
1755        /**
1756         * Implements absolute timed condition wait.
1757         * <ol>
1758         * <li> If current thread is interrupted, throw InterruptedException
1759         * <li> Save lock state returned by {@link #getState}
1760         * <li> Invoke {@link #release} with
1761         * saved state as argument, throwing
1762         * IllegalMonitorStateException if it fails.
1763         * <li> Block until signalled, interrupted, or timed out
1764         * <li> Reacquire by invoking specialized version of
1765         * {@link #acquire} with saved state as argument.
1766         * <li> If interrupted while blocked in step 4, throw InterruptedException
1767         * <li> If timed out while blocked in step 4, return false, else true
1768         * </ol>
1769         */

1770        public final boolean awaitUntil(Date deadline) throws InterruptedException JavaDoc {
1771            if (deadline == null)
1772                throw new NullPointerException JavaDoc();
1773            long abstime = deadline.getTime();
1774            if (Thread.interrupted())
1775                throw new InterruptedException JavaDoc();
1776            Node node = addConditionWaiter();
1777            long savedState = fullyRelease(node);
1778            boolean timedout = false;
1779            int interruptMode = 0;
1780            while (!isOnSyncQueue(node)) {
1781                if (System.currentTimeMillis() > abstime) {
1782                    timedout = transferAfterCancelledWait(node);
1783                    break;
1784                }
1785                LockSupport.parkUntil(this, abstime);
1786                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1787                    break;
1788            }
1789            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1790                interruptMode = REINTERRUPT;
1791            if (node.nextWaiter != null)
1792                unlinkCancelledWaiters();
1793            if (interruptMode != 0)
1794                reportInterruptAfterWait(interruptMode);
1795            return !timedout;
1796        }
1797
1798        /**
1799         * Implements timed condition wait.
1800         * <ol>
1801         * <li> If current thread is interrupted, throw InterruptedException
1802         * <li> Save lock state returned by {@link #getState}
1803         * <li> Invoke {@link #release} with
1804         * saved state as argument, throwing
1805         * IllegalMonitorStateException if it fails.
1806         * <li> Block until signalled, interrupted, or timed out
1807         * <li> Reacquire by invoking specialized version of
1808         * {@link #acquire} with saved state as argument.
1809         * <li> If interrupted while blocked in step 4, throw InterruptedException
1810         * <li> If timed out while blocked in step 4, return false, else true
1811         * </ol>
1812         */

1813        public final boolean await(long time, TimeUnit unit) throws InterruptedException JavaDoc {
1814            if (unit == null)
1815                throw new NullPointerException JavaDoc();
1816            long nanosTimeout = unit.toNanos(time);
1817            if (Thread.interrupted())
1818                throw new InterruptedException JavaDoc();
1819            Node node = addConditionWaiter();
1820            long savedState = fullyRelease(node);
1821            long lastTime = System.nanoTime();
1822            boolean timedout = false;
1823            int interruptMode = 0;
1824            while (!isOnSyncQueue(node)) {
1825                if (nanosTimeout <= 0L) {
1826                    timedout = transferAfterCancelledWait(node);
1827                    break;
1828                }
1829                LockSupport.parkNanos(this, nanosTimeout);
1830                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1831                    break;
1832                long now = System.nanoTime();
1833                nanosTimeout -= now - lastTime;
1834                lastTime = now;
1835            }
1836            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1837                interruptMode = REINTERRUPT;
1838            if (node.nextWaiter != null)
1839                unlinkCancelledWaiters();
1840            if (interruptMode != 0)
1841                reportInterruptAfterWait(interruptMode);
1842            return !timedout;
1843        }
1844
1845        // support for instrumentation
1846

1847        /**
1848         * Returns true if this condition was created by the given
1849         * synchronization object.
1850         *
1851         * @return {@code true} if owned
1852         */

1853        final boolean isOwnedBy(AbstractQueuedLongSynchronizer JavaDoc sync) {
1854            return sync == AbstractQueuedLongSynchronizer.this;
1855        }
1856
1857        /**
1858         * Queries whether any threads are waiting on this condition.
1859         * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters}.
1860         *
1861         * @return {@code true} if there are any waiting threads
1862         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1863         * returns {@code false}
1864         */

1865        protected final boolean hasWaiters() {
1866            if (!isHeldExclusively())
1867                throw new IllegalMonitorStateException JavaDoc();
1868            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1869                if (w.waitStatus == Node.CONDITION)
1870                    return true;
1871            }
1872            return false;
1873        }
1874
1875        /**
1876         * Returns an estimate of the number of threads waiting on
1877         * this condition.
1878         * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength}.
1879         *
1880         * @return the estimated number of waiting threads
1881         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1882         * returns {@code false}
1883         */

1884        protected final int getWaitQueueLength() {
1885            if (!isHeldExclusively())
1886                throw new IllegalMonitorStateException JavaDoc();
1887            int n = 0;
1888            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1889                if (w.waitStatus == Node.CONDITION)
1890                    ++n;
1891            }
1892            return n;
1893        }
1894
1895        /**
1896         * Returns a collection containing those threads that may be
1897         * waiting on this Condition.
1898         * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads}.
1899         *
1900         * @return the collection of threads
1901         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1902         * returns {@code false}
1903         */

1904        protected final Collection<Thread JavaDoc> getWaitingThreads() {
1905            if (!isHeldExclusively())
1906                throw new IllegalMonitorStateException JavaDoc();
1907            ArrayList<Thread JavaDoc> list = new ArrayList<Thread JavaDoc>();
1908            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1909                if (w.waitStatus == Node.CONDITION) {
1910                    Thread JavaDoc t = w.thread;
1911                    if (t != null)
1912                        list.add(t);
1913                }
1914            }
1915            return list;
1916        }
1917    }
1918
1919    /**
1920     * Setup to support compareAndSet. We need to natively implement
1921     * this here: For the sake of permitting future enhancements, we
1922     * cannot explicitly subclass AtomicLong, which would be
1923     * efficient and useful otherwise. So, as the lesser of evils, we
1924     * natively implement using hotspot intrinsics API. And while we
1925     * are at it, we do the same for other CASable fields (which could
1926     * otherwise be done with atomic field updaters).
1927     */

1928    private static final Unsafe unsafe = Unsafe.getUnsafe();
1929    private static final long stateOffset;
1930    private static final long headOffset;
1931    private static final long tailOffset;
1932    private static final long waitStatusOffset;
1933    private static final long nextOffset;
1934
1935    static {
1936        try {
1937            stateOffset = unsafe.objectFieldOffset
1938                (AbstractQueuedLongSynchronizer JavaDoc.class.getDeclaredField("state"));
1939            headOffset = unsafe.objectFieldOffset
1940                (AbstractQueuedLongSynchronizer JavaDoc.class.getDeclaredField("head"));
1941            tailOffset = unsafe.objectFieldOffset
1942                (AbstractQueuedLongSynchronizer JavaDoc.class.getDeclaredField("tail"));
1943            waitStatusOffset = unsafe.objectFieldOffset
1944                (Node.class.getDeclaredField("waitStatus"));
1945            nextOffset = unsafe.objectFieldOffset
1946                (Node.class.getDeclaredField("next"));
1947
1948        } catch (Exception JavaDoc ex) { throw new Error JavaDoc(ex); }
1949    }
1950
1951    /**
1952     * CAS head field. Used only by enq
1953     */

1954    private final boolean compareAndSetHead(Node update) {
1955        return unsafe.compareAndSwapObject(this, headOffset, null, update);
1956    }
1957
1958    /**
1959     * CAS tail field. Used only by enq
1960     */

1961    private final boolean compareAndSetTail(Node expect, Node update) {
1962        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
1963    }
1964
1965    /**
1966     * CAS waitStatus field of a node.
1967     */

1968    private final static boolean compareAndSetWaitStatus(Node node,
1969                                                         int expect,
1970                                                         int update) {
1971        return unsafe.compareAndSwapInt(node, waitStatusOffset,
1972                                        expect, update);
1973    }
1974
1975    /**
1976     * CAS next field of a node.
1977     */

1978    private final static boolean compareAndSetNext(Node node,
1979                                                   Node expect,
1980                                                   Node update) {
1981        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
1982    }
1983}
1984
Popular Tags