KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > openide > util > RequestProcessor


1 /*
2  * The contents of this file are subject to the terms of the Common Development
3  * and Distribution License (the License). You may not use this file except in
4  * compliance with the License.
5  *
6  * You can obtain a copy of the License at http://www.netbeans.org/cddl.html
7  * or http://www.netbeans.org/cddl.txt.
8  *
9  * When distributing Covered Code, include this CDDL Header Notice in each file
10  * and include the License file at http://www.netbeans.org/cddl.txt.
11  * If applicable, add the following below the CDDL Header, with the fields
12  * enclosed by brackets [] replaced by your own identifying information:
13  * "Portions Copyrighted [year] [name of copyright owner]"
14  *
15  * The Original Software is NetBeans. The Initial Developer of the Original
16  * Software is Sun Microsystems, Inc. Portions Copyright 1997-2006 Sun
17  * Microsystems, Inc. All Rights Reserved.
18  */

19
20 package org.openide.util;
21
22 import java.util.HashSet JavaDoc;
23 import java.util.Iterator JavaDoc;
24 import java.util.LinkedList JavaDoc;
25 import java.util.List JavaDoc;
26 import java.util.ListIterator JavaDoc;
27 import java.util.Stack JavaDoc;
28 import java.util.Timer JavaDoc;
29 import java.util.TimerTask JavaDoc;
30 import java.util.logging.Level JavaDoc;
31 import java.util.logging.Logger JavaDoc;
32
33 /** Request processor that is capable to execute requests in dedicated threads.
34  * You can create your own instance or use the shared one.
35  *
36  * <P><A name="use_cases">There are several use cases for RequestProcessor:</A>
37  *
38  * <UL><LI>Having something done asynchronously in some other thread,
39  * not insisting on any kind of serialization of the requests:
40  * Use <CODE>RequestProcessor.{@link RequestProcessor#getDefault
41  * }.{@link #post(java.lang.Runnable) post(runnable)}</CODE>
42  * for this purpose.
43  * <LI>Having something done later in some other thread:
44  * Use <CODE>RequestProcessor.{@link RequestProcessor#getDefault
45  * }.{@link #post(java.lang.Runnable,int) post(runnable,&nbsp;delay)}</CODE>
46  * <LI>Having something done periodically in any thread: Use the
47  * {@link RequestProcessor.Task}'s ability to
48  * {@link RequestProcessor.Task#schedule schedule()}, like
49  * <PRE>
50  * static RequestProcessor.Task CLEANER = RequestProcessor.getDefault().post(runnable,DELAY);
51  * public void run() {
52  * doTheWork();
53  * CLEANER.schedule(DELAY);
54  * }
55  * </PRE>
56  * <STRONG>Note:</STRONG> Please think twice before implementing some periodic
57  * background activity. It is generally considered evil if it will run
58     regardless of user actions and the application state, even while the application
59     is minimized / not currently used.
60  * <LI>Having something done in some other thread but properly ordered:
61  * Create a private instance of the
62  * {@link RequestProcessor#RequestProcessor(java.lang.String) RequestProcessor(name)}</CODE>
63  * and use it from all places you'd like to have serialized. It works
64  * like a simple Mutex.
65  * <LI>Having some entity that will do processing in a limited
66  * number of threads paralelly: Create a private instance of the
67  * {@link RequestProcessor#RequestProcessor(java.lang.String,int) RequestProcessor(name,throughput)}</CODE>
68  * set proper throughput and use it to schedule the work.
69  * It works like a queue of requests passing through a semafore with predefined
70  * number of <CODE>DOWN()</CODE>s.
71  * </UL>
72  *
73  * <STRONG>Note:</STRONG> If you don't need to serialize your requests but
74  * you're generating them in bursts, you should use your private
75  * <CODE>RequestProcessor</CODE> instance with limited throughput (probably
76  * set to 1), NetBeans would try to run all your requests in parallel otherwise.
77  *
78  * <P>
79  * Since version 6.3 there is a conditional support for interruption of long running tasks.
80  * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel }
81  * but if the task was already running, one was out of luck. Since version 6.3
82  * the thread running the task is interrupted and the Runnable can check for that
83  * and terminate its execution sooner. In the runnable one shall check for
84  * thread interruption (done from {@link RequestProcessor.Task#cancel }) and
85  * if true, return immediatelly as in this example:
86  * <PRE>
87  * public void run () {
88  * while (veryLongTimeLook) {
89  * doAPieceOfIt ();
90  *
91  * if (Thread.interrupted ()) return;
92  * }
93  * }
94  * </PRE>
95  *
96  * @author Petr Nejedly, Jaroslav Tulach
97  */

