| ||||
|
Code - Class EDU.oswego.cs.dl.util.concurrent.FJTaskRunnerGroup1 /* 2 File: FJTaskRunnerGroup.java 3 4 Originally written by Doug Lea and released into the public domain. 5 This may be used for any purposes whatsoever without acknowledgment. 6 Thanks for the assistance and support of Sun Microsystems Labs, 7 and everyone contributing, testing, and using this code. 8 9 History: 10 Date Who What 11 7Jan1999 dl First public release 12 12Jan1999 dl made getActiveCount public; misc minor cleanup. 13 14Jan1999 dl Added executeTask 14 20Jan1999 dl Allow use of priorities; reformat stats 15 6Feb1999 dl Lazy thread starts 16 27Apr1999 dl Renamed 17 */ 18 19 package EDU.oswego.cs.dl.util.concurrent; 20 21 /** 22 * A stripped down analog of a ThreadGroup used for 23 * establishing and managing FJTaskRunner threads. 24 * ThreadRunnerGroups serve as the control boundary separating 25 * the general world of normal threads from the specialized world 26 * of FJTasks. 27 * <p> 28 * By intent, this class does not subclass java.lang.ThreadGroup, and 29 * does not support most methods found in ThreadGroups, since they 30 * would make no sense for FJTaskRunner threads. In fact, the class 31 * does not deal with ThreadGroups at all. If you want to restrict 32 * a FJTaskRunnerGroup to a particular ThreadGroup, you can create 33 * it from within that ThreadGroup. 34 * <p> 35 * The main contextual parameter for a FJTaskRunnerGroup is 36 * the group size, established in the constructor. 37 * Groups must be of a fixed size. 38 * There is no way to dynamically increase or decrease the number 39 * of threads in an existing group. 40 * <p> 41 * In general, the group size should be equal to the number 42 * of CPUs on the system. (Unfortunately, there is no portable 43 * means of automatically detecting the number of CPUs on a JVM, so there is 44 * no good way to automate defaults.) In principle, when 45 * FJTasks are used for computation-intensive tasks, having only 46 * as many threads as CPUs should minimize bookkeeping overhead 47 * and contention, and so maximize throughput. However, because 48 * FJTaskRunners lie atop Java threads, and in turn operating system 49 * thread support and scheduling policies, 50 * it is very possible that using more threads 51 * than CPUs will improve overall throughput even though it adds 52 * to overhead. This will always be so if FJTasks are I/O bound. 53 * So it may pay to experiment a bit when tuning on particular platforms. 54 * You can also use <code>setRunPriorities</code> to either 55 * increase or decrease the priorities of active threads, which 56 * may interact with group size choice. 57 * <p> 58 * In any case, overestimating group sizes never 59 * seriously degrades performance (at least within reasonable bounds). 60 * You can also use a value 61 * less than the number of CPUs in order to reserve processing 62 * for unrelated threads. 63 * <p> 64 * There are two general styles for using a FJTaskRunnerGroup. 65 * You can create one group per entire program execution, for example 66 * as a static singleton, and use it for all parallel tasks: 67 * <pre> 68 * class Tasks { 69 * static FJTaskRunnerGroup group; 70 * public void initialize(int groupsize) { 71 * group = new FJTaskRunnerGroup(groupSize); 72 * } 73 * // ... 74 * } 75 * </pre> 76 * Alternatively, you can make new groups on the fly and use them only for 77 * particular task sets. This is more flexible,, 78 * and leads to more controllable and deterministic execution patterns, 79 * but it encounters greater overhead on startup. Also, to reclaim 80 * system resources, you should 81 * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done 82 * using one-shot groups. Otherwise, because FJTaskRunners set 83 * <code>Thread.isDaemon</code> 84 * status, they will not normally be reclaimed until program termination. 85 * <p> 86 * The main supported methods are <code>execute</code>, 87 * which starts a task processed by FJTaskRunner threads, 88 * and <code>invoke</code>, which starts one and waits for completion. 89 * For example, you might extend the above <code>FJTasks</code> 90 * class to support a task-based computation, say, the 91 * <code>Fib</code> class from the <code>FJTask</code> documentation: 92 * <pre> 93 * class Tasks { // continued 94 * // ... 95 * static int fib(int n) { 96 * try { 97 * Fib f = new Fib(n); 98 * group.invoke(f); 99 * return f.getAnswer(); 100 * } 101 * catch (InterruptedException ex) { 102 * throw new Error("Interrupted during computation"); 103 * } 104 * } 105 * } 106 * </pre> 107 * <p> 108 * Method <code>stats()</code> can be used to monitor performance. 109 * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with 110 * the compile-time constant COLLECT_STATS set to false. In this 111 * case, various simple counts reported in stats() are not collected. 112 * On platforms tested, 113 * this leads to such a tiny performance improvement that there is 114 * very little motivation to bother. 115 * 116 * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] 117 * <p> 118 * @see FJTask 119 * @see FJTaskRunner 120 **/ 121 122 public class FJTaskRunnerGroup implements Executor { 123 124 /** The threads in this group **/ 125 protected final FJTaskRunner[] threads; 126 127 /** Group-wide queue for tasks entered via execute() **/ 128 protected final LinkedQueue entryQueue = new LinkedQueue(); 129 130 /** Number of threads that are not waiting for work **/ 131 protected int activeCount = 0; 132 133 /** Number of threads that have been started. Used to avoid 134 unecessary contention during startup of task sets. 135 **/ 136 protected int nstarted = 0; 137 138 /** 139 * Compile-time constant. If true, various counts of 140 * runs, waits, etc., are maintained. These are NOT 141 * updated with synchronization, so statistics reports 142 * might not be accurate. 143 **/ 144 145 static final boolean COLLECT_STATS = true; 146 // static final boolean COLLECT_STATS = false; 147 148 // for stats 149 150 /** The time at which this ThreadRunnerGroup was constructed **/ 151 long initTime = 0; 152 153 /** Total number of executes or invokes **/ 154 int entries = 0; 155 156 static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1; 157 158 /** 159 * Create a FJTaskRunnerGroup with the indicated number 160 * of FJTaskRunner threads. Normally, the best size to use is 161 * the number of CPUs on the system. 162 * <p> 163 * The threads in a FJTaskRunnerGroup are created with their 164 * isDaemon status set, so do not normally need to be 165 * shut down manually upon program termination. 166 **/ 167 168 public FJTaskRunnerGroup(int groupSize) { 169 threads = new FJTaskRunner[groupSize]; 170 initializeThreads(); 171 initTime = System.currentTimeMillis(); 172 } 173 174 /** 175 * Arrange for execution of the given task 176 * by placing it in a work queue. If the argument 177 * is not of type FJTask, it is embedded in a FJTask via 178 * <code>FJTask.Wrap</code>. 179 * @exception InterruptedException if current Thread is 180 * currently interrupted 181 **/ 182 183 public void execute(Runnable r) throws InterruptedException { 184 if (r instanceof FJTask) { 185 entryQueue.put((FJTask)r); 186 } 187 else { 188 entryQueue.put(new FJTask.Wrap(r)); 189 } 190 signalNewTask(); 191 } 192 193 194 /** 195 * Specialized form of execute called only from within FJTasks 196 **/ 197 public void executeTask(FJTask t) { 198 try { 199 entryQueue.put(t); 200 signalNewTask(); 201 } 202 catch (InterruptedException ex) { 203 Thread.currentThread().interrupt(); 204 } 205 } 206 207 208 /** 209 * Start a task and wait it out. Returns when the task completes. 210 * @exception InterruptedException if current Thread is 211 * interrupted before completion of the task. 212 **/ 213 214 public void invoke(Runnable r) throws InterruptedException { 215 InvokableFJTask w = new InvokableFJTask(r); 216 entryQueue.put(w); 217 signalNewTask(); 218 w.awaitTermination(); 219 } 220 221 222 /** 223 * Try to shut down all FJTaskRunner threads in this group 224 * by interrupting them all. This method is designed 225 * to be used during cleanup when it is somehow known 226 * that all threads are idle. 227 * FJTaskRunners only 228 * check for interruption when they are not otherwise 229 * processing a task (and its generated subtasks, 230 * if any), so if any threads are active, shutdown may 231 * take a while, and may lead to unpredictable 232 * task processing. 233 **/ 234 235 public void interruptAll() { 236 // paranoically interrupt current thread last if in group. 237 Thread current = Thread.currentThread(); 238 boolean stopCurrent = false; 239 240 for (int i = 0; i < threads.length; ++i) { 241 Thread t = threads[i]; 242 if (t == current) 243 stopCurrent = true; 244 else 245 t.interrupt(); 246 } 247 if (stopCurrent) 248 current.interrupt(); 249 } 250 251 252 /** 253 * Set the priority to use while a FJTaskRunner is 254 * polling for new tasks to perform. Default 255 * is currently Thread.MIN_PRIORITY+1. The value 256 * set may not go into effect immediately, but 257 * will be used at least the next time a thread scans for work. 258 **/ 259 public synchronized void setScanPriorities(int pri) { 260 for (int i = 0; i < threads.length; ++i) { 261 FJTaskRunner t = threads[i]; 262 t.setScanPriority(pri); 263 if (!t.active) t.setPriority(pri); 264 } 265 } 266 267 268 /** 269 * Set the priority to use while a FJTaskRunner is 270 * actively running tasks. Default 271 * is the priority that was in effect by the thread that 272 * constructed this FJTaskRunnerGroup. Setting this value 273 * while threads are running may momentarily result in 274 * them running at this priority even when idly waiting for work. 275 **/ 276 public synchronized void setRunPriorities(int pri) { 277 for (int i = 0; i < threads.length; ++i) { 278 FJTaskRunner t = threads[i]; 279 t.setRunPriority(pri); 280 if (t.active) t.setPriority(pri); 281 } 282 } 283 284 285 286 /** Return the number of FJTaskRunner threads in this group **/ 287 288 public int size() { return threads.length; } 289 290 291 /** 292 * Return the number of threads that are not idly waiting for work. 293 * Beware that even active threads might not be doing any useful 294 * work, but just spinning waiting for other dependent tasks. 295 * Also, since this is just a snapshot value, some tasks 296 * may be in the process of becoming idle. 297 **/ 298 public synchronized int getActiveCount() { return activeCount; } 299 300 /** 301 * Prints various snapshot statistics to System.out. 302 * <ul> 303 * <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for 304 * <em>n</em> from zero to group size - 1): 305 * <ul> 306 * <li> A star "*" is printed if the thread is currently active; 307 * that is, not sleeping while waiting for work. Because 308 * threads gradually enter sleep modes, an active thread 309 * may in fact be about to sleep (or wake up). 310 * <li> <em>Q Cap</em> The current capacity of its task queue. 311 * <li> <em>Run</em> The total number of tasks that have been run. 312 * <li> <em>New</em> The number of these tasks that were 313 * taken from either the entry queue or from other 314 * thread queues; that is, the number of tasks run 315 * that were <em>not</em> forked by the thread itself. 316 * <li> <em>Scan</em> The number of times other task 317 * queues or the entry queue were polled for tasks. 318 * </ul> 319 * <li> <em>Execute</em> The total number of tasks entered 320 * (but not necessarily yet run) via execute or invoke. 321 * <li> <em>Time</em> Time in seconds since construction of this 322 * FJTaskRunnerGroup. 323 * <li> <em>Rate</em> The total number of tasks processed 324 * per second across all threads. This 325 * may be useful as a simple throughput indicator 326 * if all processed tasks take approximately the 327 * same time to run. 328 * </ul> 329 * <p> 330 * Cautions: Some statistics are updated and gathered 331 * without synchronization, 332 * so may not be accurate. However, reported counts may be considered 333 * as lower bounds of actual values. 334 * Some values may be zero if classes are compiled 335 * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup 336 * classes can be independently compiled with different values of 337 * COLLECT_STATS.) Also, the counts are maintained as ints so could 338 * overflow in exceptionally long-lived applications. 339 * <p> 340 * These statistics can be useful when tuning algorithms or diagnosing 341 * problems. For example: 342 * <ul> 343 * <li> High numbers of scans may mean that there is insufficient 344 * parallelism to keep threads busy. However, high scan rates 345 * are expected if the number 346 * of Executes is also high or there is a lot of global 347 * synchronization in the application, and the system is not otherwise 348 * busy. Threads may scan 349 * for work hundreds of times upon startup, shutdown, and 350 * global synch points of task sets. 351 * <li> Large imbalances in tasks run across different threads might 352 * just reflect contention with unrelated threads on a system 353 * (possibly including JVM threads such as GC), but may also 354 * indicate some systematic bias in how you generate tasks. 355 * <li> Large task queue capacities may mean that too many tasks are being 356 * generated before they can be run. 357 * Capacities are reported rather than current numbers of tasks 358 * in queues because they are better indicators of the existence 359 * of these kinds of possibly-transient problems. 360 * Queue capacities are 361 * resized on demand from their initial value of 4096 elements, 362 * which is much more than sufficient for the kinds of 363 * applications that this framework is intended to best support. 364 * </ul> 365 **/ 366 367 public void stats() { 368 long time = System.currentTimeMillis() - initTime; 369 double secs = ((double)time) / 1000.0; 370 long totalRuns = 0; 371 long totalScans = 0; 372 long totalSteals = 0; 373 374 System.out.print("Thread" + 375 "\tQ Cap" + 376 "\tScans" + 377 "\tNew" + 378 "\tRuns" + 379 "\n"); 380 381 for (int i = 0; i < threads.length; ++i) { 382 FJTaskRunner t = threads[i]; 383 int truns = t.runs; 384 totalRuns += truns; 385 386 int tscans = t.scans; 387 totalScans += tscans; 388 389 int tsteals = t.steals; 390 totalSteals += tsteals; 391 392 String star = (getActive(t))? "*" : " "; 393 394 395 System.out.print("T" + i + star + 396 "\t" + t.deqSize() + 397 "\t" + tscans + 398 "\t" + tsteals + 399 "\t" + truns + 400 "\n"); 401 } 402 403 System.out.print("Total" + 404 "\t " + 405 "\t" + totalScans + 406 "\t" + totalSteals + 407 "\t" + totalRuns + 408 "\n"); 409 410 System.out.print("Execute: " + entries); 411 412 System.out.print("\tTime: " + secs); 413 414 long rps = 0; 415 if (secs != 0) rps = Math.round((double)(totalRuns) / secs); 416 417 System.out.println("\tRate: " + rps); 418 } 419 420 421 /* ------------ Methods called only by FJTaskRunners ------------- */ 422 423 424 /** 425 * Return the array of threads in this group. 426 * Called only by FJTaskRunner.scan(). 427 **/ 428 429 protected FJTaskRunner[] getArray() { return threads; } 430 431 432 /** 433 * Return a task from entry queue, or null if empty. 434 * Called only by FJTaskRunner.scan(). 435 **/ 436 437 protected FJTask pollEntryQueue() { 438 try { 439 FJTask t = (FJTask)(entryQueue.poll(0)); 440 return t; 441 } 442 catch(InterruptedException ex) { // ignore interrupts 443 Thread.currentThread().interrupt(); 444 return null; 445 } 446 } 447 448 449 /** 450 * Return active status of t. 451 * Per-thread active status can only be accessed and 452 * modified via synchronized method here in the group class. 453 **/ 454 455 protected synchronized boolean getActive(FJTaskRunner t) { 456 return t.active; 457 } 458 459 460 /** 461 * Set active status of thread t to true, and notify others 462 * that might be waiting for work. 463 **/ 464 465 protected synchronized void setActive(FJTaskRunner t) { 466 if (!t.active) { 467 t.active = true; 468 ++activeCount; 469 if (nstarted < threads.length) 470 threads[nstarted++].start(); 471 else 472 notifyAll(); 473 } 474 } 475 476 /** 477 * Set active status of thread t to false. 478 **/ 479 480 protected synchronized void setInactive(FJTaskRunner t) { 481 if (t.active) { 482 t.active = false; 483 --activeCount; 484 } 485 } 486 487 /** 488 * The number of times to scan other threads for tasks 489 * before transitioning to a mode where scans are 490 * interleaved with sleeps (actually timed waits). 491 * Upon transition, sleeps are for duration of 492 * scans / SCANS_PER_SLEEP milliseconds. 493 * <p> 494 * This is not treated as a user-tunable parameter because 495 * good values do not appear to vary much across JVMs or 496 * applications. Its main role is to help avoid some 497 * useless spinning and contention during task startup. 498 **/ 499 static final long SCANS_PER_SLEEP = 15; 500 501 /** 502 * The maximum time (in msecs) to sleep when a thread is idle, 503 * yet others are not, so may eventually generate work that 504 * the current thread can steal. This value reflects the maximum time 505 * that a thread may sleep when it possibly should not, because there 506 * are other active threads that might generate work. In practice, 507 * designs in which some threads become stalled because others 508 * are running yet not generating tasks are not likely to work 509 * well in this framework anyway, so the exact value does not matter 510 * too much. However, keeping it in the sub-second range does 511 * help smooth out startup and shutdown effects. 512 **/ 513 514 static final long MAX_SLEEP_TIME = 100; 515 516 /** 517 * Set active status of thread t to false, and 518 * then wait until: (a) there is a task in the entry 519 * queue, or (b) other threads are active, or (c) the current 520 * thread is interrupted. Upon return, it 521 * is not certain that there will be work available. 522 * The thread must itself check. 523 * <p> 524 * The main underlying reason 525 * for these mechanics is that threads do not 526 * signal each other when they add elements to their queues. 527 * (This would add to task overhead, reduce locality. 528 * and increase contention.) 529 * So we must rely on a tamed form of polling. However, tasks 530 * inserted into the entry queue do result in signals, so 531 * tasks can wait on these if all of them are otherwise idle. 532 **/ 533 534 protected synchronized void checkActive(FJTaskRunner t, long scans) { 535 536 setInactive(t); 537 538 try { 539 // if nothing available, do a hard wait 540 if (activeCount == 0 && entryQueue.peek() == null) { 541 wait(); 542 } 543 else { 544 // If there is possibly some work, 545 // sleep for a while before rechecking 546 547 long msecs = scans / SCANS_PER_SLEEP; 548 if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME; 549 int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep 550 wait(msecs, nsecs); 551 } 552 } 553 catch (InterruptedException ex) { 554 notify(); // avoid lost notifies on interrupts 555 Thread.currentThread().interrupt(); 556 } 557 } 558 559 /* ------------ Utility methods ------------- */ 560 561 /** 562 * Start or wake up any threads waiting for work 563 **/ 564 565 protected synchronized void signalNewTask() { 566 if (COLLECT_STATS) ++entries; 567 if (nstarted < threads.length) 568 threads[nstarted++].start(); 569 else 570 notify(); 571 } 572 573 /** 574 * Create all FJTaskRunner threads in this group. 575 **/ 576 577 protected void initializeThreads() { 578 for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this); 579 } 580 581 582 583 584 /** 585 * Wrap wait/notify mechanics around a task so that 586 * invoke() can wait it out 587 **/ 588 protected static final class InvokableFJTask extends FJTask { 589 protected final Runnable wrapped; 590 protected boolean terminated = false; 591 592 protected InvokableFJTask(Runnable r) { wrapped = r; } 593 594 public void run() { 595 try { 596 if (wrapped instanceof FJTask) 597 FJTask.invoke((FJTask)(wrapped)); 598 else 599 wrapped.run(); 600 } 601 finally { 602 setTerminated(); 603 } 604 } 605 606 protected synchronized void setTerminated() { 607 terminated = true; 608 notifyAll(); 609 } 610 611 protected synchronized void awaitTermination() throws InterruptedException { 612 while (!terminated) wait(); 613 } 614 } 615 616 617 } 618 619 |
|||
Java API By Example, From Geeks To Geeks. |
Conditions of Use |
About Us
© 2002 - 2005, KickJava.com, or its affiliates
|