1 17 18 package org.sape.carbon.services.threadpool; 19 20 import java.util.ArrayList ; 21 import java.util.Collections ; 22 import java.util.List ; 23 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 27 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; 28 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 29 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 30 31 import org.sape.carbon.core.component.Component; 32 import org.sape.carbon.core.component.ComponentConfiguration; 33 import org.sape.carbon.core.component.lifecycle.Configurable; 34 import org.sape.carbon.core.component.lifecycle.Destroyable; 35 import org.sape.carbon.core.component.lifecycle.Initializable; 36 import org.sape.carbon.core.component.lifecycle.Startable; 37 import org.sape.carbon.core.component.lifecycle.Suspendable; 38 import org.sape.carbon.core.config.InvalidConfigurationException; 39 import org.sape.carbon.core.exception.ExceptionUtility; 40 import org.sape.carbon.core.exception.InvalidParameterException; 41 42 56 public class DefaultThreadPoolImpl 57 implements ThreadPool, Configurable, Initializable, Startable, Suspendable, Destroyable { 58 59 private ThreadPoolConfiguration config; 60 private PooledExecutor threadPool = null; 61 private List failedTasks; 62 private Component thisComponent; 63 private BoundedBuffer taskQueue = null; 65 66 public TaskInfo execute(Runnable task, String taskName) { 67 return execute(task, taskName, null); 68 } 69 70 public TaskInfo execute(Runnable task, String taskName, TaskCallback callback) { 71 72 if (task == null) { 73 throw new InvalidParameterException( 74 this.getClass(), 75 "task cannot be null"); 76 } 77 78 try { 79 TaskInfoImpl info = new TaskInfoImpl( 80 task, 81 taskName, 82 callback, 83 this.thisComponent.getComponentName()); 84 85 TaskRunner taskRunner = new TaskRunner(info); 86 this.threadPool.execute(taskRunner); 87 return info; 88 89 } catch (InterruptedException ie) { 90 Thread.currentThread().interrupt(); 91 throw new ThreadPoolRuntimeException( 92 this.getClass(), 93 "Interrupted while queuing task [" + taskName + "]", 94 ie); 95 } 96 } 97 98 public List getFailedTasks() { 99 return Collections.unmodifiableList(this.failedTasks); 100 } 101 102 public Integer getQueueSize() { 103 return new Integer (this.taskQueue.size()); 104 } 105 106 public Integer getPoolSize() { 107 return new Integer (this.threadPool.getPoolSize()); 108 } 109 110 113 public void initialize(Component thisComponent) throws Exception { 114 this.failedTasks = Collections.synchronizedList(new ArrayList ()); 115 this.thisComponent = thisComponent; 116 } 117 118 121 public void configure(ComponentConfiguration configuration) 122 throws Exception { 123 124 this.config = (ThreadPoolConfiguration) configuration; 125 126 if (this.config.getFailureListCapacity() < 0) { 128 throw new InvalidConfigurationException( 129 this.getClass(), 130 configuration.getConfigurationName(), 131 "FailureListCapacity", 132 "Must be >= 0"); 133 } 134 if (this.config.getThreadPoolSize() <= 0) { 135 throw new InvalidConfigurationException( 136 this.getClass(), 137 configuration.getConfigurationName(), 138 "ThreadPoolSize", 139 "Must be > 0"); 140 } 141 if (this.config.getKeepAliveTime() < 0) { 142 throw new InvalidConfigurationException( 143 this.getClass(), 144 configuration.getConfigurationName(), 145 "KeepAliveTime", 146 "Must be >= 0"); 147 } 148 if (this.config.getTaskQueueSize() <= 0) { 149 throw new InvalidConfigurationException( 150 this.getClass(), 151 configuration.getConfigurationName(), 152 "TaskQueueSize", 153 "Must be > 0"); 154 } 155 if (this.config.getInitialThreadCount() < 0) { 156 throw new InvalidConfigurationException( 157 this.getClass(), 158 configuration.getConfigurationName(), 159 "InitialThreadCount", 160 "Must be >= 0"); 161 } 162 if (this.config.getShutdownWaitTime() < 0) { 163 throw new InvalidConfigurationException( 164 this.getClass(), 165 configuration.getConfigurationName(), 166 "ShutdownWaitTime", 167 "Must be >= 0"); 168 } 169 170 if (this.threadPool != null) { 171 configureThreadPool(); 173 } 174 } 175 176 180 public void start() throws Exception { 181 this.taskQueue = new BoundedBuffer(this.config.getTaskQueueSize()); 182 this.threadPool = new PooledExecutor(this.taskQueue); 183 184 configureThreadPool(); 185 186 this.threadPool.createThreads(this.config.getInitialThreadCount()); 187 } 188 189 193 public void stop() throws Exception { 194 if (this.config.getDiscardQueuedTasksOnShutdown()) { 195 this.threadPool.shutdownNow(); 196 197 } else { 198 this.threadPool.shutdownAfterProcessingCurrentlyQueuedTasks(); 199 this.threadPool.awaitTerminationAfterShutdown( 200 this.config.getShutdownWaitTime()); 201 } 202 203 this.threadPool = null; 204 this.taskQueue = null; 205 this.failedTasks.clear(); 206 } 207 208 212 public void resume() throws Exception { 213 this.threadPool.createThreads(this.taskQueue.size()); 214 } 215 216 217 public void suspend() throws Exception { 218 this.threadPool.interruptAll(); 219 } 220 221 222 public void destroy() throws Exception { 223 if (this.threadPool != null) { 224 this.threadPool.shutdownNow(); 225 } 226 } 227 228 229 public void taskFailed(TaskInfo task) { 230 if (this.config.getFailureListCapacity() == 0) { 231 return; 232 } 233 234 if (this.failedTasks.size() == this.config.getFailureListCapacity()) { 235 this.failedTasks.remove(0); 236 } 237 238 this.failedTasks.add(task); 239 } 240 241 242 private void configureThreadPool() { 243 this.threadPool.setThreadFactory(new ThreadFactory() { 244 public Thread newThread(Runnable command) { 245 String threadName = 246 DefaultThreadPoolImpl.this.thisComponent.getComponentName(); 247 Thread newThread = new Thread (command, threadName); 248 newThread.setDaemon( 249 DefaultThreadPoolImpl.this.config.isUseDaemonThreads()); 250 return newThread; 251 } 252 }); 253 254 this.threadPool.setMinimumPoolSize(this.config.getThreadPoolSize()); 255 this.threadPool.setMaximumPoolSize(this.config.getThreadPoolSize()); 256 this.threadPool.setKeepAliveTime(this.config.getKeepAliveTime()); 257 258 switch (this.config.getQueueFullPolicy().getOrdinal()) { 259 case QueueFullPolicyEnum.RUN_ORDINAL : 260 this.threadPool.runWhenBlocked(); 261 break; 262 263 case QueueFullPolicyEnum.WAIT_ORDINAL : 264 this.threadPool.waitWhenBlocked(); 265 break; 266 267 case QueueFullPolicyEnum.ABORT_ORDINAL : 268 this.threadPool.abortWhenBlocked(); 269 break; 270 271 case QueueFullPolicyEnum.DISCARD_ORDINAL : 272 this.threadPool.discardWhenBlocked(); 273 break; 274 275 case QueueFullPolicyEnum.DISCARD_OLDEST_ORDINAL : 276 this.threadPool.discardOldestWhenBlocked(); 277 break; 278 } 279 } 280 281 286 private class TaskRunner implements Runnable { 287 private TaskInfoImpl info; 288 289 public TaskRunner(TaskInfoImpl info) { 290 this.info = info; 291 } 292 293 public void run() { 294 try { 295 info.setExecuting(); 296 this.info.getTask().run(); 297 info.setSuccess(); 298 299 } catch(Throwable t) { 300 taskFailed(info); 301 info.setFailure(t); 302 } 303 } 304 } 305 306 } 307 | Popular Tags |