98 public final class RequestProcessor {
99     /** the static instance for users that do not want to have own processor */
100     private static RequestProcessor DEFAULT = new RequestProcessor();
101
102     // 50: a conservative value, just for case of misuse
103

104     /** the static instance for users that do not want to have own processor */
105     private static RequestProcessor UNLIMITED = new RequestProcessor("Default RequestProcessor", 50); // NOI18N
106

107     /** A shared timer used to pass timeouted tasks to pending queue */
108     private static Timer JavaDoc starterThread = new Timer JavaDoc(true);
109
110     /** logger */
111     private static Logger JavaDoc logger;
112
113     /** The counter for automatic naming of unnamed RequestProcessors */
114     private static int counter = 0;
115     static final boolean SLOW = Boolean.getBoolean("org.openide.util.RequestProcessor.Item.SLOW");
116
117     /** The name of the RequestProcessor instance */
118     String JavaDoc name;
119
120     /** If the RP was stopped, this variable will be set, every new post()
121      * will throw an exception and no task will be processed any further */

122     boolean stopped = false;
123
124     /** The lock covering following four fields. They should be accessed
125      * only while having this lock held. */

126     private Object JavaDoc processorLock = new Object JavaDoc();
127
128     /** The set holding all the Processors assigned to this RequestProcessor */
129     private HashSet JavaDoc<Processor> processors = new HashSet JavaDoc<Processor>();
130
131     /** Actualy the first item is pending to be processed.
132      * Can be accessed/trusted only under the above processorLock lock.
133      * If null, nothing is scheduled and the processor is not running. */

134     private List JavaDoc<Item> queue = new LinkedList JavaDoc<Item>();
135
136     /** Number of currently running processors. If there is a new request
137      * and this number is lower that the throughput, new Processor is asked
138      * to carry over the request. */

139     private int running = 0;
140
141     /** The maximal number of processors that can perform the requests sent
142      * to this RequestProcessors. If 1, all the requests are serialized. */

143     private int throughput;
144     
145     /** support for interrupts or not? */
146     private boolean interruptThread;
147
148     /** Creates new RequestProcessor with automatically assigned unique name. */
149     public RequestProcessor() {
150         this(null, 1);
151     }
152
153     /** Creates a new named RequestProcessor with throughput 1.
154      * @param name the name to use for the request processor thread */

155     public RequestProcessor(String JavaDoc name) {
156         this(name, 1);
157     }
158
159     /** Creates a new named RequestProcessor with defined throughput.
160      * @param name the name to use for the request processor thread
161      * @param throughput the maximal count of requests allowed to run in parallel
162      *
163      * @since OpenAPI version 2.12
164      */

165     public RequestProcessor(String JavaDoc name, int throughput) {
166         this(name, throughput, false);
167     }
168
169     /** Creates a new named RequestProcessor with defined throughput which
170      * can support interruption of the thread the processor runs in.
171      * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel }
172      * but if the task was already running, one was out of luck. With this
173      * constructor one can create a {@link RequestProcessor} which threads
174      * thread running tasks are interrupted and the Runnable can check for that
175      * and terminate its execution sooner. In the runnable one shall check for
176      * thread interruption (done from {@link RequestProcessor.Task#cancel }) and
177      * if true, return immediatelly as in this example:
178      * <PRE>
179      * public void run () {
180      * while (veryLongTimeLook) {
181      * doAPieceOfIt ();
182      *
183      * if (Thread.interrupted ()) return;
184      * }
185      * }
186      * </PRE>
187      *
188      * @param name the name to use for the request processor thread
189      * @param throughput the maximal count of requests allowed to run in parallel
190      * @param interruptThread true if {@link RequestProcessor.Task#cancel} shall interrupt the thread
191      *
192      * @since 6.3
193      */

194     public RequestProcessor(String JavaDoc name, int throughput, boolean interruptThread) {
195         this.throughput = throughput;
196         this.name = (name != null) ? name : ("OpenIDE-request-processor-" + (counter++));
197         this.interruptThread = interruptThread;
198     }
199     
200     
201     /** The getter for the shared instance of the <CODE>RequestProcessor</CODE>.
202      * This instance is shared by anybody who
203      * needs a way of performing sporadic or repeated asynchronous work.
204      * Tasks posted to this instance may be canceled until they start their
205      * execution. If a there is a need to cancel a task while it is running
206      * a seperate request processor needs to be created via
207      * {@link #RequestProcessor(String, int, boolean)} constructor.
208      *
209      * @return an instance of RequestProcessor that is capable of performing
210      * "unlimited" (currently limited to 50, just for case of misuse) number
211      * of requests in parallel.
212      *
213      * @see #RequestProcessor(String, int, boolean)
214      * @see RequestProcessor.Task#cancel
215      *
216      * @since version 2.12
217      */

218     public static RequestProcessor getDefault() {
219         return UNLIMITED;
220     }
221
222     /** This methods asks the request processor to start given
223      * runnable immediately. The default priority is {@link Thread#MIN_PRIORITY}.
224      *
225      * @param run class to run
226      * @return the task to control the request
227      */

228     public Task post(Runnable JavaDoc run) {
229         return post(run, 0, Thread.MIN_PRIORITY);
230     }
231
232     /** This methods asks the request processor to start given
233     * runnable after <code>timeToWait</code> milliseconds. The default priority is {@link Thread#MIN_PRIORITY}.
234     *
235     * @param run class to run
236     * @param timeToWait to wait before execution
237     * @return the task to control the request
238     */

239     public Task post(final Runnable JavaDoc run, int timeToWait) {
240         return post(run, timeToWait, Thread.MIN_PRIORITY);
241     }
242
243     /** This methods asks the request processor to start given
244     * runnable after <code>timeToWait</code> milliseconds. Given priority is assigned to the
245     * request. <p>
246     * For request relaying please consider:
247     * <pre>
248     * post(run, timeToWait, Thread.currentThread().getPriority());
249     * </pre>
250     *
251     * @param run class to run
252     * @param timeToWait to wait before execution
253     * @param priority the priority from {@link Thread#MIN_PRIORITY} to {@link Thread#MAX_PRIORITY}
254     * @return the task to control the request
255     */

256     public Task post(final Runnable JavaDoc run, int timeToWait, int priority) {
257         RequestProcessor.Task task = new Task(run, priority);
258         task.schedule(timeToWait);
259
260         return task;
261     }
262
263     /** Creates request that can be later started by setting its delay.
264     * The request is not immediatelly put into the queue. It is planned after
265     * setting its delay by schedule method. By default the initial state of
266     * the task is <code>!isFinished()</code> so doing waitFinished() will
267     * block on and wait until the task is scheduled.
268     *
269     * @param run action to run in the process
270     * @return the task to control execution of given action
271     */

272     public Task create(Runnable JavaDoc run) {
273         return create(run, false);
274     }
275     
276     /** Creates request that can be later started by setting its delay.
277     * The request is not immediatelly put into the queue. It is planned after
278     * setting its delay by schedule method.
279     *
280     * @param run action to run in the process
281     * @param initiallyFinished should the task be marked initially finished? If
282     * so the {@link Task#waitFinished} on the task will succeeded immediatelly even
283     * the task has not yet been {@link Task#schedule}d.
284     * @return the task to control execution of given action
285     * @since 6.8
286     */

287     public Task create(Runnable JavaDoc run, boolean initiallyFinished) {
288         Task t = new Task(run);
289         if (initiallyFinished) {
290             t.notifyFinished();
291         }
292         return t;
293     }
294     
295
296     /** Tests if the current thread is request processor thread.
297     * This method could be used to prevent the deadlocks using
298     * <CODE>waitFinished</CODE> method. Any two tasks created
299     * by request processor must not wait for themself.
300     *
301     * @return <CODE>true</CODE> if the current thread is request processor
302     * thread, otherwise <CODE>false</CODE>
303     */

304     public boolean isRequestProcessorThread() {
305         Thread JavaDoc c = Thread.currentThread();
306
307         // return c instanceof Processor && ((Processor)c).source == this;
308
synchronized (processorLock) {
309             return processors.contains(c);
310         }
311     }
312
313     /** Stops processing of runnables processor.
314     * The currently running runnable is finished and no new is started.
315     */

316     public void stop() {
317         if ((this == UNLIMITED) || (this == DEFAULT)) {
318             throw new IllegalArgumentException JavaDoc("Can't stop shared RP's"); // NOI18N
319
}
320
321         synchronized (processorLock) {
322             stopped = true;
323
324             Iterator JavaDoc it = processors.iterator();
325
326             while (it.hasNext())
327                 ((Processor) it.next()).interrupt();
328         }
329     }
330
331     //
332
// Static methods communicating with default request processor
333
//
334

335     /** This methods asks the request processor to start given
336      * runnable after <code>timeToWait</code> milliseconds. The default priority is {@link Thread#MIN_PRIORITY}.
337      *
338      * @param run class to run
339      * @return the task to control the request
340      *
341      * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
342      * among different users and posting even blocking requests is inherently
343      * deadlock-prone. See <A HREF="#use_cases">use cases</A>. */

344     @Deprecated JavaDoc
345     public static Task postRequest(Runnable JavaDoc run) {
346         return DEFAULT.post(run);
347     }
348
349     /** This methods asks the request processor to start given
350      * runnable after <code>timeToWait</code> milliseconds.
351      * The default priority is {@link Thread#MIN_PRIORITY}.
352      *
353      * @param run class to run
354      * @param timeToWait to wait before execution
355      * @return the task to control the request
356      *
357      * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
358      * among different users and posting even blocking requests is inherently
359      * deadlock-prone. See <A HREF="#use_cases">use cases</A>. */

360     @Deprecated JavaDoc
361     public static Task postRequest(final Runnable JavaDoc run, int timeToWait) {
362         return DEFAULT.post(run, timeToWait);
363     }
364
365     /** This methods asks the request processor to start given
366      * runnable after <code>timeToWait</code> milliseconds. Given priority is assigned to the
367      * request.
368      * @param run class to run
369      * @param timeToWait to wait before execution
370      * @param priority the priority from {@link Thread#MIN_PRIORITY} to {@link Thread#MAX_PRIORITY}
371      * @return the task to control the request
372      *
373      * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
374      * among different users and posting even blocking requests is inherently
375      * deadlock-prone. See <A HREF="#use_cases">use cases</A>. */

376     @Deprecated JavaDoc
377     public static Task postRequest(final Runnable JavaDoc run, int timeToWait, int priority) {
378         return DEFAULT.post(run, timeToWait, priority);
379     }
380
381     /** Creates request that can be later started by setting its delay.
382      * The request is not immediatelly put into the queue. It is planned after
383      * setting its delay by setDelay method.
384      * @param run action to run in the process
385      * @return the task to control execution of given action
386      *
387      * @deprecated Sharing of one singlethreaded <CODE>RequestProcessor</CODE>
388      * among different users and posting even blocking requests is inherently
389      * deadlock-prone. See <A HREF="#use_cases">use cases</A>. */

390     @Deprecated JavaDoc
391     public static Task createRequest(Runnable JavaDoc run) {
392         return DEFAULT.create(run);
393     }
394
395     /** Logger for the error manager.
396      */

397     static Logger JavaDoc logger() {
398         synchronized (starterThread) {
399             if (logger == null) {
400                 logger = Logger.getLogger("org.openide.util.RequestProcessor"); // NOI18N
401
}
402
403             return logger;
404         }
405     }
406
407     //------------------------------------------------------------------------------
408
// The pending queue management implementation
409
//------------------------------------------------------------------------------
410

411     /** Place the Task to the queue of pending tasks for immediate processing.
412      * If there is no other Task planned, this task is immediatelly processed
413      * in the Processor.
414      */

415     void enqueue(Item item) {
416         Logger JavaDoc em = logger();
417         boolean loggable = em.isLoggable(Level.FINE);
418         
419         synchronized (processorLock) {
420             if (item.getTask() == null) {
421                 if (loggable) {
422                     em.fine("Null task for item " + item); // NOI18N
423
}
424                 return;
425             }
426
427             prioritizedEnqueue(item);
428
429             if (running < throughput) {
430                 running++;
431
432                 Processor proc = Processor.get();
433                 processors.add(proc);
434                 proc.setName(name);
435                 proc.attachTo(this);
436             }
437         }
438         if (loggable) {
439             em.fine("Item enqueued: " + item.action + " status: " + item.enqueued); // NOI18N
440
}
441     }
442
443     // call it under queue lock i.e. processorLock
444
private void prioritizedEnqueue(Item item) {
445         int iprio = item.getPriority();
446
447         if (queue.isEmpty()) {
448             queue.add(item);
449             item.enqueued = true;
450
451             return;
452         } else if (iprio <= queue.get(queue.size() - 1).getPriority()) {
453             queue.add(item);
454             item.enqueued = true;
455         } else {
456             for (ListIterator JavaDoc<Item> it = queue.listIterator(); it.hasNext();) {
457                 Item next = it.next();
458
459                 if (iprio > next.getPriority()) {
460                     it.set(item);
461                     it.add(next);
462                     item.enqueued = true;
463
464                     return;
465                 }
466             }
467
468             throw new IllegalStateException JavaDoc("Prioritized enqueue failed!");
469         }
470     }
471
472     Task askForWork(Processor worker, String JavaDoc debug) {
473         if (stopped || queue.isEmpty()) { // no more work in this burst, return him
474
processors.remove(worker);
475             Processor.put(worker, debug);
476             running--;
477
478             return null;
479         } else { // we have some work for the worker, pass it
480

481             Item i = queue.remove(0);
482             Task t = i.getTask();
483             i.clear(worker);
484
485             return t;
486         }
487     }
488
489     private class EnqueueTask extends TimerTask JavaDoc {
490         Item itm;
491         
492         EnqueueTask(Item itm) {
493             this.itm = itm;
494         }
495         
496         public void run() {
497             try {
498                 enqueue(itm);
499             } catch (RuntimeException JavaDoc e) {
500                 Exceptions.printStackTrace(e);
501             }
502         }
503     }
504     
505     /**
506      * The task describing the request sent to the processor.
507      * Cancellable since 4.1.
508      */

509     public final class Task extends org.openide.util.Task implements Cancellable {
510         private Item item;
511         private int priority = Thread.MIN_PRIORITY;
512         private long time = 0;
513         private Thread JavaDoc lastThread = null;
514
515         /** @param run runnable to start
516         * @param delay amount of millis to wait
517         * @param priority the priorty of the task
518         */

519         Task(Runnable JavaDoc run) {
520             super(run);
521         }
522
523         Task(Runnable JavaDoc run, int priority) {
524             super(run);
525
526             if (priority < Thread.MIN_PRIORITY) {
527                 priority = Thread.MIN_PRIORITY;
528             }
529
530             if (priority > Thread.MAX_PRIORITY) {
531                 priority = Thread.MAX_PRIORITY;
532             }
533
534             this.priority = priority;
535         }
536
537         public void run() {
538             try {
539                 notifyRunning();
540                 lastThread = Thread.currentThread();
541                 run.run();
542             } finally {
543                 Item scheduled = this.item;
544                 if (scheduled != null && scheduled.getTask() == this) {
545                     // do not mark as finished, we are scheduled for future
546
} else {
547                     notifyFinished();
548                 }
549                 lastThread = null;
550             }
551         }
552
553         /** Getter for amount of millis till this task
554         * is started.
555         * @return amount of millis
556         */

557         public int getDelay() {
558             long delay = time - System.currentTimeMillis();
559
560             if (delay < 0L) {
561                 return 0;
562             }
563
564             if (delay > (long) Integer.MAX_VALUE) {
565                 return Integer.MAX_VALUE;
566             }
567
568             return (int) delay;
569         }
570
571         /** (Re-)schedules a task to run in the future.
572         * If the task has not been run yet, it is postponed to
573         * the new time. If it has already run and finished, it is scheduled
574         * to be started again. If it is currently running, it is nevertheless
575         * left to finish, and also scheduled to run again.
576         * @param delay time in milliseconds to wait (starting from now)
577         */

578         public void schedule(int delay) {
579             if (stopped) {
580                 throw new IllegalStateException JavaDoc("RequestProcessor already stopped!"); // NOI18N
581
}
582
583             time = System.currentTimeMillis() + delay;
584
585             final Item localItem;
586
587             synchronized (processorLock) {
588                 notifyRunning();
589
590                 if (item != null) {
591                     item.clear(null);
592                 }
593
594                 item = new Item(this, RequestProcessor.this);
595                 localItem = item;
596             }
597
598             if (delay == 0) { // Place it to pending queue immediatelly
599
enqueue(localItem);
600             } else { // Post the starter
601
starterThread.schedule(new EnqueueTask(localItem), delay);
602             }
603         }
604
605         /** Removes the task from the queue.
606         *
607         * @return true if the task has been removed from the queue,
608         * false it the task has already been processed
609         */

610         public boolean cancel() {
611             synchronized (processorLock) {
612                 boolean success;
613
614                 if (item == null) {
615                     success = false;
616                 } else {
617                     Processor p = item.getProcessor();
618                     success = item.clear(null);
619
620                     if (p != null) {
621                         p.interruptTask(this, RequestProcessor.this);
622                         item = null;
623                     }
624                 }
625
626                 if (success) {
627                     notifyFinished(); // mark it as finished
628
}
629
630                 return success;
631             }
632         }
633
634         /** Current priority of the task.
635         */

636         public int getPriority() {
637             return priority;
638         }
639
640         /** Changes the priority the task will be performed with. */
641         public void setPriority(int priority) {
642             if (this.priority == priority) {
643                 return;
644             }
645
646             if (priority < Thread.MIN_PRIORITY) {
647                 priority = Thread.MIN_PRIORITY;
648             }
649
650             if (priority > Thread.MAX_PRIORITY) {
651                 priority = Thread.MAX_PRIORITY;
652             }
653
654             this.priority = priority;
655
656             // update queue position accordingly
657
synchronized (processorLock) {
658                 if (item == null) {
659                     return;
660                 }
661
662                 if (queue.remove(item)) {
663                     prioritizedEnqueue(item);
664                 }
665             }
666         }
667
668         /** This method is an implementation of the waitFinished method
669         * in the RequestProcessor.Task. It check the current thread if it is
670         * request processor thread and in such case runs the task immediatelly
671         * to prevent deadlocks.
672         */

673         public void waitFinished() {
674             if (isRequestProcessorThread()) { //System.err.println(
675
boolean toRun;
676                 
677                 Logger JavaDoc em = logger();
678                 boolean loggable = em.isLoggable(Level.FINE);
679                 
680                 if (loggable) {
681                     em.fine("Task.waitFinished on " + this + " from other task in RP: " + Thread.currentThread().getName()); // NOI18N
682
}
683                 
684
685                 synchronized (processorLock) {
686                     // correct line: toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ());
687
// the same: toRun = !isFinished () && (item == null ? true : item.clear ());
688
toRun = !isFinished() && ((item == null) || item.clear(null));
689                     if (loggable) {
690                         em.fine(" ## finished: " + isFinished()); // NOI18N
691
em.fine(" ## item: " + item); // NOI18N
692
}
693                 }
694
695                 if (toRun) {
696                     if (loggable) {
697                         em.fine(" ## running it synchronously"); // NOI18N
698
}
699                     Processor processor = (Processor)Thread.currentThread();
700                     processor.doEvaluate (this, processorLock, RequestProcessor.this);
701                 } else { // it is already running in other thread of this RP
702
if (loggable) {
703                         em.fine(" ## not running it synchronously"); // NOI18N
704
}
705
706                     if (lastThread != Thread.currentThread()) {
707                         if (loggable) {
708                             em.fine(" ## waiting for it to be finished"); // NOI18N
709
}
710                         super.waitFinished();
711                     }
712
713                     // else {
714
//System.err.println("Thread waiting for itself!!!!! - semantics broken!!!");
715
//Thread.dumpStack();
716
// }
717
}
718                 if (loggable) {
719                     em.fine(" ## exiting waitFinished"); // NOI18N
720
}
721             } else {
722                 super.waitFinished();
723             }
724         }
725
726         /** Enhanced reimplementation of the {@link Task#waitFinished(long)}
727         * method. The added semantic is that if one calls this method from
728         * another task of the same processor, and the task has not yet been
729         * executed, the method will immediatelly detect that and throw
730         * <code>InterruptedException</code> to signal that state.
731         *
732         * @param timeout the amount of time to wait
733         * @exception InterruptedException if waiting has been interrupted or if
734         * the wait cannot succeed due to possible deadlock collision
735         * @return true if the task was finished successfully during the
736         * timeout period, false otherwise
737         * @since 5.0
738         */

739         public boolean waitFinished(long timeout) throws InterruptedException JavaDoc {
740             if (isRequestProcessorThread()) {
741                 boolean toRun;
742
743                 synchronized (processorLock) {
744                     toRun = !isFinished() && ((item == null) || item.clear(null));
745                 }
746
747                 if (toRun) {
748                     throw new InterruptedException JavaDoc(
749                         "Cannot wait with timeout " + timeout + " from the RequestProcessor thread for task: " + this
750                     ); // NOI18N
751
} else { // it is already running in other thread of this RP
752

753                     if (lastThread != Thread.currentThread()) {
754                         return super.waitFinished(timeout);
755                     } else {
756                         return true;
757                     }
758                 }
759             } else {
760                 return super.waitFinished(timeout);
761             }
762         }
763
764         public String JavaDoc toString() {
765             return "RequestProcessor.Task [" + name + ", " + priority + "] for " + super.toString(); // NOI18N
766
}
767     }
768
769     /* One item representing the task pending in the pending queue */
770     private static class Item extends Exception JavaDoc {
771         private final RequestProcessor owner;
772         private Object JavaDoc action;
773         private boolean enqueued;
774
775         Item(Task task, RequestProcessor rp) {
776             super("Posted StackTrace"); // NOI18N
777
action = task;
778             owner = rp;
779         }
780
781         Task getTask() {
782             Object JavaDoc a = action;
783
784             return (a instanceof Task) ? (Task) a : null;
785         }
786
787         /** Annulate this request iff still possible.
788          * @returns true if it was possible to skip this item, false
789          * if the item was/is already processed */

790         boolean clear(Processor processor) {
791             synchronized (owner.processorLock) {
792                 action = processor;
793
794                 return enqueued ? owner.queue.remove(this) : true;
795             }
796         }
797
798         Processor getProcessor() {
799             Object JavaDoc a = action;
800
801             return (a instanceof Processor) ? (Processor) a : null;
802         }
803
804         int getPriority() {
805             return getTask().getPriority();
806         }
807
808         public Throwable JavaDoc fillInStackTrace() {
809             return SLOW ? super.fillInStackTrace() : this;
810         }
811     }
812
813     //------------------------------------------------------------------------------
814
// The Processor management implementation
815
//------------------------------------------------------------------------------
816

817     /**
818     /** A special thread that processes timouted Tasks from a RequestProcessor.
819      * It uses the RequestProcessor as a synchronized queue (a Channel),
820      * so it is possible to run more Processors in paralel for one RequestProcessor
821      */

822     private static class Processor extends Thread JavaDoc {
823         /** A stack containing all the inactive Processors */
824         private static Stack JavaDoc<Processor> pool = new Stack JavaDoc<Processor>();
825
826         /* One minute of inactivity and the Thread will die if not assigned */
827         private static final int INACTIVE_TIMEOUT = 60000;
828
829         /** Internal variable holding the Runnable to be run.
830          * Used for passing Runnable through Thread boundaries.
831          */

832
833         //private Item task;
834
private RequestProcessor source;
835
836         /** task we are working on */
837         private RequestProcessor.Task todo;
838         private boolean idle = true;
839
840         /** Waiting lock */
841         private Object JavaDoc lock = new Object JavaDoc();
842
843         public Processor() {
844             super(getTopLevelThreadGroup(), "Inactive RequestProcessor thread"); // NOI18N
845
setDaemon(true);
846         }
847
848         /** Provide an inactive Processor instance. It will return either
849          * existing inactive processor from the pool or will create a new instance
850          * if no instance is in the pool.
851          *
852          * @return inactive Processor
853          */

854         static Processor get() {
855             synchronized (pool) {
856                 if (pool.isEmpty()) {
857                     Processor proc = new Processor();
858                     proc.idle = false;
859                     proc.start();
860
861                     return proc;
862                 } else {
863                     Processor proc = pool.pop();
864                     proc.idle = false;
865
866                     return proc;
867                 }
868             }
869         }
870
871         /** A way of returning a Processor to the inactive pool.
872          *
873          * @param proc the Processor to return to the pool. It shall be inactive.
874          * @param last the debugging string identifying the last client.
875          */

876         static void put(Processor proc, String JavaDoc last) {
877             synchronized (pool) {
878                 proc.setName("Inactive RequestProcessor thread [Was:" + proc.getName() + "/" + last + "]"); // NOI18N
879
proc.idle = true;
880                 pool.push(proc);
881             }
882         }
883
884         /** setPriority wrapper that skips setting the same priority
885          * we'return already running at */

886         void setPrio(int priority) {
887             if (priority != getPriority()) {
888                 setPriority(priority);
889             }
890         }
891
892         /**
893          * Sets an Item to be performed and notifies the performing Thread
894          * to start the processing.
895          *
896          * @param r the Item to run.
897          */

898         public void attachTo(RequestProcessor src) {
899             synchronized (lock) {
900                 //assert(source == null);
901
source = src;
902                 lock.notify();
903             }
904         }
905
906         /**
907          * The method that will repeatedly wait for a request and perform it.
908          */

909         public void run() {
910             for (;;) {
911                 RequestProcessor current = null;
912
913                 synchronized (lock) {
914                     try {
915                         if (source == null) {
916                             lock.wait(INACTIVE_TIMEOUT); // wait for the job
917
}
918                     } catch (InterruptedException JavaDoc e) {
919                     }
920                      // not interesting
921

922                     current = source;
923                     source = null;
924
925                     if (current == null) { // We've timeouted
926

927                         synchronized (pool) {
928                             if (idle) { // and we're idle
929
pool.remove(this);
930
931                                 break; // exit the thread
932
} else { // this will happen if we've been just
933

934                                 continue; // before timeout when we were assigned
935
}
936                         }
937                     }
938                 }
939
940                 String JavaDoc debug = null;
941
942                 Logger JavaDoc em = logger();
943                 boolean loggable = em.isLoggable(Level.INFO);
944
945                 if (loggable) {
946                     em.fine("Begining work " + getName()); // NOI18N
947
}
948
949                 // while we have something to do
950
for (;;) {
951                     // need the same sync as interruptTask
952
synchronized (current.processorLock) {
953                         todo = current.askForWork(this, debug);
954                         if (todo == null) break;
955                     }
956                     setPrio(todo.getPriority());
957
958                     try {
959                         if (loggable) {
960                             em.fine(" Executing " + todo); // NOI18N
961
}
962
963                         todo.run();
964
965                         if (loggable) {
966                             em.fine(" Execution finished in" + getName()); // NOI18N
967
}
968
969                         debug = todo.debug();
970                     } catch (OutOfMemoryError JavaDoc oome) {
971                         // direct notification, there may be no room for
972
// annotations and we need OOME to be processed
973
// for debugging hooks
974
em.log(Level.SEVERE, null, oome);
975                     } catch (StackOverflowError JavaDoc e) {
976                         // Try as hard as possible to get a real stack trace
977
e.printStackTrace();
978
979                         // recoverable too
980
doNotify(todo, e);
981                     } catch (Throwable JavaDoc t) {
982                         doNotify(todo, t);
983                     }
984
985                     // need the same sync as interruptTask
986
synchronized (current.processorLock) {
987                         // to improve GC
988
todo = null;
989                         // and to clear any possible interrupted state
990
// set by calling Task.cancel ()
991
Thread.interrupted();
992                     }
993                 }
994
995                 if (loggable) {
996                     em.fine("Work finished " + getName()); // NOI18N
997
}
998             }
999         }
1000        
1001        /** Evaluates given task directly.
1002         */

1003        final void doEvaluate (Task t, Object JavaDoc processorLock, RequestProcessor src) {
1004            Task previous = todo;
1005            boolean interrupted = Thread.interrupted();
1006            try {
1007                todo = t;
1008                t.run ();
1009            } finally {
1010                synchronized (processorLock) {
1011                    todo = previous;
1012                    if (interrupted || todo.item == null) {
1013                        if (src.interruptThread) {
1014                            // reinterrupt the thread if it was interrupted and
1015
// we support interrupts
1016
Thread.currentThread().interrupt();
1017                        }
1018                    }
1019                }
1020            }
1021        }
1022
1023        /** Called under the processorLock */
1024        public void interruptTask(Task t, RequestProcessor src) {
1025            if (t != todo) {
1026                // not running this task so
1027
return;
1028            }
1029            
1030            if (src.interruptThread) {
1031                // otherwise interrupt this thread
1032
interrupt();
1033            }
1034        }
1035
1036        /** @see "#20467" */
1037        private static void doNotify(RequestProcessor.Task todo, Throwable JavaDoc ex) {
1038            logger().log(Level.SEVERE, null, ex);
1039            if (SLOW) {
1040                logger.log(Level.SEVERE, null, todo.item);
1041            }
1042        }
1043
1044        /**
1045         * @return a top level ThreadGroup. The method ensures that even
1046         * Processors created by internal execution will survive the
1047         * end of the task.
1048         */

1049        static ThreadGroup JavaDoc getTopLevelThreadGroup() {
1050            java.security.PrivilegedAction JavaDoc<ThreadGroup JavaDoc> run = new java.security.PrivilegedAction JavaDoc<ThreadGroup JavaDoc>() {
1051                    public ThreadGroup JavaDoc run() {
1052                        ThreadGroup JavaDoc current = Thread.currentThread().getThreadGroup();
1053
1054                        while (current.getParent() != null) {
1055                            current = current.getParent();
1056                        }
1057
1058                        return current;
1059                    }
1060                };
1061
1062            return java.security.AccessController.doPrivileged(run);
1063        }
1064    }
1065}
1066
Popular Tags