KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > go > trove > util > ThreadPool


1 /* ====================================================================
2  * Trove - Copyright (c) 1997-2000 Walt Disney Internet Group
3  * ====================================================================
4  * The Tea Software License, Version 1.1
5  *
6  * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Walt Disney Internet Group (http://opensource.go.com/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must
28  * not be used to endorse or promote products derived from this
29  * software without prior written permission. For written
30  * permission, please contact opensource@dig.com.
31  *
32  * 5. Products derived from this software may not be called "Tea",
33  * "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet",
34  * "Kettle", "Trove" or "BeanDoc" appear in their name, without prior
35  * written permission of the Walt Disney Internet Group.
36  *
37  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
38  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
39  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
40  * DISCLAIMED. IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS
41  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
43  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
44  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
45  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
46  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
47  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48  * ====================================================================
49  *
50  * For more information about Tea, please see http://opensource.go.com/.
51  */

52
53 package com.go.trove.util;
54
55 import java.util.*;
56
57 /******************************************************************************
58  * A ThreadPool contains a collection of re-usable threads. There is a slight
59  * performance overhead in creating new threads, and so a ThreadPool can
60  * improve performance in systems that create short-lived threads. Pooled
61  * threads operate on Runnable targets and return back to the pool when the
62  * Runnable.run method exits.
63  *
64  * @author Brian S O'Neill
65  * @version
66  * <!--$$Revision:--> 8 <!-- $-->, <!--$$JustDate:--> 01/03/12 <!-- $-->
67  */

