KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > tomcat > util > threads > ThreadPool


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 package org.apache.tomcat.util.threads;
19
20 import java.util.*;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.tomcat.util.res.StringManager;
25
26 /**
27  * A thread pool that is trying to copy the apache process management.
28  *
29  * Should we remove this in favor of Doug Lea's thread package?
30  *
31  * @author Gal Shachor
32  * @author Yoav Shapira <yoavs@apache.org>
33  */

34 public class ThreadPool {
35
36     private static Log log = LogFactory.getLog(ThreadPool.class);
37
38     private static StringManager sm =
39         StringManager.getManager("org.apache.tomcat.util.threads.res");
40
41     private static boolean logfull=true;
42
43     /*
44      * Default values ...
45      */

46     public static final int MAX_THREADS = 200;
47     public static final int MAX_THREADS_MIN = 10;
48     public static final int MAX_SPARE_THREADS = 50;
49     public static final int MIN_SPARE_THREADS = 4;
50     public static final int WORK_WAIT_TIMEOUT = 60*1000;
51
52     /*
53      * Where the threads are held.
54      */

55     protected ControlRunnable[] pool = null;
56
57     /*
58      * A monitor thread that monitors the pool for idel threads.
59      */

60     protected MonitorRunnable monitor;
61
62
63     /*
64      * Max number of threads that you can open in the pool.
65      */

66     protected int maxThreads;
67
68     /*
69      * Min number of idel threads that you can leave in the pool.
70      */

71     protected int minSpareThreads;
72
73     /*
74      * Max number of idel threads that you can leave in the pool.
75      */

76     protected int maxSpareThreads;
77
78     /*
79      * Number of threads in the pool.
80      */

81     protected int currentThreadCount;
82
83     /*
84      * Number of busy threads in the pool.
85      */

86     protected int currentThreadsBusy;
87
88     /*
89      * Flag that the pool should terminate all the threads and stop.
90      */

91     protected boolean stopThePool;
92
93     /* Flag to control if the main thread is 'daemon' */
94     protected boolean isDaemon=true;
95
96     /** The threads that are part of the pool.
97      * Key is Thread, value is the ControlRunnable
98      */

99     protected Hashtable threads=new Hashtable();
100
101     protected Vector listeners=new Vector();
102
103     /** Name of the threadpool
104      */

105     protected String JavaDoc name = "TP";
106
107     /**
108      * Sequence.
109      */

110     protected int sequence = 1;
111
112     /**
113      * Thread priority.
114      */

115     protected int threadPriority = Thread.NORM_PRIORITY;
116
117
118     /**
119      * Constructor.
120      */

121     public ThreadPool() {
122         maxThreads = MAX_THREADS;
123         maxSpareThreads = MAX_SPARE_THREADS;
124         minSpareThreads = MIN_SPARE_THREADS;
125         currentThreadCount = 0;
126         currentThreadsBusy = 0;
127         stopThePool = false;
128     }
129
130
131     /** Create a ThreadPool instance.
132      *
133      * @param jmx UNUSED
134      * @return ThreadPool instance. If JMX support is requested, you need to
135      * call register() in order to set a name.
136      */

137     public static ThreadPool createThreadPool(boolean jmx) {
138         return new ThreadPool();
139     }
140
141     public synchronized void start() {
142     stopThePool=false;
143         currentThreadCount = 0;
144         currentThreadsBusy = 0;
145
146         adjustLimits();
147
148         pool = new ControlRunnable[maxThreads];
149
150         openThreads(minSpareThreads);
151         if (maxSpareThreads < maxThreads) {
152             monitor = new MonitorRunnable(this);
153         }
154     }
155
156     public MonitorRunnable getMonitor() {
157         return monitor;
158     }
159   
160     /**
161      * Sets the thread priority for current
162      * and future threads in this pool.
163      *
164      * @param threadPriority The new priority
165      * @throws IllegalArgumentException If the specified
166      * priority is less than Thread.MIN_PRIORITY or
167      * more than Thread.MAX_PRIORITY
168      */

169     public synchronized void setThreadPriority(int threadPriority) {
170         if(log.isDebugEnabled())
171             log.debug(getClass().getName() +
172                       ": setPriority(" + threadPriority + "): here.");
173
174       if (threadPriority < Thread.MIN_PRIORITY) {
175         throw new IllegalArgumentException JavaDoc("new priority < MIN_PRIORITY");
176       } else if (threadPriority > Thread.MAX_PRIORITY) {
177         throw new IllegalArgumentException JavaDoc("new priority > MAX_PRIORITY");
178       }
179
180       // Set for future threads
181
this.threadPriority = threadPriority;
182
183       Enumeration currentThreads = getThreads();
184       Thread JavaDoc t = null;
185       while(currentThreads.hasMoreElements()) {
186         t = (Thread JavaDoc) currentThreads.nextElement();
187         t.setPriority(threadPriority);
188       }
189     }
190
191     /**
192      * Returns the priority level of current and
193      * future threads in this pool.
194      *
195      * @return The priority
196      */

197     public int getThreadPriority() {
198       return threadPriority;
199     }
200      
201
202     public void setMaxThreads(int maxThreads) {
203         this.maxThreads = maxThreads;
204     }
205
206     public int getMaxThreads() {
207         return maxThreads;
208     }
209
210     public void setMinSpareThreads(int minSpareThreads) {
211         this.minSpareThreads = minSpareThreads;
212     }
213
214     public int getMinSpareThreads() {
215         return minSpareThreads;
216     }
217
218     public void setMaxSpareThreads(int maxSpareThreads) {
219         this.maxSpareThreads = maxSpareThreads;
220     }
221
222     public int getMaxSpareThreads() {
223         return maxSpareThreads;
224     }
225
226     public int getCurrentThreadCount() {
227         return currentThreadCount;
228     }
229
230     public int getCurrentThreadsBusy() {
231         return currentThreadsBusy;
232     }
233
234     public boolean isDaemon() {
235         return isDaemon;
236     }
237
238     public static int getDebug() {
239         return 0;
240     }
241
242     /** The default is true - the created threads will be
243      * in daemon mode. If set to false, the control thread
244      * will not be daemon - and will keep the process alive.
245      */

246     public void setDaemon( boolean b ) {
247         isDaemon=b;
248     }
249     
250     public boolean getDaemon() {
251         return isDaemon;
252     }
253
254     public void setName(String JavaDoc name) {
255         this.name = name;
256     }
257
258     public String JavaDoc getName() {
259         return name;
260     }
261
262     public int getSequence() {
263         return sequence++;
264     }
265
266     public void addThread( Thread JavaDoc t, ControlRunnable cr ) {
267         threads.put( t, cr );
268         for( int i=0; i<listeners.size(); i++ ) {
269             ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i);
270             tpl.threadStart(this, t);
271         }
272     }
273
274     public void removeThread( Thread JavaDoc t ) {
275         threads.remove(t);
276         for( int i=0; i<listeners.size(); i++ ) {
277             ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i);
278             tpl.threadEnd(this, t);
279         }
280     }
281
282     public void addThreadPoolListener( ThreadPoolListener tpl ) {
283         listeners.addElement( tpl );
284     }
285
286     public Enumeration getThreads(){
287         return threads.keys();
288     }
289
290     public void run(Runnable JavaDoc r) {
291         ControlRunnable c = findControlRunnable();
292         c.runIt(r);
293     }
294     
295     //
296
// You may wonder what you see here ... basically I am trying
297
// to maintain a stack of threads. This way locality in time
298
// is kept and there is a better chance to find residues of the
299
// thread in memory next time it runs.
300
//
301

