1 16 17 package org.springframework.scheduling.concurrent; 18 19 import java.util.concurrent.BlockingQueue ; 20 import java.util.concurrent.Executor ; 21 import java.util.concurrent.Executors ; 22 import java.util.concurrent.LinkedBlockingQueue ; 23 import java.util.concurrent.RejectedExecutionException ; 24 import java.util.concurrent.RejectedExecutionHandler ; 25 import java.util.concurrent.SynchronousQueue ; 26 import java.util.concurrent.ThreadFactory ; 27 import java.util.concurrent.ThreadPoolExecutor ; 28 import java.util.concurrent.TimeUnit ; 29 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 33 import org.springframework.beans.factory.DisposableBean; 34 import org.springframework.beans.factory.InitializingBean; 35 import org.springframework.core.task.TaskRejectedException; 36 import org.springframework.scheduling.SchedulingTaskExecutor; 37 import org.springframework.util.Assert; 38 39 69 public class ThreadPoolTaskExecutor implements SchedulingTaskExecutor, Executor , InitializingBean, DisposableBean { 70 71 protected final Log logger = LogFactory.getLog(getClass()); 72 73 private final Object poolSizeMonitor = new Object (); 74 75 private int corePoolSize = 1; 76 77 private int maxPoolSize = Integer.MAX_VALUE; 78 79 private int keepAliveSeconds = 60; 80 81 private int queueCapacity = Integer.MAX_VALUE; 82 83 private ThreadFactory threadFactory = Executors.defaultThreadFactory(); 84 85 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy (); 86 87 private ThreadPoolExecutor threadPoolExecutor; 88 89 90 95 public void setCorePoolSize(int corePoolSize) { 96 synchronized (this.poolSizeMonitor) { 97 this.corePoolSize = corePoolSize; 98 if (this.threadPoolExecutor != null) { 99 this.threadPoolExecutor.setCorePoolSize(corePoolSize); 100 } 101 } 102 } 103 104 107 public int getCorePoolSize() { 108 synchronized (this.poolSizeMonitor) { 109 return this.corePoolSize; 110 } 111 } 112 113 118 public void setMaxPoolSize(int maxPoolSize) { 119 synchronized (this.poolSizeMonitor) { 120 this.maxPoolSize = maxPoolSize; 121 if (this.threadPoolExecutor != null) { 122 this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); 123 } 124 } 125 } 126 127 130 public int getMaxPoolSize() { 131 synchronized (this.poolSizeMonitor) { 132 return this.maxPoolSize; 133 } 134 } 135 136 141 public void setKeepAliveSeconds(int keepAliveSeconds) { 142 synchronized (this.poolSizeMonitor) { 143 this.keepAliveSeconds = keepAliveSeconds; 144 if (this.threadPoolExecutor != null) { 145 this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS); 146 } 147 } 148 } 149 150 153 public int getKeepAliveSeconds() { 154 synchronized (this.poolSizeMonitor) { 155 return this.keepAliveSeconds; 156 } 157 } 158 159 167 public void setQueueCapacity(int queueCapacity) { 168 this.queueCapacity = queueCapacity; 169 } 170 171 176 public void setThreadFactory(ThreadFactory threadFactory) { 177 this.threadFactory = (threadFactory != null ? threadFactory : Executors.defaultThreadFactory()); 178 } 179 180 185 public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) { 186 this.rejectedExecutionHandler = 187 (rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy ()); 188 } 189 190 191 195 public void afterPropertiesSet() { 196 initialize(); 197 } 198 199 203 public void initialize() { 204 logger.info("Creating ThreadPoolExecutor"); 205 BlockingQueue queue = createQueue(this.queueCapacity); 206 this.threadPoolExecutor = new ThreadPoolExecutor ( 207 this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, 208 queue, this.threadFactory, this.rejectedExecutionHandler); 209 } 210 211 220 protected BlockingQueue createQueue(int queueCapacity) { 221 if (queueCapacity > 0) { 222 return new LinkedBlockingQueue (queueCapacity); 223 } 224 else { 225 return new SynchronousQueue (); 226 } 227 } 228 229 234 public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException { 235 Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized"); 236 return this.threadPoolExecutor; 237 } 238 239 240 246 public void execute(Runnable task) { 247 Executor executor = getThreadPoolExecutor(); 248 try { 249 executor.execute(task); 250 } 251 catch (RejectedExecutionException ex) { 252 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 253 } 254 } 255 256 259 public boolean prefersShortLivedTasks() { 260 return true; 261 } 262 263 264 268 public int getPoolSize() { 269 return getThreadPoolExecutor().getPoolSize(); 270 } 271 272 276 public int getActiveCount() { 277 return getThreadPoolExecutor().getActiveCount(); 278 } 279 280 281 286 public void destroy() { 287 shutdown(); 288 } 289 290 294 public void shutdown() { 295 logger.info("Shutting down ThreadPoolExecutor"); 296 this.threadPoolExecutor.shutdown(); 297 } 298 299 } 300 | Popular Tags |