1 16 17 package org.springframework.scheduling.backportconcurrent; 18 19 import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; 20 import edu.emory.mathcs.backport.java.util.concurrent.Executor; 21 import edu.emory.mathcs.backport.java.util.concurrent.Executors; 22 import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; 23 import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException; 24 import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler; 25 import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue; 26 import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory; 27 import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; 28 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 32 import org.springframework.beans.factory.DisposableBean; 33 import org.springframework.beans.factory.InitializingBean; 34 import org.springframework.core.task.TaskRejectedException; 35 import org.springframework.scheduling.SchedulingTaskExecutor; 36 import org.springframework.util.Assert; 37 38 71 public class ThreadPoolTaskExecutor implements SchedulingTaskExecutor, Executor, InitializingBean, DisposableBean { 72 73 protected final Log logger = LogFactory.getLog(getClass()); 74 75 private final Object poolSizeMonitor = new Object (); 76 77 private int corePoolSize = 1; 78 79 private int maxPoolSize = Integer.MAX_VALUE; 80 81 private int keepAliveSeconds = 60; 82 83 private int queueCapacity = Integer.MAX_VALUE; 84 85 private ThreadFactory threadFactory = Executors.defaultThreadFactory(); 86 87 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); 88 89 private ThreadPoolExecutor threadPoolExecutor; 90 91 92 97 public void setCorePoolSize(int corePoolSize) { 98 synchronized (this.poolSizeMonitor) { 99 this.corePoolSize = corePoolSize; 100 if (this.threadPoolExecutor != null) { 101 this.threadPoolExecutor.setCorePoolSize(corePoolSize); 102 } 103 } 104 } 105 106 109 public int getCorePoolSize() { 110 synchronized (this.poolSizeMonitor) { 111 return this.corePoolSize; 112 } 113 } 114 115 120 public void setMaxPoolSize(int maxPoolSize) { 121 synchronized (this.poolSizeMonitor) { 122 this.maxPoolSize = maxPoolSize; 123 if (this.threadPoolExecutor != null) { 124 this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); 125 } 126 } 127 } 128 129 132 public int getMaxPoolSize() { 133 synchronized (this.poolSizeMonitor) { 134 return this.maxPoolSize; 135 } 136 } 137 138 143 public void setKeepAliveSeconds(int keepAliveSeconds) { 144 synchronized (this.poolSizeMonitor) { 145 this.keepAliveSeconds = keepAliveSeconds; 146 if (this.threadPoolExecutor != null) { 147 this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS); 148 } 149 } 150 } 151 152 155 public int getKeepAliveSeconds() { 156 synchronized (this.poolSizeMonitor) { 157 return this.keepAliveSeconds; 158 } 159 } 160 161 169 public void setQueueCapacity(int queueCapacity) { 170 this.queueCapacity = queueCapacity; 171 } 172 173 178 public void setThreadFactory(ThreadFactory threadFactory) { 179 this.threadFactory = (threadFactory != null ? threadFactory : Executors.defaultThreadFactory()); 180 } 181 182 187 public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) { 188 this.rejectedExecutionHandler = 189 (rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy()); 190 } 191 192 193 197 public void afterPropertiesSet() { 198 initialize(); 199 } 200 201 205 public void initialize() { 206 logger.info("Creating ThreadPoolExecutor"); 207 BlockingQueue queue = createQueue(this.queueCapacity); 208 this.threadPoolExecutor = new ThreadPoolExecutor( 209 this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, 210 queue, this.threadFactory, this.rejectedExecutionHandler); 211 } 212 213 222 protected BlockingQueue createQueue(int queueCapacity) { 223 if (queueCapacity > 0) { 224 return new LinkedBlockingQueue(queueCapacity); 225 } 226 else { 227 return new SynchronousQueue(); 228 } 229 } 230 231 236 public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException { 237 Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized"); 238 return this.threadPoolExecutor; 239 } 240 241 242 248 public void execute(Runnable task) { 249 Executor executor = getThreadPoolExecutor(); 250 try { 251 executor.execute(task); 252 } 253 catch (RejectedExecutionException ex) { 254 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 255 } 256 } 257 258 261 public boolean prefersShortLivedTasks() { 262 return true; 263 } 264 265 266 270 public int getPoolSize() { 271 return getThreadPoolExecutor().getPoolSize(); 272 } 273 274 278 public int getActiveCount() { 279 return getThreadPoolExecutor().getActiveCount(); 280 } 281 282 283 288 public void destroy() { 289 shutdown(); 290 } 291 292 296 public void shutdown() { 297 logger.info("Shutting down ThreadPoolExecutor"); 298 this.threadPoolExecutor.shutdown(); 299 } 300 301 } 302 | Popular Tags |