302     /**
303      * Executes a given Runnable on a thread in the pool, block if needed.
304      */

305     public void runIt(ThreadPoolRunnable r) {
306         if(null == r) {
307             throw new NullPointerException JavaDoc();
308         }
309
310         ControlRunnable c = findControlRunnable();
311         c.runIt(r);
312     }
313
314     private ControlRunnable findControlRunnable() {
315         ControlRunnable c=null;
316
317         if ( stopThePool ) {
318             throw new IllegalStateException JavaDoc();
319         }
320
321         // Obtain a free thread from the pool.
322
synchronized(this) {
323
324             while (currentThreadsBusy == currentThreadCount) {
325                  // All threads are busy
326
if (currentThreadCount < maxThreads) {
327                     // Not all threads were open,
328
// Open new threads up to the max number of idel threads
329
int toOpen = currentThreadCount + minSpareThreads;
330                     openThreads(toOpen);
331                 } else {
332                     logFull(log, currentThreadCount, maxThreads);
333                     // Wait for a thread to become idel.
334
try {
335                         this.wait();
336                     }
337                     // was just catch Throwable -- but no other
338
// exceptions can be thrown by wait, right?
339
// So we catch and ignore this one, since
340
// it'll never actually happen, since nowhere
341
// do we say pool.interrupt().
342
catch(InterruptedException JavaDoc e) {
343                         log.error("Unexpected exception", e);
344                     }
345             if( log.isDebugEnabled() ) {
346             log.debug("Finished waiting: CTC="+currentThreadCount +
347                   ", CTB=" + currentThreadsBusy);
348                     }
349                     // Pool was stopped. Get away of the pool.
350
if( stopThePool) {
351                         break;
352                     }
353                 }
354             }
355             // Pool was stopped. Get away of the pool.
356
if(0 == currentThreadCount || stopThePool) {
357                 throw new IllegalStateException JavaDoc();
358             }
359                     
360             // If we are here it means that there is a free thread. Take it.
361
int pos = currentThreadCount - currentThreadsBusy - 1;
362             c = pool[pos];
363             pool[pos] = null;
364             currentThreadsBusy++;
365
366         }
367         return c;
368     }
369
370     private static void logFull(Log loghelper, int currentThreadCount,
371                                 int maxThreads) {
372     if( logfull ) {
373             log.error(sm.getString("threadpool.busy",
374                                    new Integer JavaDoc(currentThreadCount),
375                                    new Integer JavaDoc(maxThreads)));
376             logfull=false;
377         } else if( log.isDebugEnabled() ) {
378             log.debug("All threads are busy " + currentThreadCount + " " +
379                       maxThreads );
380         }
381     }
382
383     /**
384      * Stop the thread pool
385      */

