KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > sape > carbon > services > threadpool > DefaultThreadPoolImpl


1 /*
2  * The contents of this file are subject to the Sapient Public License
3  * Version 1.0 (the "License"); you may not use this file except in compliance
4  * with the License. You may obtain a copy of the License at
5  * http://carbon.sf.net/License.html.
6  *
7  * Software distributed under the License is distributed on an "AS IS" basis,
8  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
9  * the specific language governing rights and limitations under the License.
10  *
11  * The Original Code is The Carbon Component Framework.
12  *
13  * The Initial Developer of the Original Code is Sapient Corporation
14  *
15  * Copyright (C) 2003 Sapient Corporation. All Rights Reserved.
16  */

17
18 package org.sape.carbon.services.threadpool;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.Collections JavaDoc;
22 import java.util.List JavaDoc;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26
27 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
28 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
29 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
30
31 import org.sape.carbon.core.component.Component;
32 import org.sape.carbon.core.component.ComponentConfiguration;
33 import org.sape.carbon.core.component.lifecycle.Configurable;
34 import org.sape.carbon.core.component.lifecycle.Destroyable;
35 import org.sape.carbon.core.component.lifecycle.Initializable;
36 import org.sape.carbon.core.component.lifecycle.Startable;
37 import org.sape.carbon.core.component.lifecycle.Suspendable;
38 import org.sape.carbon.core.config.InvalidConfigurationException;
39 import org.sape.carbon.core.exception.ExceptionUtility;
40 import org.sape.carbon.core.exception.InvalidParameterException;
41
42 /**
43  * This implementation of ThreadPool relys on Doug Lea's PooledExecutor thread
44  * pool thread pool implementation. The PooledExecutor is a robust, highly
45  * configurable thread pool. Some configurations do not guarantee task
46  * execution, however. See
47  * <a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html">
48  * Overview of package util.concurrent</a> for more information about the
49  * PooledExecutor and other related classes.
50  *
51  * <br>Copyright 2002 Sapient
52  * @since carbon 2.1
53  * @author Douglas Voet, Nov 5, 2002
54  * @version $Revision: 1.8 $($Author: dvoet $ / $Date: 2003/11/20 18:49:58 $)
55  */

