KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > util > TimeScheduler


1 // $Id: TimeScheduler.java,v 1.5 2004/09/23 16:29:56 belaban Exp $
2

3 package org.jgroups.util;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8
9 import java.io.PrintWriter JavaDoc;
10 import java.io.StringWriter JavaDoc;
11 import java.util.Iterator JavaDoc;
12 import java.util.SortedSet JavaDoc;
13 import java.util.TreeSet JavaDoc;
14
15
16 /**
17  * Fixed-delay & fixed-rate single thread scheduler
18  * <p/>
19  * The scheduler supports varying scheduling intervals by asking the task
20  * every time for its next preferred scheduling interval. Scheduling can
21  * either be <i>fixed-delay</i> or <i>fixed-rate</i>. The notions are
22  * borrowed from <tt>java.util.Timer</tt> and retain the same meaning.
23  * I.e. in fixed-delay scheduling, the task's new schedule is calculated
24  * as:<br>
25  * new_schedule = time_task_starts + scheduling_interval
26  * <p/>
27  * In fixed-rate scheduling, the next schedule is calculated as:<br>
28  * new_schedule = time_task_was_supposed_to_start + scheduling_interval
29  * <p/>
30  * The scheduler internally holds a queue of tasks sorted in ascending order
31  * according to their next execution time. A task is removed from the queue
32  * if it is cancelled, i.e. if <tt>TimeScheduler.Task.isCancelled()</tt>
33  * returns true.
34  * <p/>
35  * The scheduler internally uses a <tt>java.util.SortedSet</tt> to keep tasks
36  * sorted. <tt>java.util.Timer</tt> uses an array arranged as a binary heap
37  * that doesn't shrink. It is likely that the latter arrangement is faster.
38  * <p/>
39  * Initially, the scheduler is in <tt>SUSPEND</tt>ed mode, <tt>start()</tt>
40  * need not be called: if a task is added, the scheduler gets started
41  * automatically. Calling <tt>start()</tt> starts the scheduler if it's
42  * suspended or stopped else has no effect. Once <tt>stop()</tt> is called,
43  * added tasks will not restart it: <tt>start()</tt> has to be called to
44  * restart the scheduler.
45  */

