KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > java > util > concurrent > Semaphore


1 /*
2  * @(#)Semaphore.java 1.8 04/07/12
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package java.util.concurrent;
9 import java.util.*;
10 import java.util.concurrent.locks.*;
11 import java.util.concurrent.atomic.*;
12
13 /**
14  * A counting semaphore. Conceptually, a semaphore maintains a set of
15  * permits. Each {@link #acquire} blocks if necessary until a permit is
16  * available, and then takes it. Each {@link #release} adds a permit,
17  * potentially releasing a blocking acquirer.
18  * However, no actual permit objects are used; the <tt>Semaphore</tt> just
19  * keeps a count of the number available and acts accordingly.
20  *
21  * <p>Semaphores are often used to restrict the number of threads than can
22  * access some (physical or logical) resource. For example, here is
23  * a class that uses a semaphore to control access to a pool of items:
24  * <pre>
25  * class Pool {
26  * private static final MAX_AVAILABLE = 100;
27  * private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
28  *
29  * public Object getItem() throws InterruptedException {
30  * available.acquire();
31  * return getNextAvailableItem();
32  * }
33  *
34  * public void putItem(Object x) {
35  * if (markAsUnused(x))
36  * available.release();
37  * }
38  *
39  * // Not a particularly efficient data structure; just for demo
40  *
41  * protected Object[] items = ... whatever kinds of items being managed
42  * protected boolean[] used = new boolean[MAX_AVAILABLE];
43  *
44  * protected synchronized Object getNextAvailableItem() {
45  * for (int i = 0; i < MAX_AVAILABLE; ++i) {
46  * if (!used[i]) {
47  * used[i] = true;
48  * return items[i];
49  * }
50  * }
51  * return null; // not reached
52  * }
53  *
54  * protected synchronized boolean markAsUnused(Object item) {
55  * for (int i = 0; i < MAX_AVAILABLE; ++i) {
56  * if (item == items[i]) {
57  * if (used[i]) {
58  * used[i] = false;
59  * return true;
60  * } else
61  * return false;
62  * }
63  * }
64  * return false;
65  * }
66  *
67  * }
68  * </pre>
69  *
70  * <p>Before obtaining an item each thread must acquire a permit from
71  * the semaphore, guaranteeing that an item is available for use. When
72  * the thread has finished with the item it is returned back to the
73  * pool and a permit is returned to the semaphore, allowing another
74  * thread to acquire that item. Note that no synchronization lock is
75  * held when {@link #acquire} is called as that would prevent an item
76  * from being returned to the pool. The semaphore encapsulates the
77  * synchronization needed to restrict access to the pool, separately
78  * from any synchronization needed to maintain the consistency of the
79  * pool itself.
80  *
81  * <p>A semaphore initialized to one, and which is used such that it
82  * only has at most one permit available, can serve as a mutual
83  * exclusion lock. This is more commonly known as a <em>binary
84  * semaphore</em>, because it only has two states: one permit
85  * available, or zero permits available. When used in this way, the
86  * binary semaphore has the property (unlike many {@link Lock}
87  * implementations), that the &quot;lock&quot; can be released by a
88  * thread other than the owner (as semaphores have no notion of
89  * ownership). This can be useful in some specialized contexts, such
90  * as deadlock recovery.
91  *
92  * <p> The constructor for this class optionally accepts a
93  * <em>fairness</em> parameter. When set false, this class makes no
94  * guarantees about the order in which threads acquire permits. In
95  * particular, <em>barging</em> is permitted, that is, a thread
96  * invoking {@link #acquire} can be allocated a permit ahead of a
97  * thread that has been waiting - logically the new thread places itself at
98  * the head of the queue of waiting threads. When fairness is set true, the
99  * semaphore guarantees that threads invoking any of the {@link
100  * #acquire() acquire} methods are selected to obtain permits in the order in
101  * which their invocation of those methods was processed
102  * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
103  * applies to specific internal points of execution within these
104  * methods. So, it is possible for one thread to invoke
105  * <tt>acquire</tt> before another, but reach the ordering point after
106  * the other, and similarly upon return from the method.
107  * Also note that the untimed {@link #tryAcquire() tryAcquire} methods do not
108  * honor the fairness setting, but will take any permits that are
109  * available.
110  *
111  * <p>Generally, semaphores used to control resource access should be
112  * initialized as fair, to ensure that no thread is starved out from
113  * accessing a resource. When using semaphores for other kinds of
114  * synchronization control, the throughput advantages of non-fair
115  * ordering often outweigh fairness considerations.
116  *
117  * <p>This class also provides convenience methods to {@link
118  * #acquire(int) acquire} and {@link #release(int) release} multiple
119  * permits at a time. Beware of the increased risk of indefinite
120  * postponement when these methods are used without fairness set true.
121  *
122  * @since 1.5
123  * @author Doug Lea
124  *
125  */

