1 19 20 package org.openide.util; 21 22 import java.util.HashSet ; 23 import java.util.Iterator ; 24 import java.util.LinkedList ; 25 import java.util.List ; 26 import java.util.ListIterator ; 27 import java.util.Stack ; 28 import java.util.Timer ; 29 import java.util.TimerTask ; 30 import java.util.logging.Level ; 31 import java.util.logging.Logger ; 32 33 98 public final class RequestProcessor { 99 100 private static RequestProcessor DEFAULT = new RequestProcessor(); 101 102 104 105 private static RequestProcessor UNLIMITED = new RequestProcessor("Default RequestProcessor", 50); 107 108 private static Timer starterThread = new Timer (true); 109 110 111 private static Logger logger; 112 113 114 private static int counter = 0; 115 static final boolean SLOW = Boolean.getBoolean("org.openide.util.RequestProcessor.Item.SLOW"); 116 117 118 String name; 119 120 122 boolean stopped = false; 123 124 126 private Object processorLock = new Object (); 127 128 129 private HashSet <Processor> processors = new HashSet <Processor>(); 130 131 134 private List <Item> queue = new LinkedList <Item>(); 135 136 139 private int running = 0; 140 141 143 private int throughput; 144 145 146 private boolean interruptThread; 147 148 149 public RequestProcessor() { 150 this(null, 1); 151 } 152 153 155 public RequestProcessor(String name) { 156 this(name, 1); 157 } 158 159 165 public RequestProcessor(String name, int throughput) { 166 this(name, throughput, false); 167 } 168 169 194 public RequestProcessor(String name, int throughput, boolean interruptThread) { 195 this.throughput = throughput; 196 this.name = (name != null) ? name : ("OpenIDE-request-processor-" + (counter++)); 197 this.interruptThread = interruptThread; 198 } 199 200 201 218 public static RequestProcessor getDefault() { 219 return UNLIMITED; 220 } 221 222 228 public Task post(Runnable run) { 229 return post(run, 0, Thread.MIN_PRIORITY); 230 } 231 232 239 public Task post(final Runnable run, int timeToWait) { 240 return post(run, timeToWait, Thread.MIN_PRIORITY); 241 } 242 243 256 public Task post(final Runnable run, int timeToWait, int priority) { 257 RequestProcessor.Task task = new Task(run, priority); 258 task.schedule(timeToWait); 259 260 return task; 261 } 262 263 272 public Task create(Runnable run) { 273 return create(run, false); 274 } 275 276 287 public Task create(Runnable run, boolean initiallyFinished) { 288 Task t = new Task(run); 289 if (initiallyFinished) { 290 t.notifyFinished(); 291 } 292 return t; 293 } 294 295 296 304 public boolean isRequestProcessorThread() { 305 Thread c = Thread.currentThread(); 306 307 synchronized (processorLock) { 309 return processors.contains(c); 310 } 311 } 312 313 316 public void stop() { 317 if ((this == UNLIMITED) || (this == DEFAULT)) { 318 throw new IllegalArgumentException ("Can't stop shared RP's"); } 320 321 synchronized (processorLock) { 322 stopped = true; 323 324 Iterator it = processors.iterator(); 325 326 while (it.hasNext()) 327 ((Processor) it.next()).interrupt(); 328 } 329 } 330 331 335 344 @Deprecated 345 public static Task postRequest(Runnable run) { 346 return DEFAULT.post(run); 347 } 348 349 360 @Deprecated 361 public static Task postRequest(final Runnable run, int timeToWait) { 362 return DEFAULT.post(run, timeToWait); 363 } 364 365 376 @Deprecated 377 public static Task postRequest(final Runnable run, int timeToWait, int priority) { 378 return DEFAULT.post(run, timeToWait, priority); 379 } 380 381 390 @Deprecated 391 public static Task createRequest(Runnable run) { 392 return DEFAULT.create(run); 393 } 394 395 397 static Logger logger() { 398 synchronized (starterThread) { 399 if (logger == null) { 400 logger = Logger.getLogger("org.openide.util.RequestProcessor"); } 402 403 return logger; 404 } 405 } 406 407 411 415 void enqueue(Item item) { 416 Logger em = logger(); 417 boolean loggable = em.isLoggable(Level.FINE); 418 419 synchronized (processorLock) { 420 if (item.getTask() == null) { 421 if (loggable) { 422 em.fine("Null task for item " + item); } 424 return; 425 } 426 427 prioritizedEnqueue(item); 428 429 if (running < throughput) { 430 running++; 431 432 Processor proc = Processor.get(); 433 processors.add(proc); 434 proc.setName(name); 435 proc.attachTo(this); 436 } 437 } 438 if (loggable) { 439 em.fine("Item enqueued: " + item.action + " status: " + item.enqueued); } 441 } 442 443 private void prioritizedEnqueue(Item item) { 445 int iprio = item.getPriority(); 446 447 if (queue.isEmpty()) { 448 queue.add(item); 449 item.enqueued = true; 450 451 return; 452 } else if (iprio <= queue.get(queue.size() - 1).getPriority()) { 453 queue.add(item); 454 item.enqueued = true; 455 } else { 456 for (ListIterator <Item> it = queue.listIterator(); it.hasNext();) { 457 Item next = it.next(); 458 459 if (iprio > next.getPriority()) { 460 it.set(item); 461 it.add(next); 462 item.enqueued = true; 463 464 return; 465 } 466 } 467 468 throw new IllegalStateException ("Prioritized enqueue failed!"); 469 } 470 } 471 472 Task askForWork(Processor worker, String debug) { 473 if (stopped || queue.isEmpty()) { processors.remove(worker); 475 Processor.put(worker, debug); 476 running--; 477 478 return null; 479 } else { 481 Item i = queue.remove(0); 482 Task t = i.getTask(); 483 i.clear(worker); 484 485 return t; 486 } 487 } 488 489 private class EnqueueTask extends TimerTask { 490 Item itm; 491 492 EnqueueTask(Item itm) { 493 this.itm = itm; 494 } 495 496 public void run() { 497 try { 498 enqueue(itm); 499 } catch (RuntimeException e) { 500 Exceptions.printStackTrace(e); 501 } 502 } 503 } 504 505 509 public final class Task extends org.openide.util.Task implements Cancellable { 510 private Item item; 511 private int priority = Thread.MIN_PRIORITY; 512 private long time = 0; 513 private Thread lastThread = null; 514 515 519 Task(Runnable run) { 520 super(run); 521 } 522 523 Task(Runnable run, int priority) { 524 super(run); 525 526 if (priority < Thread.MIN_PRIORITY) { 527 priority = Thread.MIN_PRIORITY; 528 } 529 530 if (priority > Thread.MAX_PRIORITY) { 531 priority = Thread.MAX_PRIORITY; 532 } 533 534 this.priority = priority; 535 } 536 537 public void run() { 538 try { 539 notifyRunning(); 540 lastThread = Thread.currentThread(); 541 run.run(); 542 } finally { 543 Item scheduled = this.item; 544 if (scheduled != null && scheduled.getTask() == this) { 545 } else { 547 notifyFinished(); 548 } 549 lastThread = null; 550 } 551 } 552 553 557 public int getDelay() { 558 long delay = time - System.currentTimeMillis(); 559 560 if (delay < 0L) { 561 return 0; 562 } 563 564 if (delay > (long) Integer.MAX_VALUE) { 565 return Integer.MAX_VALUE; 566 } 567 568 return (int) delay; 569 } 570 571 578 public void schedule(int delay) { 579 if (stopped) { 580 throw new IllegalStateException ("RequestProcessor already stopped!"); } 582 583 time = System.currentTimeMillis() + delay; 584 585 final Item localItem; 586 587 synchronized (processorLock) { 588 notifyRunning(); 589 590 if (item != null) { 591 item.clear(null); 592 } 593 594 item = new Item(this, RequestProcessor.this); 595 localItem = item; 596 } 597 598 if (delay == 0) { enqueue(localItem); 600 } else { starterThread.schedule(new EnqueueTask(localItem), delay); 602 } 603 } 604 605 610 public boolean cancel() { 611 synchronized (processorLock) { 612 boolean success; 613 614 if (item == null) { 615 success = false; 616 } else { 617 Processor p = item.getProcessor(); 618 success = item.clear(null); 619 620 if (p != null) { 621 p.interruptTask(this, RequestProcessor.this); 622 item = null; 623 } 624 } 625 626 if (success) { 627 notifyFinished(); } 629 630 return success; 631 } 632 } 633 634 636 public int getPriority() { 637 return priority; 638 } 639 640 641 public void setPriority(int priority) { 642 if (this.priority == priority) { 643 return; 644 } 645 646 if (priority < Thread.MIN_PRIORITY) { 647 priority = Thread.MIN_PRIORITY; 648 } 649 650 if (priority > Thread.MAX_PRIORITY) { 651 priority = Thread.MAX_PRIORITY; 652 } 653 654 this.priority = priority; 655 656 synchronized (processorLock) { 658 if (item == null) { 659 return; 660 } 661 662 if (queue.remove(item)) { 663 prioritizedEnqueue(item); 664 } 665 } 666 } 667 668 673 public void waitFinished() { 674 if (isRequestProcessorThread()) { boolean toRun; 676 677 Logger em = logger(); 678 boolean loggable = em.isLoggable(Level.FINE); 679 680 if (loggable) { 681 em.fine("Task.waitFinished on " + this + " from other task in RP: " + Thread.currentThread().getName()); } 683 684 685 synchronized (processorLock) { 686 toRun = !isFinished() && ((item == null) || item.clear(null)); 689 if (loggable) { 690 em.fine(" ## finished: " + isFinished()); em.fine(" ## item: " + item); } 693 } 694 695 if (toRun) { 696 if (loggable) { 697 em.fine(" ## running it synchronously"); } 699 Processor processor = (Processor)Thread.currentThread(); 700 processor.doEvaluate (this, processorLock, RequestProcessor.this); 701 } else { if (loggable) { 703 em.fine(" ## not running it synchronously"); } 705 706 if (lastThread != Thread.currentThread()) { 707 if (loggable) { 708 em.fine(" ## waiting for it to be finished"); } 710 super.waitFinished(); 711 } 712 713 } 718 if (loggable) { 719 em.fine(" ## exiting waitFinished"); } 721 } else { 722 super.waitFinished(); 723 } 724 } 725 726 739 public boolean waitFinished(long timeout) throws InterruptedException { 740 if (isRequestProcessorThread()) { 741 boolean toRun; 742 743 synchronized (processorLock) { 744 toRun = !isFinished() && ((item == null) || item.clear(null)); 745 } 746 747 if (toRun) { 748 throw new InterruptedException ( 749 "Cannot wait with timeout " + timeout + " from the RequestProcessor thread for task: " + this 750 ); } else { 753 if (lastThread != Thread.currentThread()) { 754 return super.waitFinished(timeout); 755 } else { 756 return true; 757 } 758 } 759 } else { 760 return super.waitFinished(timeout); 761 } 762 } 763 764 public String toString() { 765 return "RequestProcessor.Task [" + name + ", " + priority + "] for " + super.toString(); } 767 } 768 769 770 private static class Item extends Exception { 771 private final RequestProcessor owner; 772 private Object action; 773 private boolean enqueued; 774 775 Item(Task task, RequestProcessor rp) { 776 super("Posted StackTrace"); action = task; 778 owner = rp; 779 } 780 781 Task getTask() { 782 Object a = action; 783 784 return (a instanceof Task) ? (Task) a : null; 785 } 786 787 790 boolean clear(Processor processor) { 791 synchronized (owner.processorLock) { 792 action = processor; 793 794 return enqueued ? owner.queue.remove(this) : true; 795 } 796 } 797 798 Processor getProcessor() { 799 Object a = action; 800 801 return (a instanceof Processor) ? (Processor) a : null; 802 } 803 804 int getPriority() { 805 return getTask().getPriority(); 806 } 807 808 public Throwable fillInStackTrace() { 809 return SLOW ? super.fillInStackTrace() : this; 810 } 811 } 812 813 817 822 private static class Processor extends Thread { 823 824 private static Stack <Processor> pool = new Stack <Processor>(); 825 826 827 private static final int INACTIVE_TIMEOUT = 60000; 828 829 832 833 private RequestProcessor source; 835 836 837 private RequestProcessor.Task todo; 838 private boolean idle = true; 839 840 841 private Object lock = new Object (); 842 843 public Processor() { 844 super(getTopLevelThreadGroup(), "Inactive RequestProcessor thread"); setDaemon(true); 846 } 847 848 854 static Processor get() { 855 synchronized (pool) { 856 if (pool.isEmpty()) { 857 Processor proc = new Processor(); 858 proc.idle = false; 859 proc.start(); 860 861 return proc; 862 } else { 863 Processor proc = pool.pop(); 864 proc.idle = false; 865 866 return proc; 867 } 868 } 869 } 870 871 876 static void put(Processor proc, String last) { 877 synchronized (pool) { 878 proc.setName("Inactive RequestProcessor thread [Was:" + proc.getName() + "/" + last + "]"); proc.idle = true; 880 pool.push(proc); 881 } 882 } 883 884 886 void setPrio(int priority) { 887 if (priority != getPriority()) { 888 setPriority(priority); 889 } 890 } 891 892 898 public void attachTo(RequestProcessor src) { 899 synchronized (lock) { 900 source = src; 902 lock.notify(); 903 } 904 } 905 906 909 public void run() { 910 for (;;) { 911 RequestProcessor current = null; 912 913 synchronized (lock) { 914 try { 915 if (source == null) { 916 lock.wait(INACTIVE_TIMEOUT); } 918 } catch (InterruptedException e) { 919 } 920 922 current = source; 923 source = null; 924 925 if (current == null) { 927 synchronized (pool) { 928 if (idle) { pool.remove(this); 930 931 break; } else { 934 continue; } 936 } 937 } 938 } 939 940 String debug = null; 941 942 Logger em = logger(); 943 boolean loggable = em.isLoggable(Level.INFO); 944 945 if (loggable) { 946 em.fine("Begining work " + getName()); } 948 949 for (;;) { 951 synchronized (current.processorLock) { 953 todo = current.askForWork(this, debug); 954 if (todo == null) break; 955 } 956 setPrio(todo.getPriority()); 957 958 try { 959 if (loggable) { 960 em.fine(" Executing " + todo); } 962 963 todo.run(); 964 965 if (loggable) { 966 em.fine(" Execution finished in" + getName()); } 968 969 debug = todo.debug(); 970 } catch (OutOfMemoryError oome) { 971 em.log(Level.SEVERE, null, oome); 975 } catch (StackOverflowError e) { 976 e.printStackTrace(); 978 979 doNotify(todo, e); 981 } catch (Throwable t) { 982 doNotify(todo, t); 983 } 984 985 synchronized (current.processorLock) { 987 todo = null; 989 Thread.interrupted(); 992 } 993 } 994 995 if (loggable) { 996 em.fine("Work finished " + getName()); } 998 } 999 } 1000 1001 1003 final void doEvaluate (Task t, Object processorLock, RequestProcessor src) { 1004 Task previous = todo; 1005 boolean interrupted = Thread.interrupted(); 1006 try { 1007 todo = t; 1008 t.run (); 1009 } finally { 1010 synchronized (processorLock) { 1011 todo = previous; 1012 if (interrupted || todo.item == null) { 1013 if (src.interruptThread) { 1014 Thread.currentThread().interrupt(); 1017 } 1018 } 1019 } 1020 } 1021 } 1022 1023 1024 public void interruptTask(Task t, RequestProcessor src) { 1025 if (t != todo) { 1026 return; 1028 } 1029 1030 if (src.interruptThread) { 1031 interrupt(); 1033 } 1034 } 1035 1036 1037 private static void doNotify(RequestProcessor.Task todo, Throwable ex) { 1038 logger().log(Level.SEVERE, null, ex); 1039 if (SLOW) { 1040 logger.log(Level.SEVERE, null, todo.item); 1041 } 1042 } 1043 1044 1049 static ThreadGroup getTopLevelThreadGroup() { 1050 java.security.PrivilegedAction <ThreadGroup > run = new java.security.PrivilegedAction <ThreadGroup >() { 1051 public ThreadGroup run() { 1052 ThreadGroup current = Thread.currentThread().getThreadGroup(); 1053 1054 while (current.getParent() != null) { 1055 current = current.getParent(); 1056 } 1057 1058 return current; 1059 } 1060 }; 1061 1062 return java.security.AccessController.doPrivileged(run); 1063 } 1064 } 1065} 1066 | Popular Tags |