Code - Class EDU.oswego.cs.dl.util.concurrent.QueuedExecutor


1 /*
2   File: QueuedExecutor.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   21Jun1998 dl Create public version
12   28aug1998 dl rely on ThreadFactoryUser, restart now public
13    4may1999 dl removed redundant interrupt detect
14    7sep2000 dl new shutdown methods
15   20may2004 dl can shutdown even if thread not created yet
16 */

17
18 package EDU.oswego.cs.dl.util.concurrent;
19
20 /**
21  *
22  * An implementation of Executor that queues incoming
23  * requests until they can be processed by a single background
24  * thread.
25  * <p>
26  * The thread is not actually started until the first
27  * <code>execute</code> request is encountered. Also, if the
28  * thread is stopped for any reason (for example, after hitting
29  * an unrecoverable exception in an executing task), one is started
30  * upon encountering a new request, or if <code>restart()</code> is
31  * invoked.
32  * <p>
33  * Beware that, especially in situations
34  * where command objects themselves invoke execute, queuing can
35  * sometimes lead to lockups, since commands that might allow
36  * other threads to terminate do not run at all when they are in the queue.
37  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
38  **/

39 public class QueuedExecutor extends ThreadFactoryUser implements Executor {
40
41
42   
43   /** The thread used to process commands **/
44   protected Thread thread_;
45
46   /** Special queue element to signal termination **/
47   protected static Runnable ENDTASK = new Runnable() { public void run() {} };
48
49   /** true if thread should shut down after processing current task **/
50   protected volatile boolean shutdown_; // latches true;
51

52   /**
53    * Return the thread being used to process commands, or
54    * null if there is no such thread. You can use this
55    * to invoke any special methods on the thread, for
56    * example, to interrupt it.
57    **/

58   public synchronized Thread getThread() {
59     return thread_;
60   }
61
62   /** set thread_ to null to indicate termination **/
63   protected synchronized void clearThread() {
64     thread_ = null;
65   }
66
67
68   /** The queue **/
69   protected final Channel queue_;
70
71
72   /**
73    * The runloop is isolated in its own Runnable class
74    * just so that the main
75    * class need not implement Runnable, which would
76    * allow others to directly invoke run, which would
77    * never make sense here.
78    **/

79   protected class RunLoop implements Runnable {
80     public void run() {
81       try {
82         while (!shutdown_) {
83           Runnable task = (Runnable)(queue_.take());
84           if (task == ENDTASK) {
85             shutdown_ = true;
86             break;
87           }
88           else if (task != null) {
89             task.run();
90             task = null;
91           }
92           else
93             break;
94         }
95       }
96       catch (InterruptedException ex) {} // fallthrough
97
finally {
98         clearThread();
99       }
100     }
101   }
102
103   protected final RunLoop runLoop_;
104
105
106   /**
107    * Construct a new QueuedExecutor that uses
108    * the supplied Channel as its queue.
109    * <p>
110    * This class does not support any methods that
111    * reveal this queue. If you need to access it
112    * independently (for example to invoke any
113    * special status monitoring operations), you
114    * should record a reference to it separately.
115    **/

116
117   public QueuedExecutor(Channel queue) {
118     queue_ = queue;
119     runLoop_ = new RunLoop();
120   }
121
122   /**
123    * Construct a new QueuedExecutor that uses
124    * a BoundedLinkedQueue with the current
125    * DefaultChannelCapacity as its queue.
126    **/

127
128   public QueuedExecutor() {
129     this(new BoundedLinkedQueue());
130   }
131
132   /**
133    * Start (or restart) the background thread to process commands. It has
134    * no effect if a thread is already running. This
135    * method can be invoked if the background thread crashed
136    * due to an unrecoverable exception.
137    **/

138
139   public synchronized void restart() {
140     if (thread_ == null && !shutdown_) {
141       thread_ = threadFactory_.newThread(runLoop_);
142       thread_.start();
143     }
144   }
145
146
147   /**
148    * Arrange for execution of the command in the
149    * background thread by adding it to the queue.
150    * The method may block if the channel's put
151    * operation blocks.
152    * <p>
153    * If the background thread
154    * does not exist, it is created and started.
155    **/

156   public void execute(Runnable command) throws InterruptedException {
157     restart();
158     queue_.put(command);
159   }
160
161   /**
162    * Terminate background thread after it processes all
163    * elements currently in queue. Any tasks entered after this point will
164    * not be processed. A shut down thread cannot be restarted.
165    * This method may block if the task queue is finite and full.
166    * Also, this method
167    * does not in general apply (and may lead to comparator-based
168    * exceptions) if the task queue is a priority queue.
169    **/

170   public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks() {
171     if (!shutdown_) {
172       try { queue_.put(ENDTASK); }
173       catch (InterruptedException ex) {
174         Thread.currentThread().interrupt();
175       }
176     }
177   }
178
179
180   /**
181    * Terminate background thread after it processes the
182    * current task, removing other queued tasks and leaving them unprocessed.
183    * A shut down thread cannot be restarted.
184    **/

185   public synchronized void shutdownAfterProcessingCurrentTask() {
186     shutdown_ = true;
187     try {
188       while (queue_.poll(0) != null) ; // drain
189
queue_.put(ENDTASK);
190     }
191     catch (InterruptedException ex) {
192       Thread.currentThread().interrupt();
193     }
194   }
195
196
197   /**
198    * Terminate background thread even if it is currently processing
199    * a task. This method uses Thread.interrupt, so relies on tasks
200    * themselves responding appropriately to interruption. If the
201    * current tasks does not terminate on interruption, then the
202    * thread will not terminate until processing current task.
203    * A shut down thread cannot be restarted.
204    **/

205   public synchronized void shutdownNow() {
206     shutdown_ = true;
207     Thread t = thread_;
208     if (t != null)
209       t.interrupt();
210     shutdownAfterProcessingCurrentTask();
211   }
212 }
213

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates