KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > util > Scheduler


1 // $Id: Scheduler.java,v 1.12 2004/09/23 16:29:56 belaban Exp $
2

3 package org.jgroups.util;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8
9
10 /**
11  * Implementation of a priority scheduler. The scheduler maintains a queue to the end of which
12  * all tasks are added. It continually looks at the first queue element, assigns a thread to
13  * it, runs the thread and waits for completion. When a new <em>priority task</em> is added,
14  * it will be added to the head of the queue and the scheduler will be interrupted. In this
15  * case, the currently handled task is suspended, and the one at the head of the queue
16  * handled. This is recursive: a priority task can always be interrupted by another priority
17  * task. Resursion ends when no more priority tasks are added, or when the thread pool is
18  * exhausted.
19  *
20  * @author Bela Ban
21  */

22 public class Scheduler implements Runnable JavaDoc {
23     final Queue queue=new Queue();
24     Thread JavaDoc sched_thread=null;
25     Task current_task=null;
26     ThreadPool pool=null;
27     SchedulerListener listener=null;
28
29     protected static final Log log=LogFactory.getLog(Scheduler.class);
30
31     /** Process items on the queue concurrently. The default is to wait until the processing of an item
32      * has completed before fetching the next item from the queue. Note that setting this to true
33      * may destroy the properties of a protocol stack, e.g total or causal order may not be
34      * guaranteed. Set this to true only if you know what you're doing ! */

35     boolean concurrent_processing=false;
36
37     /** max number of threads, will only be allocated when needed */
38     int NUM_THREADS=128;
39
40     static final int WAIT_FOR_THREAD_AVAILABILITY=3000;
41     static final int THREAD_JOIN_TIMEOUT=1000;
42
43
44
45
46
47
48     public Scheduler() {
49         // PropertyPermission not granted if running in an untrusted environment with JNLP.
50
try {
51             this.NUM_THREADS=Integer.parseInt(System.getProperty("scheduler.max.threads", "128"));
52         }
53         catch (SecurityException JavaDoc ex){
54           //The default value specified above is used.
55
}
56     }
57
58
59     public Scheduler(int num_threads) {
60         this.NUM_THREADS=num_threads;
61     }
62
63
64     public void setListener(SchedulerListener l) {
65         listener=l;
66     }
67
68
69     public boolean getConcurrentProcessing() {
70         return concurrent_processing;
71     }
72
73     public void setConcurrentProcessing(boolean process_concurrently) {
74         this.concurrent_processing=process_concurrently;
75     }
76
77     public void run() {
78         while(sched_thread != null) {
79             if(queue.closed()) break;
80             try {
81                 current_task=(Task)queue.peek(); // get the first task in the queue (blocks until available)
82
if(current_task == null) { // @remove
83
if(log.isWarnEnabled()) log.warn("current task is null, queue.size()=" + queue.size() +
84                             ", queue.closed()=" + queue.closed() + ", continuing");
85                     continue;
86                 }
87
88                 if(current_task.suspended) {
89                     current_task.suspended=false;
90                     current_task.thread.resume();
91                     if(listener != null) listener.resumed(current_task.target);
92                 }
93                 else {
94                     if(current_task.thread == null) {
95                         current_task.thread=pool.getThread();
96                         if(current_task.thread == null) { // thread pool exhausted
97
if(log.isWarnEnabled()) log.warn("thread pool exhausted, waiting for " +
98                                     WAIT_FOR_THREAD_AVAILABILITY + "ms before retrying");
99                             Util.sleep(WAIT_FOR_THREAD_AVAILABILITY);
100                             continue;
101                         }
102                     }
103
104                     // if we get here, current_task.thread and current_task.target are guaranteed to be non-null
105
if(listener != null) listener.started(current_task.target);
106                     if(current_task.thread.assignTask(current_task.target) == false)
107                         continue;
108                 }
109
110                 if(sched_thread.isInterrupted()) { // will continue at "catch(InterruptedException)" below
111
// sched_thread.interrupt();
112

113                     // changed on suggestion from Victor Cardoso: sched_thread.interrupt() does *not* throw an
114
// InterruptedException, so we don't land in the catch clause, but rather execute the code below
115
// (which we don't want) - bela April 15 2004
116

117                     throw new InterruptedException JavaDoc();
118                 }
119
120                 if(concurrent_processing == false) { // this is the default: process serially
121
synchronized(current_task.thread) {
122                         while(!current_task.thread.done() && !current_task.thread.suspended)
123                             current_task.thread.wait();
124                     }
125                     if(listener != null) listener.stopped(current_task.target);
126                 }
127                 queue.removeElement(current_task);
128             }
129             catch(InterruptedException JavaDoc interrupted) {
130                 if(sched_thread == null || queue.closed()) break;
131                 if(current_task.thread != null) {
132                     current_task.thread.suspend();
133                     if(listener != null) listener.suspended(current_task.target);
134                     current_task.suspended=true;
135                 }
136                 Thread.interrupted(); // clears the interrupt-flag
137
continue;
138             }
139             catch(QueueClosedException closed_ex) {
140                 return;
141             }
142             catch(Throwable JavaDoc ex) {
143                 if(log.isErrorEnabled()) log.error("exception=" + Util.print(ex));
144                 continue;
145             }
146         }
147          if(log.isTraceEnabled()) log.trace("scheduler thread terminated");
148     }
149
150
151     public void addPrio(Runnable JavaDoc task) {
152         Task new_task=new Task(task);
153         boolean do_interrupt=false;
154
155         try {
156             synchronized(queue) { // sync against add()
157
if(queue.size() == 0)
158                     queue.add(new_task);
159                 else {
160                     queue.addAtHead(new_task);
161                     do_interrupt=true;
162                 }
163             }
164             if(do_interrupt) // moved out of 'synchronized(queue)' to minimize lock contention
165
sched_thread.interrupt();
166         }
167         catch(Throwable JavaDoc e) {
168             if(log.isErrorEnabled()) log.error("exception=" + e);
169         }
170     }
171
172
173     public void add(Runnable JavaDoc task) {
174         Task new_task=new Task(task);
175
176         try {
177             synchronized(queue) { // sync against addPrio()
178
queue.add(new_task);
179             }
180         }
181         catch(Exception JavaDoc e) {
182             if(log.isErrorEnabled()) log.error("exception=" + e);
183         }
184     }
185
186
187     public void start() {
188         if(queue.closed())
189             queue.reset();
190         if(sched_thread == null) {
191             pool=new ThreadPool(NUM_THREADS);
192             sched_thread=new Thread JavaDoc(this, "Scheduler main thread");
193             sched_thread.setDaemon(true);
194             sched_thread.start();
195         }
196     }
197
198
199     /**
200      * Stop the scheduler thread. The thread may be waiting for its next task (queue.peek()) or it may be waiting on
201      * the currently executing thread. In the first case, closing the queue will throw a QueueClosed exception which
202      * terminates the scheduler thread. In the second case, after closing the queue, we interrupt the scheduler thread,
203      * which then checks whether the queue is closed. If this is the case, the scheduler thread terminates.
204      */

205     public void stop() {
206         Thread JavaDoc tmp=null;
207
208         // 1. Close the queue
209
queue.close(false); // will stop thread at next peek();
210

211         // 2. Interrupt the scheduler thread
212
if(sched_thread != null && sched_thread.isAlive()) {
213             tmp=sched_thread;
214             sched_thread=null;
215             tmp.interrupt();
216             try {
217                 tmp.join(THREAD_JOIN_TIMEOUT);
218             }
219             catch(Exception JavaDoc ex) {
220             }
221
222             if(tmp.isAlive())
223                 if(log.isErrorEnabled()) log.error("scheduler thread is still not dead !!!");
224         }
225         sched_thread=null;
226
227         // 3. Delete the thread pool
228
if(pool != null) {
229             pool.destroy();
230             pool=null;
231         }
232     }
233
234
235
236     public class Task {
237         ReusableThread thread=null;
238         Runnable JavaDoc target=null;
239         boolean suspended=false;
240
241         Task(Runnable JavaDoc target) {
242             this.target=target;
243         }
244
245         public String JavaDoc toString() {
246             return "[thread=" + thread + ", target=" + target + ", suspended=" + suspended + ']';
247         }
248     }
249
250
251 }
252
Popular Tags