KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > util > ReusableThread


1 // $Id: ReusableThread.java,v 1.7 2005/01/16 01:04:52 belaban Exp $
2

3 package org.jgroups.util;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8
9
10 /**
11  * Reusable thread class. Instead of creating a new thread per task, this instance can be reused
12  * to run different tasks in turn. This is done by looping and assigning the Runnable task objects
13  * whose <code>run</code> method is then called.<br>
14  * Tasks are Runnable objects and should be prepared to terminate when they receive an
15  * InterruptedException. This is thrown by the stop() method.<br>
16  * <p/>
17  * The following situations have to be tested:
18  * <ol>
19  * <li>ReusableThread is started. Then, brefore assigning a task, it is stopped again
20  * <li>ReusableThread is started, assigned a long running task. Then, before task is done,
21  * stop() is called
22  * <li>ReusableThread is started, assigned a task. Then waitUntilDone() is called, then stop()
23  * <li>ReusableThread is started, assigned a number of tasks (waitUntilDone() called between tasks),
24  * then stopped
25  * <li>ReusableThread is started, assigned a task
26  * </ol>
27  *
28  * @author Bela Ban
29  */

30 public class ReusableThread implements Runnable JavaDoc {
31     volatile Thread JavaDoc thread=null; // thread that works on the task
32
Runnable JavaDoc task=null; // task assigned to thread
33
String JavaDoc thread_name="ReusableThread";
34     volatile boolean suspended=false;
35     protected static final Log log=LogFactory.getLog(ReusableThread.class);
36     final long TASK_JOIN_TIME=3000; // wait 3 secs for an interrupted thread to terminate
37

38
39     public ReusableThread() {
40     }
41
42
43     public ReusableThread(String JavaDoc thread_name) {
44         this.thread_name=thread_name;
45     }
46
47
48     public boolean done() {
49         return task == null;
50     }
51
52     public boolean available() {
53         return done();
54     }
55
56     public boolean isAlive() {
57         synchronized(this) {
58             return thread != null && thread.isAlive();
59         }
60     }
61
62
63     /**
64      * Will always be called from synchronized method, no need to do our own synchronization
65      */

66     public void start() {
67         if(thread == null || (thread != null && !thread.isAlive())) {
68             thread=new Thread JavaDoc(this, thread_name);
69             thread.setDaemon(true);
70             thread.start();
71         }
72     }
73
74
75     /**
76      * Stops the thread by setting thread=null and interrupting it. The run() method catches the
77      * InterruptedException and checks whether thread==null. If this is the case, it will terminate
78      */

79     public void stop() {
80         Thread JavaDoc tmp=null;
81
82         if(log.isTraceEnabled()) log.trace("entering THIS");
83         synchronized(this) {
84             if(log.isTraceEnabled())
85                 log.trace("entered THIS (thread=" + printObj(thread) +
86                         ", task=" + printObj(task) + ", suspended=" + suspended + ')');
87             if(thread != null && thread.isAlive()) {
88                 tmp=thread;
89                 thread=null; // signals the thread to stop
90
task=null;
91                 if(log.isTraceEnabled()) log.trace("notifying thread");
92                 notifyAll();
93                 if(log.isTraceEnabled()) log.trace("notifying thread completed");
94             }
95             thread=null;
96             task=null;
97         }
98
99         if(tmp != null && tmp.isAlive()) {
100             long s1=System.currentTimeMillis(), s2=0;
101             if(log.isTraceEnabled()) log.trace("join(" + TASK_JOIN_TIME + ')');
102
103             tmp.interrupt();
104
105             try {
106                 tmp.join(TASK_JOIN_TIME);
107             }
108             catch(Exception JavaDoc e) {
109             }
110             s2=System.currentTimeMillis();
111             if(log.isTraceEnabled()) log.trace("join(" + TASK_JOIN_TIME + ") completed in " + (s2 - s1));
112             if(tmp.isAlive())
113                 if(log.isErrorEnabled()) log.error("thread is still alive");
114             tmp=null;
115         }
116     }
117
118
119     /**
120      * Suspends the thread. Does nothing if already suspended. If a thread is waiting to be assigned a task, or
121      * is currently running a (possibly long-running) task, then it will be suspended the next time it
122      * waits for suspended==false (second wait-loop in run())
123      */

124
125     public void suspend() {
126         synchronized(this) {
127             if(log.isTraceEnabled()) log.trace("suspended=" + suspended + ", task=" + printObj(task));
128             if(suspended)
129                 return; // already suspended
130
else
131                 suspended=true;
132         }
133     }
134
135
136     /**
137      * Resumes the thread. Noop if not suspended
138      */

139     public void resume() {
140         synchronized(this) {
141             suspended=false;
142             notifyAll(); // notifies run(): the wait on suspend() is released
143
}
144     }
145
146
147     /**
148      * Assigns a task to the thread. If the thread is not running, it will be started. It it is
149      * already working on a task, it will reject the new task. Returns true if task could be
150      * assigned auccessfully
151      */

152     public boolean assignTask(Runnable JavaDoc t) {
153         synchronized(this) {
154             start(); // creates and starts the thread if not yet running
155
if(task == null) {
156                 task=t;
157                 notifyAll(); // signals run() to start working (first wait-loop)
158
return true;
159             }
160             else {
161                 if(log.isErrorEnabled())
162                     log.error("already working on a thread: current_task=" + task + ", new task=" + t +
163                             ", thread=" + thread + ", is alive=" + (thread != null ? "" + thread.isAlive() : "null"));
164                 return false;
165             }
166         }
167     }
168
169
170     /**
171      * Delicate piece of code (means very important :-)). Works as follows: loops until stop is true.
172      * Waits in a loop until task is assigned. Then runs the task and notifies waiters that it's done
173      * when task is completed. Then returns to the first loop to wait for more work. Does so until
174      * stop() is called, which sets stop=true and interrupts the thread. If waiting for a task, the
175      * thread terminates. If running a task, the task is interrupted, and the thread terminates. If the
176      * task is not interrupible, the stop() method will wait for 3 secs (join on the thread), then return.
177      * This means that the run() method of the task will complete and only then will the thread be
178      * garbage-collected.
179      */

180     public void run() {
181         while(thread != null) { // Stop sets thread=null
182
try {
183                 if(log.isTraceEnabled()) log.trace("entering ASSIGN");
184                 synchronized(this) {
185                     if(log.isTraceEnabled())
186                         log.trace("entered ASSIGN (task=" + printObj(task) + ", thread=" + printObj(thread) + ')');
187
188                     while(task == null && thread != null) { // first wait-loop: wait for task to be assigned (assignTask())
189
if(log.isTraceEnabled()) log.trace("wait ASSIGN");
190                         wait();
191                         if(log.isTraceEnabled()) log.trace("wait ASSIGN completed");
192                     }
193                 }
194             }
195             catch(InterruptedException JavaDoc ex) { // on assignTask()
196
if(log.isTraceEnabled()) log.trace("interrupt on ASSIGN");
197             }
198             if(thread == null) return; // we need to terminate
199

200             try {
201                 if(log.isTraceEnabled()) log.trace("entering SUSPEND");
202                 synchronized(this) {
203                     if(log.isTraceEnabled())
204                         log.trace("entered SUSPEND (suspended=" + suspended + ", task=" + printObj(task) + ')');
205                     while(suspended && thread != null) { // second wait-loop: wait for thread to resume (resume())
206
if(log.isTraceEnabled()) log.trace("wait SUSPEND");
207                         wait();
208                         if(log.isTraceEnabled()) log.trace("wait SUSPEND completed");
209                     }
210                 }
211             }
212             catch(InterruptedException JavaDoc ex) { // on resume()
213
if(log.isTraceEnabled()) log.trace("interrupt on RESUME");
214             }
215             if(thread == null) return; // we need to terminate
216

217             if(task != null) {
218                 if(log.isTraceEnabled()) log.trace("running task");
219                 try {
220                     task.run(); //here we are actually running the task
221
}
222                 catch(Throwable JavaDoc ex) {
223                     if(log.isErrorEnabled()) log.error("failed running task", ex);
224                 }
225                 if(log.isTraceEnabled()) log.trace("task completed");
226             }
227
228             if(log.isTraceEnabled()) log.trace("entering THIS");
229             synchronized(this) {
230                 if(log.isTraceEnabled()) log.trace("entered THIS");
231                 task=null;
232                 if(log.isTraceEnabled()) log.trace("notify THIS");
233                 notifyAll();
234                 if(log.isTraceEnabled()) log.trace("notify THIS completed");
235             }
236         }
237         if(log.isTraceEnabled()) log.trace("terminated");
238     }
239
240
241     String JavaDoc printObj(Object JavaDoc obj) {
242         if(obj == null)
243             return "null";
244         else
245             return "non-null";
246     }
247
248
249     public void waitUntilDone() {
250
251         if(log.isTraceEnabled()) log.trace("entering THIS");
252         synchronized(this) {
253             if(log.isTraceEnabled()) log.trace("entered THIS (task=" + printObj(task) + ')');
254             while(task != null) {
255                 try {
256                     if(log.isTraceEnabled()) log.trace("wait THIS");
257                     wait();
258                     if(log.isTraceEnabled()) log.trace("wait THIS completed");
259                 }
260                 catch(InterruptedException JavaDoc interrupted) {
261                 }
262             }
263         }
264     }
265
266
267     public String JavaDoc toString() {
268         return "suspended=" + suspended;
269     }
270
271
272 }
273
274
275
276
277
278
279
280
Popular Tags