56 public class DefaultThreadPoolImpl
57     implements ThreadPool, Configurable, Initializable, Startable, Suspendable, Destroyable {
58
59     private ThreadPoolConfiguration config;
60     private PooledExecutor threadPool = null;
61     private List JavaDoc failedTasks;
62     private Component thisComponent;
63     // explicitly a BoundedBuffer because it supports the size() method
64
private BoundedBuffer taskQueue = null;
65
66     public TaskInfo execute(Runnable JavaDoc task, String JavaDoc taskName) {
67         return execute(task, taskName, null);
68     }
69
70     public TaskInfo execute(Runnable JavaDoc task, String JavaDoc taskName, TaskCallback callback) {
71         
72         if (task == null) {
73             throw new InvalidParameterException(
74                 this.getClass(),
75                 "task cannot be null");
76         }
77         
78         try {
79             TaskInfoImpl info = new TaskInfoImpl(
80                 task,
81                 taskName,
82                 callback,
83                 this.thisComponent.getComponentName());
84                 
85             TaskRunner taskRunner = new TaskRunner(info);
86             this.threadPool.execute(taskRunner);
87             return info;
88
89         } catch (InterruptedException JavaDoc ie) {
90             Thread.currentThread().interrupt();
91             throw new ThreadPoolRuntimeException(
92                 this.getClass(),
93                 "Interrupted while queuing task [" + taskName + "]",
94                 ie);
95         }
96     }
97
98     public List JavaDoc getFailedTasks() {
99         return Collections.unmodifiableList(this.failedTasks);
100     }
101
102     public Integer JavaDoc getQueueSize() {
103         return new Integer JavaDoc(this.taskQueue.size());
104     }
105     
106     public Integer JavaDoc getPoolSize() {
107         return new Integer JavaDoc(this.threadPool.getPoolSize());
108     }
109
110     /**
111      * initializes this.failedTasks and saves thisComponent for later
112      */

113     public void initialize(Component thisComponent) throws Exception JavaDoc {
114         this.failedTasks = Collections.synchronizedList(new ArrayList JavaDoc());
115         this.thisComponent = thisComponent;
116     }
117
118     /**
119      * validates the configuration object and stores it
120      */

121     public void configure(ComponentConfiguration configuration)
122         throws Exception JavaDoc {
123
124         this.config = (ThreadPoolConfiguration) configuration;
125
126         // validate configuration
127
if (this.config.getFailureListCapacity() < 0) {
128             throw new InvalidConfigurationException(
129                 this.getClass(),
130                 configuration.getConfigurationName(),
131                 "FailureListCapacity",
132                 "Must be >= 0");
133         }
134         if (this.config.getThreadPoolSize() <= 0) {
135             throw new InvalidConfigurationException(
136                 this.getClass(),
137                 configuration.getConfigurationName(),
138                 "ThreadPoolSize",
139                 "Must be > 0");
140         }
141         if (this.config.getKeepAliveTime() < 0) {
142             throw new InvalidConfigurationException(
143                 this.getClass(),
144                 configuration.getConfigurationName(),
145                 "KeepAliveTime",
146                 "Must be >= 0");
147         }
148         if (this.config.getTaskQueueSize() <= 0) {
149             throw new InvalidConfigurationException(
150                 this.getClass(),
151                 configuration.getConfigurationName(),
152                 "TaskQueueSize",
153                 "Must be > 0");
154         }
155         if (this.config.getInitialThreadCount() < 0) {
156             throw new InvalidConfigurationException(
157                 this.getClass(),
158                 configuration.getConfigurationName(),
159                 "InitialThreadCount",
160                 "Must be >= 0");
161         }
162         if (this.config.getShutdownWaitTime() < 0) {
163             throw new InvalidConfigurationException(
164                 this.getClass(),
165                 configuration.getConfigurationName(),
166                 "ShutdownWaitTime",
167                 "Must be >= 0");
168         }
169         
170         if (this.threadPool != null) {
171             // we must be in a suspended state because stop sets threadPool = null
172
configureThreadPool();
173         }
174     }
175
176     /**
177      * Creates the thread pool object and configures it then creates the
178      * configured number of threads
179      */

180     public void start() throws Exception JavaDoc {
181         this.taskQueue = new BoundedBuffer(this.config.getTaskQueueSize());
182         this.threadPool = new PooledExecutor(this.taskQueue);
183         
184         configureThreadPool();
185
186         this.threadPool.createThreads(this.config.getInitialThreadCount());
187     }
188
189     /**
190      * Stops the thread pool and if configured, will wait for queued tasks
191      * to execute.
192      */

193     public void stop() throws Exception JavaDoc {
194         if (this.config.getDiscardQueuedTasksOnShutdown()) {
195             this.threadPool.shutdownNow();
196             
197         } else {
198             this.threadPool.shutdownAfterProcessingCurrentlyQueuedTasks();
199             this.threadPool.awaitTerminationAfterShutdown(
200                 this.config.getShutdownWaitTime());
201         }
202         
203         this.threadPool = null;
204         this.taskQueue = null;
205         this.failedTasks.clear();
206     }
207
208     /**
209      * Resumes executing queued tasks by starting a thread for each queued task
210      * up to the max number of threads for the pool.
211      */

212     public void resume() throws Exception JavaDoc {
213         this.threadPool.createThreads(this.taskQueue.size());
214     }
215
216     /** interrupts all the threads allowing them to terminate */
217     public void suspend() throws Exception JavaDoc {
218         this.threadPool.interruptAll();
219     }
220
221     /** shutsdown the pool, not waiting to complete any queued tasks */
222     public void destroy() throws Exception JavaDoc {
223         if (this.threadPool != null) {
224             this.threadPool.shutdownNow();
225         }
226     }
227
228     /** adds a task to the failed list */
229     public void taskFailed(TaskInfo task) {
230         if (this.config.getFailureListCapacity() == 0) {
231             return;
232         }
233         
234         if (this.failedTasks.size() == this.config.getFailureListCapacity()) {
235             this.failedTasks.remove(0);
236         }
237         
238         this.failedTasks.add(task);
239     }
240     
241     /** configures this.threadPool using values from this.config */
242     private void configureThreadPool() {
243         this.threadPool.setThreadFactory(new ThreadFactory() {
244             public Thread JavaDoc newThread(Runnable JavaDoc command) {
245                 String JavaDoc threadName =
246                     DefaultThreadPoolImpl.this.thisComponent.getComponentName();
247                 Thread JavaDoc newThread = new Thread JavaDoc(command, threadName);
248                 newThread.setDaemon(
249                     DefaultThreadPoolImpl.this.config.isUseDaemonThreads());
250                 return newThread;
251             }
252         });
253         
254         this.threadPool.setMinimumPoolSize(this.config.getThreadPoolSize());
255         this.threadPool.setMaximumPoolSize(this.config.getThreadPoolSize());
256         this.threadPool.setKeepAliveTime(this.config.getKeepAliveTime());
257         
258         switch (this.config.getQueueFullPolicy().getOrdinal()) {
259             case QueueFullPolicyEnum.RUN_ORDINAL :
260                 this.threadPool.runWhenBlocked();
261                 break;
262
263             case QueueFullPolicyEnum.WAIT_ORDINAL :
264                 this.threadPool.waitWhenBlocked();
265                 break;
266
267             case QueueFullPolicyEnum.ABORT_ORDINAL :
268                 this.threadPool.abortWhenBlocked();
269                 break;
270
271             case QueueFullPolicyEnum.DISCARD_ORDINAL :
272                 this.threadPool.discardWhenBlocked();
273                 break;
274
275             case QueueFullPolicyEnum.DISCARD_OLDEST_ORDINAL :
276                 this.threadPool.discardOldestWhenBlocked();
277                 break;
278         }
279     }
280     
281     /**
282      * Implementation of Runnable that wraps the tasks passed into the
283      * execute method and is responsible for tracking the task as it is
284      * queued and executed.
285      */

286     private class TaskRunner implements Runnable JavaDoc {
287         private TaskInfoImpl info;
288         
289         public TaskRunner(TaskInfoImpl info) {
290             this.info = info;
291         }
292         
293         public void run() {
294             try {
295                 info.setExecuting();
296                 this.info.getTask().run();
297                 info.setSuccess();
298                 
299             } catch(Throwable JavaDoc t) {
300                 taskFailed(info);
301                 info.setFailure(t);
302             }
303         }
304     }
305     
306 }
307
Popular Tags