KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > springframework > scheduling > concurrent > ThreadPoolTaskExecutor


1 /*
2  * Copyright 2002-2007 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.springframework.scheduling.concurrent;
18
19 import java.util.concurrent.BlockingQueue JavaDoc;
20 import java.util.concurrent.Executor JavaDoc;
21 import java.util.concurrent.Executors JavaDoc;
22 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
23 import java.util.concurrent.RejectedExecutionException JavaDoc;
24 import java.util.concurrent.RejectedExecutionHandler JavaDoc;
25 import java.util.concurrent.SynchronousQueue JavaDoc;
26 import java.util.concurrent.ThreadFactory JavaDoc;
27 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
28 import java.util.concurrent.TimeUnit JavaDoc;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32
33 import org.springframework.beans.factory.DisposableBean;
34 import org.springframework.beans.factory.InitializingBean;
35 import org.springframework.core.task.TaskRejectedException;
36 import org.springframework.scheduling.SchedulingTaskExecutor;
37 import org.springframework.util.Assert;
38
39 /**
40  * JavaBean that allows for configuring a JDK 1.5 {@link java.util.concurrent.ThreadPoolExecutor}
41  * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity"
42  * properties), exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
43  * This is an alternative to configuring a ThreadPoolExecutor instance directly using
44  * constructor injection, with a separate {@link ConcurrentTaskExecutor} adapter wrapping it.
45  *
46  * <p>For any custom needs, in particular for defining a
47  * {@link java.util.concurrent.ScheduledThreadPoolExecutor}, it is recommended to
48  * use a straight definition of the Executor instance or a factory method definition
49  * that points to the JDK 1.5 {@link java.util.concurrent.Executors} class.
50  * To expose such a raw Executor as a Spring {@link org.springframework.core.task.TaskExecutor},
51  * simply wrap it with a {@link ConcurrentTaskExecutor} adapter.
52  *
53  * <p><b>NOTE:</b> This class implements Spring's
54  * {@link org.springframework.core.task.TaskExecutor} interface as well as the JDK 1.5
55  * {@link java.util.concurrent.Executor} interface, with the former being the primary
56  * interface, the other just serving as secondary convenience. For this reason, the
57  * exception handling follows the TaskExecutor contract rather than the Executor contract,
58  * in particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
59  *
60  * @author Juergen Hoeller
61  * @since 2.0
62  * @see org.springframework.core.task.TaskExecutor
63  * @see java.util.concurrent.Executor
64  * @see java.util.concurrent.ThreadPoolExecutor
65  * @see java.util.concurrent.ScheduledThreadPoolExecutor
66  * @see java.util.concurrent.Executors
67  * @see ConcurrentTaskExecutor
68  */