386     public synchronized void shutdown() {
387         if(!stopThePool) {
388             stopThePool = true;
389             if (monitor != null) {
390                 monitor.terminate();
391                 monitor = null;
392             }
393             for(int i = 0; i < currentThreadCount - currentThreadsBusy; i++) {
394                 try {
395                     pool[i].terminate();
396                 } catch(Throwable JavaDoc t) {
397                     /*
398              * Do nothing... The show must go on, we are shutting
399              * down the pool and nothing should stop that.
400              */

401             log.error("Ignored exception while shutting down thread pool", t);
402                 }
403             }
404             currentThreadsBusy = currentThreadCount = 0;
405             pool = null;
406             notifyAll();
407         }
408     }
409
410     /**
411      * Called by the monitor thread to harvest idle threads.
412      */

413     protected synchronized void checkSpareControllers() {
414
415         if(stopThePool) {
416             return;
417         }
418
419         if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
420             int toFree = currentThreadCount -
421                          currentThreadsBusy -
422                          maxSpareThreads;
423
424             for(int i = 0 ; i < toFree ; i++) {
425                 ControlRunnable c = pool[currentThreadCount - currentThreadsBusy - 1];
426                 c.terminate();
427                 pool[currentThreadCount - currentThreadsBusy - 1] = null;
428                 currentThreadCount --;
429             }
430
431         }
432
433     }
434
435     /**
436      * Returns the thread to the pool.
437      * Called by threads as they are becoming idel.
438      */

