1 22 package org.jboss.util.threadpool; 23 24 import java.util.Collections ; 25 import java.util.Map ; 26 27 import org.jboss.util.collection.WeakValueHashMap; 28 import org.jboss.logging.Logger; 29 30 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; 31 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 32 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 33 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 34 import EDU.oswego.cs.dl.util.concurrent.Heap; 35 36 43 public class BasicThreadPool implements ThreadPool, BasicThreadPoolMBean 44 { 45 47 48 private static final ThreadGroup JBOSS_THREAD_GROUP = new ThreadGroup ("JBoss Pooled Threads"); 49 50 51 private static final Map threadGroups = Collections.synchronizedMap(new WeakValueHashMap()); 52 53 54 private static final SynchronizedInt lastPoolNumber = new SynchronizedInt(0); 55 56 private static Logger log = Logger.getLogger(BasicThreadPool.class); 57 58 60 61 private String name; 62 63 64 private int poolNumber; 65 66 67 private BlockingMode blockingMode = BlockingMode.ABORT; 68 69 70 private MinPooledExecutor executor; 71 72 73 private BoundedLinkedQueue queue; 74 75 76 private ThreadGroup threadGroup; 77 78 79 private SynchronizedInt lastThreadNumber = new SynchronizedInt(0); 80 81 82 private SynchronizedBoolean stopped = new SynchronizedBoolean(false); 83 84 private Heap tasksWithTimeouts = new Heap(13); 85 86 private TimeoutMonitor timeoutTask; 87 88 private boolean trace; 89 90 92 94 97 public BasicThreadPool() 98 { 99 this("ThreadPool"); 100 } 101 102 108 public BasicThreadPool(String name) 109 { 110 this(name, JBOSS_THREAD_GROUP); 111 } 112 113 120 public BasicThreadPool(String name, ThreadGroup threadGroup) 121 { 122 trace = log.isTraceEnabled(); 123 ThreadFactory factory = new ThreadPoolThreadFactory(); 124 125 queue = new BoundedLinkedQueue(1024); 126 127 executor = new MinPooledExecutor(queue, 100); 128 executor.setMinimumPoolSize(4); 129 executor.setKeepAliveTime(60 * 1000); 130 executor.setThreadFactory(factory); 131 executor.abortWhenBlocked(); 132 133 poolNumber = lastPoolNumber.increment(); 134 setName(name); 135 this.threadGroup = threadGroup; 136 } 137 138 140 142 public void stop(boolean immediate) 143 { 144 log.debug("stop, immediate="+immediate); 145 stopped.set(true); 146 if (immediate) 147 executor.shutdownNow(); 148 else 149 executor.shutdownAfterProcessingCurrentlyQueuedTasks(); 150 } 151 152 public void waitForTasks() throws InterruptedException 153 { 154 executor.awaitTerminationAfterShutdown(); 155 } 156 public void waitForTasks(long maxWaitTime) throws InterruptedException 157 { 158 executor.awaitTerminationAfterShutdown(maxWaitTime); 159 } 160 161 public void runTaskWrapper(TaskWrapper wrapper) 162 { 163 if( trace ) 164 log.trace("runTaskWrapper, wrapper="+wrapper); 165 if (stopped.get()) 166 { 167 wrapper.rejectTask(new ThreadPoolStoppedException("Thread pool has been stopped")); 168 return; 169 } 170 171 wrapper.acceptTask(); 172 173 long completionTimeout = wrapper.getTaskCompletionTimeout(); 174 TimeoutInfo info = null; 175 if( completionTimeout > 0 ) 176 { 177 checkTimeoutMonitor(); 178 info = new TimeoutInfo(wrapper, completionTimeout); 180 tasksWithTimeouts.insert(info); 181 } 182 int waitType = wrapper.getTaskWaitType(); 183 switch (waitType) 184 { 185 case Task.WAIT_FOR_COMPLETE: 186 { 187 executeOnThread(wrapper); 188 break; 189 } 190 default: 191 { 192 execute(wrapper); 193 } 194 } 195 waitForTask(wrapper); 196 } 197 198 public void runTask(Task task) 199 { 200 BasicTaskWrapper wrapper = new BasicTaskWrapper(task); 201 runTaskWrapper(wrapper); 202 } 203 204 public void run(Runnable runnable) 205 { 206 run(runnable, 0, 0); 207 } 208 209 public void run(Runnable runnable, long startTimeout, long completeTimeout) 210 { 211 RunnableTaskWrapper wrapper = new RunnableTaskWrapper(runnable, startTimeout, completeTimeout); 212 runTaskWrapper(wrapper); 213 } 214 215 public ThreadGroup getThreadGroup() 216 { 217 return threadGroup; 218 } 219 220 222 public String getName() 223 { 224 return name; 225 } 226 227 public void setName(String name) 228 { 229 this.name = name; 230 } 231 232 public int getPoolNumber() 233 { 234 return poolNumber; 235 } 236 237 public String getThreadGroupName() 238 { 239 return threadGroup.getName(); 240 } 241 242 public void setThreadGroupName(String threadGroupName) 243 { 244 ThreadGroup group; 245 synchronized(threadGroups) 246 { 247 group = (ThreadGroup ) threadGroups.get(threadGroupName); 248 if (group == null) 249 { 250 group = new ThreadGroup (JBOSS_THREAD_GROUP, threadGroupName); 251 threadGroups.put(threadGroupName, group); 252 } 253 } 254 threadGroup = group; 255 } 256 257 public int getQueueSize() 258 { 259 return queue.size(); 260 } 261 262 public int getMaximumQueueSize() 263 { 264 return queue.capacity(); 265 } 266 267 public void setMaximumQueueSize(int size) 268 { 269 queue.setCapacity(size); 270 } 271 272 public int getPoolSize() 273 { 274 return executor.getPoolSize(); 275 } 276 277 public int getMinimumPoolSize() 278 { 279 return executor.getMinimumPoolSize(); 280 } 281 282 public void setMinimumPoolSize(int size) 283 { 284 synchronized (executor) 285 { 286 executor.setKeepAliveSize(size); 287 if (executor.getMaximumPoolSize() < size) 289 { 290 executor.setMinimumPoolSize(size); 291 executor.setMaximumPoolSize(size); 292 } 293 } 294 } 295 296 public int getMaximumPoolSize() 297 { 298 return executor.getMaximumPoolSize(); 299 } 300 301 public void setMaximumPoolSize(int size) 302 { 303 synchronized (executor) 304 { 305 executor.setMinimumPoolSize(size); 306 executor.setMaximumPoolSize(size); 307 if (executor.getKeepAliveSize() > size) 309 executor.setKeepAliveSize(size); 310 } 311 } 312 313 public long getKeepAliveTime() 314 { 315 return executor.getKeepAliveTime(); 316 } 317 318 public void setKeepAliveTime(long time) 319 { 320 executor.setKeepAliveTime(time); 321 } 322 323 public BlockingMode getBlockingMode() 324 { 325 return blockingMode; 326 } 327 328 public void setBlockingMode(BlockingMode mode) 329 { 330 blockingMode = mode; 331 332 if( blockingMode == BlockingMode.RUN ) 333 { 334 executor.runWhenBlocked(); 335 } 336 else if( blockingMode == BlockingMode.WAIT ) 337 { 338 executor.waitWhenBlocked(); 339 } 340 else if( blockingMode == BlockingMode.DISCARD ) 341 { 342 executor.discardWhenBlocked(); 343 } 344 else if( blockingMode == BlockingMode.DISCARD_OLDEST ) 345 { 346 executor.discardOldestWhenBlocked(); 347 } 348 else if( blockingMode == BlockingMode.ABORT ) 349 { 350 executor.abortWhenBlocked(); 351 } 352 else 353 { 354 throw new IllegalArgumentException ("Failed to recognize mode: "+mode); 355 } 356 } 357 358 362 public void setBlockingMode(String name) 363 { 364 blockingMode = BlockingMode.toBlockingMode(name); 365 if( blockingMode == null ) 366 blockingMode = BlockingMode.ABORT; 367 } 368 369 375 public void setBlockingModeString(String name) 376 { 377 blockingMode = BlockingMode.toBlockingMode(name); 378 if( blockingMode == null ) 379 blockingMode = BlockingMode.ABORT; 380 } 381 382 public ThreadPool getInstance() 383 { 384 return this; 385 } 386 387 public void stop() 388 { 389 stop(false); 390 } 391 392 394 public String toString() 395 { 396 return name + '(' + poolNumber + ')'; 397 } 398 399 401 403 408 protected void executeOnThread(TaskWrapper wrapper) 409 { 410 if( trace ) 411 log.trace("executeOnThread, wrapper="+wrapper); 412 wrapper.run(); 413 } 414 415 420 protected void execute(TaskWrapper wrapper) 421 { 422 if( trace ) 423 log.trace("execute, wrapper="+wrapper); 424 try 425 { 426 executor.execute(wrapper); 427 } 428 catch (Throwable t) 429 { 430 wrapper.rejectTask(new ThreadPoolFullException(t.toString())); 431 } 432 } 433 434 439 protected void waitForTask(TaskWrapper wrapper) 440 { 441 wrapper.waitForTask(); 442 } 443 444 447 protected synchronized void checkTimeoutMonitor() 448 { 449 if( timeoutTask == null ) 450 timeoutTask = new TimeoutMonitor(name, log); 451 } 452 protected TimeoutInfo getNextTimeout() 453 { 454 TimeoutInfo info = (TimeoutInfo) this.tasksWithTimeouts.extract(); 455 return info; 456 } 457 458 460 462 465 private class ThreadPoolThreadFactory implements ThreadFactory 466 { 467 public Thread newThread(Runnable runnable) 468 { 469 String threadName = BasicThreadPool.this.toString() + "-" + lastThreadNumber.increment(); 470 Thread thread = new Thread (threadGroup, runnable, threadName); 471 thread.setDaemon(true); 472 return thread; 473 } 474 } 475 476 478 private static class TimeoutInfo implements Comparable 479 { 480 long start; 481 long timeoutMS; 482 TaskWrapper wrapper; 483 boolean firstStop; 484 TimeoutInfo(TaskWrapper wrapper, long timeout) 485 { 486 this.start = System.currentTimeMillis(); 487 this.timeoutMS = start + timeout; 488 this.wrapper = wrapper; 489 } 490 public void setTimeout(long timeout) 491 { 492 this.start = System.currentTimeMillis(); 493 this.timeoutMS = start + timeout; 494 } 495 500 public int compareTo(Object o) 501 { 502 TimeoutInfo ti = (TimeoutInfo) o; 503 long to0 = timeoutMS; 504 long to1 = ti.timeoutMS; 505 int diff = (int) (to0 - to1); 506 return diff; 507 } 508 TaskWrapper getTaskWrapper() 509 { 510 return wrapper; 511 } 512 public long getTaskCompletionTimeout() 513 { 514 return wrapper.getTaskCompletionTimeout(); 515 } 516 520 public long getTaskCompletionTimeout(long now) 521 { 522 return timeoutMS - now; 523 } 524 528 public boolean stopTask() 529 { 530 wrapper.stopTask(); 531 boolean wasFirstStop = firstStop == false; 532 firstStop = true; 533 return wasFirstStop; 534 } 535 } 536 540 private class TimeoutMonitor implements Runnable 541 { 542 final Logger log; 543 TimeoutMonitor(String name, Logger log) 544 { 545 this.log = log; 546 Thread t = new Thread (this, name+" TimeoutMonitor"); 547 t.setDaemon(true); 548 t.start(); 549 } 550 564 public void run() 565 { 566 boolean isStopped = stopped.get(); 567 while( isStopped == false ) 568 { 569 boolean trace = log.isTraceEnabled(); 570 try 571 { 572 TimeoutInfo info = getNextTimeout(); 573 if( info != null ) 574 { 575 long now = System.currentTimeMillis(); 576 long timeToTimeout = info.getTaskCompletionTimeout(now); 577 if( timeToTimeout > 0 ) 578 { 579 if( trace ) 580 { 581 log.trace("Will check wrapper="+info.getTaskWrapper() 582 +" after "+timeToTimeout); 583 } 584 Thread.sleep(timeToTimeout); 585 } 586 TaskWrapper wrapper = info.getTaskWrapper(); 588 if( wrapper.isComplete() == false ) 589 { 590 if( trace ) 591 log.trace("Failed completion check for wrapper="+wrapper); 592 if( info.stopTask() == true ) 593 { 594 info.setTimeout(1000); 596 tasksWithTimeouts.insert(info); 597 if( trace ) 598 log.trace("Rescheduled completion check for wrapper="+wrapper); 599 } 600 } 601 } 602 else 603 { 604 Thread.sleep(1000); 605 } 606 } 607 catch(InterruptedException e) 608 { 609 log.debug("Timeout monitor has been interrupted", e); 610 } 611 catch(Throwable e) 612 { 613 log.debug("Timeout monitor saw unexpected error", e); 614 } 615 isStopped = stopped.get(); 616 } 617 } 618 } 619 } 620 | Popular Tags |