69 public class ThreadPoolTaskExecutor implements SchedulingTaskExecutor, Executor JavaDoc, InitializingBean, DisposableBean {
70
71     protected final Log logger = LogFactory.getLog(getClass());
72
73     private final Object JavaDoc poolSizeMonitor = new Object JavaDoc();
74
75     private int corePoolSize = 1;
76
77     private int maxPoolSize = Integer.MAX_VALUE;
78
79     private int keepAliveSeconds = 60;
80
81     private int queueCapacity = Integer.MAX_VALUE;
82
83     private ThreadFactory JavaDoc threadFactory = Executors.defaultThreadFactory();
84
85     private RejectedExecutionHandler JavaDoc rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy JavaDoc();
86
87     private ThreadPoolExecutor JavaDoc threadPoolExecutor;
88
89
90     /**
91      * Set the ThreadPoolExecutor's core pool size.
92      * Default is 1.
93      * <p><b>This setting can be modified at runtime, for example through JMX.</b>
94      */

95     public void setCorePoolSize(int corePoolSize) {
96         synchronized (this.poolSizeMonitor) {
97             this.corePoolSize = corePoolSize;
98             if (this.threadPoolExecutor != null) {
99                 this.threadPoolExecutor.setCorePoolSize(corePoolSize);
100             }
101         }
102     }
103
104     /**
105      * Return the ThreadPoolExecutor's core pool size.
106      */

107     public int getCorePoolSize() {
108         synchronized (this.poolSizeMonitor) {
109             return this.corePoolSize;
110         }
111     }
112
113     /**
114      * Set the ThreadPoolExecutor's maximum pool size.
115      * Default is <code>Integer.MAX_VALUE</code>.
116      * <p><b>This setting can be modified at runtime, for example through JMX.</b>
117      */

118     public void setMaxPoolSize(int maxPoolSize) {
119         synchronized (this.poolSizeMonitor) {
120             this.maxPoolSize = maxPoolSize;
121             if (this.threadPoolExecutor != null) {
122                 this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
123             }
124         }
125     }
126
127     /**
128      * Return the ThreadPoolExecutor's maximum pool size.
129      */

130     public int getMaxPoolSize() {
131         synchronized (this.poolSizeMonitor) {
132             return this.maxPoolSize;
133         }
134     }
135
136     /**
137      * Set the ThreadPoolExecutor's keep-alive seconds.
138      * Default is 60.
139      * <p><b>This setting can be modified at runtime, for example through JMX.</b>
140      */

141     public void setKeepAliveSeconds(int keepAliveSeconds) {
142         synchronized (this.poolSizeMonitor) {
143             this.keepAliveSeconds = keepAliveSeconds;
144             if (this.threadPoolExecutor != null) {
145                 this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
146             }
147         }
148     }
149
150     /**
151      * Return the ThreadPoolExecutor's keep-alive seconds.
152      */

153     public int getKeepAliveSeconds() {
154         synchronized (this.poolSizeMonitor) {
155             return this.keepAliveSeconds;
156         }
157     }
158
159     /**
160      * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
161      * Default is <code>Integer.MAX_VALUE</code>.
162      * <p>Any positive value will lead to a LinkedBlockingQueue instance;
163      * any other value will lead to a SynchronousQueue instance.
164      * @see java.util.concurrent.LinkedBlockingQueue
165      * @see java.util.concurrent.SynchronousQueue
166      */

167     public void setQueueCapacity(int queueCapacity) {
168         this.queueCapacity = queueCapacity;
169     }
170
171     /**
172      * Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
173      * Default is the ThreadPoolExecutor's default thread factory.
174      * @see java.util.concurrent.Executors#defaultThreadFactory()
175      */

176     public void setThreadFactory(ThreadFactory JavaDoc threadFactory) {
177         this.threadFactory = (threadFactory != null ? threadFactory : Executors.defaultThreadFactory());
178     }
179
180     /**
181      * Set the RejectedExecutionHandler to use for the ThreadPoolExecutor.
182      * Default is the ThreadPoolExecutor's default abort policy.
183      * @see java.util.concurrent.ThreadPoolExecutor.AbortPolicy
184      */

185     public void setRejectedExecutionHandler(RejectedExecutionHandler JavaDoc rejectedExecutionHandler) {
186         this.rejectedExecutionHandler =
187                 (rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy JavaDoc());
188     }
189
190
191     /**
192      * Calls <code>initialize()</code> after the container applied all property values.
193      * @see #initialize()
194      */

195     public void afterPropertiesSet() {
196         initialize();
197     }
198
199     /**
200      * Creates the BlockingQueue and the ThreadPoolExecutor.
201      * @see #createQueue
202      */

203     public void initialize() {
204         logger.info("Creating ThreadPoolExecutor");
205         BlockingQueue JavaDoc queue = createQueue(this.queueCapacity);
206         this.threadPoolExecutor = new ThreadPoolExecutor JavaDoc(
207                 this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
208                 queue, this.threadFactory, this.rejectedExecutionHandler);
209     }
210
211     /**
212      * Create the BlockingQueue to use for the ThreadPoolExecutor.
213      * <p>A LinkedBlockingQueue instance will be created for a positive
214      * capacity value; a SynchronousQueue else.
215      * @param queueCapacity the specified queue capacity
216      * @return the BlockingQueue instance
217      * @see java.util.concurrent.LinkedBlockingQueue
218      * @see java.util.concurrent.SynchronousQueue
219      */

220     protected BlockingQueue JavaDoc createQueue(int queueCapacity) {
221         if (queueCapacity > 0) {
222             return new LinkedBlockingQueue JavaDoc(queueCapacity);
223         }
224         else {
225             return new SynchronousQueue JavaDoc();
226         }
227     }
228
229     /**
230      * Return the underlying ThreadPoolExecutor for native access.
231      * @return the underlying ThreadPoolExecutor (never <code>null</code>)
232      * @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet
233      */

234     public ThreadPoolExecutor JavaDoc getThreadPoolExecutor() throws IllegalStateException JavaDoc {
235         Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
236         return this.threadPoolExecutor;
237     }
238
239
240     /**
241      * Implementation of both the JDK 1.5 Executor interface and the Spring
242      * TaskExecutor interface, delegating to the ThreadPoolExecutor instance.
243      * @see java.util.concurrent.Executor#execute(Runnable)
244      * @see org.springframework.core.task.TaskExecutor#execute(Runnable)
245      */

246     public void execute(Runnable JavaDoc task) {
247         Executor JavaDoc executor = getThreadPoolExecutor();
248         try {
249             executor.execute(task);
250         }
251         catch (RejectedExecutionException JavaDoc ex) {
252             throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
253         }
254     }
255
256     /**
257      * This task executor prefers short-lived work units.
258      */

259     public boolean prefersShortLivedTasks() {
260         return true;
261     }
262
263
264     /**
265      * Return the current pool size.
266      * @see java.util.concurrent.ThreadPoolExecutor#getPoolSize()
267      */

268     public int getPoolSize() {
269         return getThreadPoolExecutor().getPoolSize();
270     }
271
272     /**
273      * Return the number of currently active threads.
274      * @see java.util.concurrent.ThreadPoolExecutor#getActiveCount()
275      */

276     public int getActiveCount() {
277         return getThreadPoolExecutor().getActiveCount();
278     }
279
280
281     /**
282      * Calls <code>shutdown</code> when the BeanFactory destroys
283      * the task executor instance.
284      * @see #shutdown()
285      */

286     public void destroy() {
287         shutdown();
288     }
289
290     /**
291      * Perform a shutdown on the ThreadPoolExecutor.
292      * @see java.util.concurrent.ThreadPoolExecutor#shutdown()
293      */

294     public void shutdown() {
295         logger.info("Shutting down ThreadPoolExecutor");
296         this.threadPoolExecutor.shutdown();
297     }
298
299 }
300
Popular Tags