68 public class ThreadPool extends ThreadGroup JavaDoc {
69     private static int cThreadID;
70
71     private synchronized static int nextThreadID() {
72         return cThreadID++;
73     }
74
75     // Fields that use the monitor of this instance.
76

77     private long mTimeout = -1;
78     private long mIdleTimeout = -1;
79
80     // Fields that use the mListeners monitor.
81

82     private Collection mListeners = new LinkedList();
83
84     // Fields that use the mPool monitor.
85

86     // Pool is accessed like a stack.
87
private LinkedList mPool;
88     private int mMax;
89     private int mActive;
90     private boolean mDaemon;
91     private int mPriority;
92     private boolean mClosed;
93
94     /**
95      * Create a ThreadPool of daemon threads.
96      *
97      * @param name Name of ThreadPool
98      * @param max The maximum allowed number of threads
99      *
100      * @throws IllegalArgumentException
101      */

102     public ThreadPool(String JavaDoc name, int max)
103         throws IllegalArgumentException JavaDoc {
104
105         this(name, max, true);
106     }
107
108     /**
109      * Create a ThreadPool of daemon threads.
110      *
111      * @param parent Parent ThreadGroup
112      * @param name Name of ThreadPool
113      * @param max The maximum allowed number of threads
114      *
115      * @throws IllegalArgumentException
116      */

117     public ThreadPool(ThreadGroup JavaDoc parent, String JavaDoc name, int max)
118         throws IllegalArgumentException JavaDoc {
119
120         this(parent, name, max, true);
121     }
122
123     /**
124      * Create a ThreadPool.
125      *
126      * @param name Name of ThreadPool
127      * @param max The maximum allowed number of threads
128      * @param daemon Set to true to create ThreadPool of daemon threads
129      *
130      * @throws IllegalArgumentException
131      */

132     public ThreadPool(String JavaDoc name, int max, boolean daemon)
133         throws IllegalArgumentException JavaDoc {
134
135         super(name);
136
137         init(max, daemon);
138     }
139
140     /**
141      * Create a ThreadPool.
142      *
143      * @param parent Parent ThreadGroup
144      * @param name Name of ThreadPool
145      * @param max The maximum allowed number of threads
146      * @param daemon Set to true to create ThreadPool of daemon threads
147      *
148      * @throws IllegalArgumentException
149      */

150     public ThreadPool(ThreadGroup JavaDoc parent, String JavaDoc name, int max,boolean daemon)
151         throws IllegalArgumentException JavaDoc {
152
153         super(parent, name);
154
155         init(max, daemon);
156     }
157
158     private void init(int max, boolean daemon)
159         throws IllegalArgumentException JavaDoc {
160
161         if (max <= 0) {
162             throw new IllegalArgumentException JavaDoc
163                 ("Maximum number of threads must be greater than zero: " +
164                  max);
165         }
166
167         mMax = max;
168
169         mDaemon = daemon;
170         mPriority = Thread.currentThread().getPriority();
171         mClosed = false;
172
173         mPool = new LinkedList();
174     }
175
176     /**
177      * Sets the timeout (in milliseconds) for getting threads from the pool
178      * or for closing the pool. A negative value specifies an infinite timeout.
179      * Calling the start method that accepts a timeout value will override
180      * this setting.
181      */

182     public synchronized void setTimeout(long timeout) {
183         mTimeout = timeout;
184     }
185
186     /**
187      * Returns the timeout (in milliseconds) for getting threads from the pool.
188      * The default value is negative, which indicates an infinite wait.
189      */

190     public synchronized long getTimeout() {
191         return mTimeout;
192     }
193
194     /**
195      * Sets the timeout (in milliseconds) for idle threads to exit. A negative
196      * value specifies that an idle thread never exits.
197      */

198     public synchronized void setIdleTimeout(long timeout) {
199         mIdleTimeout = timeout;
200     }
201
202     /**
203      * Returns the idle timeout (in milliseconds) for threads to exit. The
204      * default value is negative, which indicates that idle threads never exit.
205      */

206     public synchronized long getIdleTimeout() {
207         return mIdleTimeout;
208     }
209
210     public void addThreadPoolListener(ThreadPoolListener listener) {
211         synchronized (mListeners) {
212             mListeners.add(listener);
213         }
214     }
215
216     public void removeThreadPoolListener(ThreadPoolListener listener) {
217         synchronized (mListeners) {
218             mListeners.remove(listener);
219         }
220     }
221
222     /**
223      * Returns the initial priority given to each thread in the pool. The
224      * default value is that of the thread that created the ThreadPool.
225      */

226     public int getPriority() {
227         synchronized (mPool) {
228             return mPriority;
229         }
230     }
231     
232     /**
233      * Sets the priority given to each thread in the pool.
234      *
235      * @throws IllegalArgumentException if priority is out of range
236      */

237     public void setPriority(int priority) throws IllegalArgumentException JavaDoc {
238         if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
239             throw new IllegalArgumentException JavaDoc
240                 ("Priority out of range: " + priority);
241         }
242
243         synchronized (mPool) {
244             mPriority = priority;
245         }
246     }
247
248     /**
249      * @return The maximum allowed number of threads.
250      */

251     public int getMaximumAllowed() {
252         synchronized (mPool) {
253             return mMax;
254         }
255     }
256
257     /**
258      * @return The number of currently available threads in the pool.
259      */

260     public int getAvailableCount() {
261         synchronized (mPool) {
262             return mPool.size();
263         }
264     }
265
266     /**
267      * @return The total number of threads in the pool that are either
268      * available or in use.
269      */

270     public int getPooledCount() {
271         synchronized (mPool) {
272             return mActive;
273         }
274     }
275
276     /**
277      * @return The total number of threads in the ThreadGroup.
278      */

279     public int getThreadCount() {
280         return activeCount();
281     }
282
283     /**
284      * @return Each thread that is active in the entire ThreadGroup.
285      */

