1 26 27 package org.exoplatform.services.threadpool.impl; 28 29 30 import java.util.*; 31 32 import org.apache.commons.logging.Log; 33 import org.exoplatform.services.log.LogService; 34 import org.exoplatform.services.threadpool.ThreadPoolService; 35 36 public class ThreadPoolServiceImpl implements ThreadPoolService { 37 38 42 public static final int DEFAULT_MAXIMUMPOOLSIZE = 500; 43 47 public static final int DEFAULT_NORMALPOOLSIZE = 1; 48 53 public static final long DEFAULT_MAXIDLETIME = 60 * 1000; 54 protected volatile int maximumPoolSize = DEFAULT_MAXIMUMPOOLSIZE; 57 protected volatile int normalPoolSize = DEFAULT_NORMALPOOLSIZE; 58 protected long maxIdleTime = DEFAULT_MAXIDLETIME; 59 63 protected Queue handOff; 64 67 protected Object poolLock = new Object (); 68 73 protected volatile int poolSize = 0; 74 82 protected Map threads; 83 87 private ThreadFactoryIF threadFactory = new DefaultThreadFactory(); 88 89 private Log log; 90 91 94 public ThreadPoolServiceImpl(LogService logService) { 95 log = logService.getLog("org.exoplatform.services.threadpool"); 96 maximumPoolSize = DEFAULT_MAXIMUMPOOLSIZE; 97 handOff = new Queue(); 98 runWhenBlocked(); 99 threads = new HashMap(); 100 } 102 107 public int getMaximumPoolSize() { 108 return maximumPoolSize; 109 } 111 121 public void setMaximumPoolSize(int newMaximum) { 122 if (newMaximum <= 0) throw new IllegalArgumentException (); 123 maximumPoolSize = newMaximum; 124 } 126 132 public int getNormalPoolSize() { 133 return normalPoolSize; 134 } 136 145 public void setNormalPoolSize(int newNormal) { 146 if (newNormal < 0) { 147 throw new IllegalArgumentException (); 148 } normalPoolSize = newNormal; 150 } 152 156 public int getPoolSize() { 157 return poolSize; 158 } 160 163 public void setThreadFactory(ThreadFactoryIF newValue) { 164 threadFactory = newValue; 165 } 167 170 protected ThreadFactoryIF getThreadFactory() { 171 return threadFactory; 172 } 174 178 protected void addThread(Runnable task) { 179 ++poolSize; 180 Worker worker = new Worker(task); 181 Thread thread = getThreadFactory().createThread(worker); 182 threads.put(worker, thread); 183 thread.start(); 184 } 186 194 public int createThreads(int numberOfThreads) { 195 int ncreated = 0; 196 for (int i = 0; i < numberOfThreads; ++i) { 197 synchronized (poolLock) { 198 if (getPoolSize() < getMaximumPoolSize()) { 199 ++ncreated; 200 addThread(null); 201 } else { 202 break; 203 } } } return ncreated; 207 } 209 214 public void interruptAll() { 215 synchronized (poolLock) { 217 for (Iterator it = threads.values().iterator(); 218 it.hasNext();) { 219 Thread t = (Thread ) (it.next()); 220 t.interrupt(); 221 } } } 225 242 public List drain() { 243 boolean wasInterrupted = false; 244 Vector tasks = new Vector(); 245 for (; ;) { 246 try { 247 Object x = handOff.get(0); 248 if (x == null) 249 break; 250 else 251 tasks.addElement(x); 252 } catch (InterruptedException ex) { 253 wasInterrupted = true; } } if (wasInterrupted) Thread.currentThread().interrupt(); 257 return tasks; 258 } 260 261 267 public synchronized long getMaxIdleTime() { 268 return maxIdleTime; 269 } 271 277 public synchronized void setMaxIdleTime(long msecs) { 278 maxIdleTime = msecs; 279 } 281 284 protected void workerDone(Worker w) { 285 synchronized (poolLock) { 286 --poolSize; 287 threads.remove(w); 288 } } 291 294 protected Runnable getTask() throws InterruptedException { 295 long waitTime = getMaxIdleTime(); 296 if (waitTime >= 0) { 297 return (Runnable ) (handOff.get(waitTime)); 298 } else { 299 return (Runnable ) (handOff.get()); 300 } } 303 306 protected class Worker implements Runnable { 307 protected Runnable firstTask; 308 309 Worker(Runnable firstTask) { 310 this.firstTask = firstTask; 311 } 313 public void run() { 314 try { 315 Runnable task = firstTask; 316 firstTask = null; 317 if (task != null) { 318 task.run(); 319 } while (getPoolSize() <= getMaximumPoolSize()) { 322 task = getTask(); 323 if (task != null) { 324 task.run(); 325 } else { 326 break; 327 } } } catch (InterruptedException e) { 330 } finally { 333 workerDone(this); 334 } } } 338 339 347 protected interface BlockedExecutionStrategy { 348 353 public boolean blockedAction(Runnable task); 354 } 356 359 protected class RunWhenBlocked implements BlockedExecutionStrategy { 360 public boolean blockedAction(Runnable task) { 361 task.run(); 362 return true; 363 } } 366 369 protected class WaitWhenBlocked implements BlockedExecutionStrategy { 370 public boolean blockedAction(Runnable task) { 371 try { 372 handOff.put(task); 373 } catch (InterruptedException ex) { 374 Thread.currentThread().interrupt(); } return true; 377 } } 380 383 protected class DiscardWhenBlocked implements BlockedExecutionStrategy { 384 public boolean blockedAction(Runnable task) { 385 return true; 386 } } 389 392 protected BlockedExecutionStrategy blockedExecutionStrategy; 393 394 397 protected synchronized BlockedExecutionStrategy getBlockedExecutionStrategy() { 398 return blockedExecutionStrategy; 399 } 401 406 public synchronized void runWhenBlocked() { 407 blockedExecutionStrategy = new RunWhenBlocked(); 408 } 410 414 public synchronized void WhenBlocked() { 415 blockedExecutionStrategy = new WaitWhenBlocked(); 416 } 418 422 public synchronized void discardWhenBlocked() { 423 blockedExecutionStrategy = new DiscardWhenBlocked(); 424 } 426 431 public void execute(Runnable task) throws InterruptedException { 432 log.debug("execute method called"); 433 while (true) { 434 synchronized (poolLock) { 435 if (getPoolSize() < getNormalPoolSize()) { 437 addThread(task); 438 return; 439 } 441 if (handOff.put(task, 0)) { 443 return; 444 } 446 if (getPoolSize() < getMaximumPoolSize()) { 449 addThread(task); 450 return; 451 } } 454 if (getBlockedExecutionStrategy().blockedAction(task)) { 456 return; 457 } } } 461 } 462 | Popular Tags |