439     protected synchronized void returnController(ControlRunnable c) {
440
441         if(0 == currentThreadCount || stopThePool) {
442             c.terminate();
443             return;
444         }
445
446         // atomic
447
currentThreadsBusy--;
448
449         pool[currentThreadCount - currentThreadsBusy - 1] = c;
450         notify();
451     }
452
453     /**
454      * Inform the pool that the specific thread finish.
455      *
456      * Called by the ControlRunnable.run() when the runnable
457      * throws an exception.
458      */

459     protected synchronized void notifyThreadEnd(ControlRunnable c) {
460         currentThreadsBusy--;
461         currentThreadCount --;
462         notify();
463     }
464
465
466     /*
467      * Checks for problematic configuration and fix it.
468      * The fix provides reasonable settings for a single CPU
469      * with medium load.
470      */

471     protected void adjustLimits() {
472         if(maxThreads <= 0) {
473             maxThreads = MAX_THREADS;
474         } else if (maxThreads < MAX_THREADS_MIN) {
475             log.warn(sm.getString("threadpool.max_threads_too_low",
476                                   new Integer JavaDoc(maxThreads),
477                                   new Integer JavaDoc(MAX_THREADS_MIN)));
478             maxThreads = MAX_THREADS_MIN;
479         }
480
481         if(maxSpareThreads >= maxThreads) {
482             maxSpareThreads = maxThreads;
483         }
484
485         if(maxSpareThreads <= 0) {
486             if(1 == maxThreads) {
487                 maxSpareThreads = 1;
488             } else {
489                 maxSpareThreads = maxThreads/2;
490             }
491         }
492
493         if(minSpareThreads > maxSpareThreads) {
494             minSpareThreads = maxSpareThreads;
495         }
496
497         if(minSpareThreads <= 0) {
498             if(1 == maxSpareThreads) {
499                 minSpareThreads = 1;
500             } else {
501                 minSpareThreads = maxSpareThreads/2;
502             }
503         }
504     }
505
506     /** Create missing threads.
507      *
508      * @param toOpen Total number of threads we'll have open
509      */

510     protected void openThreads(int toOpen) {
511
512         if(toOpen > maxThreads) {
513             toOpen = maxThreads;
514         }
515
516         for(int i = currentThreadCount ; i < toOpen ; i++) {
517             pool[i - currentThreadsBusy] = new ControlRunnable(this);
518         }
519
520         currentThreadCount = toOpen;
521     }
522
523     /** @deprecated */
524     void log( String JavaDoc s ) {
525     log.info(s);
526     //loghelper.flush();
527
}
528     
529     /**
530      * Periodically execute an action - cleanup in this case
531      */

532     public static class MonitorRunnable implements Runnable JavaDoc {
533         ThreadPool p;
534         Thread JavaDoc t;
535         int interval=WORK_WAIT_TIMEOUT;
536         boolean shouldTerminate;
537
538         MonitorRunnable(ThreadPool p) {
539             this.p=p;
540             this.start();
541         }
542
543         public void start() {
544             shouldTerminate = false;
545             t = new Thread JavaDoc(this);
546             t.setDaemon(p.getDaemon() );
547         t.setName(p.getName() + "-Monitor");
548             t.start();
549         }
550
551         public void setInterval(int i ) {
552             this.interval=i;
553         }
554
555         public void run() {
556             while(true) {
557                 try {
558
559                     // Sleep for a while.
560
synchronized(this) {
561                         this.wait(interval);
562                     }
563
564                     // Check if should terminate.
565
// termination happens when the pool is shutting down.
566
if(shouldTerminate) {
567                         break;
568                     }
569
570                     // Harvest idle threads.
571
p.checkSpareControllers();
572
573                 } catch(Throwable JavaDoc t) {
574             ThreadPool.log.error("Unexpected exception", t);
575                 }
576             }
577         }
578
579         public void stop() {
580             this.terminate();
581         }
582
583     /** Stop the monitor
584      */

585         public synchronized void terminate() {
586             shouldTerminate = true;
587             this.notify();
588         }
589     }
590
591     /**
592      * A Thread object that executes various actions ( ThreadPoolRunnable )
593      * under control of ThreadPool
594      */

