1 18 19 package EDU.oswego.cs.dl.util.concurrent; 20 21 121 122 public class FJTaskRunnerGroup implements Executor { 123 124 125 protected final FJTaskRunner[] threads; 126 127 128 protected final LinkedQueue entryQueue = new LinkedQueue(); 129 130 131 protected int activeCount = 0; 132 133 136 protected int nstarted = 0; 137 138 144 145 static final boolean COLLECT_STATS = true; 146 148 150 151 long initTime = 0; 152 153 154 int entries = 0; 155 156 static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1; 157 158 167 168 public FJTaskRunnerGroup(int groupSize) { 169 threads = new FJTaskRunner[groupSize]; 170 initializeThreads(); 171 initTime = System.currentTimeMillis(); 172 } 173 174 182 183 public void execute(Runnable r) throws InterruptedException { 184 if (r instanceof FJTask) { 185 entryQueue.put((FJTask)r); 186 } 187 else { 188 entryQueue.put(new FJTask.Wrap(r)); 189 } 190 signalNewTask(); 191 } 192 193 194 197 public void executeTask(FJTask t) { 198 try { 199 entryQueue.put(t); 200 signalNewTask(); 201 } 202 catch (InterruptedException ex) { 203 Thread.currentThread().interrupt(); 204 } 205 } 206 207 208 213 214 public void invoke(Runnable r) throws InterruptedException { 215 InvokableFJTask w = new InvokableFJTask(r); 216 entryQueue.put(w); 217 signalNewTask(); 218 w.awaitTermination(); 219 } 220 221 222 234 235 public void interruptAll() { 236 Thread current = Thread.currentThread(); 238 boolean stopCurrent = false; 239 240 for (int i = 0; i < threads.length; ++i) { 241 Thread t = threads[i]; 242 if (t == current) 243 stopCurrent = true; 244 else 245 t.interrupt(); 246 } 247 if (stopCurrent) 248 current.interrupt(); 249 } 250 251 252 259 public synchronized void setScanPriorities(int pri) { 260 for (int i = 0; i < threads.length; ++i) { 261 FJTaskRunner t = threads[i]; 262 t.setScanPriority(pri); 263 if (!t.active) t.setPriority(pri); 264 } 265 } 266 267 268 276 public synchronized void setRunPriorities(int pri) { 277 for (int i = 0; i < threads.length; ++i) { 278 FJTaskRunner t = threads[i]; 279 t.setRunPriority(pri); 280 if (t.active) t.setPriority(pri); 281 } 282 } 283 284 285 286 287 288 public int size() { return threads.length; } 289 290 291 298 public synchronized int getActiveCount() { return activeCount; } 299 300 366 367 public void stats() { 368 long time = System.currentTimeMillis() - initTime; 369 double secs = ((double)time) / 1000.0; 370 long totalRuns = 0; 371 long totalScans = 0; 372 long totalSteals = 0; 373 374 System.out.print("Thread" + 375 "\tQ Cap" + 376 "\tScans" + 377 "\tNew" + 378 "\tRuns" + 379 "\n"); 380 381 for (int i = 0; i < threads.length; ++i) { 382 FJTaskRunner t = threads[i]; 383 int truns = t.runs; 384 totalRuns += truns; 385 386 int tscans = t.scans; 387 totalScans += tscans; 388 389 int tsteals = t.steals; 390 totalSteals += tsteals; 391 392 String star = (getActive(t))? "*" : " "; 393 394 395 System.out.print("T" + i + star + 396 "\t" + t.deqSize() + 397 "\t" + tscans + 398 "\t" + tsteals + 399 "\t" + truns + 400 "\n"); 401 } 402 403 System.out.print("Total" + 404 "\t " + 405 "\t" + totalScans + 406 "\t" + totalSteals + 407 "\t" + totalRuns + 408 "\n"); 409 410 System.out.print("Execute: " + entries); 411 412 System.out.print("\tTime: " + secs); 413 414 long rps = 0; 415 if (secs != 0) rps = Math.round((double)(totalRuns) / secs); 416 417 System.out.println("\tRate: " + rps); 418 } 419 420 421 422 423 424 428 429 protected FJTaskRunner[] getArray() { return threads; } 430 431 432 436 437 protected FJTask pollEntryQueue() { 438 try { 439 FJTask t = (FJTask)(entryQueue.poll(0)); 440 return t; 441 } 442 catch(InterruptedException ex) { Thread.currentThread().interrupt(); 444 return null; 445 } 446 } 447 448 449 454 455 protected synchronized boolean getActive(FJTaskRunner t) { 456 return t.active; 457 } 458 459 460 464 465 protected synchronized void setActive(FJTaskRunner t) { 466 if (!t.active) { 467 t.active = true; 468 ++activeCount; 469 if (nstarted < threads.length) 470 threads[nstarted++].start(); 471 else 472 notifyAll(); 473 } 474 } 475 476 479 480 protected synchronized void setInactive(FJTaskRunner t) { 481 if (t.active) { 482 t.active = false; 483 --activeCount; 484 } 485 } 486 487 499 static final long SCANS_PER_SLEEP = 15; 500 501 513 514 static final long MAX_SLEEP_TIME = 100; 515 516 533 534 protected synchronized void checkActive(FJTaskRunner t, long scans) { 535 536 setInactive(t); 537 538 try { 539 if (activeCount == 0 && entryQueue.peek() == null) { 541 wait(); 542 } 543 else { 544 547 long msecs = scans / SCANS_PER_SLEEP; 548 if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME; 549 int nsecs = (msecs == 0) ? 1 : 0; wait(msecs, nsecs); 551 } 552 } 553 catch (InterruptedException ex) { 554 notify(); Thread.currentThread().interrupt(); 556 } 557 } 558 559 560 561 564 565 protected synchronized void signalNewTask() { 566 if (COLLECT_STATS) ++entries; 567 if (nstarted < threads.length) 568 threads[nstarted++].start(); 569 else 570 notify(); 571 } 572 573 576 577 protected void initializeThreads() { 578 for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this); 579 } 580 581 582 583 584 588 protected static final class InvokableFJTask extends FJTask { 589 protected final Runnable wrapped; 590 protected boolean terminated = false; 591 592 protected InvokableFJTask(Runnable r) { wrapped = r; } 593 594 public void run() { 595 try { 596 if (wrapped instanceof FJTask) 597 FJTask.invoke((FJTask)(wrapped)); 598 else 599 wrapped.run(); 600 } 601 finally { 602 setTerminated(); 603 } 604 } 605 606 protected synchronized void setTerminated() { 607 terminated = true; 608 notifyAll(); 609 } 610 611 protected synchronized void awaitTermination() throws InterruptedException { 612 while (!terminated) wait(); 613 } 614 } 615 616 617 } 618 619 | Popular Tags |