126
127 public class Semaphore implements java.io.Serializable JavaDoc {
128     private static final long serialVersionUID = -3222578661600680210L;
129     /** All mechanics via AbstractQueuedSynchronizer subclass */
130     private final Sync sync;
131
132     /**
133      * Synchronization implementation for semaphore. Uses AQS state
134      * to represent permits. Subclassed into fair and nonfair
135      * versions.
136      */

137     abstract static class Sync extends AbstractQueuedSynchronizer {
138         Sync(int permits) {
139             setState(permits);
140         }
141         
142         final int getPermits() {
143             return getState();
144         }
145
146         final int nonfairTryAcquireShared(int acquires) {
147             for (;;) {
148                 int available = getState();
149                 int remaining = available - acquires;
150                 if (remaining < 0 ||
151                     compareAndSetState(available, remaining))
152                     return remaining;
153             }
154         }
155         
156         protected final boolean tryReleaseShared(int releases) {
157             for (;;) {
158                 int p = getState();
159                 if (compareAndSetState(p, p + releases))
160                     return true;
161             }
162         }
163
164         final void reducePermits(int reductions) {
165             for (;;) {
166                 int current = getState();
167                 int next = current - reductions;
168                 if (compareAndSetState(current, next))
169                     return;
170             }
171         }
172
173         final int drainPermits() {
174             for (;;) {
175                 int current = getState();
176                 if (current == 0 || compareAndSetState(current, 0))
177                     return current;
178             }
179         }
180     }
181
182     /**
183      * NonFair version
184      */

185     final static class NonfairSync extends Sync {
186         NonfairSync(int permits) {
187             super(permits);
188         }
189        
190         protected int tryAcquireShared(int acquires) {
191             return nonfairTryAcquireShared(acquires);
192         }
193     }
194
195     /**
196      * Fair version
197      */

198     final static class FairSync extends Sync {
199         FairSync(int permits) {
200             super(permits);
201         }
202         
203         protected int tryAcquireShared(int acquires) {
204             Thread JavaDoc current = Thread.currentThread();
205             for (;;) {
206                 Thread JavaDoc first = getFirstQueuedThread();
207                 if (first != null && first != current)
208                     return -1;
209                 int available = getState();
210                 int remaining = available - acquires;
211                 if (remaining < 0 ||
212                     compareAndSetState(available, remaining))
213                     return remaining;
214             }
215         }
216     }
217
218     /**
219      * Creates a <tt>Semaphore</tt> with the given number of
220      * permits and nonfair fairness setting.
221      * @param permits the initial number of permits available. This
222      * value may be negative, in which case releases must
223      * occur before any acquires will be granted.
224      */

225     public Semaphore(int permits) {
226         sync = new NonfairSync(permits);
227     }
228
229     /**
230      * Creates a <tt>Semaphore</tt> with the given number of
231      * permits and the given fairness setting.
232      * @param permits the initial number of permits available. This
233      * value may be negative, in which case releases must
234      * occur before any acquires will be granted.
235      * @param fair true if this semaphore will guarantee first-in
236      * first-out granting of permits under contention, else false.
237      */

238     public Semaphore(int permits, boolean fair) {
239         sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
240     }
241
242     /**
243      * Acquires a permit from this semaphore, blocking until one is
244      * available, or the thread is {@link Thread#interrupt interrupted}.
245      *
246      * <p>Acquires a permit, if one is available and returns immediately,
247      * reducing the number of available permits by one.
248      * <p>If no permit is available then the current thread becomes
249      * disabled for thread scheduling purposes and lies dormant until
250      * one of two things happens:
251      * <ul>
252      * <li>Some other thread invokes the {@link #release} method for this
253      * semaphore and the current thread is next to be assigned a permit; or
254      * <li>Some other thread {@link Thread#interrupt interrupts} the current
255      * thread.
256      * </ul>
257      *
258      * <p>If the current thread:
259      * <ul>
260      * <li>has its interrupted status set on entry to this method; or
261      * <li>is {@link Thread#interrupt interrupted} while waiting
262      * for a permit,
263      * </ul>
264      * then {@link InterruptedException} is thrown and the current thread's
265      * interrupted status is cleared.
266      *
267      * @throws InterruptedException if the current thread is interrupted
268      *
269      * @see Thread#interrupt
270      */

271     public void acquire() throws InterruptedException JavaDoc {
272         sync.acquireSharedInterruptibly(1);
273     }
274
275     /**
276      * Acquires a permit from this semaphore, blocking until one is
277      * available.
278      *
279      * <p>Acquires a permit, if one is available and returns immediately,
280      * reducing the number of available permits by one.
281      * <p>If no permit is available then the current thread becomes
282      * disabled for thread scheduling purposes and lies dormant until
283      * some other thread invokes the {@link #release} method for this
284      * semaphore and the current thread is next to be assigned a permit.
285      *
286      * <p>If the current thread
287      * is {@link Thread#interrupt interrupted} while waiting
288      * for a permit then it will continue to wait, but the time at which
289      * the thread is assigned a permit may change compared to the time it
290      * would have received the permit had no interruption occurred. When the
291      * thread does return from this method its interrupt status will be set.
292      *
293      */

294     public void acquireUninterruptibly() {
295         sync.acquireShared(1);
296     }
297
298     /**
299      * Acquires a permit from this semaphore, only if one is available at the
300      * time of invocation.
301      * <p>Acquires a permit, if one is available and returns immediately,
302      * with the value <tt>true</tt>,
303      * reducing the number of available permits by one.
304      *
305      * <p>If no permit is available then this method will return
306      * immediately with the value <tt>false</tt>.
307      *
308      * <p>Even when this semaphore has been set to use a
309      * fair ordering policy, a call to <tt>tryAcquire()</tt> <em>will</em>
310      * immediately acquire a permit if one is available, whether or not
311      * other threads are currently waiting.
312      * This &quot;barging&quot; behavior can be useful in certain
313      * circumstances, even though it breaks fairness. If you want to honor
314      * the fairness setting, then use
315      * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
316      * which is almost equivalent (it also detects interruption).
317      *
318      * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
319      * otherwise.
320      */

321     public boolean tryAcquire() {
322         return sync.nonfairTryAcquireShared(1) >= 0;
323     }
324
325     /**
326      * Acquires a permit from this semaphore, if one becomes available
327      * within the given waiting time and the
328      * current thread has not been {@link Thread#interrupt interrupted}.
329      * <p>Acquires a permit, if one is available and returns immediately,
330      * with the value <tt>true</tt>,
331      * reducing the number of available permits by one.
332      * <p>If no permit is available then
333      * the current thread becomes disabled for thread scheduling
334      * purposes and lies dormant until one of three things happens:
335      * <ul>
336      * <li>Some other thread invokes the {@link #release} method for this
337      * semaphore and the current thread is next to be assigned a permit; or
338      * <li>Some other thread {@link Thread#interrupt interrupts} the current
339      * thread; or
340      * <li>The specified waiting time elapses.
341      * </ul>
342      * <p>If a permit is acquired then the value <tt>true</tt> is returned.
343      * <p>If the current thread:
344      * <ul>
345      * <li>has its interrupted status set on entry to this method; or
346      * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
347      * a permit,
348      * </ul>
349      * then {@link InterruptedException} is thrown and the current thread's
350      * interrupted status is cleared.
351      * <p>If the specified waiting time elapses then the value <tt>false</tt>
352      * is returned.
353      * If the time is less than or equal to zero, the method will not wait
354      * at all.
355      *
356      * @param timeout the maximum time to wait for a permit
357      * @param unit the time unit of the <tt>timeout</tt> argument.
358      * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
359      * if the waiting time elapsed before a permit was acquired.
360      *
361      * @throws InterruptedException if the current thread is interrupted
362      *
363      * @see Thread#interrupt
364      *
365      */

366     public boolean tryAcquire(long timeout, TimeUnit JavaDoc unit)
367         throws InterruptedException JavaDoc {
368         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
369     }
370
371     /**
372      * Releases a permit, returning it to the semaphore.
373      * <p>Releases a permit, increasing the number of available permits
374      * by one.
375      * If any threads are trying to acquire a permit, then one
376      * is selected and given the permit that was just released.
377      * That thread is (re)enabled for thread scheduling purposes.
378      * <p>There is no requirement that a thread that releases a permit must
379      * have acquired that permit by calling {@link #acquire}.
380      * Correct usage of a semaphore is established by programming convention
381      * in the application.
382      */

383     public void release() {
384         sync.releaseShared(1);
385     }
386        
387     /**
388      * Acquires the given number of permits from this semaphore,
389      * blocking until all are available,
390      * or the thread is {@link Thread#interrupt interrupted}.
391      *
392      * <p>Acquires the given number of permits, if they are available,
393      * and returns immediately,
394      * reducing the number of available permits by the given amount.
395      *
396      * <p>If insufficient permits are available then the current thread becomes
397      * disabled for thread scheduling purposes and lies dormant until
398      * one of two things happens:
399      * <ul>
400      * <li>Some other thread invokes one of the {@link #release() release}
401      * methods for this semaphore, the current thread is next to be assigned
402      * permits and the number of available permits satisfies this request; or
403      * <li>Some other thread {@link Thread#interrupt interrupts} the current
404      * thread.
405      * </ul>
406      *
407      * <p>If the current thread:
408      * <ul>
409      * <li>has its interrupted status set on entry to this method; or
410      * <li>is {@link Thread#interrupt interrupted} while waiting
411      * for a permit,
412      * </ul>
413      * then {@link InterruptedException} is thrown and the current thread's
414      * interrupted status is cleared.
415      * Any permits that were to be assigned to this thread are instead
416      * assigned to other threads trying to acquire permits, as if
417      * permits had been made available by a call to {@link #release()}.
418      *
419      * @param permits the number of permits to acquire
420      *
421      * @throws InterruptedException if the current thread is interrupted
422      * @throws IllegalArgumentException if permits less than zero.
423      *
424      * @see Thread#interrupt
425      */

426     public void acquire(int permits) throws InterruptedException JavaDoc {
427         if (permits < 0) throw new IllegalArgumentException JavaDoc();
428         sync.acquireSharedInterruptibly(permits);
429     }
430
431     /**
432      * Acquires the given number of permits from this semaphore,
433      * blocking until all are available.
434      *
435      * <p>Acquires the given number of permits, if they are available,
436      * and returns immediately,
437      * reducing the number of available permits by the given amount.
438      *
439      * <p>If insufficient permits are available then the current thread becomes
440      * disabled for thread scheduling purposes and lies dormant until
441      * some other thread invokes one of the {@link #release() release}
442      * methods for this semaphore, the current thread is next to be assigned
443      * permits and the number of available permits satisfies this request.
444      *
445      * <p>If the current thread
446      * is {@link Thread#interrupt interrupted} while waiting
447      * for permits then it will continue to wait and its position in the
448      * queue is not affected. When the
449      * thread does return from this method its interrupt status will be set.
450      *
451      * @param permits the number of permits to acquire
452      * @throws IllegalArgumentException if permits less than zero.
453      *
454      */

455     public void acquireUninterruptibly(int permits) {
456         if (permits < 0) throw new IllegalArgumentException JavaDoc();
457         sync.acquireShared(permits);
458     }
459
460     /**
461      * Acquires the given number of permits from this semaphore, only
462      * if all are available at the time of invocation.
463      *
464      * <p>Acquires the given number of permits, if they are available, and
465      * returns immediately, with the value <tt>true</tt>,
466      * reducing the number of available permits by the given amount.
467      *
468      * <p>If insufficient permits are available then this method will return
469      * immediately with the value <tt>false</tt> and the number of available
470      * permits is unchanged.
471      *
472      * <p>Even when this semaphore has been set to use a fair ordering
473      * policy, a call to <tt>tryAcquire</tt> <em>will</em>
474      * immediately acquire a permit if one is available, whether or
475      * not other threads are currently waiting. This
476      * &quot;barging&quot; behavior can be useful in certain
477      * circumstances, even though it breaks fairness. If you want to
478      * honor the fairness setting, then use {@link #tryAcquire(int,
479      * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
480      * which is almost equivalent (it also detects interruption).
481      *
482      * @param permits the number of permits to acquire
483      *
484      * @return <tt>true</tt> if the permits were acquired and <tt>false</tt>
485      * otherwise.
486      * @throws IllegalArgumentException if permits less than zero.
487      */

488     public boolean tryAcquire(int permits) {
489         if (permits < 0) throw new IllegalArgumentException JavaDoc();
490         return sync.nonfairTryAcquireShared(permits) >= 0;
491     }
492
493     /**
494      * Acquires the given number of permits from this semaphore, if all
495      * become available within the given waiting time and the
496      * current thread has not been {@link Thread#interrupt interrupted}.
497      * <p>Acquires the given number of permits, if they are available and
498      * returns immediately, with the value <tt>true</tt>,
499      * reducing the number of available permits by the given amount.
500      * <p>If insufficient permits are available then
501      * the current thread becomes disabled for thread scheduling
502      * purposes and lies dormant until one of three things happens:
503      * <ul>
504      * <li>Some other thread invokes one of the {@link #release() release}
505      * methods for this semaphore, the current thread is next to be assigned
506      * permits and the number of available permits satisfies this request; or
507      * <li>Some other thread {@link Thread#interrupt interrupts} the current
508      * thread; or
509      * <li>The specified waiting time elapses.
510      * </ul>
511      * <p>If the permits are acquired then the value <tt>true</tt> is returned.
512      * <p>If the current thread:
513      * <ul>
514      * <li>has its interrupted status set on entry to this method; or
515      * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
516      * the permits,
517      * </ul>
518      * then {@link InterruptedException} is thrown and the current thread's
519      * interrupted status is cleared.
520      * Any permits that were to be assigned to this thread, are instead
521      * assigned to other threads trying to acquire permits, as if
522      * the permits had been made available by a call to {@link #release()}.
523      *
524      * <p>If the specified waiting time elapses then the value <tt>false</tt>
525      * is returned.
526      * If the time is
527      * less than or equal to zero, the method will not wait at all.
528      * Any permits that were to be assigned to this thread, are instead
529      * assigned to other threads trying to acquire permits, as if
530      * the permits had been made available by a call to {@link #release()}.
531      *
532      * @param permits the number of permits to acquire
533      * @param timeout the maximum time to wait for the permits
534      * @param unit the time unit of the <tt>timeout</tt> argument.
535      * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
536      * if the waiting time elapsed before all permits were acquired.
537      *
538      * @throws InterruptedException if the current thread is interrupted
539      * @throws IllegalArgumentException if permits less than zero.
540      *
541      * @see Thread#interrupt
542      *
543      */

544     public boolean tryAcquire(int permits, long timeout, TimeUnit JavaDoc unit)
545         throws InterruptedException JavaDoc {
546         if (permits < 0) throw new IllegalArgumentException JavaDoc();
547         return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
548     }
549
550     /**
551      * Releases the given number of permits, returning them to the semaphore.
552      * <p>Releases the given number of permits, increasing the number of
553      * available permits by that amount.
554      * If any threads are trying to acquire permits, then one
555      * is selected and given the permits that were just released.
556      * If the number of available permits satisfies that thread's request
557      * then that thread is (re)enabled for thread scheduling purposes;
558      * otherwise the thread will wait until sufficient permits are available.
559      * If there are still permits available
560      * after this thread's request has been satisfied, then those permits
561      * are assigned in turn to other threads trying to acquire permits.
562      *
563      * <p>There is no requirement that a thread that releases a permit must
564      * have acquired that permit by calling {@link Semaphore#acquire acquire}.
565      * Correct usage of a semaphore is established by programming convention
566      * in the application.
567      *
568      * @param permits the number of permits to release
569      * @throws IllegalArgumentException if permits less than zero.
570      */

571     public void release(int permits) {
572         if (permits < 0) throw new IllegalArgumentException JavaDoc();
573         sync.releaseShared(permits);
574     }
575
576     /**
577      * Returns the current number of permits available in this semaphore.
578      * <p>This method is typically used for debugging and testing purposes.
579      * @return the number of permits available in this semaphore.
580      */

581     public int availablePermits() {
582         return sync.getPermits();
583     }
584
585     /**
586      * Acquire and return all permits that are immediately available.
587      * @return the number of permits
588      */

589     public int drainPermits() {
590         return sync.drainPermits();
591     }
592
593     /**
594      * Shrinks the number of available permits by the indicated
595      * reduction. This method can be useful in subclasses that use
596      * semaphores to track resources that become unavailable. This
597      * method differs from <tt>acquire</tt> in that it does not block
598      * waiting for permits to become available.
599      * @param reduction the number of permits to remove
600      * @throws IllegalArgumentException if reduction is negative
601      */

602     protected void reducePermits(int reduction) {
603     if (reduction < 0) throw new IllegalArgumentException JavaDoc();
604         sync.reducePermits(reduction);
605     }
606
607     /**
608      * Returns true if this semaphore has fairness set true.
609      * @return true if this semaphore has fairness set true.
610      */

611     public boolean isFair() {
612         return sync instanceof FairSync;
613     }
614
615     /**
616      * Queries whether any threads are waiting to acquire. Note that
617      * because cancellations may occur at any time, a <tt>true</tt>
618      * return does not guarantee that any other thread will ever
619      * acquire. This method is designed primarily for use in
620      * monitoring of the system state.
621      *
622      * @return true if there may be other threads waiting to acquire
623      * the lock.
624      */

625     public final boolean hasQueuedThreads() {
626         return sync.hasQueuedThreads();
627     }
628
629     /**
630      * Returns an estimate of the number of threads waiting to
631      * acquire. The value is only an estimate because the number of
632      * threads may change dynamically while this method traverses
633      * internal data structures. This method is designed for use in
634      * monitoring of the system state, not for synchronization
635      * control.
636      * @return the estimated number of threads waiting for this lock
637      */

638     public final int getQueueLength() {
639         return sync.getQueueLength();
640     }
641
642     /**
643      * Returns a collection containing threads that may be waiting to
644      * acquire. Because the actual set of threads may change
645      * dynamically while constructing this result, the returned
646      * collection is only a best-effort estimate. The elements of the
647      * returned collection are in no particular order. This method is
648      * designed to facilitate construction of subclasses that provide
649      * more extensive monitoring facilities.
650      * @return the collection of threads
651      */

652     protected Collection<Thread JavaDoc> getQueuedThreads() {
653         return sync.getQueuedThreads();
654     }
655
656     /**
657      * Returns a string identifying this semaphore, as well as its state.
658      * The state, in brackets, includes the String
659      * &quot;Permits =&quot; followed by the number of permits.
660      * @return a string identifying this semaphore, as well as its
661      * state
662      */

663     public String JavaDoc toString() {
664         return super.toString() + "[Permits = " + sync.getPermits() + "]";
665     }
666
667 }
668
Popular Tags