KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jsmtpd > generic > threadpool > GrowingThreadPool


1 /*
2  *
3  * Jsmtpd, Java SMTP daemon
4  * Copyright (C) 2006 Jean-Francois POUX, jf.poux@laposte.net
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19  *
20  */

21 package org.jsmtpd.generic.threadpool;
22
23 import java.util.HashSet JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.LinkedList JavaDoc;
26 import java.util.List JavaDoc;
27 import java.util.Set JavaDoc;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31
32 /**
33  * This needs testing.
34  *
35  *
36  * A growable thread pool.
37  * It's like the generic thread pool, but :
38  * Starts at 10% of initial threads provided (or whatever you set with growthRatio)
39  * When the pool is exhausted and a new thread is required, it will start
40  * numThreads*growRatio new threads, and place them in the pool, if the pool
41  * has still lower than numThreads
42  *
43  * Each 5000 ms, (or whatever you set), a "control thread" will look for
44  * unused threads and decrease the size of the pool as needed. There must have been no
45  * creation since 4* cycleTime.
46  *
47  * @author Jean-Francois POUX
48  * @see org.jsmtpd.generic.threadpool.IThreadedClass
49  */

50 public class GrowingThreadPool extends Thread JavaDoc implements ThreadPool {
51     private Log log = LogFactory.getLog(GrowingThreadPool.class);
52     private List JavaDoc<ThreadWorker> threads = new LinkedList JavaDoc<ThreadWorker>();
53     private int maxSize;
54     private int minimumThreads;
55     private int growth;
56     private String JavaDoc displayThreadName;
57     private String JavaDoc threadClassName;
58     private boolean running=true;
59     private long lastUp;
60     private float growthRatio=0.2f;
61     private float minRatio=0.1f;
62     private int cycleTime=5000;
63     
64     /**
65      *
66      * @param numThreads number of threads to be spawned
67      * @param threadClassName name of the class to be threaded, must impletement IThreadedClass
68      * @throws InstantiationException
69      * @throws IllegalAccessException
70      * @throws ClassNotFoundException
71      */

72     public GrowingThreadPool(int numThreads, String JavaDoc threadClassName,String JavaDoc displayThreadName) throws InstantiationException JavaDoc, IllegalAccessException JavaDoc, ClassNotFoundException JavaDoc {
73         ThreadWorker tmp;
74         IThreadedClass cls;
75         
76         this.displayThreadName=displayThreadName;
77         this.threadClassName=threadClassName;
78         maxSize=numThreads;
79         // Start at ratio of max size
80
minimumThreads=(int)Math.round(numThreads*minRatio);
81         if (minimumThreads==0)
82             minimumThreads=1;
83         
84         // Grow or lower by growthRatio
85
growth = (int) Math.round(numThreads*growthRatio);
86         if (growth==0)
87             growth=1;
88         
89         log.info("Starting initial pool of "+minimumThreads+" threads");
90         for (int i = 0; i < minimumThreads; i++) {
91             tmp = new ThreadWorker();
92             cls = (IThreadedClass) Class.forName(threadClassName).newInstance();
93             tmp.setWorker(cls);
94             tmp.setName(displayThreadName+"-"+tmp.getId());
95             tmp.start();
96             while (!tmp.isFree()) {
97                 //wait for thread to be up if not
98
Thread.yield();
99             }
100             log.debug("Thread "+tmp.getName()+" started");
101             threads.add(tmp);
102         }
103         lastUp=System.currentTimeMillis();
104         start();
105         
106     }
107     
108     /**
109      * Will gracefully shutdown each running thread
110      *
111      */

112     public synchronized void gracefullShutdown() {
113         running=false;
114         this.interrupt();
115         log.debug("Gracefull shutdown ...");
116         ThreadWorker tmp;
117         for (int i = 0; i < threads.size(); i++) {
118             tmp = (ThreadWorker) threads.get(i);
119             tmp.gracefullShutdown();
120         }
121     }
122
123     /**
124      * Will force each thread to shutdown
125      *
126      */

127     public synchronized void forceShutdown() {
128         running=false;
129         this.interrupt();
130         log.debug("Forcing shutdown ...");
131         ThreadWorker tmp;
132         for (int i = 0; i < threads.size(); i++) {
133             tmp = (ThreadWorker) threads.get(i);
134             tmp.forceShutdown();
135         }
136     }
137
138     /**
139      *
140      * @return true if any free thread
141      */

142     public synchronized boolean hasFreeThread() {
143         for (Iterator JavaDoc iter = threads.iterator(); iter.hasNext();) {
144             ThreadWorker element = (ThreadWorker) iter.next();
145             if (element.isFree())
146                 return true;
147         }
148         if ((threads.size()+growth)<=maxSize)
149             return true;
150         return false;
151     }
152
153     /**
154      *
155      */

156     public synchronized int countFreeThread() {
157         int count = 0;
158         for (Iterator JavaDoc iter = threads.iterator(); iter.hasNext();) {
159             ThreadWorker element = (ThreadWorker) iter.next();
160             if (element.isFree())
161                 count++;
162         }
163         return count;
164     }
165
166     /**
167      * passes the obj parameter to the thread instance, and runs its doJob mehtod
168      * @param obj the object to pass
169      * @throws BusyThreadPoolException when the pool is exhausted
170      */

171     public synchronized void assignFreeThread(Object JavaDoc obj) throws BusyThreadPoolException {
172         if (countFreeThread()==0) {
173             growPool ();
174         }
175         int i = 0;
176         for (ThreadWorker worker : threads) {
177             if (worker.isFree()) {
178                 log.debug("Worker "+worker.getName()+" is free, assigning job");
179                 worker.setParam(obj);
180                 worker.wake();
181                 return;
182             }
183             i++;
184         }
185         log.warn("Thread pool exhausted !");
186         throw new BusyThreadPoolException();
187     }
188     
189     public synchronized void growPool () {
190         log.debug("Trying to grow thread pool");
191         ThreadWorker tmp;
192         IThreadedClass cls;
193       
194         if ((threads.size()+growth)<=maxSize) {
195             try {
196                 lastUp=System.currentTimeMillis();
197                 log.info("Increasing number of threads by "+growth+", starting from "+threads.size());
198                 for (int i=0;i<growth;i++) {
199                     tmp = new ThreadWorker();
200                     cls = (IThreadedClass) Class.forName(threadClassName).newInstance();
201                     tmp.setWorker(cls);
202                     tmp.setName(displayThreadName+"#"+tmp.getId());
203                     tmp.start();
204                     while (!tmp.isFree()) {
205                         //wait for thread to be up
206
Thread.yield();
207                     }
208                     threads.add(tmp);
209                     log.debug("Thread "+tmp.getName()+" started");
210                 }
211             } catch (Exception JavaDoc e) {
212                 log.error("Could not increase pool",e);
213             }
214         } else {
215             log.error("Thread pool too low to assign a thread");
216         }
217     }
218     
219     public synchronized void lowerPool() {
220         long diff = System.currentTimeMillis()-lastUp;
221         if ((diff>(cycleTime*4)) && (threads.size()>minimumThreads) && (countFreeThread()>growth)) {
222             // remove growth size
223
Set JavaDoc<ThreadWorker> toRemove=new HashSet JavaDoc<ThreadWorker>();
224             for (int i=threads.size()-1;i>0;i--) { // loop backward to avoid destroy first threads
225
ThreadWorker worker = threads.get(i);
226                 if (worker.isFree())
227                     toRemove.add(worker);
228                 if (toRemove.size()==growth)
229                     break;
230             }
231             log.info("Reducing number of threads by "+growth+", starting from "+threads.size());
232             for (ThreadWorker worker : toRemove) {
233                 log.debug("Shutting down thread "+worker.getName());
234                 worker.gracefullShutdown();
235                 threads.remove(worker);
236             }
237         }
238     }
239     
240     public void run () {
241         setName("C-"+getId());
242         log.info("Control thread started");
243         while (running) {
244             try {
245                 Thread.sleep(cycleTime);
246             } catch (InterruptedException JavaDoc e) {
247             }
248             log.debug("Current size "+threads.size()+"/"+maxSize+", "+countFreeThread()+" free threads");
249             lowerPool();
250         }
251         log.info("Control thread ended");
252     }
253
254     /**
255      * Time in ms of wait time of control thread
256      */

257     public void setCycleTime(int cycleTime) {
258         this.cycleTime = cycleTime;
259     }
260
261     /**
262      * Ratio between 0.1 and 0.9 of grow rate of the pool
263      */

264     public void setGrowthRatio(float growthRatio) {
265         this.growthRatio = growthRatio;
266     }
267
268     /**
269      * Minimal ratio of pool (between 0.1 and 1)
270      */

271     public void setMinRatio(float minRatio) {
272         this.minRatio = minRatio;
273     }
274     
275 }
Popular Tags