1 28 29 package EDU.oswego.cs.dl.util.concurrent; 30 31 import java.util.Random ; 32 33 208 209 public class FJTaskRunner extends Thread { 210 211 212 protected final FJTaskRunnerGroup group; 213 214 217 218 protected FJTaskRunner(FJTaskRunnerGroup g) { 219 group = g; 220 victimRNG = new Random (System.identityHashCode(this)); 221 runPriority = getPriority(); 222 setDaemon(true); 223 } 224 225 228 229 protected final FJTaskRunnerGroup getGroup() { return group; } 230 231 232 233 234 235 243 244 protected static final int INITIAL_CAPACITY = 4096; 245 246 250 251 protected static final int MAX_CAPACITY = 1 << 30; 252 253 256 257 protected final static class VolatileTaskRef { 258 259 protected volatile FJTask ref; 260 261 262 protected final void put(FJTask r) { ref = r; } 263 264 protected final FJTask get() { return ref; } 265 266 protected final FJTask take() { FJTask r = ref; ref = null; return r; } 267 268 273 protected static VolatileTaskRef[] newArray(int cap) { 274 VolatileTaskRef[] a = new VolatileTaskRef[cap]; 275 for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef(); 276 return a; 277 } 278 279 } 280 281 284 285 protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY); 286 287 288 protected int deqSize() { return deq.length; } 289 290 300 protected volatile int top = 0; 301 302 303 307 308 protected volatile int base = 0; 309 310 311 315 316 protected final Object barrier = new Object (); 317 318 319 320 326 327 protected boolean active = false; 328 329 330 protected final Random victimRNG; 331 332 333 334 protected int scanPriority = FJTaskRunnerGroup.DEFAULT_SCAN_PRIORITY; 335 336 337 protected int runPriority; 338 339 347 protected void setScanPriority(int pri) { scanPriority = pri; } 348 349 350 354 protected void setRunPriority(int pri) { runPriority = pri; } 355 356 361 362 363 364 static final boolean COLLECT_STATS = true; 365 367 368 370 371 protected int runs = 0; 372 373 374 protected int scans = 0; 375 376 377 protected int steals = 0; 378 379 380 381 382 383 384 385 389 390 protected final void push(final FJTask r) { 391 int t = top; 392 393 400 401 if (t < (base & (deq.length-1)) + deq.length) { 402 403 deq[t & (deq.length-1)].put(r); 404 top = t + 1; 405 } 406 407 else slowPush(r); } 410 411 412 415 416 protected synchronized void slowPush(final FJTask r) { 417 checkOverflow(); 418 push(r); } 420 421 422 429 430 protected final synchronized void put(final FJTask r) { 431 for (;;) { 432 int b = base - 1; 433 if (top < b + deq.length) { 434 435 int newBase = b & (deq.length-1); 436 deq[newBase].put(r); 437 base = newBase; 438 439 if (b != newBase) { int newTop = top & (deq.length-1); 441 if (newTop < newBase) newTop += deq.length; 442 top = newTop; 443 } 444 return; 445 } 446 else { 447 checkOverflow(); 448 } 450 } 451 } 452 453 465 466 protected final FJTask pop() { 467 470 471 int t = --top; 472 473 481 482 if (base + 1 < t) 483 return deq[t & (deq.length-1)].take(); 484 else 485 return confirmPop(t); 486 487 } 488 489 490 494 495 protected final synchronized FJTask confirmPop(int provisionalTop) { 496 if (base <= provisionalTop) 497 return deq[provisionalTop & (deq.length-1)].take(); 498 else { 504 505 top = base = 0; 506 return null; 507 } 508 } 509 510 511 515 516 517 protected final synchronized FJTask take() { 518 519 522 523 int b = base++; 524 525 if (b < top) 526 return confirmTake(b); 527 else { 528 base = b; 530 return null; 531 } 532 } 533 534 535 538 539 protected FJTask confirmTake(int oldBase) { 540 541 546 547 synchronized(barrier) { 548 if (oldBase < top) { 549 557 558 return deq[oldBase & (deq.length-1)].get(); 559 } 560 else { 561 base = oldBase; 562 return null; 563 } 564 } 565 } 566 567 568 574 575 protected void checkOverflow() { 576 int t = top; 577 int b = base; 578 579 if (t - b < deq.length-1) { 581 int newBase = b & (deq.length-1); 582 int newTop = top & (deq.length-1); 583 if (newTop < newBase) newTop += deq.length; 584 top = newTop; 585 base = newBase; 586 587 591 592 int i = newBase; 593 while (i != newTop && deq[i].ref != null) { 594 deq[i].ref = null; 595 i = (i - 1) & (deq.length-1); 596 } 597 598 } 599 else { 601 int newTop = t - b; 602 int oldcap = deq.length; 603 int newcap = oldcap * 2; 604 605 if (newcap >= MAX_CAPACITY) 606 throw new Error ("FJTask queue maximum capacity exceeded"); 607 608 VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap]; 609 610 for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)]; 612 613 for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef(); 615 616 deq = newdeq; 617 base = 0; 618 top = newTop; 619 } 620 } 621 622 623 624 625 626 638 639 protected void scan(final FJTask waitingFor) { 640 641 FJTask task = null; 642 643 boolean lowered = false; 645 646 655 656 FJTaskRunner[] ts = group.getArray(); 657 int idx = victimRNG.nextInt(ts.length); 658 659 for (int i = 0; i < ts.length; ++i) { 660 661 FJTaskRunner t = ts[idx]; 662 if (++idx >= ts.length) idx = 0; 664 if (t != null && t != this) { 665 666 if (waitingFor != null && waitingFor.isDone()) { 667 break; 668 } 669 else { 670 if (COLLECT_STATS) ++scans; 671 task = t.take(); 672 if (task != null) { 673 if (COLLECT_STATS) ++steals; 674 break; 675 } 676 else if (isInterrupted()) { 677 break; 678 } 679 else if (!lowered) { lowered = true; 681 setPriority(scanPriority); 682 } 683 else { yield(); 685 } 686 } 687 } 688 689 } 690 691 if (task == null) { 692 if (COLLECT_STATS) ++scans; 693 task = group.pollEntryQueue(); 694 if (COLLECT_STATS) if (task != null) ++steals; 695 } 696 697 if (lowered) setPriority(runPriority); 698 699 if (task != null && !task.isDone()) { 700 if (COLLECT_STATS) ++runs; 701 task.run(); 702 task.setDone(); 703 } 704 705 } 706 707 717 718 protected void scanWhileIdling() { 719 FJTask task = null; 720 721 boolean lowered = false; 722 long iters = 0; 723 724 FJTaskRunner[] ts = group.getArray(); 725 int idx = victimRNG.nextInt(ts.length); 726 727 do { 728 for (int i = 0; i < ts.length; ++i) { 729 730 FJTaskRunner t = ts[idx]; 731 if (++idx >= ts.length) idx = 0; 733 if (t != null && t != this) { 734 if (COLLECT_STATS) ++scans; 735 736 task = t.take(); 737 if (task != null) { 738 if (COLLECT_STATS) ++steals; 739 if (lowered) setPriority(runPriority); 740 group.setActive(this); 741 break; 742 } 743 } 744 } 745 746 if (task == null) { 747 if (isInterrupted()) 748 return; 749 750 if (COLLECT_STATS) ++scans; 751 task = group.pollEntryQueue(); 752 753 if (task != null) { 754 if (COLLECT_STATS) ++steals; 755 if (lowered) setPriority(runPriority); 756 group.setActive(this); 757 } 758 else { 759 ++iters; 760 if (iters >= group.SCANS_PER_SLEEP) { 762 group.checkActive(this, iters); 763 if (isInterrupted()) 764 return; 765 } 766 else if (!lowered) { 767 lowered = true; 768 setPriority(scanPriority); 769 } 770 else { 771 yield(); 772 } 773 } 774 } 775 } while (task == null); 776 777 778 if (!task.isDone()) { 779 if (COLLECT_STATS) ++runs; 780 task.run(); 781 task.setDone(); 782 } 783 784 } 785 786 787 788 789 792 793 public void run() { 794 try{ 795 while (!interrupted()) { 796 797 FJTask task = pop(); 798 if (task != null) { 799 if (!task.isDone()) { 800 if (COLLECT_STATS) ++runs; 802 task.run(); 803 task.setDone(); 804 } 805 } 806 else 807 scanWhileIdling(); 808 } 809 } 810 finally { 811 group.setInactive(this); 812 } 813 } 814 815 819 820 821 protected final void taskYield() { 822 FJTask task = pop(); 823 if (task != null) { 824 if (!task.isDone()) { 825 if (COLLECT_STATS) ++runs; 826 task.run(); 827 task.setDone(); 828 } 829 } 830 else 831 scan(null); 832 } 833 834 835 839 840 protected final void taskJoin(final FJTask w) { 841 842 while (!w.isDone()) { 843 844 FJTask task = pop(); 845 if (task != null) { 846 if (!task.isDone()) { 847 if (COLLECT_STATS) ++runs; 848 task.run(); 849 task.setDone(); 850 if (task == w) return; } 852 } 853 else 854 scan(w); 855 } 856 } 857 858 862 863 864 protected final void coInvoke(final FJTask w, final FJTask v) { 865 866 868 int t = top; 869 if (t < (base & (deq.length-1)) + deq.length) { 870 871 deq[t & (deq.length-1)].put(w); 872 top = t + 1; 873 874 876 if (!v.isDone()) { 877 if (COLLECT_STATS) ++runs; 878 v.run(); 879 v.setDone(); 880 } 881 882 884 while (!w.isDone()) { 885 FJTask task = pop(); 886 if (task != null) { 887 if (!task.isDone()) { 888 if (COLLECT_STATS) ++runs; 889 task.run(); 890 task.setDone(); 891 if (task == w) return; } 893 } 894 else 895 scan(w); 896 } 897 } 898 899 else slowCoInvoke(w, v); 901 } 902 903 904 907 908 protected void slowCoInvoke(final FJTask w, final FJTask v) { 909 push(w); FJTask.invoke(v); 911 taskJoin(w); 912 } 913 914 915 918 919 protected final void coInvoke(FJTask[] tasks) { 920 int nforks = tasks.length - 1; 921 922 924 int t = top; 925 926 if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) { 927 for (int i = 0; i < nforks; ++i) { 928 deq[t++ & (deq.length-1)].put(tasks[i]); 929 top = t; 930 } 931 932 FJTask v = tasks[nforks]; 934 if (!v.isDone()) { 935 if (COLLECT_STATS) ++runs; 936 v.run(); 937 v.setDone(); 938 } 939 940 942 for (int i = 0; i < nforks; ++i) { 943 FJTask w = tasks[i]; 944 while (!w.isDone()) { 945 946 FJTask task = pop(); 947 if (task != null) { 948 if (!task.isDone()) { 949 if (COLLECT_STATS) ++runs; 950 task.run(); 951 task.setDone(); 952 } 953 } 954 else 955 scan(w); 956 } 957 } 958 } 959 960 else slowCoInvoke(tasks); 962 } 963 964 967 968 protected void slowCoInvoke(FJTask[] tasks) { 969 for (int i = 0; i < tasks.length; ++i) push(tasks[i]); 970 for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]); 971 } 972 973 } 974 975 | Popular Tags |