286     public Thread JavaDoc[] getAllThreads() {
287         int count = activeCount();
288         Thread JavaDoc[] threads = new Thread JavaDoc[count];
289         count = enumerate(threads);
290         if (count >= threads.length) {
291             return sort(threads);
292         }
293         else {
294             Thread JavaDoc[] newThreads = new Thread JavaDoc[count];
295             System.arraycopy(threads, 0, newThreads, 0, count);
296             return sort(newThreads);
297         }
298     }
299
300     private Thread JavaDoc[] sort(Thread JavaDoc[] threads) {
301         Comparator c = BeanComparator.forClass(Thread JavaDoc.class)
302             .orderBy("threadGroup.name")
303             .orderBy("name")
304             .orderBy("priority");
305         Arrays.sort(threads, c);
306         return threads;
307     }
308
309     /**
310      * Waits for a Thread to become available and starts a Runnable in it.
311      * If there are no available threads and the number of active threads is
312      * less than the maximum allowed, then a newly created thread is returned.
313      *
314      * @param target The Runnable instance that gets started by the returned
315      * thread.
316      * @exception NoThreadException If no thread could be obtained.
317      * @exception InterruptedException If interrupted while waiting for a
318      * thread to become available.
319      * @return A Thread that has been started on the given Runnable.
320      */

321     public Thread JavaDoc start(Runnable JavaDoc target)
322         throws NoThreadException, InterruptedException JavaDoc
323     {
324         try {
325             return start0(target, getTimeout(), null);
326         }
327         catch (NoThreadException e) {
328             e.fillInStackTrace();
329             throw e;
330         }
331     }
332
333     /**
334      * Waits for a Thread to become available and starts a Runnable in it.
335      * If there are no available threads and the number of active threads is
336      * less than the maximum allowed, then a newly created thread is returned.
337      *
338      * @param target The Runnable instance that gets started by the returned
339      * thread.
340      * @param timeout Milliseconds to wait for a thread to become
341      * available. If zero, don't wait at all. If negative, wait forever.
342      * @exception NoThreadException If no thread could be obtained.
343      * @exception InterruptedException If interrupted while waiting for a
344      * thread to become available.
345      * @return A Thread that has been started on the given Runnable.
346      */

347     public Thread JavaDoc start(Runnable JavaDoc target, long timeout)
348         throws NoThreadException, InterruptedException JavaDoc
349     {
350         try {
351             return start0(target, timeout, null);
352         }
353         catch (NoThreadException e) {
354             e.fillInStackTrace();
355             throw e;
356         }
357     }
358
359
360     /**
361      * Waits for a Thread to become available and starts a Runnable in it.
362      * If there are no available threads and the number of active threads is
363      * less than the maximum allowed, then a newly created thread is returned.
364      *
365      * @param target The Runnable instance that gets started by the returned
366      * thread.
367      * @param name The name to give the thread.
368      * @exception NoThreadException If no thread could be obtained.
369      * @exception InterruptedException If interrupted while waiting for a
370      * thread to become available.
371      * @return A Thread that has been started on the given Runnable.
372      */

373     public Thread JavaDoc start(Runnable JavaDoc target, String JavaDoc name)
374         throws NoThreadException, InterruptedException JavaDoc
375     {
376         try {
377             return start0(target, getTimeout(), name);
378         }
379         catch (NoThreadException e) {
380             e.fillInStackTrace();
381             throw e;
382         }
383     }
384
385     /**
386      * Waits for a Thread to become available and starts a Runnable in it.
387      * If there are no available threads and the number of active threads is
388      * less than the maximum allowed, then a newly created thread is returned.
389      *
390      * @param target The Runnable instance that gets started by the returned
391      * thread.
392      * @param timeout Milliseconds to wait for a thread to become
393      * @param name The name to give the thread.
394      * available. If zero, don't wait at all. If negative, wait forever.
395      * @exception NoThreadException If no thread could be obtained.
396      * @exception InterruptedException If interrupted while waiting for a
397      * thread to become available.
398      * @return A Thread that has been started on the given Runnable.
399      */

