KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > rift > coad > lib > thread > pool > ThreadPoolManager


1 /*
2  * CoadunationLib: The coadunation libraries.
3  * Copyright (C) 2007 Rift IT Contracting
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * ThreadPoolManager.java
20  */

21
22 // package path
23
package com.rift.coad.lib.thread.pool;
24
25 // java imports
26
import java.util.Vector JavaDoc;
27 import java.util.Iterator JavaDoc;
28 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
29         
30 // logging import
31
import org.apache.log4j.Logger;
32
33 // coadunation imports
34
import com.rift.coad.lib.common.ClassUtil;
35 import com.rift.coad.lib.thread.CoadunationThread;
36 import com.rift.coad.lib.thread.ThreadStateMonitor;
37
38 /**
39  * This object is responsible for managing a pool of threads assigned to process
40  * a task object.
41  *
42  * @author Brett Chaldecott
43  */

44 public class ThreadPoolManager {
45     
46     /**
47      * This class is responsible for processing the tasks.
48      */

49     public class PoolThread extends CoadunationThread {
50         
51         // The classes private member variables
52
private ThreadStateMonitor state = new ThreadStateMonitor();
53         private ThreadPoolManager threadPoolManager = null;
54         private Class JavaDoc taskClass = null;
55         
56         /**
57          * The constructor of pool thread.
58          *
59          * @param threadPool The reference to the thread pool.
60          * @param taskClass The task object to process.
61          * @exception Exception
62          */

63         public PoolThread(ThreadPoolManager threadPoolManager, Class JavaDoc taskClass)
64                 throws Exception JavaDoc {
65             this.threadPoolManager = threadPoolManager;
66             this.taskClass = taskClass;
67         }
68         
69         
70         /**
71          * This method replaces the run method in the BasicThread.
72          *
73          * @exception Exception
74          */

75         public void process() throws Exception JavaDoc {
76             while(!state.isTerminated()) {
77                 if (!monitor()) {
78                     break;
79                 }
80                 try {
81                     Task task = (Task)taskClass.newInstance();
82                     task.process(threadPoolManager);
83                 } catch (Exception JavaDoc ex) {
84                     log.error("Failed to process a task : " + ex.getMessage(),
85                             ex);
86                 }
87                 processing.decrementAndGet();
88             }
89             removeThread(this);
90             log.debug("Pool thread exiting");
91         }
92
93
94         /**
95          * This method will be implemented by child objects to terminate the
96          * processing of this thread.
97          */

98         public void terminate() {
99             state.terminate(true);
100         }
101         
102     }
103     
104     // the logger reference
105
protected Logger log =
106             Logger.getLogger(ThreadPoolManager.class.getName());
107     
108     // privat member variables
109
private AtomicInteger JavaDoc processing = new AtomicInteger JavaDoc(0);
110     private int currentSize = 0;
111     private int minSize = 0;
112     private int maxSize = 0;
113     private Class JavaDoc taskClass = null;
114     private String JavaDoc username = null;
115     private Vector JavaDoc threadList = new Vector JavaDoc();
116     private ThreadStateMonitor state = new ThreadStateMonitor();
117     private int releaseThread = 1;
118     
119     /**
120      * Creates a new instance of ThreadPoolManager
121      *
122      * @param size The size of this thread pool.
123      * @param taskClass The class that implements the task interface.
124      * @param username The name of the user that the threads will run as.
125      * @exception PoolException
126      */

127     public ThreadPoolManager(int size, Class JavaDoc taskClass, String JavaDoc username) throws
128             PoolException {
129         validateTask(taskClass);
130         this.minSize = size;
131         this.maxSize = size;
132         this.taskClass = taskClass;
133         this.username = username;
134         startThreads(minSize);
135     }
136     
137     
138     /**
139      * Creates a new instance of ThreadPoolManager
140      *
141      * @param minSize The minimum size of this thread pool.
142      * @param maxSize The maximum size of this thread pool.
143      * @param taskClass The class that implements the task interface.
144      * @param username The name of the user that the threads will run as.
145      * @exception PoolException
146      */

147     public ThreadPoolManager(int minSize, int maxSize, Class JavaDoc taskClass,
148             String JavaDoc username) throws PoolException {
149         validateTask(taskClass);
150         this.minSize = minSize;
151         this.maxSize = maxSize;
152         this.taskClass = taskClass;
153         this.username = username;
154         startThreads(minSize);
155     }
156     
157     
158     /**
159      * This method returns the min size.
160      *
161      * @return The minimum size of the thread pool.
162      */

163     public synchronized int getMinSize() {
164         return minSize;
165     }
166     
167     
168     /**
169      * This method sets the minum size of the thread pool.
170      *
171      * @param minSize The minimum size of the pool.
172      * @exception PoolException
173      */

174     public synchronized void setMinSize(int minSize) throws PoolException {
175         checkState();
176         if (minSize > maxSize) {
177             throw new PoolException("Min size must be smaller than max size.");
178         }
179         this.minSize = minSize;
180         if (currentSize < minSize) {
181             startThreads(minSize - currentSize);
182         }
183         notifyAll();
184     }
185     
186     
187     /**
188      * This method returns the max size of the thread pool.
189      *
190      * @return The maximum size of the thread pool.
191      */

192     public synchronized int getMaxSize() {
193         return maxSize;
194     }
195     
196     
197     /**
198      * This method sets the maximum size of the thread pool.
199      *
200      * @param maxSize The maximum size of the thread pool.
201      */

202     public synchronized void setMaxSize(int maxSize) throws PoolException{
203         checkState();
204         if (maxSize < minSize) {
205             throw new PoolException("Max size must be greater than min size.");
206         }
207         this.maxSize = maxSize;
208         notifyAll();
209     }
210     
211     
212     /**
213      * This method returns the size of the thread pool.
214      *
215      * @return The size of the thread pool.
216      */

217     public synchronized int getSize() {
218         return maxSize;
219     }
220     
221     
222     /**
223      * This method sets the size of the thread pool.
224      *
225      * @param size The size of the thread pool.
226      * @exception PoolException
227      */

228     public synchronized void setSize(int size) throws PoolException {
229         checkState();
230         this.minSize = size;
231         this.maxSize = size;
232         if (currentSize < size) {
233             startThreads(size - currentSize);
234         }
235         notifyAll();
236     }
237     
238     
239     /**
240      * This method releases threads a thread from the pool.
241      *
242      * @exception PoolException
243      */

244     public synchronized void releaseThread() throws PoolException {
245         int processing = this.processing.get();
246         this.releaseThread++;
247         processing += releaseThread;
248         if (processing > minSize && processing <= maxSize) {
249             startThreads(1);
250         }
251         notify();
252     }
253     
254     
255     /**
256      * This method is called to terminate the thread pool.
257      */

258     public void terminate() throws PoolException {
259         state.terminate(true);
260         Vector JavaDoc threadListCopy = null;
261         synchronized(this) {
262             threadListCopy = new Vector JavaDoc(threadList);
263         }
264         for (Iterator JavaDoc iter = threadListCopy.iterator(); iter.hasNext();) {
265             CoadunationThread thread = (CoadunationThread)iter.next();
266             thread.terminate();
267         }
268         
269         synchronized(this) {
270             notifyAll();
271         }
272     }
273     
274     
275     /**
276      * This method validates the task object.
277      *
278      * @param taskClass The class to test.
279      * @exception PoolException
280      */

281     private void validateTask(Class JavaDoc taskClass) throws PoolException {
282         if (!ClassUtil.testForParent(taskClass,Task.class)) {
283             throw new PoolException("Task class [" + taskClass.getName() +
284                     "] does not inherit from [" + Task.class.getName() + "]");
285         }
286     }
287     
288     
289     /**
290      * This method is called to start the threads
291      *
292      * @param size The number of threads to release.
293      * @exception PoolException
294      */

295     private void startThreads(int size) throws PoolException {
296         try {
297             for (int count = 0; count < size; count++) {
298                 PoolThread thread = new PoolThread(this,taskClass);
299                 thread.start(username);
300                 addThread(thread);
301             }
302         } catch (Exception JavaDoc ex) {
303             log.error("Failed to start the threads : " +
304                     ex.getMessage(),ex);
305             throw new PoolException("Failed to start the threads : " +
306                     ex.getMessage(),ex);
307         }
308     }
309     
310     
311     /**
312      * This method is call by the pool threads to monitor the processing.
313      *
314      * @return TRUE if processing, should continue, FALSE if not.
315      * @exception PoolException
316      */

317     private synchronized boolean monitor() throws PoolException {
318         while (true) {
319             if (currentSize > maxSize) {
320                 currentSize--;
321                 return false;
322             } else if (releaseThread > 0) {
323                 releaseThread--;
324                 processing.incrementAndGet();
325                 return true;
326             } else if (currentSize > minSize) {
327                 currentSize--;
328                 return false;
329             } else if (state.isTerminated()) {
330                 currentSize--;
331                 return false;
332             }
333             try {
334                 wait();
335             } catch (Exception JavaDoc ex) {
336                 log.error("Wait failed : " + ex.getMessage());
337             }
338         }
339     }
340     
341     
342     /**
343      * This method adds a thread to the list of threads
344      */

345     private synchronized void addThread(PoolThread thread) {
346         currentSize++;
347         threadList.add(thread);
348     }
349     
350     
351     /**
352      * This method is called to remove a thread from the list.
353      *
354      * @param thread The thread to remove
355      */

356     private synchronized void removeThread(PoolThread thread) {
357         threadList.remove(thread);
358     }
359     
360     
361     /**
362      * This method is used to check the state of this pool.
363      *
364      * @exception PoolException
365      */

366     private void checkState() throws PoolException {
367         if (state.isTerminated()) {
368             throw new PoolException("The thread pool has been terminated");
369         }
370     }
371 }
372
Popular Tags