595     public static class ControlRunnable implements Runnable JavaDoc {
596         /**
597      * ThreadPool where this thread will be returned
598      */

599         private ThreadPool p;
600
601     /**
602      * The thread that executes the actions
603      */

604         private ThreadWithAttributes t;
605
606     /**
607      * The method that is executed in this thread
608      */

609         
610         private ThreadPoolRunnable toRun;
611         private Runnable JavaDoc toRunRunnable;
612
613     /**
614      * Stop this thread
615      */

616     private boolean shouldTerminate;
617
618     /**
619      * Activate the execution of the action
620      */

621         private boolean shouldRun;
622
623     /**
624      * Per thread data - can be used only if all actions are
625      * of the same type.
626      * A better mechanism is possible ( that would allow association of
627      * thread data with action type ), but right now it's enough.
628      */

629     private boolean noThData;
630
631     /**
632      * Start a new thread, with no method in it
633      */

634         ControlRunnable(ThreadPool p) {
635             toRun = null;
636             shouldTerminate = false;
637             shouldRun = false;
638             this.p = p;
639             t = new ThreadWithAttributes(p, this);
640             t.setDaemon(true);
641             t.setName(p.getName() + "-Processor" + p.getSequence());
642             t.setPriority(p.getThreadPriority());
643             p.addThread( t, this );
644         noThData=true;
645             t.start();
646         }
647
648         public void run() {
649             boolean _shouldRun = false;
650             boolean _shouldTerminate = false;
651             ThreadPoolRunnable _toRun = null;
652             try {
653                 while (true) {
654                     try {
655                         /* Wait for work. */
656                         synchronized (this) {
657                             while (!shouldRun && !shouldTerminate) {
658                                 this.wait();
659                             }
660                             _shouldRun = shouldRun;
661                             _shouldTerminate = shouldTerminate;
662                             _toRun = toRun;
663                         }
664
665                         if (_shouldTerminate) {
666                             if (ThreadPool.log.isDebugEnabled())
667                                 ThreadPool.log.debug("Terminate");
668                             break;
669                         }
670
671                         /* Check if should execute a runnable. */
672                         try {
673                             if (noThData) {
674                                 if (_toRun != null) {
675                                     Object JavaDoc thData[] = _toRun.getInitData();
676                                     t.setThreadData(p, thData);
677                                     if (ThreadPool.log.isDebugEnabled())
678                                         ThreadPool.log.debug(
679                                             "Getting new thread data");
680                                 }
681                                 noThData = false;
682                             }
683
684                             if (_shouldRun) {
685                                 if (_toRun != null) {
686                                     _toRun.runIt(t.getThreadData(p));
687                                 } else if (toRunRunnable != null) {
688                                     toRunRunnable.run();
689                                 } else {
690                                     if (ThreadPool.log.isDebugEnabled())
691                                     ThreadPool.log.debug("No toRun ???");
692                                 }
693                             }
694                         } catch (Throwable JavaDoc t) {
695                             ThreadPool.log.error(sm.getString
696                                 ("threadpool.thread_error", t, toRun.toString()));
697                             /*
698                              * The runnable throw an exception (can be even a ThreadDeath),
699                              * signalling that the thread die.
700                              *
701                             * The meaning is that we should release the thread from
702                             * the pool.
703                             */

704                             _shouldTerminate = true;
705                             _shouldRun = false;
706                             p.notifyThreadEnd(this);
707                         } finally {
708                             if (_shouldRun) {
709                                 shouldRun = false;
710                                 /*
711                                 * Notify the pool that the thread is now idle.
712                                  */

713                                 p.returnController(this);
714                             }
715                         }
716
717                         /*
718                         * Check if should terminate.
719                         * termination happens when the pool is shutting down.
720                         */

721                         if (_shouldTerminate) {
722                             break;
723                         }
724                     } catch (InterruptedException JavaDoc ie) { /* for the wait operation */
725                         // can never happen, since we don't call interrupt
726
ThreadPool.log.error("Unexpected exception", ie);
727                     }
728                 }
729             } finally {
730                 p.removeThread(Thread.currentThread());
731             }
732         }
733         /** Run a task
734          *
735          * @param toRun
736          */

737         public synchronized void runIt(Runnable JavaDoc toRun) {
738         this.toRunRunnable = toRun;
739         // Do not re-init, the whole idea is to run init only once per
740
// thread - the pool is supposed to run a single task, that is
741
// initialized once.
742
// noThData = true;
743
shouldRun = true;
744             this.notify();
745         }
746
747         /** Run a task
748          *
749          * @param toRun
750          */

751         public synchronized void runIt(ThreadPoolRunnable toRun) {
752         this.toRun = toRun;
753         // Do not re-init, the whole idea is to run init only once per
754
// thread - the pool is supposed to run a single task, that is
755
// initialized once.
756
// noThData = true;
757
shouldRun = true;
758             this.notify();
759         }
760
761         public void stop() {
762             this.terminate();
763         }
764
765         public void kill() {
766             t.stop();
767         }
768
769         public synchronized void terminate() {
770             shouldTerminate = true;
771             this.notify();
772         }
773     }
774
775     /**
776      * Debug display of the stage of each thread. The return is html style,
777      * for display in the console ( it can be easily parsed too ).
778      *
779      * @return The thread status display
780      */