400     public Thread JavaDoc start(Runnable JavaDoc target, long timeout, String JavaDoc name)
401         throws NoThreadException, InterruptedException JavaDoc
402     {
403         try {
404             return start0(target, timeout, name);
405         }
406         catch (NoThreadException e) {
407             e.fillInStackTrace();
408             throw e;
409         }
410     }
411
412     private Thread JavaDoc start0(Runnable JavaDoc target, long timeout, String JavaDoc name)
413         throws NoThreadException, InterruptedException JavaDoc
414     {
415         PooledThread thread;
416
417         while (true) {
418             synchronized (mPool) {
419                 closeCheck();
420
421                 // Obtain a thread from the pool if non-empty.
422
if (mPool.size() > 0) {
423                     thread = (PooledThread)mPool.removeLast();
424                 }
425                 else {
426                     // Create a new thread if the number of active threads
427
// is less than the maximum allowed.
428
if (mActive < mMax) {
429                         return startThread(target, name);
430                     }
431                     else {
432                         break;
433                     }
434                 }
435             }
436
437             if (name != null) {
438                 thread.setName(name);
439             }
440             
441             if (thread.setTarget(target)) {
442                 return thread;
443             }
444             
445             // Couldn't set the target because the pooled thread is exiting.
446
// Wait for it to exit to ensure that the active count is less
447
// than the maximum and try to obtain another thread.
448
thread.join();
449         }
450         
451         if (timeout == 0) {
452             throw new NoThreadException("No thread available from " + this);
453         }
454
455         // Wait for a thread to become available in the pool.
456
synchronized (mPool) {
457             closeCheck();
458
459             if (timeout < 0) {
460                 while (mPool.size() <= 0) {
461                     mPool.wait(0);
462                     closeCheck();
463                 }
464             }
465             else {
466                 long expireTime = System.currentTimeMillis() + timeout;
467                 while (mPool.size() <= 0) {
468                     mPool.wait(timeout);
469                     closeCheck();
470
471                     // Thread could have been notified, but another thread may
472
// have stolen the thread away.
473
if (mPool.size() <= 0 &&
474                         System.currentTimeMillis() > expireTime) {
475                         
476                         throw new NoThreadException
477                             ("No thread available after waiting " +
478                              timeout + " milliseconds: " + this);
479                     }
480                 }
481             }
482         
483             thread = (PooledThread)mPool.removeLast();
484             if (name != null) {
485                 thread.setName(name);
486             }
487         
488             if (thread.setTarget(target)) {
489                 return thread;
490             }
491         }
492         
493         // Couldn't set the target because the pooled thread is exiting.
494
// Wait for it to exit to ensure that the active count is less
495
// than the maximum and create a new thread.
496
thread.join();
497         return startThread(target, name);
498     }
499
500     public boolean isClosed() {
501         return mClosed;
502     }
503
504     /**
505      * Will close down all the threads in the pool as they become
506      * available. This method may block forever if any threads are
507      * never returned to the thread pool.
508      */

509     public void close() throws InterruptedException JavaDoc {
510         close(getTimeout());
511     }
512
513     /**
514      * Will close down all the threads in the pool as they become
515      * available. If all the threads cannot become available within the
516      * specified timeout, any active threads not yet returned to the
517      * thread pool are interrupted.
518      *
519      * @param timeout Milliseconds to wait before unavailable threads
520      * are interrupted. If zero, don't wait at all. If negative, wait forever.
521      */

