KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > batik > util > RunnableQueue


1 /*
2
3    Copyright 2001-2003 The Apache Software Foundation
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16
17  */

18 package org.apache.batik.util;
19
20 import java.util.Iterator JavaDoc;
21
22 /**
23  * This class represents an object which queues Runnable objects for
24  * invocation in a single thread.
25  *
26  * @author <a HREF="mailto:stephane@hillion.org">Stephane Hillion</a>
27  * @version $Id: RunnableQueue.java,v 1.21 2005/02/23 11:05:54 deweese Exp $
28  */

29 public class RunnableQueue implements Runnable JavaDoc {
30
31     /**
32      * Type-safe enumeration of queue states.
33      */

34     public static class RunnableQueueState extends Object JavaDoc {
35         final String JavaDoc value;
36         private RunnableQueueState(String JavaDoc value) {
37             this.value = value.intern(); }
38         public String JavaDoc getValue() { return value; }
39         public String JavaDoc toString() {
40             return "[RunnableQueueState: " + value + "]"; }
41     }
42
43     /**
44      * The queue is in the processes of running tasks.
45      */

46     public static final RunnableQueueState RUNNING
47         = new RunnableQueueState("Running");
48
49     /**
50      * The queue may still be running tasks but as soon as possible
51      * will go to SUSPENDED state.
52      */

53     public static final RunnableQueueState SUSPENDING
54         = new RunnableQueueState("Suspending");
55
56     /**
57      * The queue is no longer running any tasks and will not
58      * run any tasks until resumeExecution is called.
59      */

60     public static final RunnableQueueState SUSPENDED
61         = new RunnableQueueState("Suspended");
62
63     /**
64      * The Suspension state of this thread.
65      */

66     protected RunnableQueueState state;
67
68     /**
69      * Object to synchronize/wait/notify for suspension
70      * issues.
71      */

72     protected Object JavaDoc stateLock = new Object JavaDoc();
73
74     /**
75      * Used to indicate if the queue was resumed while
76      * still running, so a 'resumed' event can be sent.
77      */

78     protected boolean wasResumed;
79
80     /**
81      * The Runnable objects list, also used as synchoronization point
82      * for pushing/poping runables.
83      */

84     protected DoublyLinkedList list = new DoublyLinkedList();
85
86     /**
87      * Count of preempt entries in queue, so preempt entries
88      * can be kept properly ordered.
89      */

90     protected int preemptCount = 0;
91
92     /**
93      * The object which handle run events.
94      */

95     protected RunHandler runHandler;
96
97     /**
98      * The current thread.
99      */

100     protected HaltingThread runnableQueueThread;
101
102     /**
103      * Creates a new RunnableQueue started in a new thread.
104      * @return a RunnableQueue which is garanteed to have entered its
105      * <tt>run()</tt> method.
106      */

107     public static RunnableQueue createRunnableQueue() {
108         RunnableQueue result = new RunnableQueue();
109         synchronized (result) {
110             HaltingThread ht = new HaltingThread
111                 (result, "RunnableQueue-" + threadCount++);
112             ht.setDaemon(true);
113             ht.start();
114             while (result.getThread() == null) {
115                 try {
116                     result.wait();
117                 } catch (InterruptedException JavaDoc ie) {
118                 }
119             }
120         }
121         return result;
122     }
123     private static int threadCount;
124     
125     /**
126      * Runs this queue.
127      */

128     public void run() {
129         synchronized (this) {
130             runnableQueueThread = (HaltingThread)Thread.currentThread();
131             // Wake the create method so it knows we are in
132
// our run and ready to go.
133
notify();
134         }
135
136         Link l;
137         Runnable JavaDoc rable;
138         try {
139             while (!HaltingThread.hasBeenHalted()) {
140
141                 // Mutex for suspention work.
142
synchronized (stateLock) {
143                     if (state == RUNNING) {
144
145                         if (wasResumed) {
146                             wasResumed = false;
147                             executionResumed();
148                         }
149                     } else {
150                         
151                         while (state != RUNNING) {
152                             state = SUSPENDED;
153
154                             // notify suspendExecution in case it is
155
// waiting til we shut down.
156
stateLock.notifyAll();
157
158                             executionSuspended();
159
160                             // Wait until resumeExecution called.
161
try {
162                                 stateLock.wait();
163                             } catch(InterruptedException JavaDoc ie) { }
164                         }
165
166                         wasResumed = false;
167                         executionResumed();
168                     }
169                 }
170
171                 // The following seriously stress tests the class
172
// for stuff happening between the two sync blocks.
173
//
174
// try {
175
// Thread.sleep(1);
176
// } catch (InterruptedException ie) { }
177

178                 synchronized (list) {
179                     if (state == SUSPENDING)
180                         continue;
181                     l = (Link)list.pop();
182                     if (preemptCount != 0) preemptCount--;
183                     if (l == null) {
184                         // No item to run, wait till there is one.
185
try {
186                             list.wait();
187                         } catch (InterruptedException JavaDoc ie) {
188                             // just loop again.
189
}
190                         continue; // start loop over again...
191
}
192
193                     rable = l.runnable;
194                 }
195
196                 runnableStart(rable);
197
198                 try {
199                     rable.run();
200                 } catch (ThreadDeath JavaDoc td) {
201                     // Let it kill us...
202
throw td;
203                 } catch (Throwable JavaDoc t) {
204                     // Might be nice to notify someone directly.
205
// But this is more or less what Swing does.
206
t.printStackTrace();
207                 }
208                 l.unlock();
209                 runnableInvoked(rable);
210             }
211         } catch (InterruptedException JavaDoc e) {
212         } finally {
213             synchronized (this) {
214                 runnableQueueThread = null;
215             }
216         }
217     }
218
219     /**
220      * Returns the thread in which the RunnableQueue is currently running.
221      * @return null if the RunnableQueue has not entered his
222      * <tt>run()</tt> method.
223      */

224     public HaltingThread getThread() {
225         return runnableQueueThread;
226     }
227
228     /**
229      * Schedules the given Runnable object for a later invocation, and
230      * returns.
231      * An exception is thrown if the RunnableQueue was not started.
232      * @throws IllegalStateException if getThread() is null.
233      */

234     public void invokeLater(Runnable JavaDoc r) {
235         if (runnableQueueThread == null) {
236             throw new IllegalStateException JavaDoc
237                 ("RunnableQueue not started or has exited");
238         }
239         synchronized (list) {
240             list.push(new Link(r));
241             list.notify();
242         }
243     }
244
245     /**
246      * Waits until the given Runnable's <tt>run()</tt> has returned.
247      * <em>Note: <tt>invokeAndWait()</tt> must not be called from the
248      * current thread (for example from the <tt>run()</tt> method of the
249      * argument).
250      * @throws IllegalStateException if getThread() is null or if the
251      * thread returned by getThread() is the current one.
252      */

253     public void invokeAndWait(Runnable JavaDoc r) throws InterruptedException JavaDoc {
254         if (runnableQueueThread == null) {
255             throw new IllegalStateException JavaDoc
256                 ("RunnableQueue not started or has exited");
257         }
258         if (runnableQueueThread == Thread.currentThread()) {
259             throw new IllegalStateException JavaDoc
260                 ("Cannot be called from the RunnableQueue thread");
261         }
262
263         LockableLink l = new LockableLink(r);
264         synchronized (list) {
265             list.push(l);
266             list.notify();
267         }
268         l.lock();
269     }
270
271
272     /**
273      * Schedules the given Runnable object for a later invocation, and
274      * returns. The given runnable preempts any runnable that is not
275      * currently executing (ie the next runnable started will be the
276      * one given). An exception is thrown if the RunnableQueue was
277      * not started.
278      * @throws IllegalStateException if getThread() is null.
279      */

280     public void preemptLater(Runnable JavaDoc r) {
281         if (runnableQueueThread == null) {
282             throw new IllegalStateException JavaDoc
283                 ("RunnableQueue not started or has exited");
284         }
285         synchronized (list) {
286             list.add(preemptCount, new Link(r));
287             preemptCount++;
288             list.notify();
289         }
290     }
291
292     /**
293      * Waits until the given Runnable's <tt>run()</tt> has returned.
294      * The given runnable preempts any runnable that is not currently
295      * executing (ie the next runnable started will be the one given).
296      * <em>Note: <tt>preemptAndWait()</tt> must not be called from the
297      * current thread (for example from the <tt>run()</tt> method of the
298      * argument).
299      * @throws IllegalStateException if getThread() is null or if the
300      * thread returned by getThread() is the current one.
301      */

302     public void preemptAndWait(Runnable JavaDoc r) throws InterruptedException JavaDoc {
303         if (runnableQueueThread == null) {
304             throw new IllegalStateException JavaDoc
305                 ("RunnableQueue not started or has exited");
306         }
307         if (runnableQueueThread == Thread.currentThread()) {
308             throw new IllegalStateException JavaDoc
309                 ("Cannot be called from the RunnableQueue thread");
310         }
311
312         LockableLink l = new LockableLink(r);
313         synchronized (list) {
314             list.add(preemptCount, l);
315             preemptCount++;
316             list.notify();
317         }
318         l.lock();
319     }
320
321     public RunnableQueueState getQueueState() {
322         synchronized (stateLock) {
323             return state;
324         }
325     }
326
327     /**
328      * Suspends the execution of this queue after the current runnable
329      * completes.
330      * @param waitTillSuspended if true this method will not return
331      * until the queue has suspended (no runnable in progress
332      * or about to be in progress). If resumeExecution is
333      * called while waiting will simply return (this really
334      * indicates a race condition in your code). This may
335      * return before an associated RunHandler is notified.
336      * @throws IllegalStateException if getThread() is null. */

337     public void suspendExecution(boolean waitTillSuspended) {
338         if (runnableQueueThread == null) {
339             throw new IllegalStateException JavaDoc
340                 ("RunnableQueue not started or has exited");
341         }
342         // System.err.println("Suspend Called");
343
synchronized (stateLock) {
344             wasResumed = false;
345
346             if (state == SUSPENDED) {
347                 // already suspended, notify stateLock so an event is
348
// generated.
349
stateLock.notifyAll();
350                 return;
351             }
352
353             if (state == RUNNING) {
354                 state = SUSPENDING;
355                 synchronized (list) {
356                     // Wake up run thread if it is waiting for jobs,
357
// so we go into the suspended case (notifying
358
// run-handler etc...)
359
list.notify();
360                 }
361             }
362
363             if (waitTillSuspended) {
364                 while (state == SUSPENDING) {
365                     try {
366                         stateLock.wait();
367                     } catch(InterruptedException JavaDoc ie) { }
368                 }
369             }
370         }
371     }
372
373     /**
374      * Resumes the execution of this queue.
375      * @throws IllegalStateException if getThread() is null.
376      */

377     public void resumeExecution() {
378         // System.err.println("Resume Called");
379
if (runnableQueueThread == null) {
380             throw new IllegalStateException JavaDoc
381                 ("RunnableQueue not started or has exited");
382         }
383
384         synchronized (stateLock) {
385             wasResumed = true;
386
387             if (state != RUNNING) {
388                 state = RUNNING;
389                 stateLock.notifyAll(); // wake it up.
390
}
391         }
392     }
393
394     /**
395      * Returns iterator lock to use to work with the iterator
396      * returned by iterator().
397      */

398     public Object JavaDoc getIteratorLock() {
399         return list;
400     }
401
402     /**
403      * Returns an iterator over the runnables.
404      */

405     public Iterator JavaDoc iterator() {
406         return new Iterator JavaDoc() {
407                 Link head = (Link)list.getHead();
408                 Link link;
409                 public boolean hasNext() {
410                     if (head == null) {
411                         return false;
412                     }
413                     if (link == null) {
414                         return true;
415                     }
416                     return link != head;
417                 }
418                 public Object JavaDoc next() {
419                     if (head == null || head == link) {
420                         throw new java.util.NoSuchElementException JavaDoc();
421                     }
422                     if (link == null) {
423                         link = (Link)head.getNext();
424                         return head.runnable;
425                     }
426                     Object JavaDoc result = link.runnable;
427                     link = (Link)link.getNext();
428                     return result;
429                 }
430                 public void remove() {
431                     throw new UnsupportedOperationException JavaDoc();
432                 }
433             };
434     }
435
436     /**
437      * Sets the RunHandler for this queue.
438      */

439     public synchronized void setRunHandler(RunHandler rh) {
440         runHandler = rh;
441     }
442
443     /**
444      * Returns the RunHandler or null.
445      */

446     public synchronized RunHandler getRunHandler() {
447         return runHandler;
448     }
449
450     /**
451      * Called when execution is being suspended.
452      * Currently just notifies runHandler
453      */

454     protected synchronized void executionSuspended() {
455         // System.err.println("Suspend Sent");
456
if (runHandler != null) {
457             runHandler.executionSuspended(this);
458         }
459     }
460
461     /**
462      * Called when execution is being resumed.
463      * Currently just notifies runHandler
464      */

465     protected synchronized void executionResumed() {
466         // System.err.println("Resumed Sent");
467
if (runHandler != null) {
468             runHandler.executionResumed(this);
469         }
470     }
471         
472     /**
473      * Called just prior to executing a Runnable.
474      * Currently just notifies runHandler
475      * @param rable The runnable that is about to start
476      */

477     protected synchronized void runnableStart(Runnable JavaDoc rable ) {
478         if (runHandler != null) {
479             runHandler.runnableStart(this, rable);
480         }
481     }
482
483     /**
484      * Called when a Runnable completes.
485      * Currently just notifies runHandler
486      * @param rable The runnable that just completed.
487      */

488     protected synchronized void runnableInvoked(Runnable JavaDoc rable ) {
489         if (runHandler != null) {
490             runHandler.runnableInvoked(this, rable);
491         }
492     }
493
494     /**
495      * This interface must be implemented by an object which wants to
496      * be notified of run events.
497      */

498     public interface RunHandler {
499
500         /**
501          * Called just prior to invoking the runnable
502          */

503         void runnableStart(RunnableQueue rq, Runnable JavaDoc r);
504
505         /**
506          * Called when the given Runnable has just been invoked and
507          * has returned.
508          */

509         void runnableInvoked(RunnableQueue rq, Runnable JavaDoc r);
510
511         /**
512          * Called when the execution of the queue has been suspended.
513          */

514         void executionSuspended(RunnableQueue rq);
515
516         /**
517          * Called when the execution of the queue has been resumed.
518          */

519         void executionResumed(RunnableQueue rq);
520     }
521
522     /**
523      * This is an adapter class that implements the RunHandler interface.
524      * It simply does nothing in response to the calls.
525      */

526     public static class RunHandlerAdapter implements RunHandler {
527
528         /**
529          * Called just prior to invoking the runnable
530          */

531         public void runnableStart(RunnableQueue rq, Runnable JavaDoc r) { }
532
533         /**
534          * Called when the given Runnable has just been invoked and
535          * has returned.
536          */

537         public void runnableInvoked(RunnableQueue rq, Runnable JavaDoc r) { }
538
539         /**
540          * Called when the execution of the queue has been suspended.
541          */

542         public void executionSuspended(RunnableQueue rq) { }
543
544         /**
545          * Called when the execution of the queue has been resumed.
546          */

547         public void executionResumed(RunnableQueue rq) { }
548     }
549
550     /**
551      * To store a Runnable.
552      */

553     protected static class Link extends DoublyLinkedList.Node {
554         
555         /**
556          * The Runnable.
557          */

558         public Runnable JavaDoc runnable;
559
560         /**
561          * Creates a new link.
562          */

563         public Link(Runnable JavaDoc r) {
564             runnable = r;
565         }
566
567         /**
568          * unlock link and notify locker.
569          * Basic implementation does nothing.
570          */

571         public void unlock() throws InterruptedException JavaDoc { return; }
572     }
573
574     /**
575      * To store a Runnable with an object waiting for him to be executed.
576      */

577     protected static class LockableLink extends Link {
578
579         /**
580          * Whether this link is actually locked.
581          */

582         protected boolean locked;
583
584         /**
585          * Creates a new link.
586          */

587         public LockableLink(Runnable JavaDoc r) {
588             super(r);
589         }
590
591         /**
592          * Whether the link is actually locked.
593          */

594         public boolean isLocked() {
595             return locked;
596         }
597
598         /**
599          * Locks this link.
600          */

601         public synchronized void lock() throws InterruptedException JavaDoc {
602             locked = true;
603             notify();
604             wait();
605         }
606
607         /**
608          * unlocks this link.
609          */

610         public synchronized void unlock() throws InterruptedException JavaDoc {
611             while (!locked) {
612                 // Wait until lock is called...
613
wait();
614             }
615             // Wake the locking thread...
616
notify();
617         }
618     }
619 }
620
Popular Tags