1 22 23 24 package com.mchange.v2.async; 25 26 import java.util.*; 27 import com.mchange.v2.log.*; 28 29 import java.io.StringWriter ; 30 import java.io.PrintWriter ; 31 import java.io.IOException ; 32 import java.lang.reflect.Method ; 33 import com.mchange.v2.io.IndentedWriter; 34 import com.mchange.v2.util.ResourceClosedException; 35 36 public final class ThreadPoolAsynchronousRunner implements AsynchronousRunner 37 { 38 final static MLogger logger = MLog.getLogger( ThreadPoolAsynchronousRunner.class ); 39 40 final static int POLL_FOR_STOP_INTERVAL = 5000; 42 final static int DFLT_DEADLOCK_DETECTOR_INTERVAL = 10000; final static int DFLT_INTERRUPT_DELAY_AFTER_APPARENT_DEADLOCK = 60000; final static int DFLT_MAX_INDIVIDUAL_TASK_TIME = 0; 46 final static int DFLT_MAX_EMERGENCY_THREADS = 10; 47 48 int deadlock_detector_interval; 49 int interrupt_delay_after_apparent_deadlock; 50 int max_individual_task_time; 51 52 int num_threads; 53 boolean daemon; 54 HashSet managed; 55 HashSet available; 56 LinkedList pendingTasks; 57 58 Timer myTimer; 59 boolean should_cancel_timer; 60 61 TimerTask deadlockDetector = new DeadlockDetector(); 62 TimerTask replacedThreadInterruptor = null; 63 64 Map stoppedThreadsToStopDates = new HashMap(); 65 66 private ThreadPoolAsynchronousRunner( int num_threads, 67 boolean daemon, 68 int max_individual_task_time, 69 int deadlock_detector_interval, 70 int interrupt_delay_after_apparent_deadlock, 71 Timer myTimer, 72 boolean should_cancel_timer ) 73 { 74 this.num_threads = num_threads; 75 this.daemon = daemon; 76 this.max_individual_task_time = max_individual_task_time; 77 this.deadlock_detector_interval = deadlock_detector_interval; 78 this.interrupt_delay_after_apparent_deadlock = interrupt_delay_after_apparent_deadlock; 79 this.myTimer = myTimer; 80 this.should_cancel_timer = should_cancel_timer; 81 82 recreateThreadsAndTasks(); 83 84 myTimer.schedule( deadlockDetector, deadlock_detector_interval, deadlock_detector_interval ); 85 86 } 87 88 89 public ThreadPoolAsynchronousRunner( int num_threads, 90 boolean daemon, 91 int max_individual_task_time, 92 int deadlock_detector_interval, 93 int interrupt_delay_after_apparent_deadlock, 94 Timer myTimer ) 95 { 96 this( num_threads, 97 daemon, 98 max_individual_task_time, 99 deadlock_detector_interval, 100 interrupt_delay_after_apparent_deadlock, 101 myTimer, 102 false ); 103 } 104 105 public ThreadPoolAsynchronousRunner( int num_threads, 106 boolean daemon, 107 int max_individual_task_time, 108 int deadlock_detector_interval, 109 int interrupt_delay_after_apparent_deadlock ) 110 { 111 this( num_threads, 112 daemon, 113 max_individual_task_time, 114 deadlock_detector_interval, 115 interrupt_delay_after_apparent_deadlock, 116 new Timer( true ), 117 true ); 118 } 119 120 public ThreadPoolAsynchronousRunner( int num_threads, boolean daemon, Timer sharedTimer ) 121 { 122 this( num_threads, 123 daemon, 124 DFLT_MAX_INDIVIDUAL_TASK_TIME, 125 DFLT_DEADLOCK_DETECTOR_INTERVAL, 126 DFLT_INTERRUPT_DELAY_AFTER_APPARENT_DEADLOCK, 127 sharedTimer, 128 false ); 129 } 130 131 public ThreadPoolAsynchronousRunner( int num_threads, boolean daemon ) 132 { 133 this( num_threads, 134 daemon, 135 DFLT_MAX_INDIVIDUAL_TASK_TIME, 136 DFLT_DEADLOCK_DETECTOR_INTERVAL, 137 DFLT_INTERRUPT_DELAY_AFTER_APPARENT_DEADLOCK, 138 new Timer( true ), 139 true ); } 140 141 public synchronized void postRunnable(Runnable r) 142 { 143 try 144 { 145 pendingTasks.add( r ); 146 this.notifyAll(); 147 } 148 catch ( NullPointerException e ) 149 { 150 if ( Debug.DEBUG ) 152 { 153 if ( logger.isLoggable( MLevel.FINE ) ) 154 logger.log( MLevel.FINE, "NullPointerException while posting Runnable -- Probably we're closed.", e ); 155 } 156 throw new ResourceClosedException("Attempted to use a ThreadPoolAsynchronousRunner in a closed or broken state."); 157 } 158 } 159 160 public synchronized int getThreadCount() 161 { return managed.size(); } 162 163 public void close( boolean skip_remaining_tasks ) 164 { 165 synchronized ( this ) 166 { 167 if (managed == null) return; 168 deadlockDetector.cancel(); 169 if (should_cancel_timer) 171 myTimer.cancel(); 172 myTimer = null; 173 for (Iterator ii = managed.iterator(); ii.hasNext(); ) 174 { 175 PoolThread stopMe = (PoolThread) ii.next(); 176 stopMe.gentleStop(); 177 if (skip_remaining_tasks) 178 stopMe.interrupt(); 179 } 180 managed = null; 181 182 if (!skip_remaining_tasks) 183 { 184 for (Iterator ii = pendingTasks.iterator(); ii.hasNext(); ) 185 { 186 Runnable r = (Runnable ) ii.next(); 187 new Thread (r).start(); 188 ii.remove(); 189 } 190 } 191 available = null; 192 pendingTasks = null; 193 } 194 } 195 196 public void close() 197 { close( true ); } 198 199 public synchronized int getActiveCount() 200 { return managed.size() - available.size(); } 201 202 public synchronized int getIdleCount() 203 { return available.size(); } 204 205 public synchronized int getPendingTaskCount() 206 { return pendingTasks.size(); } 207 208 public synchronized String getStatus() 209 { 210 217 218 return getMultiLineStatusString(); 219 } 220 221 public synchronized String getStackTraces() 223 { return getStackTraces(0); } 224 225 private String getStackTraces(int initial_indent) 228 { 229 if (managed == null) 230 return null; 231 232 try 233 { 234 Method m = Thread .class.getMethod("getStackTrace", null); 235 236 StringWriter sw = new StringWriter (2048); 237 IndentedWriter iw = new IndentedWriter( sw ); 238 for (int i = 0; i < initial_indent; ++i) 239 iw.upIndent(); 240 for (Iterator ii = managed.iterator(); ii.hasNext(); ) 241 { 242 Object poolThread = ii.next(); 243 Object [] stackTraces = (Object []) m.invoke( poolThread, null ); 244 iw.println( poolThread ); 245 iw.upIndent(); 246 for (int i = 0, len = stackTraces.length; i < len; ++i) 247 iw.println( stackTraces[i] ); 248 iw.downIndent(); 249 } 250 for (int i = 0; i < initial_indent; ++i) 251 iw.downIndent(); 252 iw.flush(); String out = sw.toString(); 254 iw.close(); return out; 256 } 257 catch (NoSuchMethodException e) 258 { 259 if ( logger.isLoggable( MLevel.FINE ) ) 260 logger.fine( this + ": strack traces unavailable because this is a pre-Java 1.5 VM."); 261 return null; 262 } 263 catch (Exception e) 264 { 265 if ( logger.isLoggable( MLevel.FINE ) ) 266 logger.log( MLevel.FINE, this + ": An Exception occurred while trying to extract PoolThread stack traces.", e); 267 return null; 268 } 269 } 270 271 public synchronized String getMultiLineStatusString() 272 { return this.getMultiLineStatusString(0); } 273 274 private String getMultiLineStatusString(int initial_indent) 277 { 278 try 279 { 280 StringWriter sw = new StringWriter (2048); 281 IndentedWriter iw = new IndentedWriter( sw ); 282 283 for (int i = 0; i < initial_indent; ++i) 284 iw.upIndent(); 285 286 if (managed == null) 287 { 288 iw.print("["); 289 iw.print( this ); 290 iw.println(" closed.]"); 291 } 292 else 293 { 294 HashSet active = (HashSet) managed.clone(); 295 active.removeAll( available ); 296 297 iw.print("Managed Threads: "); 298 iw.println( managed.size() ); 299 iw.print("Active Threads: "); 300 iw.println( active.size() ); 301 iw.println("Active Tasks: "); 302 iw.upIndent(); 303 for (Iterator ii = active.iterator(); ii.hasNext(); ) 304 { 305 PoolThread pt = (PoolThread) ii.next(); 306 iw.print( pt.getCurrentTask() ); 307 iw.print( " ("); 308 iw.print( pt.getName() ); 309 iw.println(')'); 310 } 311 iw.downIndent(); 312 iw.println("Pending Tasks: "); 313 iw.upIndent(); 314 for (int i = 0, len = pendingTasks.size(); i < len; ++i) 315 iw.println( pendingTasks.get( i ) ); 316 iw.downIndent(); 317 } 318 319 for (int i = 0; i < initial_indent; ++i) 320 iw.downIndent(); 321 iw.flush(); String out = sw.toString(); 323 iw.close(); return out; 325 } 326 catch (IOException e) 327 { 328 if (logger.isLoggable( MLevel.WARNING )) 329 logger.log( MLevel.WARNING, "Huh? An IOException when working with a StringWriter?!?", e); 330 throw new RuntimeException ("Huh? An IOException when working with a StringWriter?!? " + e); 331 } 332 } 333 334 private void appendStatusString( StringBuffer sb ) 337 { 338 if (managed == null) 339 sb.append( "[closed]" ); 340 else 341 { 342 HashSet active = (HashSet) managed.clone(); 343 active.removeAll( available ); 344 sb.append("[num_managed_threads: "); 345 sb.append( managed.size() ); 346 sb.append(", num_active: "); 347 sb.append( active.size() ); 348 sb.append("; activeTasks: "); 349 boolean first = true; 350 for (Iterator ii = active.iterator(); ii.hasNext(); ) 351 { 352 if (first) 353 first = false; 354 else 355 sb.append(", "); 356 PoolThread pt = (PoolThread) ii.next(); 357 sb.append( pt.getCurrentTask() ); 358 sb.append( " ("); 359 sb.append( pt.getName() ); 360 sb.append(')'); 361 } 362 sb.append("; pendingTasks: "); 363 for (int i = 0, len = pendingTasks.size(); i < len; ++i) 364 { 365 if (i != 0) sb.append(", "); 366 sb.append( pendingTasks.get( i ) ); 367 } 368 sb.append(']'); 369 } 370 } 371 372 private void recreateThreadsAndTasks() 375 { 376 if ( this.managed != null) 377 { 378 Date aboutNow = new Date(); 379 for (Iterator ii = managed.iterator(); ii.hasNext(); ) 380 { 381 PoolThread pt = (PoolThread) ii.next(); 382 pt.gentleStop(); 383 stoppedThreadsToStopDates.put( pt, aboutNow ); 384 ensureReplacedThreadsProcessing(); 385 } 386 } 387 388 this.managed = new HashSet(); 389 this.available = new HashSet(); 390 this.pendingTasks = new LinkedList(); 391 for (int i = 0; i < num_threads; ++i) 392 { 393 Thread t = new PoolThread(i, daemon); 394 managed.add( t ); 395 available.add( t ); 396 t.start(); 397 } 398 } 399 400 private void processReplacedThreads() 403 { 404 long about_now = System.currentTimeMillis(); 405 for (Iterator ii = stoppedThreadsToStopDates.keySet().iterator(); ii.hasNext(); ) 406 { 407 PoolThread pt = (PoolThread) ii.next(); 408 if (! pt.isAlive()) 409 ii.remove(); 410 else 411 { 412 Date d = (Date) stoppedThreadsToStopDates.get( pt ); 413 if ((about_now - d.getTime()) > interrupt_delay_after_apparent_deadlock) 414 { 415 if (logger.isLoggable(MLevel.WARNING)) 416 logger.log(MLevel.WARNING, 417 "Task " + pt.getCurrentTask() + " (in deadlocked PoolThread) failed to complete in maximum time " + 418 interrupt_delay_after_apparent_deadlock + "ms. Trying interrupt()."); 419 pt.interrupt(); 420 ii.remove(); 421 } 422 } 424 if (stoppedThreadsToStopDates.isEmpty()) 425 stopReplacedThreadsProcessing(); 426 } 427 } 428 429 private void ensureReplacedThreadsProcessing() 432 { 433 if (replacedThreadInterruptor == null) 434 { 435 if (logger.isLoggable( MLevel.FINE )) 436 logger.fine("Apparently some threads have been replaced. Replacement thread processing enabled."); 437 438 this.replacedThreadInterruptor = new ReplacedThreadInterruptor(); 439 int replacedThreadProcessDelay = interrupt_delay_after_apparent_deadlock / 4; 440 myTimer.schedule( replacedThreadInterruptor, replacedThreadProcessDelay, replacedThreadProcessDelay ); 441 } 442 } 443 444 private void stopReplacedThreadsProcessing() 447 { 448 if (this.replacedThreadInterruptor != null) 449 { 450 this.replacedThreadInterruptor.cancel(); 451 this.replacedThreadInterruptor = null; 452 453 if (logger.isLoggable( MLevel.FINE )) 454 logger.fine("Apparently all replaced threads have either completed their tasks or been interrupted(). " + 455 "Replacement thread processing cancelled."); 456 } 457 } 458 459 private void shuttingDown( PoolThread pt ) 462 { 463 if (managed != null && managed.contains( pt )) { 465 managed.remove( pt ); 466 available.remove( pt ); 467 PoolThread replacement = new PoolThread( pt.getIndex(), daemon ); 468 managed.add( replacement ); 469 available.add( replacement ); 470 replacement.start(); 471 } 472 } 473 474 475 class PoolThread extends Thread 476 { 477 Runnable currentTask; 479 480 boolean should_stop; 482 483 int index; 485 486 TimerTask maxIndividualTaskTimeEnforcer = null; 488 489 PoolThread(int index, boolean daemon) 490 { 491 this.setName( this.getClass().getName() + "-#" + index); 492 this.setDaemon( daemon ); 493 this.index = index; 494 } 495 496 public int getIndex() 497 { return index; } 498 499 void gentleStop() 502 { should_stop = true; } 503 504 Runnable getCurrentTask() 507 { return currentTask; } 508 509 private void setMaxIndividualTaskTimeEnforcer() 511 { 512 this.maxIndividualTaskTimeEnforcer = new MaxIndividualTaskTimeEnforcer( this ); 513 myTimer.schedule( maxIndividualTaskTimeEnforcer, max_individual_task_time ); 514 } 515 516 private void cancelMaxIndividualTaskTimeEnforcer() 518 { 519 this.maxIndividualTaskTimeEnforcer.cancel(); 520 this.maxIndividualTaskTimeEnforcer = null; 521 } 522 523 public void run() 524 { 525 try 526 { 527 thread_loop: 528 while (true) 529 { 530 Runnable myTask; 531 synchronized ( ThreadPoolAsynchronousRunner.this ) 532 { 533 while ( !should_stop && pendingTasks.size() == 0 ) 534 ThreadPoolAsynchronousRunner.this.wait( POLL_FOR_STOP_INTERVAL ); 535 if (should_stop) 536 break thread_loop; 537 538 if (! available.remove( this ) ) 539 throw new InternalError ("An unavailable PoolThread tried to check itself out!!!"); 540 myTask = (Runnable ) pendingTasks.remove(0); 541 currentTask = myTask; 542 } 543 try 544 { 545 if (max_individual_task_time > 0) 546 setMaxIndividualTaskTimeEnforcer(); 547 myTask.run(); 548 } 549 catch ( RuntimeException e ) 550 { 551 if ( logger.isLoggable( MLevel.WARNING ) ) 552 logger.log(MLevel.WARNING, this + " -- caught unexpected Exception while executing posted task.", e); 553 } 555 finally 556 { 557 if ( maxIndividualTaskTimeEnforcer != null ) 558 cancelMaxIndividualTaskTimeEnforcer(); 559 560 synchronized ( ThreadPoolAsynchronousRunner.this ) 561 { 562 if (should_stop) 563 break thread_loop; 564 565 if ( available != null && ! available.add( this ) ) 566 throw new InternalError ("An apparently available PoolThread tried to check itself in!!!"); 567 currentTask = null; 568 } 569 } 570 } 571 } 572 catch ( InterruptedException exc ) 573 { 574 577 if ( Debug.TRACE > Debug.TRACE_NONE && logger.isLoggable( MLevel.FINE ) ) 578 logger.fine(this + " interrupted. Shutting down."); 579 } 580 581 synchronized ( ThreadPoolAsynchronousRunner.this ) 582 { ThreadPoolAsynchronousRunner.this.shuttingDown( this ); } 583 } 584 } 585 586 class DeadlockDetector extends TimerTask 587 { 588 LinkedList last = null; 589 LinkedList current = null; 590 591 public void run() 592 { 593 boolean run_stray_tasks = false; 594 synchronized ( ThreadPoolAsynchronousRunner.this ) 595 { 596 if (pendingTasks.size() == 0) 597 { 598 last = null; 599 return; 600 } 601 602 current = (LinkedList) pendingTasks.clone(); 603 if ( current.equals( last ) ) 604 { 605 if ( logger.isLoggable( MLevel.WARNING ) ) 607 { 608 logger.warning(this + " -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending tasks!"); 609 StringWriter sw = new StringWriter ( 4096 ); 610 PrintWriter pw = new PrintWriter ( sw ); 611 pw.print( this ); 615 pw.println( " -- APPARENT DEADLOCK!!! Complete Status: "); 616 pw.print( ThreadPoolAsynchronousRunner.this.getMultiLineStatusString( 1 ) ); 617 pw.println("Pool thread stack traces:"); 618 String stackTraces = getStackTraces( 1 ); 619 if (stackTraces == null) 620 pw.println("\t[Stack traces of deadlocked task threads not available.]"); 621 else 622 pw.println( stackTraces ); 623 pw.flush(); logger.warning( sw.toString() ); 625 pw.close(); } 627 recreateThreadsAndTasks(); 628 run_stray_tasks = true; 629 } 630 } 631 if (run_stray_tasks) 632 { 633 AsynchronousRunner ar = new ThreadPerTaskAsynchronousRunner( DFLT_MAX_EMERGENCY_THREADS, max_individual_task_time ); 634 for ( Iterator ii = current.iterator(); ii.hasNext(); ) 635 ar.postRunnable( (Runnable ) ii.next() ); 636 ar.close( false ); last = null; 638 } 639 else 640 last = current; 641 642 current = null; 647 } 648 } 649 650 class MaxIndividualTaskTimeEnforcer extends TimerTask 651 { 652 PoolThread pt; 653 Thread interruptMe; 654 String threadStr; 655 String fixedTaskStr; 656 657 MaxIndividualTaskTimeEnforcer(PoolThread pt) 658 { 659 this.pt = pt; 660 this.interruptMe = pt; 661 this.threadStr = pt.toString(); 662 this.fixedTaskStr = null; 663 } 664 665 MaxIndividualTaskTimeEnforcer(Thread interruptMe, String threadStr, String fixedTaskStr) 666 { 667 this.pt = null; 668 this.interruptMe = interruptMe; 669 this.threadStr = threadStr; 670 this.fixedTaskStr = fixedTaskStr; 671 } 672 673 public void run() 674 { 675 String taskStr; 676 677 if (fixedTaskStr != null) 678 taskStr = fixedTaskStr; 679 else if (pt != null) 680 { 681 synchronized (ThreadPoolAsynchronousRunner.this) 682 { taskStr = String.valueOf( pt.getCurrentTask() ); } 683 } 684 else 685 taskStr = "Unknown task?!"; 686 687 if (logger.isLoggable( MLevel.WARNING )) 688 logger.warning("A task has exceeded the maximum allowable task time. Will interrupt() thread [" + threadStr 689 + "], with current task: " + taskStr); 690 691 interruptMe.interrupt(); 692 693 if (logger.isLoggable( MLevel.WARNING )) 694 logger.warning("Thread [" + threadStr + "] interrupted."); 695 } 696 } 697 698 private void runInEmergencyThread( final Runnable r ) 700 { 701 final Thread t = new Thread ( r ); 702 t.start(); 703 if (max_individual_task_time > 0) 704 { 705 TimerTask maxIndividualTaskTimeEnforcer = new MaxIndividualTaskTimeEnforcer(t, t + " [One-off emergency thread!!!]", r.toString()); 706 myTimer.schedule( maxIndividualTaskTimeEnforcer, max_individual_task_time ); 707 } 708 } 709 710 class ReplacedThreadInterruptor extends TimerTask 711 { 712 public void run() 713 { 714 synchronized (ThreadPoolAsynchronousRunner.this) 715 { processReplacedThreads(); } 716 } 717 } 718 } 719 | Popular Tags |