522     public void close(long timeout) throws InterruptedException JavaDoc {
523         synchronized (mPool) {
524             mClosed = true;
525             mPool.notifyAll();
526             
527             if (timeout != 0) {
528                 if (timeout < 0) {
529                     while (mActive > 0) {
530                         // Infinite wait for notification.
531
mPool.wait(0);
532                     }
533                 }
534                 else {
535                     long expireTime = System.currentTimeMillis() + timeout;
536                     while (mActive > 0) {
537                         mPool.wait(timeout);
538                         if (System.currentTimeMillis() > expireTime) {
539                             break;
540                         }
541                     }
542                 }
543             }
544         }
545
546         interrupt();
547     }
548
549     private PooledThread startThread(Runnable JavaDoc target, String JavaDoc name) {
550         PooledThread thread;
551
552         synchronized (mPool) {
553             mActive++;
554             thread = new PooledThread(getName() + ' ' + nextThreadID());
555             thread.setPriority(mPriority);
556             thread.setDaemon(mDaemon);
557             
558             if (name != null) {
559                 thread.setName(name);
560             }
561
562             thread.setTarget(target);
563             thread.start();
564         }
565
566         ThreadPoolEvent event = new ThreadPoolEvent(this, thread);
567         synchronized (mListeners) {
568             for (Iterator it = mListeners.iterator(); it.hasNext();) {
569                 ((ThreadPoolListener)it.next()).threadStarted(event);
570             }
571         }
572
573         return thread;
574     }
575
576     private void closeCheck() throws NoThreadException {
577         if (mClosed) {
578             throw new NoThreadException("Thread pool is closed", true);
579         }
580     }
581
582     void threadAvailable(PooledThread thread) {
583         synchronized (mPool) {
584             if (thread.getPriority() != mPriority) {
585                 thread.setPriority(mPriority);
586             }
587             mPool.addLast(thread);
588             mPool.notify();
589         }
590     }
591
592     void threadExiting(PooledThread thread) {
593         synchronized (mPool) {
594             if (mPool.remove(thread)) {
595                 mActive--;
596                 
597                 ThreadPoolEvent event = new ThreadPoolEvent(this, thread);
598                 synchronized (mListeners) {
599                     for (Iterator it = mListeners.iterator(); it.hasNext();) {
600                         ((ThreadPoolListener)it.next()).threadExiting(event);
601                     }
602                 }
603                 
604                 mPool.notify();
605             }
606         }
607     }
608
609     private class PooledThread extends Thread JavaDoc {
610         private String JavaDoc mOriginalName;
611         private Runnable JavaDoc mTarget;
612         private boolean mExiting;
613
614         public PooledThread(String JavaDoc name) {
615             super(ThreadPool.this, name);
616             mOriginalName = name;
617         }
618
619         synchronized boolean setTarget(Runnable JavaDoc target) {
620             if (mTarget != null) {
621                 throw new IllegalStateException JavaDoc
622                     ("Target runnable in pooled thread is already set");
623             }
624
625             if (mExiting) {
626                 return false;
627             }
628             else {
629                 mTarget = target;
630                 notify();
631                 return true;
632             }
633         }
634
635         private synchronized Runnable JavaDoc waitForTarget() {
636             Runnable JavaDoc target;
637             
638             if ((target = mTarget) == null) {
639                 long idle = getIdleTimeout();
640                 
641                 if ((target = mTarget) == null) {
642                     if (idle != 0) {
643                         try {
644                             if (idle < 0) {
645                                 wait(0);
646                             }
647                             else {
648                                 wait(idle);
649                             }
650                         }
651                         catch (InterruptedException JavaDoc e) {
652                         }
653                     }
654                     
655                     if ((target = mTarget) == null) {
656                         mExiting = true;
657                     }
658                 }
659             }
660
661             return target;
662         }
663
664         public void run() {
665             try {
666                 while (!isClosed()) {
667                     if (Thread.interrupted()) {
668                         continue;
669                     }
670
671                     Runnable JavaDoc target;
672
673                     if ((target = waitForTarget()) == null) {
674                         break;
675                     }
676
677                     try {
678                         target.run();
679                     }
680                     catch (ThreadDeath JavaDoc death) {
681                         break;
682                     }
683                     catch (Throwable JavaDoc e) {
684                         uncaughtException(Thread.currentThread(), e);
685                         e = null;
686                     }
687
688                     // Allow the garbage collector to reclaim target from
689
// stack while we wait for another target.
690
target = null;
691
692                     mTarget = null;
693                     setName(mOriginalName);
694                     threadAvailable(this);
695                 }
696             }
697             finally {
698                 threadExiting(this);
699             }
700         }
701     }
702 }
703
Popular Tags