1 package org.mortbay.util; 16 17 import java.io.Serializable ; 18 19 import org.apache.commons.logging.Log; 20 import org.mortbay.log.LogFactory; 21 22 38 public class ThreadPool implements LifeCycle,Serializable 39 { 40 static Log log=LogFactory.getLog(ThreadPool.class); 41 static private int __pool=0; 42 public static final String __DAEMON="org.mortbay.util.ThreadPool.daemon"; 43 public static final String __PRIORITY="org.mortbay.util.ThreadPool.priority"; 44 45 46 private Pool _pool; 47 private Object _join=""; 48 private transient boolean _started; 49 50 51 54 public ThreadPool() 55 { 56 String name=this.getClass().getName(); 57 int ld = name.lastIndexOf('.'); 58 if (ld>=0) 59 name=name.substring(ld+1); 60 synchronized(ThreadPool.class) 61 { 62 name+=__pool++; 63 } 64 65 _pool=new Pool(); 66 _pool.setPoolClass(ThreadPool.PoolThread.class); 67 setName(name); 68 } 69 70 71 74 public String getName() 75 { 76 return _pool.getPoolName(); 77 } 78 79 80 89 public void setName(String name) 90 { 91 synchronized(Pool.class) 92 { 93 if(isStarted()) 94 { 95 if((name==null&&_pool.getPoolName()!=null)||(name!=null&&!name.equals(_pool.getPoolName()))) 96 throw new IllegalStateException ("started"); 97 return; 98 } 99 100 if(name==null) 101 { 102 if(_pool.getPoolName()!=null) 103 { 104 _pool=new Pool(); 105 _pool.setPoolName(getName()); 106 } 107 } 108 else if (!name.equals(getName())) 109 { 110 Pool pool=Pool.getPool(name); 111 if(pool==null) 112 _pool.setPoolName(name); 113 else 114 _pool=pool; 115 } 116 } 117 } 118 119 120 123 public String getPoolName() 124 { 125 return getName(); 126 } 127 128 129 132 public void setPoolName(String name) 133 { 134 setName(name); 135 } 136 137 138 141 public boolean isDaemon() 142 { 143 return _pool.getAttribute(__DAEMON)!=null; 144 } 145 146 147 150 public void setDaemon(boolean daemon) 151 { 152 _pool.setAttribute(__DAEMON,daemon?"true":null); 153 } 154 155 156 161 public boolean isStarted() 162 { 163 return _started; 164 } 165 166 167 173 public int getThreads() 174 { 175 return _pool.size(); 176 } 177 178 179 185 public int getIdleThreads() 186 { 187 return _pool.available(); 188 } 189 190 191 197 public int getMinThreads() 198 { 199 return _pool.getMinSize(); 200 } 201 202 203 209 public void setMinThreads(int minThreads) 210 { 211 _pool.setMinSize(minThreads); 212 } 213 214 215 221 public int getMaxThreads() 222 { 223 return _pool.getMaxSize(); 224 } 225 226 227 233 public void setMaxThreads(int maxThreads) 234 { 235 _pool.setMaxSize(maxThreads); 236 } 237 238 239 245 public int getMaxIdleTimeMs() 246 { 247 return _pool.getMaxIdleTimeMs(); 248 } 249 250 251 258 public void setMaxIdleTimeMs(int maxIdleTimeMs) 259 { 260 _pool.setMaxIdleTimeMs(maxIdleTimeMs); 261 } 262 263 264 269 public int getThreadsPriority() 270 { 271 int priority=Thread.NORM_PRIORITY; 272 Object o=_pool.getAttribute(__PRIORITY); 273 if(o!=null) 274 { 275 priority=((Integer )o).intValue(); 276 } 277 return priority; 278 } 279 280 281 286 public void setThreadsPriority(int priority) 287 { 288 _pool.setAttribute(__PRIORITY,new Integer (priority)); 289 } 290 291 292 297 public void setMaxStopTimeMs(int ms) 298 { 299 log.warn("setMaxStopTimeMs is deprecated. No longer required."); 300 } 301 302 303 306 public void start() throws Exception 307 { 308 _started=true; 309 _pool.start(); 310 } 311 312 313 319 public void stop() throws InterruptedException 320 { 321 _started=false; 322 _pool.stop(); 323 synchronized(_join) 324 { 325 _join.notifyAll(); 326 } 327 } 328 329 330 public void join() 331 { 332 while(isStarted()&&_pool!=null) 333 { 334 synchronized(_join) 335 { 336 try 337 { 338 if(isStarted()&&_pool!=null) 339 _join.wait(30000); 340 } 341 catch(Exception e) 342 { 343 LogSupport.ignore(log,e); 344 } 345 } 346 } 347 } 348 349 350 public void shrink() throws InterruptedException 351 { 352 _pool.shrink(); 353 } 354 355 356 362 public void run(Object job) throws InterruptedException 363 { 364 if(job==null) 365 return; 366 try 367 { 368 PoolThread thread=(PoolThread)_pool.get(getMaxIdleTimeMs()); 369 if(thread!=null) 370 thread.run(this,job); 371 else 372 { 373 log.warn("No thread for "+job); 374 stopJob(null,job); 375 } 376 } 377 catch(InterruptedException e) 378 { 379 throw e; 380 } 381 catch(Exception e) 382 { 383 log.warn(LogSupport.EXCEPTION,e); 384 } 385 } 386 387 388 396 protected void handle(Object job) throws InterruptedException 397 { 398 if(job!=null&&job instanceof Runnable ) 399 ((Runnable )job).run(); 400 else 401 log.warn("Invalid job: "+job); 402 } 403 404 405 413 protected void stopJob(Thread thread,Object job) 414 {} 415 416 417 418 419 423 public static class PoolThread extends Thread implements Pool.PondLife 424 { 425 Pool _pool; 426 ThreadPool _jobPool; 427 Object _job; 428 ThreadPool _runPool; 429 Object _run; 430 int _id; 431 String _name; 432 433 434 public void enterPool(Pool pool,int id) 435 { 436 synchronized(this) 437 { 438 _pool=pool; 439 _id=id; 440 _name=_pool.getPoolName()+"-"+id; 441 this.setName(_name); 442 this.setDaemon(pool.getAttribute(__DAEMON)!=null); 443 Object o=pool.getAttribute(__PRIORITY); 444 if(o!=null) 445 { 446 this.setPriority(((Integer )o).intValue()); 447 } 448 this.start(); 449 } 450 } 451 452 453 public int getID() 454 { 455 return _id; 456 } 457 458 459 public void poolClosing() 460 { 461 synchronized(this) 462 { 463 _pool=null; 464 if(_run==null) 465 notify(); 466 else 467 interrupt(); 468 } 469 } 470 471 472 public void leavePool() 473 { 474 synchronized(this) 475 { 476 _pool=null; 477 if(_jobPool==null&&_runPool==null) 478 notify(); 479 if(_job!=null&&_jobPool!=null) 480 { 481 _jobPool.stopJob(this,_job); 482 _job=null; 483 _jobPool=null; 484 } 485 486 if(_run!=null&&_runPool!=null) 487 { 488 _runPool.stopJob(this,_run); 489 _run=null; 490 _runPool=null; 491 } 492 } 493 } 494 495 496 public void run(ThreadPool pool,Object job) 497 { 498 synchronized(this) 499 { 500 _jobPool=pool; 501 _job=job; 502 notify(); 503 } 504 } 505 506 507 510 public void run() 511 { 512 Object run=null; 513 ThreadPool runPool=null; 514 while(_pool!=null&&_pool.isStarted()) 515 { 516 try 517 { 518 synchronized(this) 519 { 520 if(run==null&&_pool!=null&&_pool.isStarted()&&_job==null) 522 wait(_pool.getMaxIdleTimeMs()); 523 if(_job!=null) 524 { 525 run=_run=_job; 526 _job=null; 527 runPool=_runPool=_jobPool; 528 _jobPool=null; 529 } 530 } 531 532 if(run!=null && runPool!=null) 534 runPool.handle(run); 535 else if (run==null && _pool!=null) 536 _pool.shrink(); 537 } 538 catch(InterruptedException e) 539 { 540 LogSupport.ignore(log,e); 541 } 542 finally 543 { 544 synchronized(this) 545 { 546 boolean got=run!=null; 547 run=_run=null; 548 runPool=_runPool=null; 549 try 550 { 551 if(got&&_pool!=null) 552 _pool.put(this); 553 } 554 catch(InterruptedException e) 555 { 556 LogSupport.ignore(log,e); 557 } 558 } 559 } 560 } 561 } 562 563 public String toString() 564 { 565 return _name; 566 } 567 } 568 } 569 | Popular Tags |