46 public class TimeScheduler {
47     /**
48      * The interface that submitted tasks must implement
49      */

50     public interface Task {
51         /**
52          * @return true if task is cancelled and shouldn't be scheduled
53          * again
54          */

55         boolean cancelled();
56
57         /**
58          * @return the next schedule interval
59          */

60         long nextInterval();
61
62         /**
63          * Execute the task
64          */

65         void run();
66     }
67
68
69     /**
70      * Internal task class.
71      */

72     private static class IntTask implements Comparable JavaDoc {
73         /**
74          * The user task
75          */

76         public final Task task;
77         /**
78          * The next execution time
79          */

80         public long sched;
81         /**
82          * Whether this task is scheduled fixed-delay or fixed-rate
83          */

84         public final boolean relative;
85
86         /**
87          * @param task the task to schedule & execute
88          * @param sched the next schedule
89          * @param relative whether scheduling for this task is soft or hard
90          * (see <tt>TimeScheduler.add()</tt>)
91          */

92         public IntTask(Task task, long sched, boolean relative) {
93             this.task=task;
94             this.sched=sched;
95             this.relative=relative;
96         }
97
98         /**
99          * @param obj the object to compare against
100          * <p/>
101          * <pre>
102          * If obj is not instance of <tt>IntTask</tt>, then return -1
103          * If obj is instance of <tt>IntTask</tt>, compare the
104          * contained tasks' next execution times. If these times are equal,
105          * then order them randomly <b>but</b> consistently!: return the diff
106          * of their <tt>hashcode()</tt> values
107          * </pre>
108          */

109         public int compareTo(Object JavaDoc obj) {
110             IntTask other;
111
112             if(!(obj instanceof IntTask)) return (-1);
113
114             other=(IntTask)obj;
115             if(sched < other.sched) return (-1);
116             if(sched > other.sched) return (1);
117             return (task.hashCode() - other.task.hashCode());
118         }
119
120         public String JavaDoc toString() {
121             if(task == null)
122                 return "<unnamed>";
123             else
124                 return task.getClass().getName();
125         }
126     }
127
128
129     /**
130      * The scheduler thread's main loop
131      */

132     private class Loop implements Runnable JavaDoc {
133         public void run() {
134             try {
135                 _run();
136             }
137             catch(Throwable JavaDoc t) {
138                 log.error("exception in loop", t);
139             }
140         }
141     }
142
143
144     /**
145      * The task queue used by the scheduler. Tasks are ordered in increasing
146      * order of their next execution time
147      */

148     private static class TaskQueue {
149         /**
150          * Sorted list of <tt>IntTask</tt>s
151          */

152         private final SortedSet JavaDoc set;
153
154         public TaskQueue() {
155             super();
156             set=new TreeSet JavaDoc();
157         }
158
159         public void add(IntTask t) {
160             set.add(t);
161         }
162
163         public void remove(IntTask t) {
164             set.remove(t);
165         }
166
167         public IntTask getFirst() {
168             return ((IntTask)set.first());
169         }
170
171         public void removeFirst() {
172             Iterator JavaDoc it=set.iterator();
173             it.next();
174             it.remove();
175         }
176
177         public void rescheduleFirst(long sched) {
178             Iterator JavaDoc it=set.iterator();
179             IntTask t=(IntTask)it.next();
180             it.remove();
181             t.sched=sched;
182             set.add(t);
183         }
184
185         public boolean isEmpty() {
186             return (set.isEmpty());
187         }
188
189         public void clear() {
190             set.clear();
191         }
192
193         public int size() {
194             return set.size();
195         }
196
197         public String JavaDoc toString() {
198             return set.toString();
199         }
200     }
201
202
203     /**
204      * Default suspend interval (ms)
205      */

206     private static final long SUSPEND_INTERVAL=2000;
207     /**
208      * Regular wake-up intervals for scheduler, in case all tasks have been
209      * cancelled and we are still waiting on the schedule time of the task
210      * at the top
211      */

212     private static final long TICK_INTERVAL=1000;
213
214     /**
215      * Thread is running
216      * <p/>
217      * A call to <code>start()</code> has no effect on the thread<br>
218      * A call to <code>stop()</code> will stop the thread<br>
219      * A call to <code>add()</code> has no effect on the thread
220      */

221     private static final int RUN=0;
222     /**
223      * Thread is suspended
224      * <p/>
225      * A call to <code>start()</code> will recreate the thread<br>
226      * A call to <code>stop()</code> will switch the state from suspended
227      * to stopped<br>
228      * A call to <code>add()</code> will recreate the thread <b>only</b>
229      * if it is suspended
230      */

231     private static final int SUSPEND=1;
232     /**
233      * A shutdown of the thread is in progress
234      * <p/>
235      * A call to <code>start()</code> has no effect on the thread<br>
236      * A call to <code>stop()</code> has no effect on the thread<br>
237      * A call to <code>add()</code> has no effect on the thread<br>
238      */

239     private static final int STOPPING=2;
240     /**
241      * Thread is stopped
242      * <p/>
243      * A call to <code>start()</code> will recreate the thread<br>
244      * A call to <code>stop()</code> has no effect on the thread<br>
245      * A call to <code>add()</code> has no effect on the thread<br>
246      */

247     private static final int STOP=3;
248
249     /**
250      * TimeScheduler thread name
251      */

252     private static final String JavaDoc THREAD_NAME="TimeScheduler.Thread";
253
254
255     /**
256      * The scheduler thread
257      */

258     private Thread JavaDoc thread=null;
259     /**
260      * The thread's running state
261      */

262     private int thread_state=SUSPEND;
263     /**
264      * Time that task queue is empty before suspending the scheduling
265      * thread
266      */

267     private long suspend_interval=SUSPEND_INTERVAL;
268     /**
269      * The task queue ordered according to task's next execution time
270      */

271     private final TaskQueue queue;
272
273     protected static final Log log=LogFactory.getLog(TimeScheduler.class);
274
275
276     /**
277      * Convert exception stack trace to string
278      */

279     private String JavaDoc _toString(Throwable JavaDoc ex) {
280         StringWriter JavaDoc sw=new StringWriter JavaDoc();
281         PrintWriter JavaDoc pw=new PrintWriter JavaDoc(sw);
282         ex.printStackTrace(pw);
283         return (sw.toString());
284     }
285
286
287     /**
288      * Set the thread state to running, create and start the thread
289      */

290     private void _start() {
291         thread_state=RUN;
292
293 // only start if not yet running
294
if(thread == null || !thread.isAlive()) {
295             thread=new Thread JavaDoc(new Loop(), THREAD_NAME);
296             thread.setDaemon(true);
297             thread.start();
298         }
299     }
300
301     /**
302      * Restart the suspended thread
303      */

304     private void _unsuspend() {
305         thread_state=RUN;
306
307 // only start if not yet running
308
if(thread == null || !thread.isAlive()) {
309             thread=new Thread JavaDoc(new Loop(), THREAD_NAME);
310             thread.setDaemon(true);
311             thread.start();
312         }
313     }
314
315     /**
316      * Set the thread state to suspended
317      */

318     private void _suspend() {
319         thread_state=SUSPEND;
320         thread=null;
321     }
322
323     /**
324      * Set the thread state to stopping
325      */

326     private void _stopping() {
327         thread_state=STOPPING;
328     }
329
330     /**
331      * Set the thread state to stopped
332      */

333     private void _stop() {
334         thread_state=STOP;
335         thread=null;
336     }
337
338
339     /**
340      * If the task queue is empty, sleep until a task comes in or if slept
341      * for too long, suspend the thread.
342      * <p/>
343      * Get the first task, if the running time hasn't been
344      * reached then wait a bit and retry. Else reschedule the task and then
345      * run it.
346      */

347     private void _run() {
348         IntTask intTask;
349         Task task;
350         long currTime, execTime, waitTime, intervalTime, schedTime;
351
352         while(true) {
353             synchronized(this) {
354                 if(thread == null || thread.isInterrupted()) return;
355             }
356
357             synchronized(queue) {
358                 while(true) {
359                     if(!queue.isEmpty()) break;
360                     try {
361                         queue.wait(suspend_interval);
362                     }
363                     catch(InterruptedException JavaDoc ex) {
364                         return;
365                     }
366                     if(!queue.isEmpty()) break;
367                     _suspend();
368                     return;
369                 }
370
371                 intTask=queue.getFirst();
372                 synchronized(intTask) {
373                     task=intTask.task;
374                     if(task.cancelled()) {
375                         queue.removeFirst();
376                         continue;
377                     }
378                     currTime=System.currentTimeMillis();
379                     execTime=intTask.sched;
380                     if((waitTime=execTime - currTime) <= 0) {
381                         // Reschedule the task
382
intervalTime=task.nextInterval();
383                         schedTime=intTask.relative ?
384                                 currTime + intervalTime : execTime + intervalTime;
385                         queue.rescheduleFirst(schedTime);
386                     }
387                 }
388                 if(waitTime > 0) {
389                     //try { queue.wait(Math.min(waitTime, TICK_INTERVAL));
390
try {
391                         queue.wait(waitTime);
392                     }
393                     catch(InterruptedException JavaDoc ex) {
394                         return;
395                     }
396                     continue;
397                 }
398             }
399
400             try {
401                 task.run();
402             }
403             catch(Exception JavaDoc ex) {
404                 log.error(_toString(ex));
405             }
406         }
407     }
408
409
410     /**
411      * Create a scheduler that executes tasks in dynamically adjustable
412      * intervals
413      *
414      * @param suspend_interval the time that the scheduler will wait for
415      * at least one task to be placed in the task queue before suspending
416      * the scheduling thread
417      */

418     public TimeScheduler(long suspend_interval) {
419         super();
420         queue=new TaskQueue();
421         this.suspend_interval=suspend_interval;
422     }
423
424     /**
425      * Create a scheduler that executes tasks in dynamically adjustable
426      * intervals
427      */

428     public TimeScheduler() {
429         this(SUSPEND_INTERVAL);
430     }
431
432
433     /**
434      * Add a task for execution at adjustable intervals
435      *
436      * @param t the task to execute
437      * @param relative scheduling scheme:
438      * <p/>
439      * <tt>true</tt>:<br>
440      * Task is rescheduled relative to the last time it <i>actually</i>
441      * started execution
442      * <p/>
443      * <tt>false</tt>:<br>
444      * Task is scheduled relative to its <i>last</i> execution schedule. This
445      * has the effect that the time between two consecutive executions of
446      * the task remains the same.
447      */

448     public void add(Task t, boolean relative) {
449         long interval, sched;
450
451         if((interval=t.nextInterval()) < 0) return;
452         sched=System.currentTimeMillis() + interval;
453
454         synchronized(queue) {
455             queue.add(new IntTask(t, sched, relative));
456             switch(thread_state) {
457                 case RUN:
458                     queue.notifyAll();
459                     break;
460                 case SUSPEND:
461                     _unsuspend();
462                     break;
463                 case STOPPING:
464                     break;
465                 case STOP:
466                     break;
467             }
468         }
469     }
470
471     /**
472      * Add a task for execution at adjustable intervals
473      *
474      * @param t the task to execute
475      */

476     public void add(Task t) {
477         add(t, true);
478     }
479
480
481     /**
482      * Start the scheduler, if it's suspended or stopped
483      */

484     public void start() {
485         synchronized(queue) {
486             switch(thread_state) {
487                 case RUN:
488                     break;
489                 case SUSPEND:
490                     _unsuspend();
491                     break;
492                 case STOPPING:
493                     break;
494                 case STOP:
495                     _start();
496                     break;
497             }
498         }
499     }
500
501
502     /**
503      * Stop the scheduler if it's running. Switch to stopped, if it's
504      * suspended. Clear the task queue.
505      *
506      * @throws InterruptedException if interrupted while waiting for thread
507      * to return
508      */

509     public void stop() throws InterruptedException JavaDoc {
510 // i. Switch to STOPPING, interrupt thread
511
// ii. Wait until thread ends
512
// iii. Clear the task queue, switch to STOPPED,
513
synchronized(queue) {
514             switch(thread_state) {
515                 case RUN:
516                     _stopping();
517                     break;
518                 case SUSPEND:
519                     _stop();
520                     return;
521                 case STOPPING:
522                     return;
523                 case STOP:
524                     return;
525             }
526             thread.interrupt();
527         }
528
529         thread.join();
530
531         synchronized(queue) {
532             queue.clear();
533             _stop();
534         }
535     }
536 }
537
Popular Tags