Your browser does not support JavaScript and this site utilizes JavaScript to build content and provide links to additional information. You should either enable JavaScript in your browser settings or use a browser that supports JavaScript in order to take full advantage of this site.
1 18 19 package org.logicalcobwebs.concurrent; 20 21 121 122 public class FJTaskRunnerGroup implements Executor { 123 124 125 protected final FJTaskRunner[] threads; 126 127 128 protected final LinkedQueue entryQueue = new LinkedQueue(); 129 130 131 protected int activeCount = 0; 132 133 136 protected int nstarted = 0; 137 138 144 145 static final boolean COLLECT_STATS = true; 146 148 150 151 long initTime = 0; 152 153 154 int entries = 0; 155 156 static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY + 1; 157 158 167 168 public FJTaskRunnerGroup(int groupSize) { 169 threads = new FJTaskRunner[groupSize]; 170 initializeThreads(); 171 initTime = System.currentTimeMillis(); 172 } 173 174 182 183 public void execute(Runnable r) throws InterruptedException { 184 if (r instanceof FJTask) { 185 entryQueue.put((FJTask) r); 186 } else { 187 entryQueue.put(new FJTask.Wrap(r)); 188 } 189 signalNewTask(); 190 } 191 192 193 196 public void executeTask(FJTask t) { 197 try { 198 entryQueue.put(t); 199 signalNewTask(); 200 } catch (InterruptedException ex) { 201 Thread.currentThread().interrupt(); 202 } 203 } 204 205 206 211 212 public void invoke(Runnable r) throws InterruptedException { 213 InvokableFJTask w = new InvokableFJTask(r); 214 entryQueue.put(w); 215 signalNewTask(); 216 w.awaitTermination(); 217 } 218 219 220 232 233 public void interruptAll() { 234 Thread current = Thread.currentThread(); 236 boolean stopCurrent = false; 237 238 for (int i = 0; i < threads.length; ++i) { 239 Thread t = threads[i]; 240 if (t == current) 241 stopCurrent = true; 242 else 243 t.interrupt(); 244 } 245 if (stopCurrent) 246 current.interrupt(); 247 } 248 249 250 257 public synchronized void setScanPriorities(int pri) { 258 for (int i = 0; i < threads.length; ++i) { 259 FJTaskRunner t = threads[i]; 260 t.setScanPriority(pri); 261 if (!t.active) t.setPriority(pri); 262 } 263 } 264 265 266 274 public synchronized void setRunPriorities(int pri) { 275 for (int i = 0; i < threads.length; ++i) { 276 FJTaskRunner t = threads[i]; 277 t.setRunPriority(pri); 278 if (t.active) t.setPriority(pri); 279 } 280 } 281 282 283 284 285 public int size() { 286 return threads.length; 287 } 288 289 290 297 public synchronized int getActiveCount() { 298 return activeCount; 299 } 300 301 367 368 public void stats() { 369 long time = System.currentTimeMillis() - initTime; 370 double secs = ((double) time) / 1000.0; 371 long totalRuns = 0; 372 long totalScans = 0; 373 long totalSteals = 0; 374 375 System.out.print("Thread" + 376 "\tQ Cap" + 377 "\tScans" + 378 "\tNew" + 379 "\tRuns" + 380 "\n"); 381 382 for (int i = 0; i < threads.length; ++i) { 383 FJTaskRunner t = threads[i]; 384 int truns = t.runs; 385 totalRuns += truns; 386 387 int tscans = t.scans; 388 totalScans += tscans; 389 390 int tsteals = t.steals; 391 totalSteals += tsteals; 392 393 String star = (getActive(t)) ? "*" : " "; 394 395 396 System.out.print("T" + i + star + 397 "\t" + t.deqSize() + 398 "\t" + tscans + 399 "\t" + tsteals + 400 "\t" + truns + 401 "\n"); 402 } 403 404 System.out.print("Total" + 405 "\t " + 406 "\t" + totalScans + 407 "\t" + totalSteals + 408 "\t" + totalRuns + 409 "\n"); 410 411 System.out.print("Execute: " + entries); 412 413 System.out.print("\tTime: " + secs); 414 415 long rps = 0; 416 if (secs != 0) rps = Math.round((double) (totalRuns) / secs); 417 418 System.out.println("\tRate: " + rps); 419 } 420 421 422 423 424 425 429 430 protected FJTaskRunner[] getArray() { 431 return threads; 432 } 433 434 435 439 440 protected FJTask pollEntryQueue() { 441 try { 442 FJTask t = (FJTask) (entryQueue.poll(0)); 443 return t; 444 } catch (InterruptedException ex) { Thread.currentThread().interrupt(); 446 return null; 447 } 448 } 449 450 451 456 457 protected synchronized boolean getActive(FJTaskRunner t) { 458 return t.active; 459 } 460 461 462 466 467 protected synchronized void setActive(FJTaskRunner t) { 468 if (!t.active) { 469 t.active = true; 470 ++activeCount; 471 if (nstarted < threads.length) 472 threads[nstarted++].start(); 473 else 474 notifyAll(); 475 } 476 } 477 478 481 482 protected synchronized void setInactive(FJTaskRunner t) { 483 if (t.active) { 484 t.active = false; 485 --activeCount; 486 } 487 } 488 489 501 static final long SCANS_PER_SLEEP = 15; 502 503 515 516 static final long MAX_SLEEP_TIME = 100; 517 518 535 536 protected synchronized void checkActive(FJTaskRunner t, long scans) { 537 538 setInactive(t); 539 540 try { 541 if (activeCount == 0 && entryQueue.peek() == null) { 543 wait(); 544 } else { 545 548 long msecs = scans / SCANS_PER_SLEEP; 549 if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME; 550 int nsecs = (msecs == 0) ? 1 : 0; wait(msecs, nsecs); 552 } 553 } catch (InterruptedException ex) { 554 notify(); Thread.currentThread().interrupt(); 556 } 557 } 558 559 560 561 564 565 protected synchronized void signalNewTask() { 566 if (COLLECT_STATS) ++entries; 567 if (nstarted < threads.length) 568 threads[nstarted++].start(); 569 else 570 notify(); 571 } 572 573 576 577 protected void initializeThreads() { 578 for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this); 579 } 580 581 582 586 protected static final class InvokableFJTask extends FJTask { 587 protected final Runnable wrapped; 588 protected boolean terminated = false; 589 590 protected InvokableFJTask(Runnable r) { 591 wrapped = r; 592 } 593 594 public void run() { 595 try { 596 if (wrapped instanceof FJTask) 597 FJTask.invoke((FJTask) (wrapped)); 598 else 599 wrapped.run(); 600 } finally { 601 setTerminated(); 602 } 603 } 604 605 protected synchronized void setTerminated() { 606 terminated = true; 607 notifyAll(); 608 } 609 610 protected synchronized void awaitTermination() throws InterruptedException { 611 while (!terminated) wait(); 612 } 613 } 614 615 616 } 617 618
| Popular Tags
|