1 18 19 package org.logicalcobwebs.concurrent; 20 21 121 122 public class FJTaskRunnerGroup implements Executor { 123 124 125 protected final FJTaskRunner[] threads; 126 127 128 protected final LinkedQueue entryQueue = new LinkedQueue(); 129 130 131 protected int activeCount = 0; 132 133 136 protected int nstarted = 0; 137 138 144 145 static final boolean COLLECT_STATS = true; 146 148 150 151 long initTime = 0; 152 153 154 int entries = 0; 155 156 static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY + 1; 157 158 167 168 public FJTaskRunnerGroup(int groupSize) { 169 threads = new FJTaskRunner[groupSize]; 170 initializeThreads(); 171 initTime = System.currentTimeMillis(); 172 } 173 174 182 183 public void execute(Runnable r) throws InterruptedException { 184 if (r instanceof FJTask) { 185 entryQueue.put((FJTask) r); 186 } else { 187 entryQueue.put(new FJTask.Wrap(r)); 188 } 189 signalNewTask(); 190 } 191 192 193 196 public void executeTask(FJTask t) { 197 try { 198 entryQueue.put(t); 199 signalNewTask(); 200 } catch (InterruptedException ex) { 201 Thread.currentThread().interrupt(); 202 } 203 } 204 205 206 211 212 public void invoke(Runnable r) throws InterruptedException { 213 InvokableFJTask w = new InvokableFJTask(r); 214 entryQueue.put(w); 215 signalNewTask(); 216 w.awaitTermination(); 217 } 218 219 220 232 233 public void interruptAll() { 234 Thread current = Thread.currentThread(); 236 boolean stopCurrent = false; 237 238 for (int i = 0; i < threads.length; ++i) { 239 Thread t = threads[i]; 240 if (t == current) 241 stopCurrent = true; 242 else 243 t.interrupt(); 244 } 245 if (stopCurrent) 246 current.interrupt(); 247 } 248 249 250 257 public synchronized void setScanPriorities(int pri) { 258 for (int i = 0; i < threads.length; ++i) { 259 FJTaskRunner t = threads[i]; 260 t.setScanPriority(pri); 261 if (!t.active) t.setPriority(pri); 262 } 263 } 264 265 266 274 public synchronized void setRunPriorities(int pri) { 275 for (int i = 0; i < threads.length; ++i) { 276 FJTaskRunner t = threads[i]; 277 t.setRunPriority(pri); 278 if (t.active) t.setPriority(pri); 279 } 280 } 281 282 283 284 285 public int size() { 286 return threads.length; 287 } 288 289 290 297 public synchronized int getActiveCount() { 298 return activeCount; 299 } 300 301 367 368 public void stats() { 369 long time = System.currentTimeMillis() - initTime; 370 double secs = ((double) time) / 1000.0; 371 long totalRuns = 0; 372 long totalScans = 0; 373 long totalSteals = 0; 374 375 System.out.print("Thread" + 376 "\tQ Cap" + 377 "\tScans" + 378 "\tNew" + 379 "\tRuns" + 380 "\n"); 381 382 for (int i = 0; i < threads.length; ++i) { 383 FJTaskRunner t = threads[i]; 384 int truns = t.runs; 385 totalRuns += truns; 386 387 int tscans = t.scans; 388 totalScans += tscans; 389 390 int tsteals = t.steals; 391 totalSteals += tsteals; 392 393 String star = (getActive(t)) ? "*" : " "; 394 395 396 System.out.print("T" + i + star + 397 "\t" + t.deqSize() + 398 "\t" + tscans + 399 "\t" + tsteals + 400 "\t" + truns + 401 "\n"); 402 } 403 404 System.out.print("Total" + 405 "\t " + 406 "\t" + totalScans + 407 "\t" + totalSteals + 408 "\t" + totalRuns + 409 "\n"); 410 411 System.out.print("Execute: " + entries); 412 413 System.out.print("\tTime: " + secs); 414 415 long rps = 0; 416 if (secs != 0) rps = Math.round((double) (totalRuns) / secs); 417 418 System.out.println("\tRate: " + rps); 419 } 420 421 422 423 424 425 429 430 protected FJTaskRunner[] getArray() { 431 return threads; 432 } 433 434 435 439 440 protected FJTask pollEntryQueue() { 441 try { 442 FJTask t = (FJTask) (entryQueue.poll(0)); 443 return t; 444 } catch (InterruptedException ex) { Thread.currentThread().interrupt(); 446 return null; 447 } 448 } 449 450 451 456 457 protected synchronized boolean getActive(FJTaskRunner t) { 458 return t.active; 459 } 460 461 462 466 467 protected synchronized void setActive(FJTaskRunner t) { 468 if (!t.active) { 469 t.active = true; 470 ++activeCount; 471 if (nstarted < threads.length) 472 threads[nstarted++].start(); 473 else 474 notifyAll(); 475 } 476 } 477 478 481 482 protected synchronized void setInactive(FJTaskRunner t) { 483 if (t.active) { 484 t.active = false; 485 --activeCount; 486 } 487 } 488 489 501 static final long SCANS_PER_SLEEP = 15; 502 503 515 516 static final long MAX_SLEEP_TIME = 100; 517 518 535 536 protected synchronized void checkActive(FJTaskRunner t, long scans) { 537 538 setInactive(t); 539 540 try { 541 if (activeCount == 0 && entryQueue.peek() == null) { 543 wait(); 544 } else { 545 548 long msecs = scans / SCANS_PER_SLEEP; 549 if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME; 550 int nsecs = (msecs == 0) ? 1 : 0; wait(msecs, nsecs); 552 } 553 } catch (InterruptedException ex) { 554 notify(); Thread.currentThread().interrupt(); 556 } 557 } 558 559 560 561 564 565 protected synchronized void signalNewTask() { 566 if (COLLECT_STATS) ++entries; 567 if (nstarted < threads.length) 568 threads[nstarted++].start(); 569 else 570 notify(); 571 } 572 573 576 577 protected void initializeThreads() { 578 for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this); 579 } 580 581 582 586 protected static final class InvokableFJTask extends FJTask { 587 protected final Runnable wrapped; 588 protected boolean terminated = false; 589 590 protected InvokableFJTask(Runnable r) { 591 wrapped = r; 592 } 593 594 public void run() { 595 try { 596 if (wrapped instanceof FJTask) 597 FJTask.invoke((FJTask) (wrapped)); 598 else 599 wrapped.run(); 600 } finally { 601 setTerminated(); 602 } 603 } 604 605 protected synchronized void setTerminated() { 606 terminated = true; 607 notifyAll(); 608 } 609 610 protected synchronized void awaitTermination() throws InterruptedException { 611 while (!terminated) wait(); 612 } 613 } 614 615 616 } 617 618 | Popular Tags |