1 28 29 package org.logicalcobwebs.concurrent; 30 31 import java.util.Random ; 32 33 208 209 public class FJTaskRunner extends Thread { 210 211 212 protected final FJTaskRunnerGroup group; 213 214 217 218 protected FJTaskRunner(FJTaskRunnerGroup g) { 219 group = g; 220 victimRNG = new Random (System.identityHashCode(this)); 221 runPriority = getPriority(); 222 setDaemon(true); 223 } 224 225 228 229 protected final FJTaskRunnerGroup getGroup() { 230 return group; 231 } 232 233 234 235 236 237 245 246 protected static final int INITIAL_CAPACITY = 4096; 247 248 252 253 protected static final int MAX_CAPACITY = 1 << 30; 254 255 258 259 protected final static class VolatileTaskRef { 260 261 protected volatile FJTask ref; 262 263 264 protected final void put(FJTask r) { 265 ref = r; 266 } 267 268 269 protected final FJTask get() { 270 return ref; 271 } 272 273 274 protected final FJTask take() { 275 FJTask r = ref; 276 ref = null; 277 return r; 278 } 279 280 285 protected static VolatileTaskRef[] newArray(int cap) { 286 VolatileTaskRef[] a = new VolatileTaskRef[cap]; 287 for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef(); 288 return a; 289 } 290 291 } 292 293 296 297 protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY); 298 299 300 protected int deqSize() { 301 return deq.length; 302 } 303 304 314 protected volatile int top = 0; 315 316 317 321 322 protected volatile int base = 0; 323 324 325 329 330 protected final Object barrier = new Object (); 331 332 333 334 340 341 protected boolean active = false; 342 343 344 protected final Random victimRNG; 345 346 347 348 protected int scanPriority = FJTaskRunnerGroup.DEFAULT_SCAN_PRIORITY; 349 350 351 protected int runPriority; 352 353 361 protected void setScanPriority(int pri) { 362 scanPriority = pri; 363 } 364 365 366 370 protected void setRunPriority(int pri) { 371 runPriority = pri; 372 } 373 374 379 380 381 static final boolean COLLECT_STATS = true; 382 384 385 387 388 protected int runs = 0; 389 390 391 protected int scans = 0; 392 393 394 protected int steals = 0; 395 396 397 398 399 400 401 402 406 407 protected final void push(final FJTask r) { 408 int t = top; 409 410 417 418 if (t < (base & (deq.length - 1)) + deq.length) { 419 420 deq[t & (deq.length - 1)].put(r); 421 top = t + 1; 422 } else slowPush(r); } 425 426 427 430 431 protected synchronized void slowPush(final FJTask r) { 432 checkOverflow(); 433 push(r); } 435 436 437 444 445 protected final synchronized void put(final FJTask r) { 446 for (; ;) { 447 int b = base - 1; 448 if (top < b + deq.length) { 449 450 int newBase = b & (deq.length - 1); 451 deq[newBase].put(r); 452 base = newBase; 453 454 if (b != newBase) { int newTop = top & (deq.length - 1); 456 if (newTop < newBase) newTop += deq.length; 457 top = newTop; 458 } 459 return; 460 } else { 461 checkOverflow(); 462 } 464 } 465 } 466 467 479 480 protected final FJTask pop() { 481 484 485 int t = --top; 486 487 495 496 if (base + 1 < t) 497 return deq[t & (deq.length - 1)].take(); 498 else 499 return confirmPop(t); 500 501 } 502 503 504 508 509 protected final synchronized FJTask confirmPop(int provisionalTop) { 510 if (base <= provisionalTop) 511 return deq[provisionalTop & (deq.length - 1)].take(); 512 else { 518 519 top = base = 0; 520 return null; 521 } 522 } 523 524 525 529 530 531 protected final synchronized FJTask take() { 532 533 536 537 int b = base++; 538 539 if (b < top) 540 return confirmTake(b); 541 else { 542 base = b; 544 return null; 545 } 546 } 547 548 549 552 553 protected FJTask confirmTake(int oldBase) { 554 555 560 561 synchronized (barrier) { 562 if (oldBase < top) { 563 571 572 return deq[oldBase & (deq.length - 1)].get(); 573 } else { 574 base = oldBase; 575 return null; 576 } 577 } 578 } 579 580 581 587 588 protected void checkOverflow() { 589 int t = top; 590 int b = base; 591 592 if (t - b < deq.length - 1) { 594 int newBase = b & (deq.length - 1); 595 int newTop = top & (deq.length - 1); 596 if (newTop < newBase) newTop += deq.length; 597 top = newTop; 598 base = newBase; 599 600 604 605 int i = newBase; 606 while (i != newTop && deq[i].ref != null) { 607 deq[i].ref = null; 608 i = (i - 1) & (deq.length - 1); 609 } 610 611 } else { 613 int newTop = t - b; 614 int oldcap = deq.length; 615 int newcap = oldcap * 2; 616 617 if (newcap >= MAX_CAPACITY) 618 throw new Error ("FJTask queue maximum capacity exceeded"); 619 620 VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap]; 621 622 for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap - 1)]; 624 625 for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef(); 627 628 deq = newdeq; 629 base = 0; 630 top = newTop; 631 } 632 } 633 634 635 636 637 638 650 651 protected void scan(final FJTask waitingFor) { 652 653 FJTask task = null; 654 655 boolean lowered = false; 657 658 667 668 FJTaskRunner[] ts = group.getArray(); 669 int idx = victimRNG.nextInt(ts.length); 670 671 for (int i = 0; i < ts.length; ++i) { 672 673 FJTaskRunner t = ts[idx]; 674 if (++idx >= ts.length) idx = 0; 676 if (t != null && t != this) { 677 678 if (waitingFor != null && waitingFor.isDone()) { 679 break; 680 } else { 681 if (COLLECT_STATS) ++scans; 682 task = t.take(); 683 if (task != null) { 684 if (COLLECT_STATS) ++steals; 685 break; 686 } else if (isInterrupted()) { 687 break; 688 } else if (!lowered) { lowered = true; 690 setPriority(scanPriority); 691 } else { yield(); 693 } 694 } 695 } 696 697 } 698 699 if (task == null) { 700 if (COLLECT_STATS) ++scans; 701 task = group.pollEntryQueue(); 702 if (COLLECT_STATS) if (task != null) ++steals; 703 } 704 705 if (lowered) setPriority(runPriority); 706 707 if (task != null && !task.isDone()) { 708 if (COLLECT_STATS) ++runs; 709 task.run(); 710 task.setDone(); 711 } 712 713 } 714 715 725 726 protected void scanWhileIdling() { 727 FJTask task = null; 728 729 boolean lowered = false; 730 long iters = 0; 731 732 FJTaskRunner[] ts = group.getArray(); 733 int idx = victimRNG.nextInt(ts.length); 734 735 do { 736 for (int i = 0; i < ts.length; ++i) { 737 738 FJTaskRunner t = ts[idx]; 739 if (++idx >= ts.length) idx = 0; 741 if (t != null && t != this) { 742 if (COLLECT_STATS) ++scans; 743 744 task = t.take(); 745 if (task != null) { 746 if (COLLECT_STATS) ++steals; 747 if (lowered) setPriority(runPriority); 748 group.setActive(this); 749 break; 750 } 751 } 752 } 753 754 if (task == null) { 755 if (isInterrupted()) 756 return; 757 758 if (COLLECT_STATS) ++scans; 759 task = group.pollEntryQueue(); 760 761 if (task != null) { 762 if (COLLECT_STATS) ++steals; 763 if (lowered) setPriority(runPriority); 764 group.setActive(this); 765 } else { 766 ++iters; 767 if (iters >= group.SCANS_PER_SLEEP) { 769 group.checkActive(this, iters); 770 if (isInterrupted()) 771 return; 772 } else if (!lowered) { 773 lowered = true; 774 setPriority(scanPriority); 775 } else { 776 yield(); 777 } 778 } 779 } 780 } while (task == null); 781 782 783 if (!task.isDone()) { 784 if (COLLECT_STATS) ++runs; 785 task.run(); 786 task.setDone(); 787 } 788 789 } 790 791 792 793 794 797 798 public void run() { 799 try { 800 while (!interrupted()) { 801 802 FJTask task = pop(); 803 if (task != null) { 804 if (!task.isDone()) { 805 if (COLLECT_STATS) ++runs; 807 task.run(); 808 task.setDone(); 809 } 810 } else 811 scanWhileIdling(); 812 } 813 } finally { 814 group.setInactive(this); 815 } 816 } 817 818 822 823 824 protected final void taskYield() { 825 FJTask task = pop(); 826 if (task != null) { 827 if (!task.isDone()) { 828 if (COLLECT_STATS) ++runs; 829 task.run(); 830 task.setDone(); 831 } 832 } else 833 scan(null); 834 } 835 836 837 841 842 protected final void taskJoin(final FJTask w) { 843 844 while (!w.isDone()) { 845 846 FJTask task = pop(); 847 if (task != null) { 848 if (!task.isDone()) { 849 if (COLLECT_STATS) ++runs; 850 task.run(); 851 task.setDone(); 852 if (task == w) return; } 854 } else 855 scan(w); 856 } 857 } 858 859 863 864 865 protected final void coInvoke(final FJTask w, final FJTask v) { 866 867 869 int t = top; 870 if (t < (base & (deq.length - 1)) + deq.length) { 871 872 deq[t & (deq.length - 1)].put(w); 873 top = t + 1; 874 875 877 if (!v.isDone()) { 878 if (COLLECT_STATS) ++runs; 879 v.run(); 880 v.setDone(); 881 } 882 883 885 while (!w.isDone()) { 886 FJTask task = pop(); 887 if (task != null) { 888 if (!task.isDone()) { 889 if (COLLECT_STATS) ++runs; 890 task.run(); 891 task.setDone(); 892 if (task == w) return; } 894 } else 895 scan(w); 896 } 897 } else slowCoInvoke(w, v); 899 } 900 901 902 905 906 protected void slowCoInvoke(final FJTask w, final FJTask v) { 907 push(w); FJTask.invoke(v); 909 taskJoin(w); 910 } 911 912 913 916 917 protected final void coInvoke(FJTask[] tasks) { 918 int nforks = tasks.length - 1; 919 920 922 int t = top; 923 924 if (nforks >= 0 && t + nforks < (base & (deq.length - 1)) + deq.length) { 925 for (int i = 0; i < nforks; ++i) { 926 deq[t++ & (deq.length - 1)].put(tasks[i]); 927 top = t; 928 } 929 930 FJTask v = tasks[nforks]; 932 if (!v.isDone()) { 933 if (COLLECT_STATS) ++runs; 934 v.run(); 935 v.setDone(); 936 } 937 938 940 for (int i = 0; i < nforks; ++i) { 941 FJTask w = tasks[i]; 942 while (!w.isDone()) { 943 944 FJTask task = pop(); 945 if (task != null) { 946 if (!task.isDone()) { 947 if (COLLECT_STATS) ++runs; 948 task.run(); 949 task.setDone(); 950 } 951 } else 952 scan(w); 953 } 954 } 955 } else slowCoInvoke(tasks); 957 } 958 959 962 963 protected void slowCoInvoke(FJTask[] tasks) { 964 for (int i = 0; i < tasks.length; ++i) push(tasks[i]); 965 for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]); 966 } 967 968 } 969 970 | Popular Tags |