1 16 package org.apache.cocoon.components.thread; 17 18 import org.apache.avalon.framework.activity.Disposable; 19 import org.apache.avalon.framework.activity.Startable; 20 import org.apache.avalon.framework.configuration.Configurable; 21 import org.apache.avalon.framework.configuration.Configuration; 22 import org.apache.avalon.framework.configuration.ConfigurationException; 23 import org.apache.avalon.framework.logger.AbstractLogEnabled; 24 import org.apache.avalon.framework.logger.Logger; 25 import org.apache.avalon.framework.thread.ThreadSafe; 26 27 import java.util.HashMap ; 28 import java.util.Iterator ; 29 import java.util.Map ; 30 import java.util.SortedSet ; 31 import java.util.TreeSet ; 32 33 69 public class DefaultRunnableManager 70 extends AbstractLogEnabled 71 implements RunnableManager, Configurable, Disposable, Runnable , Startable, 72 ThreadSafe 73 { 74 76 77 public static final String DEFAULT_THREAD_FACTORY = 78 DefaultThreadFactory.class.getName( ); 79 80 81 public static final int DEFAULT_QUEUE_SIZE = -1; 82 83 84 public static final int DEFAULT_MAX_POOL_SIZE = 5; 85 86 87 public static final int DEFAULT_MIN_POOL_SIZE = 5; 88 89 90 public static final String DEFAULT_THREAD_PRIORITY = "NORM"; 91 92 93 public static final boolean DEFAULT_DAEMON_MODE = false; 94 95 96 public static final long DEFAULT_KEEP_ALIVE_TIME = 60000L; 97 98 99 public static final boolean DEFAULT_SHUTDOWN_GRACEFUL = false; 100 101 102 public static final int DEFAULT_SHUTDOWN_WAIT_TIME = -1; 103 104 105 public static final String DEFAULT_THREADPOOL_NAME = "default"; 106 107 109 113 protected SortedSet m_commandStack = new TreeSet ( ); 114 115 116 final Map m_pools = new HashMap ( ); 117 118 119 private Class m_defaultThreadFactoryClass; 120 121 122 private boolean m_keepRunning = false; 123 124 126 129 public void configure( final Configuration config ) 130 throws ConfigurationException 131 { 132 final String defaultThreadFactoryName = 133 config.getChild( "thread-factory" ).getValue( DEFAULT_THREAD_FACTORY ); 134 135 try 136 { 137 m_defaultThreadFactoryClass = 138 Thread.currentThread( ).getContextClassLoader( ).loadClass( defaultThreadFactoryName ); 139 } 140 catch( final Exception ex ) 141 { 142 throw new ConfigurationException( "Cannot create instance of default thread factory " + 143 defaultThreadFactoryName, ex ); 144 } 145 146 final Configuration [] threadpools = 147 config.getChild( "thread-pools" ).getChildren( "thread-pool" ); 148 149 for( int i = 0; i < threadpools.length; i++ ) 150 { 151 final DefaultThreadPool pool = configThreadPool( threadpools[ i ] ); 152 } 153 154 final ThreadPool defaultThreadPool = 156 (ThreadPool)m_pools.get( DEFAULT_THREADPOOL_NAME ); 157 158 if( null == defaultThreadPool ) 159 { 160 createPool( DEFAULT_THREADPOOL_NAME, DEFAULT_QUEUE_SIZE, 161 DEFAULT_MAX_POOL_SIZE, DEFAULT_MIN_POOL_SIZE, 162 getPriority( DEFAULT_THREAD_PRIORITY ), 163 DEFAULT_DAEMON_MODE, DEFAULT_KEEP_ALIVE_TIME, 164 DefaultThreadPool.POLICY_DEFAULT, 165 DEFAULT_SHUTDOWN_GRACEFUL, DEFAULT_SHUTDOWN_WAIT_TIME ); 166 } 167 } 168 169 191 public void createPool( final String name, 192 final int queueSize, 193 final int maxPoolSize, 194 final int minPoolSize, 195 final int priority, 196 final boolean isDaemon, 197 final long keepAliveTime, 198 final String blockPolicy, 199 final boolean shutdownGraceful, 200 final int shutdownWaitTime ) 201 { 202 if( null != m_pools.get( name ) ) 203 { 204 throw new IllegalArgumentException ( "ThreadPool \"" + name + 205 "\" already exists" ); 206 } 207 208 createPool( new DefaultThreadPool( ), name, queueSize, maxPoolSize, 209 minPoolSize, priority, isDaemon, keepAliveTime, 210 blockPolicy, shutdownGraceful, shutdownWaitTime ); 211 } 212 213 234 public ThreadPool createPool( final int queueSize, 235 final int maxPoolSize, 236 final int minPoolSize, 237 final int priority, 238 final boolean isDaemon, 239 final long keepAliveTime, 240 final String blockPolicy, 241 final boolean shutdownGraceful, 242 final int shutdownWaitTime ) 243 { 244 final DefaultThreadPool pool = new DefaultThreadPool( ); 245 final String name = "anon-" + pool.hashCode( ); 246 247 return createPool( pool, name, queueSize, maxPoolSize, minPoolSize, 248 priority, isDaemon, keepAliveTime, blockPolicy, 249 shutdownGraceful, shutdownWaitTime ); 250 } 251 252 255 public void dispose( ) 256 { 257 if( getLogger( ).isDebugEnabled( ) ) 258 { 259 getLogger( ).debug( "Disposing all thread pools" ); 260 } 261 262 for( final Iterator i = m_pools.keySet( ).iterator( ); i.hasNext( ); ) 263 { 264 final String poolName = (String )i.next( ); 265 final DefaultThreadPool pool = 266 (DefaultThreadPool)m_pools.get( poolName ); 267 268 if( getLogger( ).isDebugEnabled( ) ) 269 { 270 getLogger( ).debug( "Disposing thread pool " + 271 pool.getName( ) ); 272 } 273 274 pool.shutdown( ); 275 276 if( getLogger( ).isDebugEnabled( ) ) 277 { 278 getLogger( ).debug( "Thread pool " + pool.getName( ) + 279 " disposed" ); 280 } 281 } 282 283 try 284 { 285 m_pools.clear( ); 286 } 287 catch( final Throwable t ) 288 { 289 getLogger( ).error( "Cannot dispose", t ); 290 } 291 } 292 293 303 public void execute( final String threadPoolName, 304 final Runnable command, 305 final long delay, 306 long interval ) 307 { 308 if( delay < 0 ) 309 { 310 throw new IllegalArgumentException ( "delay < 0" ); 311 } 312 313 if( interval < 0 ) 314 { 315 throw new IllegalArgumentException ( "interval < 0" ); 316 } 317 318 ThreadPool pool = (ThreadPool)m_pools.get( threadPoolName ); 319 320 if( null == pool ) 321 { 322 getLogger( ).warn( "ThreadPool \"" + threadPoolName + 323 "\" is not known. Will use ThreadPool \"" + 324 DEFAULT_THREADPOOL_NAME + "\"" ); 325 pool = (ThreadPool)m_pools.get( DEFAULT_THREADPOOL_NAME ); 326 } 327 328 if( getLogger( ).isDebugEnabled( ) ) 329 { 330 getLogger( ).debug( "Command entered: " + command.toString( ) + 331 ", pool=" + pool.getName( ) + ", delay=" + 332 delay + ", interval=" + interval ); 333 } 334 335 new ExecutionInfo( pool, command, delay, interval, getLogger( ) ); 336 } 337 338 345 public void execute( final Runnable command, 346 final long delay, 347 final long interval ) 348 { 349 execute( DEFAULT_THREADPOOL_NAME, command, delay, interval ); 350 } 351 352 358 public void execute( final Runnable command, 359 final long delay ) 360 { 361 execute( DEFAULT_THREADPOOL_NAME, command, delay, 0 ); 362 } 363 364 369 public void execute( final Runnable command ) 370 { 371 execute( DEFAULT_THREADPOOL_NAME, command, 0, 0 ); 372 } 373 374 381 public void execute( final String threadPoolName, 382 final Runnable command, 383 final long delay ) 384 { 385 execute( threadPoolName, command, delay, 0 ); 386 } 387 388 394 public void execute( final String threadPoolName, 395 final Runnable command ) 396 { 397 execute( threadPoolName, command, 0, 0 ); 398 } 399 400 405 public void remove( Runnable command ) 406 { 407 synchronized( m_commandStack ) 408 { 409 for( final Iterator i = m_commandStack.iterator( ); i.hasNext( ); ) 410 { 411 final ExecutionInfo info = (ExecutionInfo)i.next( ); 412 413 if( info.m_command == command ) 414 { 415 i.remove( ); 416 m_commandStack.notifyAll( ); 417 418 return; 419 } 420 } 421 } 422 423 getLogger( ).warn( "Could not find command " + command + 424 " for removal" ); 425 } 426 427 430 public void run( ) 431 { 432 if( getLogger( ).isDebugEnabled( ) ) 433 { 434 getLogger( ).debug( "Entering loop" ); 435 } 436 437 while( m_keepRunning ) 438 { 439 synchronized( m_commandStack ) 440 { 441 try 442 { 443 if( m_commandStack.size( ) > 0 ) 444 { 445 final ExecutionInfo info = 446 (ExecutionInfo)m_commandStack.first( ); 447 final long delay = 448 info.m_nextRun - System.currentTimeMillis( ); 449 450 if( delay > 0 ) 451 { 452 m_commandStack.wait( delay ); 453 } 454 } 455 else 456 { 457 if( getLogger( ).isDebugEnabled( ) ) 458 { 459 getLogger( ).debug( "No commands available. Will just wait for one" ); 460 } 461 462 m_commandStack.wait( ); 463 } 464 } 465 catch( final InterruptedException ie ) 466 { 467 if( getLogger( ).isDebugEnabled( ) ) 468 { 469 getLogger( ).debug( "I've been interrupted" ); 470 } 471 } 472 473 if( m_keepRunning ) 474 { 475 if( m_commandStack.size( ) > 0 ) 476 { 477 final ExecutionInfo info = 478 (ExecutionInfo)m_commandStack.first( ); 479 final long delay = 480 info.m_nextRun - System.currentTimeMillis( ); 481 482 if( delay < 0 ) 483 { 484 info.execute( ); 485 } 486 } 487 } 488 } 489 } 490 491 if( getLogger( ).isDebugEnabled( ) ) 492 { 493 getLogger( ).debug( "Exiting loop" ); 494 } 495 } 496 497 502 public void start( ) throws Exception 503 { 504 if( getLogger( ).isDebugEnabled( ) ) 505 { 506 getLogger( ).debug( "Starting the heart" ); 507 } 508 509 m_keepRunning = true; 510 ( (ThreadPool) m_pools.get( DEFAULT_THREADPOOL_NAME ) ).execute( this ); 511 } 512 513 518 public void stop( ) 519 throws Exception 520 { 521 m_keepRunning = false; 522 523 synchronized( m_commandStack ) 524 { 525 m_commandStack.notifyAll( ); 526 } 527 } 528 529 536 private int getPriority( final String priority ) 537 { 538 if( "MIN".equalsIgnoreCase( priority ) ) 539 { 540 return Thread.MIN_PRIORITY; 541 } 542 else if( "NORM".equalsIgnoreCase( priority ) ) 543 { 544 return Thread.NORM_PRIORITY; 545 } 546 else if( "MAX".equalsIgnoreCase( priority ) ) 547 { 548 return Thread.MAX_PRIORITY; 549 } 550 else 551 { 552 getLogger( ).warn( "Unknown thread priority \"" + priority + 553 "\". Set to \"NORM\"." ); 554 555 return Thread.NORM_PRIORITY; 556 } 557 } 558 559 568 private DefaultThreadPool configThreadPool( final Configuration config ) 569 throws ConfigurationException 570 { 571 final String name = config.getChild( "name" ).getValue( ); 572 final int queueSize = 573 config.getChild( "queue-size" ).getValueAsInteger( DEFAULT_QUEUE_SIZE ); 574 final int maxPoolSize = 575 config.getChild( "max-pool-size" ).getValueAsInteger( DEFAULT_MAX_POOL_SIZE ); 576 int minPoolSize = 577 config.getChild( "min-pool-size" ).getValueAsInteger( DEFAULT_MIN_POOL_SIZE ); 578 579 if( DEFAULT_THREADPOOL_NAME.equals( name ) && 582 ( ( minPoolSize > 0 ) && ( minPoolSize < DEFAULT_MIN_POOL_SIZE ) ) ) 583 { 584 minPoolSize = DEFAULT_MIN_POOL_SIZE; 585 } 586 587 final String priority = 588 config.getChild( "priority" ).getValue( DEFAULT_THREAD_PRIORITY ); 589 final boolean isDaemon = 590 config.getChild( "daemon" ).getValueAsBoolean( DEFAULT_DAEMON_MODE ); 591 final long keepAliveTime = 592 config.getChild( "keep-alive-time-ms" ).getValueAsLong( DEFAULT_KEEP_ALIVE_TIME ); 593 final String blockPolicy = 594 config.getChild( "block-policy" ).getValue( DefaultThreadPool.POLICY_DEFAULT ); 595 final boolean shutdownGraceful = 596 config.getChild( "shutdown-graceful" ).getValueAsBoolean( DEFAULT_SHUTDOWN_GRACEFUL ); 597 final int shutdownWaitTime = 598 config.getChild( "shutdown-wait-time-ms" ).getValueAsInteger( DEFAULT_SHUTDOWN_WAIT_TIME ); 599 600 return createPool( new DefaultThreadPool( ), name, queueSize, 601 maxPoolSize, minPoolSize, getPriority( priority ), 602 isDaemon, keepAliveTime, blockPolicy, 603 shutdownGraceful, shutdownWaitTime ); 604 } 605 606 629 private DefaultThreadPool createPool( final DefaultThreadPool pool, 630 final String name, 631 final int queueSize, 632 final int maxPoolSize, 633 final int minPoolSize, 634 final int priority, 635 final boolean isDaemon, 636 final long keepAliveTime, 637 final String blockPolicy, 638 final boolean shutdownGraceful, 639 final int shutdownWaitTime ) 640 { 641 pool.enableLogging( getLogger( ).getChildLogger( name ) ); 642 pool.setName( name ); 643 644 ThreadFactory factory = null; 645 try 646 { 647 factory = 648 (ThreadFactory)m_defaultThreadFactoryClass.newInstance( ); 649 } 650 catch( final Exception ex ) 651 { 652 getLogger( ).warn( "Cannot instantiate a ThreadFactory from class " + 653 m_defaultThreadFactoryClass.getName( ) + 654 ". Will use a " + 655 DefaultThreadFactory.class.getName( ), ex ); 656 factory = new DefaultThreadFactory( ); 657 } 658 659 factory.setPriority( priority ); 660 factory.setDaemon( isDaemon ); 661 pool.setThreadFactory( factory ); 662 pool.setQueue( queueSize ); 663 pool.setMaximumPoolSize( ( maxPoolSize < 0 ) ? Integer.MAX_VALUE 664 : maxPoolSize ); 665 666 if( minPoolSize < 1 ) 667 { 668 getLogger( ).warn( "min-pool-size < 1 for pool \"" + 669 name + "\". Set to 1" ); 670 } 671 672 pool.setMinimumPoolSize( ( minPoolSize < 1 ) ? 1 : minPoolSize ); 673 674 if( keepAliveTime < 0 ) 675 { 676 getLogger( ).warn( "keep-alive-time-ms < 0 for pool \"" + 677 name + "\". Set to 1000" ); 678 } 679 680 pool.setKeepAliveTime( ( keepAliveTime < 0 ) ? 1000 : keepAliveTime ); 681 pool.setBlockPolicy( blockPolicy ); 682 pool.setShutdownGraceful( shutdownGraceful ); 683 pool.setShutdownWaitTimeMs( shutdownWaitTime ); 684 685 synchronized( m_pools ) 686 { 687 m_pools.put( name, pool ); 688 } 689 690 printPoolInfo( pool ); 691 return pool; 692 } 693 694 699 private void printPoolInfo( final DefaultThreadPool pool ) 700 { 701 if( getLogger( ).isInfoEnabled( ) ) 702 { 703 if( pool.isQueued( ) ) 704 { 705 final StringBuffer msg = new StringBuffer ( ); 706 msg.append( "ThreadPool named \"" ).append( pool.getName( ) ); 707 msg.append( "\" created with maximum queue-size=" ); 708 msg.append( pool.getMaxQueueSize( ) ); 709 msg.append( ",max-pool-size=" ).append( pool.getMaximumPoolSize( ) ); 710 msg.append( ",min-pool-size=" ).append( pool.getMinimumPoolSize( ) ); 711 msg.append( ",priority=" ).append( pool.getPriority( ) ); 712 msg.append( ",isDaemon=" ).append( ( (ThreadFactory)pool.getThreadFactory( ) ).isDaemon( ) ); 713 msg.append( ",keep-alive-time-ms=" ).append( pool.getKeepAliveTime( ) ); 714 msg.append( ",block-policy=\"" ).append( pool.getBlockPolicy( ) ); 715 msg.append( "\",shutdown-wait-time-ms=" ).append( pool.getShutdownWaitTimeMs( ) ); 716 getLogger( ).info( msg.toString( ) ); 717 } 718 else 719 { 720 final StringBuffer msg = new StringBuffer ( ); 721 msg.append( "ThreadPool named \"" ).append( pool.getName( ) ); 722 msg.append( "\" created with no queue,max-pool-size=" ).append( pool.getMaximumPoolSize( ) ); 723 msg.append( ",min-pool-size=" ).append( pool.getMinimumPoolSize( ) ); 724 msg.append( ",priority=" ).append( pool.getPriority( ) ); 725 msg.append( ",isDaemon=" ).append( ( (ThreadFactory)pool.getThreadFactory( ) ).isDaemon( ) ); 726 msg.append( ",keep-alive-time-ms=" ).append( pool.getKeepAliveTime( ) ); 727 msg.append( ",block-policy=" ).append( pool.getBlockPolicy( ) ); 728 msg.append( ",shutdown-wait-time-ms=" ).append( pool.getShutdownWaitTimeMs( ) ); 729 getLogger( ).info( msg.toString( ) ); 730 } 731 } 732 } 733 734 736 742 private class ExecutionInfo implements Comparable 743 { 744 746 747 final Logger m_logger; 748 749 750 final Runnable m_command; 751 752 753 final ThreadPool m_pool; 754 755 756 final long m_delay; 757 758 759 final long m_interval; 760 761 762 long m_nextRun = 0; 763 764 766 775 ExecutionInfo( final ThreadPool pool, 776 final Runnable command, 777 final long delay, 778 final long interval, 779 final Logger logger ) 780 { 781 m_pool = pool; 782 m_command = command; 783 m_delay = delay; 784 m_interval = interval; 785 m_logger = logger; 786 m_nextRun = System.currentTimeMillis( ) + delay; 787 788 synchronized( m_commandStack ) 789 { 790 m_commandStack.add( this ); 791 m_commandStack.notifyAll( ); 792 } 793 Thread.yield(); } 795 796 798 805 public int compareTo( final Object other ) 806 { 807 final ExecutionInfo otherInfo = (ExecutionInfo)other; 808 int diff = (int)( m_nextRun - otherInfo.m_nextRun ); 809 if (diff == 0) { 810 if (this == other) { 811 return 0; 813 } else { 814 return System.identityHashCode(this) - System.identityHashCode(other); 816 } 817 } 818 return diff; 819 } 820 821 824 void execute( ) 825 { 826 if( m_logger.isDebugEnabled( ) ) 827 { 828 m_logger.debug( "Executing command " + m_command + " in pool \"" + 829 m_pool.getName( ) + "\", schedule with interval=" + m_interval ); 830 } 831 832 synchronized( m_commandStack ) 833 { 834 m_commandStack.remove( this ); 835 if( m_interval > 0 ) 836 { 837 m_nextRun = System.currentTimeMillis( ) + m_interval; 838 m_commandStack.add( this ); 839 } 840 } 841 842 try 843 { 844 m_pool.execute( m_command ); 845 } 846 catch( final InterruptedException ie ) 847 { 848 if( m_logger.isDebugEnabled( ) ) 849 { 850 m_logger.debug( "Interrupted executing command + " + m_command ); 851 } 852 } 853 catch( final Throwable t ) 854 { 855 m_logger.error( "Exception executing command " + m_command, t ); 856 } 857 } 858 } 859 } 860 | Popular Tags |