1 21 22 package com.rift.coad.lib.thread.pool; 24 25 import java.util.Vector ; 27 import java.util.Iterator ; 28 import java.util.concurrent.atomic.AtomicInteger ; 29 30 import org.apache.log4j.Logger; 32 33 import com.rift.coad.lib.common.ClassUtil; 35 import com.rift.coad.lib.thread.CoadunationThread; 36 import com.rift.coad.lib.thread.ThreadStateMonitor; 37 38 44 public class ThreadPoolManager { 45 46 49 public class PoolThread extends CoadunationThread { 50 51 private ThreadStateMonitor state = new ThreadStateMonitor(); 53 private ThreadPoolManager threadPoolManager = null; 54 private Class taskClass = null; 55 56 63 public PoolThread(ThreadPoolManager threadPoolManager, Class taskClass) 64 throws Exception { 65 this.threadPoolManager = threadPoolManager; 66 this.taskClass = taskClass; 67 } 68 69 70 75 public void process() throws Exception { 76 while(!state.isTerminated()) { 77 if (!monitor()) { 78 break; 79 } 80 try { 81 Task task = (Task)taskClass.newInstance(); 82 task.process(threadPoolManager); 83 } catch (Exception ex) { 84 log.error("Failed to process a task : " + ex.getMessage(), 85 ex); 86 } 87 processing.decrementAndGet(); 88 } 89 removeThread(this); 90 log.debug("Pool thread exiting"); 91 } 92 93 94 98 public void terminate() { 99 state.terminate(true); 100 } 101 102 } 103 104 protected Logger log = 106 Logger.getLogger(ThreadPoolManager.class.getName()); 107 108 private AtomicInteger processing = new AtomicInteger (0); 110 private int currentSize = 0; 111 private int minSize = 0; 112 private int maxSize = 0; 113 private Class taskClass = null; 114 private String username = null; 115 private Vector threadList = new Vector (); 116 private ThreadStateMonitor state = new ThreadStateMonitor(); 117 private int releaseThread = 1; 118 119 127 public ThreadPoolManager(int size, Class taskClass, String username) throws 128 PoolException { 129 validateTask(taskClass); 130 this.minSize = size; 131 this.maxSize = size; 132 this.taskClass = taskClass; 133 this.username = username; 134 startThreads(minSize); 135 } 136 137 138 147 public ThreadPoolManager(int minSize, int maxSize, Class taskClass, 148 String username) throws PoolException { 149 validateTask(taskClass); 150 this.minSize = minSize; 151 this.maxSize = maxSize; 152 this.taskClass = taskClass; 153 this.username = username; 154 startThreads(minSize); 155 } 156 157 158 163 public synchronized int getMinSize() { 164 return minSize; 165 } 166 167 168 174 public synchronized void setMinSize(int minSize) throws PoolException { 175 checkState(); 176 if (minSize > maxSize) { 177 throw new PoolException("Min size must be smaller than max size."); 178 } 179 this.minSize = minSize; 180 if (currentSize < minSize) { 181 startThreads(minSize - currentSize); 182 } 183 notifyAll(); 184 } 185 186 187 192 public synchronized int getMaxSize() { 193 return maxSize; 194 } 195 196 197 202 public synchronized void setMaxSize(int maxSize) throws PoolException{ 203 checkState(); 204 if (maxSize < minSize) { 205 throw new PoolException("Max size must be greater than min size."); 206 } 207 this.maxSize = maxSize; 208 notifyAll(); 209 } 210 211 212 217 public synchronized int getSize() { 218 return maxSize; 219 } 220 221 222 228 public synchronized void setSize(int size) throws PoolException { 229 checkState(); 230 this.minSize = size; 231 this.maxSize = size; 232 if (currentSize < size) { 233 startThreads(size - currentSize); 234 } 235 notifyAll(); 236 } 237 238 239 244 public synchronized void releaseThread() throws PoolException { 245 int processing = this.processing.get(); 246 this.releaseThread++; 247 processing += releaseThread; 248 if (processing > minSize && processing <= maxSize) { 249 startThreads(1); 250 } 251 notify(); 252 } 253 254 255 258 public void terminate() throws PoolException { 259 state.terminate(true); 260 Vector threadListCopy = null; 261 synchronized(this) { 262 threadListCopy = new Vector (threadList); 263 } 264 for (Iterator iter = threadListCopy.iterator(); iter.hasNext();) { 265 CoadunationThread thread = (CoadunationThread)iter.next(); 266 thread.terminate(); 267 } 268 269 synchronized(this) { 270 notifyAll(); 271 } 272 } 273 274 275 281 private void validateTask(Class taskClass) throws PoolException { 282 if (!ClassUtil.testForParent(taskClass,Task.class)) { 283 throw new PoolException("Task class [" + taskClass.getName() + 284 "] does not inherit from [" + Task.class.getName() + "]"); 285 } 286 } 287 288 289 295 private void startThreads(int size) throws PoolException { 296 try { 297 for (int count = 0; count < size; count++) { 298 PoolThread thread = new PoolThread(this,taskClass); 299 thread.start(username); 300 addThread(thread); 301 } 302 } catch (Exception ex) { 303 log.error("Failed to start the threads : " + 304 ex.getMessage(),ex); 305 throw new PoolException("Failed to start the threads : " + 306 ex.getMessage(),ex); 307 } 308 } 309 310 311 317 private synchronized boolean monitor() throws PoolException { 318 while (true) { 319 if (currentSize > maxSize) { 320 currentSize--; 321 return false; 322 } else if (releaseThread > 0) { 323 releaseThread--; 324 processing.incrementAndGet(); 325 return true; 326 } else if (currentSize > minSize) { 327 currentSize--; 328 return false; 329 } else if (state.isTerminated()) { 330 currentSize--; 331 return false; 332 } 333 try { 334 wait(); 335 } catch (Exception ex) { 336 log.error("Wait failed : " + ex.getMessage()); 337 } 338 } 339 } 340 341 342 345 private synchronized void addThread(PoolThread thread) { 346 currentSize++; 347 threadList.add(thread); 348 } 349 350 351 356 private synchronized void removeThread(PoolThread thread) { 357 threadList.remove(thread); 358 } 359 360 361 366 private void checkState() throws PoolException { 367 if (state.isTerminated()) { 368 throw new PoolException("The thread pool has been terminated"); 369 } 370 } 371 } 372 | Popular Tags |