1 17 18 package org.apache.tomcat.util.threads; 19 20 import java.util.*; 21 22 import org.apache.commons.logging.Log; 23 import org.apache.commons.logging.LogFactory; 24 import org.apache.tomcat.util.res.StringManager; 25 26 34 public class ThreadPool { 35 36 private static Log log = LogFactory.getLog(ThreadPool.class); 37 38 private static StringManager sm = 39 StringManager.getManager("org.apache.tomcat.util.threads.res"); 40 41 private static boolean logfull=true; 42 43 46 public static final int MAX_THREADS = 200; 47 public static final int MAX_THREADS_MIN = 10; 48 public static final int MAX_SPARE_THREADS = 50; 49 public static final int MIN_SPARE_THREADS = 4; 50 public static final int WORK_WAIT_TIMEOUT = 60*1000; 51 52 55 protected ControlRunnable[] pool = null; 56 57 60 protected MonitorRunnable monitor; 61 62 63 66 protected int maxThreads; 67 68 71 protected int minSpareThreads; 72 73 76 protected int maxSpareThreads; 77 78 81 protected int currentThreadCount; 82 83 86 protected int currentThreadsBusy; 87 88 91 protected boolean stopThePool; 92 93 94 protected boolean isDaemon=true; 95 96 99 protected Hashtable threads=new Hashtable(); 100 101 protected Vector listeners=new Vector(); 102 103 105 protected String name = "TP"; 106 107 110 protected int sequence = 1; 111 112 115 protected int threadPriority = Thread.NORM_PRIORITY; 116 117 118 121 public ThreadPool() { 122 maxThreads = MAX_THREADS; 123 maxSpareThreads = MAX_SPARE_THREADS; 124 minSpareThreads = MIN_SPARE_THREADS; 125 currentThreadCount = 0; 126 currentThreadsBusy = 0; 127 stopThePool = false; 128 } 129 130 131 137 public static ThreadPool createThreadPool(boolean jmx) { 138 return new ThreadPool(); 139 } 140 141 public synchronized void start() { 142 stopThePool=false; 143 currentThreadCount = 0; 144 currentThreadsBusy = 0; 145 146 adjustLimits(); 147 148 pool = new ControlRunnable[maxThreads]; 149 150 openThreads(minSpareThreads); 151 if (maxSpareThreads < maxThreads) { 152 monitor = new MonitorRunnable(this); 153 } 154 } 155 156 public MonitorRunnable getMonitor() { 157 return monitor; 158 } 159 160 169 public synchronized void setThreadPriority(int threadPriority) { 170 if(log.isDebugEnabled()) 171 log.debug(getClass().getName() + 172 ": setPriority(" + threadPriority + "): here."); 173 174 if (threadPriority < Thread.MIN_PRIORITY) { 175 throw new IllegalArgumentException ("new priority < MIN_PRIORITY"); 176 } else if (threadPriority > Thread.MAX_PRIORITY) { 177 throw new IllegalArgumentException ("new priority > MAX_PRIORITY"); 178 } 179 180 this.threadPriority = threadPriority; 182 183 Enumeration currentThreads = getThreads(); 184 Thread t = null; 185 while(currentThreads.hasMoreElements()) { 186 t = (Thread ) currentThreads.nextElement(); 187 t.setPriority(threadPriority); 188 } 189 } 190 191 197 public int getThreadPriority() { 198 return threadPriority; 199 } 200 201 202 public void setMaxThreads(int maxThreads) { 203 this.maxThreads = maxThreads; 204 } 205 206 public int getMaxThreads() { 207 return maxThreads; 208 } 209 210 public void setMinSpareThreads(int minSpareThreads) { 211 this.minSpareThreads = minSpareThreads; 212 } 213 214 public int getMinSpareThreads() { 215 return minSpareThreads; 216 } 217 218 public void setMaxSpareThreads(int maxSpareThreads) { 219 this.maxSpareThreads = maxSpareThreads; 220 } 221 222 public int getMaxSpareThreads() { 223 return maxSpareThreads; 224 } 225 226 public int getCurrentThreadCount() { 227 return currentThreadCount; 228 } 229 230 public int getCurrentThreadsBusy() { 231 return currentThreadsBusy; 232 } 233 234 public boolean isDaemon() { 235 return isDaemon; 236 } 237 238 public static int getDebug() { 239 return 0; 240 } 241 242 246 public void setDaemon( boolean b ) { 247 isDaemon=b; 248 } 249 250 public boolean getDaemon() { 251 return isDaemon; 252 } 253 254 public void setName(String name) { 255 this.name = name; 256 } 257 258 public String getName() { 259 return name; 260 } 261 262 public int getSequence() { 263 return sequence++; 264 } 265 266 public void addThread( Thread t, ControlRunnable cr ) { 267 threads.put( t, cr ); 268 for( int i=0; i<listeners.size(); i++ ) { 269 ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i); 270 tpl.threadStart(this, t); 271 } 272 } 273 274 public void removeThread( Thread t ) { 275 threads.remove(t); 276 for( int i=0; i<listeners.size(); i++ ) { 277 ThreadPoolListener tpl=(ThreadPoolListener)listeners.elementAt(i); 278 tpl.threadEnd(this, t); 279 } 280 } 281 282 public void addThreadPoolListener( ThreadPoolListener tpl ) { 283 listeners.addElement( tpl ); 284 } 285 286 public Enumeration getThreads(){ 287 return threads.keys(); 288 } 289 290 public void run(Runnable r) { 291 ControlRunnable c = findControlRunnable(); 292 c.runIt(r); 293 } 294 295 302 305 public void runIt(ThreadPoolRunnable r) { 306 if(null == r) { 307 throw new NullPointerException (); 308 } 309 310 ControlRunnable c = findControlRunnable(); 311 c.runIt(r); 312 } 313 314 private ControlRunnable findControlRunnable() { 315 ControlRunnable c=null; 316 317 if ( stopThePool ) { 318 throw new IllegalStateException (); 319 } 320 321 synchronized(this) { 323 324 while (currentThreadsBusy == currentThreadCount) { 325 if (currentThreadCount < maxThreads) { 327 int toOpen = currentThreadCount + minSpareThreads; 330 openThreads(toOpen); 331 } else { 332 logFull(log, currentThreadCount, maxThreads); 333 try { 335 this.wait(); 336 } 337 catch(InterruptedException e) { 343 log.error("Unexpected exception", e); 344 } 345 if( log.isDebugEnabled() ) { 346 log.debug("Finished waiting: CTC="+currentThreadCount + 347 ", CTB=" + currentThreadsBusy); 348 } 349 if( stopThePool) { 351 break; 352 } 353 } 354 } 355 if(0 == currentThreadCount || stopThePool) { 357 throw new IllegalStateException (); 358 } 359 360 int pos = currentThreadCount - currentThreadsBusy - 1; 362 c = pool[pos]; 363 pool[pos] = null; 364 currentThreadsBusy++; 365 366 } 367 return c; 368 } 369 370 private static void logFull(Log loghelper, int currentThreadCount, 371 int maxThreads) { 372 if( logfull ) { 373 log.error(sm.getString("threadpool.busy", 374 new Integer (currentThreadCount), 375 new Integer (maxThreads))); 376 logfull=false; 377 } else if( log.isDebugEnabled() ) { 378 log.debug("All threads are busy " + currentThreadCount + " " + 379 maxThreads ); 380 } 381 } 382 383 386 public synchronized void shutdown() { 387 if(!stopThePool) { 388 stopThePool = true; 389 if (monitor != null) { 390 monitor.terminate(); 391 monitor = null; 392 } 393 for(int i = 0; i < currentThreadCount - currentThreadsBusy; i++) { 394 try { 395 pool[i].terminate(); 396 } catch(Throwable t) { 397 401 log.error("Ignored exception while shutting down thread pool", t); 402 } 403 } 404 currentThreadsBusy = currentThreadCount = 0; 405 pool = null; 406 notifyAll(); 407 } 408 } 409 410 413 protected synchronized void checkSpareControllers() { 414 415 if(stopThePool) { 416 return; 417 } 418 419 if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) { 420 int toFree = currentThreadCount - 421 currentThreadsBusy - 422 maxSpareThreads; 423 424 for(int i = 0 ; i < toFree ; i++) { 425 ControlRunnable c = pool[currentThreadCount - currentThreadsBusy - 1]; 426 c.terminate(); 427 pool[currentThreadCount - currentThreadsBusy - 1] = null; 428 currentThreadCount --; 429 } 430 431 } 432 433 } 434 435 439 protected synchronized void returnController(ControlRunnable c) { 440 441 if(0 == currentThreadCount || stopThePool) { 442 c.terminate(); 443 return; 444 } 445 446 currentThreadsBusy--; 448 449 pool[currentThreadCount - currentThreadsBusy - 1] = c; 450 notify(); 451 } 452 453 459 protected synchronized void notifyThreadEnd(ControlRunnable c) { 460 currentThreadsBusy--; 461 currentThreadCount --; 462 notify(); 463 } 464 465 466 471 protected void adjustLimits() { 472 if(maxThreads <= 0) { 473 maxThreads = MAX_THREADS; 474 } else if (maxThreads < MAX_THREADS_MIN) { 475 log.warn(sm.getString("threadpool.max_threads_too_low", 476 new Integer (maxThreads), 477 new Integer (MAX_THREADS_MIN))); 478 maxThreads = MAX_THREADS_MIN; 479 } 480 481 if(maxSpareThreads >= maxThreads) { 482 maxSpareThreads = maxThreads; 483 } 484 485 if(maxSpareThreads <= 0) { 486 if(1 == maxThreads) { 487 maxSpareThreads = 1; 488 } else { 489 maxSpareThreads = maxThreads/2; 490 } 491 } 492 493 if(minSpareThreads > maxSpareThreads) { 494 minSpareThreads = maxSpareThreads; 495 } 496 497 if(minSpareThreads <= 0) { 498 if(1 == maxSpareThreads) { 499 minSpareThreads = 1; 500 } else { 501 minSpareThreads = maxSpareThreads/2; 502 } 503 } 504 } 505 506 510 protected void openThreads(int toOpen) { 511 512 if(toOpen > maxThreads) { 513 toOpen = maxThreads; 514 } 515 516 for(int i = currentThreadCount ; i < toOpen ; i++) { 517 pool[i - currentThreadsBusy] = new ControlRunnable(this); 518 } 519 520 currentThreadCount = toOpen; 521 } 522 523 524 void log( String s ) { 525 log.info(s); 526 } 528 529 532 public static class MonitorRunnable implements Runnable { 533 ThreadPool p; 534 Thread t; 535 int interval=WORK_WAIT_TIMEOUT; 536 boolean shouldTerminate; 537 538 MonitorRunnable(ThreadPool p) { 539 this.p=p; 540 this.start(); 541 } 542 543 public void start() { 544 shouldTerminate = false; 545 t = new Thread (this); 546 t.setDaemon(p.getDaemon() ); 547 t.setName(p.getName() + "-Monitor"); 548 t.start(); 549 } 550 551 public void setInterval(int i ) { 552 this.interval=i; 553 } 554 555 public void run() { 556 while(true) { 557 try { 558 559 synchronized(this) { 561 this.wait(interval); 562 } 563 564 if(shouldTerminate) { 567 break; 568 } 569 570 p.checkSpareControllers(); 572 573 } catch(Throwable t) { 574 ThreadPool.log.error("Unexpected exception", t); 575 } 576 } 577 } 578 579 public void stop() { 580 this.terminate(); 581 } 582 583 585 public synchronized void terminate() { 586 shouldTerminate = true; 587 this.notify(); 588 } 589 } 590 591 595 public static class ControlRunnable implements Runnable { 596 599 private ThreadPool p; 600 601 604 private ThreadWithAttributes t; 605 606 609 610 private ThreadPoolRunnable toRun; 611 private Runnable toRunRunnable; 612 613 616 private boolean shouldTerminate; 617 618 621 private boolean shouldRun; 622 623 629 private boolean noThData; 630 631 634 ControlRunnable(ThreadPool p) { 635 toRun = null; 636 shouldTerminate = false; 637 shouldRun = false; 638 this.p = p; 639 t = new ThreadWithAttributes(p, this); 640 t.setDaemon(true); 641 t.setName(p.getName() + "-Processor" + p.getSequence()); 642 t.setPriority(p.getThreadPriority()); 643 p.addThread( t, this ); 644 noThData=true; 645 t.start(); 646 } 647 648 public void run() { 649 boolean _shouldRun = false; 650 boolean _shouldTerminate = false; 651 ThreadPoolRunnable _toRun = null; 652 try { 653 while (true) { 654 try { 655 656 synchronized (this) { 657 while (!shouldRun && !shouldTerminate) { 658 this.wait(); 659 } 660 _shouldRun = shouldRun; 661 _shouldTerminate = shouldTerminate; 662 _toRun = toRun; 663 } 664 665 if (_shouldTerminate) { 666 if (ThreadPool.log.isDebugEnabled()) 667 ThreadPool.log.debug("Terminate"); 668 break; 669 } 670 671 672 try { 673 if (noThData) { 674 if (_toRun != null) { 675 Object thData[] = _toRun.getInitData(); 676 t.setThreadData(p, thData); 677 if (ThreadPool.log.isDebugEnabled()) 678 ThreadPool.log.debug( 679 "Getting new thread data"); 680 } 681 noThData = false; 682 } 683 684 if (_shouldRun) { 685 if (_toRun != null) { 686 _toRun.runIt(t.getThreadData(p)); 687 } else if (toRunRunnable != null) { 688 toRunRunnable.run(); 689 } else { 690 if (ThreadPool.log.isDebugEnabled()) 691 ThreadPool.log.debug("No toRun ???"); 692 } 693 } 694 } catch (Throwable t) { 695 ThreadPool.log.error(sm.getString 696 ("threadpool.thread_error", t, toRun.toString())); 697 704 _shouldTerminate = true; 705 _shouldRun = false; 706 p.notifyThreadEnd(this); 707 } finally { 708 if (_shouldRun) { 709 shouldRun = false; 710 713 p.returnController(this); 714 } 715 } 716 717 721 if (_shouldTerminate) { 722 break; 723 } 724 } catch (InterruptedException ie) { 725 ThreadPool.log.error("Unexpected exception", ie); 727 } 728 } 729 } finally { 730 p.removeThread(Thread.currentThread()); 731 } 732 } 733 737 public synchronized void runIt(Runnable toRun) { 738 this.toRunRunnable = toRun; 739 shouldRun = true; 744 this.notify(); 745 } 746 747 751 public synchronized void runIt(ThreadPoolRunnable toRun) { 752 this.toRun = toRun; 753 shouldRun = true; 758 this.notify(); 759 } 760 761 public void stop() { 762 this.terminate(); 763 } 764 765 public void kill() { 766 t.stop(); 767 } 768 769 public synchronized void terminate() { 770 shouldTerminate = true; 771 this.notify(); 772 } 773 } 774 775 781 public String threadStatusString() { 782 StringBuffer sb=new StringBuffer (); 783 Iterator it=threads.keySet().iterator(); 784 sb.append("<ul>"); 785 while( it.hasNext()) { 786 sb.append("<li>"); 787 ThreadWithAttributes twa=(ThreadWithAttributes) 788 it.next(); 789 sb.append(twa.getCurrentStage(this) ).append(" "); 790 sb.append( twa.getParam(this)); 791 sb.append( "</li>\n"); 792 } 793 sb.append("</ul>"); 794 return sb.toString(); 795 } 796 797 804 public String [] getThreadStatus() { 805 String status[]=new String [ threads.size()]; 806 Iterator it=threads.keySet().iterator(); 807 for( int i=0; ( i<status.length && it.hasNext()); i++ ) { 808 ThreadWithAttributes twa=(ThreadWithAttributes) 809 it.next(); 810 status[i]=twa.getCurrentStage(this); 811 } 812 return status; 813 } 814 815 820 public String [] getThreadParam() { 821 String status[]=new String [ threads.size()]; 822 Iterator it=threads.keySet().iterator(); 823 for( int i=0; ( i<status.length && it.hasNext()); i++ ) { 824 ThreadWithAttributes twa=(ThreadWithAttributes) 825 it.next(); 826 Object o=twa.getParam(this); 827 status[i]=(o==null)? null : o.toString(); 828 } 829 return status; 830 } 831 832 835 public static interface ThreadPoolListener { 836 public void threadStart( ThreadPool tp, Thread t); 837 838 public void threadEnd( ThreadPool tp, Thread t); 839 } 840 } 841 | Popular Tags |