781     public String JavaDoc threadStatusString() {
782         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
783         Iterator it=threads.keySet().iterator();
784         sb.append("<ul>");
785         while( it.hasNext()) {
786             sb.append("<li>");
787             ThreadWithAttributes twa=(ThreadWithAttributes)
788                     it.next();
789             sb.append(twa.getCurrentStage(this) ).append(" ");
790             sb.append( twa.getParam(this));
791             sb.append( "</li>\n");
792         }
793         sb.append("</ul>");
794         return sb.toString();
795     }
796
797     /** Return an array with the status of each thread. The status
798      * indicates the current request processing stage ( for tomcat ) or
799      * whatever the thread is doing ( if the application using TP provide
800      * this info )
801      *
802      * @return The status of all threads
803      */

804     public String JavaDoc[] getThreadStatus() {
805         String JavaDoc status[]=new String JavaDoc[ threads.size()];
806         Iterator it=threads.keySet().iterator();
807         for( int i=0; ( i<status.length && it.hasNext()); i++ ) {
808             ThreadWithAttributes twa=(ThreadWithAttributes)
809                     it.next();
810             status[i]=twa.getCurrentStage(this);
811         }
812         return status;
813     }
814
815     /** Return an array with the current "param" ( XXX better name ? )
816      * of each thread. This is typically the last request.
817      *
818      * @return The params of all threads
819      */

820     public String JavaDoc[] getThreadParam() {
821         String JavaDoc status[]=new String JavaDoc[ threads.size()];
822         Iterator it=threads.keySet().iterator();
823         for( int i=0; ( i<status.length && it.hasNext()); i++ ) {
824             ThreadWithAttributes twa=(ThreadWithAttributes)
825                     it.next();
826             Object JavaDoc o=twa.getParam(this);
827             status[i]=(o==null)? null : o.toString();
828         }
829         return status;
830     }
831     
832     /** Interface to allow applications to be notified when
833      * a threads are created and stopped.
834      */

835     public static interface ThreadPoolListener {
836         public void threadStart( ThreadPool tp, Thread JavaDoc t);
837
838         public void threadEnd( ThreadPool tp, Thread JavaDoc t);
839     }
840 }
841
Popular Tags