1 16 17 package org.apache.jetspeed.services.threadpool; 18 19 import java.util.*; 21 import javax.servlet.ServletConfig ; 22 23 import org.apache.turbine.services.TurbineBaseService; 25 26 import org.apache.jetspeed.services.logging.JetspeedLogFactoryService; 28 import org.apache.jetspeed.services.logging.JetspeedLogger; 29 30 52 public class JetspeedThreadPoolService 53 extends TurbineBaseService 54 implements ThreadPoolService 55 { 56 59 protected static final JetspeedLogger logger = JetspeedLogFactoryService.getLogger(JetspeedThreadPoolService.class.getName()); 60 61 64 private int initThreads = 50; 65 66 69 private int maxThreads = 100; 70 71 74 private int minSpareThreads = 15; 75 76 79 public static final int DEFAULT_THREAD_PRIORITY = Thread.MIN_PRIORITY; 80 81 84 private Vector availableThreads = new Vector(); 85 86 87 90 private ThreadGroup tg = new ThreadGroup ("JetspeedThreadPoolService"); 91 92 95 private Queue queue = new Queue(); 96 97 100 private int count = 0; 101 102 103 108 public JetspeedThreadPoolService() 109 throws Exception 110 { 111 } 112 113 114 117 public void init( ) 118 { 119 while( !getInit() ) { 120 try { 121 Thread.sleep(500); 122 } catch (InterruptedException ie ) { 123 logger.info("ThreadPool service: Waiting for init()..." ); 124 } 125 } 126 } 127 128 133 public synchronized void init( ServletConfig config ) 134 { 135 if( getInit() ) { 136 return; 138 } 139 140 try 141 { 142 logger.info ( "JetspeedThreadPoolService early init()....starting!"); 143 initThreadpool(config); 144 setInit(true); 145 logger.info ( "JetspeedThreadPoolService early init()....finished!"); 146 } 147 catch (Exception e) 148 { 149 logger.error ( "Cannot initialize JetspeedThreadPoolService!", e ); 150 } 151 152 } 154 155 161 public void process( Runnable runnable ) { 162 163 process( runnable, Thread.MIN_PRIORITY ); 164 165 } 166 167 174 public void process( Runnable runnable, int priority ) { 175 176 RunnableThread thread = this.getAvailableThread(); 177 178 if ( thread == null ) { 179 180 this.getQueue().add( runnable ); 181 182 } else { 183 184 try { 185 synchronized ( thread ) { 186 int defaultPriority = thread.getPriority(); 188 if( defaultPriority != priority ) { 189 thread.setPriority( priority ); 192 } 193 thread.setRunnable( runnable ); 194 thread.notify(); 195 } 196 } catch ( Throwable t ) { 197 logger.error("Throwable", t); 198 } 199 200 } 201 202 203 } 204 205 210 public int getThreadCount() { 211 return this.tg.activeCount(); 212 } 213 214 219 public int getAvailableThreadCount() { 220 return this.availableThreads.size(); 221 } 222 223 228 public int getQueueLength() { 229 return this.getQueue().size(); 230 } 231 232 238 public int getThreadProcessedCount() { 239 return this.count; 240 } 241 242 247 Queue getQueue() { 248 return this.queue; 249 } 250 251 256 void release( RunnableThread thread ) { 257 258 synchronized ( this.availableThreads ) { 259 260 this.availableThreads.addElement( thread ); 261 262 ++this.count; 263 264 269 synchronized( this.getQueue() ) { 270 271 if ( this.getQueue().size() > 0 ) { 274 275 Runnable r = this.getQueue().get(); 276 277 if ( r != null ) { 278 this.process( r ); 279 } else { 280 logger.info( "JetspeedThreadPoolService: no Runnable found." ); 281 } 282 283 } 284 285 } 286 287 } 288 289 } 290 291 296 private void initThreadpool( ServletConfig config ) 297 { 298 Properties props = getProperties(); 299 300 try { 301 302 this.initThreads = Integer.parseInt( props.getProperty( "init.count" ) ); 303 this.maxThreads = Integer.parseInt( props.getProperty( "max.count" ) ); 304 this.minSpareThreads = Integer.parseInt( props.getProperty( "minspare.count" ) ); 305 306 } catch ( NumberFormatException e ) { 307 logger.error("Invalid number format in properties", e); 308 } 309 310 createThreads( this.initThreads ); 312 313 } 314 315 320 private synchronized void createThreads( int count ) { 321 322 if ( this.getThreadCount() < this.maxThreads && 326 this.getThreadCount() + count > this.maxThreads ) { 327 328 count = this.maxThreads - this.getThreadCount(); 329 330 } else if ( this.getThreadCount() >= this.maxThreads ) { 331 332 return; 333 } 334 335 logger.info( "JetspeedThreadPoolService: creating " + 336 count + 337 " more thread(s) for a total of: " + 338 ( this.getThreadCount() + count ) ); 339 340 for (int i = 0; i < count; ++i ) { 341 342 343 RunnableThread thread = new RunnableThread( this.tg); 345 thread.setPriority( DEFAULT_THREAD_PRIORITY ); 346 347 thread.start(); 350 } 351 352 } 353 354 360 private RunnableThread getAvailableThread() { 361 362 363 synchronized( this.availableThreads ) { 364 365 368 if ( this.getAvailableThreadCount() < this.minSpareThreads ) { 369 this.createThreads( this.minSpareThreads ); 370 } 371 372 if ( this.getAvailableThreadCount() == 0 ) { 374 return null; 375 } 376 377 RunnableThread thread = null; 378 379 380 381 int id = this.availableThreads.size() - 1; 383 384 385 386 thread = (RunnableThread)this.availableThreads.elementAt( id ); 387 this.availableThreads.removeElementAt( id ); 388 389 return thread; 390 } 391 392 393 } 394 395 } 396 397 406 class Queue { 407 408 412 private Vector queue = new Vector(); 413 414 419 public synchronized void add( Runnable runnable ) { 420 queue.insertElementAt( runnable, 0 ); 421 } 422 423 429 public synchronized Runnable get() { 430 431 if ( this.queue.size() == 0 ) { 432 JetspeedThreadPoolService.logger.info( "JetspeedThreadPoolService->Queue: No more Runnables left in queue. Returning null" ); 433 return null; 434 } 435 436 int id = queue.size() - 1; 437 Runnable runnable = (Runnable )queue.elementAt( id ); 438 this.queue.removeElementAt( id ); 439 440 return runnable; 441 } 442 443 448 public int size() { 449 return this.queue.size(); 450 } 451 452 } 453 | Popular Tags |