1 3 package org.jgroups.util; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 9 import java.io.PrintWriter ; 10 import java.io.StringWriter ; 11 import java.util.Iterator ; 12 import java.util.SortedSet ; 13 import java.util.TreeSet ; 14 15 16 46 public class TimeScheduler { 47 50 public interface Task { 51 55 boolean cancelled(); 56 57 60 long nextInterval(); 61 62 65 void run(); 66 } 67 68 69 72 private static class IntTask implements Comparable { 73 76 public final Task task; 77 80 public long sched; 81 84 public final boolean relative; 85 86 92 public IntTask(Task task, long sched, boolean relative) { 93 this.task=task; 94 this.sched=sched; 95 this.relative=relative; 96 } 97 98 109 public int compareTo(Object obj) { 110 IntTask other; 111 112 if(!(obj instanceof IntTask)) return (-1); 113 114 other=(IntTask)obj; 115 if(sched < other.sched) return (-1); 116 if(sched > other.sched) return (1); 117 return (task.hashCode() - other.task.hashCode()); 118 } 119 120 public String toString() { 121 if(task == null) 122 return "<unnamed>"; 123 else 124 return task.getClass().getName(); 125 } 126 } 127 128 129 132 private class Loop implements Runnable { 133 public void run() { 134 try { 135 _run(); 136 } 137 catch(Throwable t) { 138 log.error("exception in loop", t); 139 } 140 } 141 } 142 143 144 148 private static class TaskQueue { 149 152 private final SortedSet set; 153 154 public TaskQueue() { 155 super(); 156 set=new TreeSet (); 157 } 158 159 public void add(IntTask t) { 160 set.add(t); 161 } 162 163 public void remove(IntTask t) { 164 set.remove(t); 165 } 166 167 public IntTask getFirst() { 168 return ((IntTask)set.first()); 169 } 170 171 public void removeFirst() { 172 Iterator it=set.iterator(); 173 it.next(); 174 it.remove(); 175 } 176 177 public void rescheduleFirst(long sched) { 178 Iterator it=set.iterator(); 179 IntTask t=(IntTask)it.next(); 180 it.remove(); 181 t.sched=sched; 182 set.add(t); 183 } 184 185 public boolean isEmpty() { 186 return (set.isEmpty()); 187 } 188 189 public void clear() { 190 set.clear(); 191 } 192 193 public int size() { 194 return set.size(); 195 } 196 197 public String toString() { 198 return set.toString(); 199 } 200 } 201 202 203 206 private static final long SUSPEND_INTERVAL=2000; 207 212 private static final long TICK_INTERVAL=1000; 213 214 221 private static final int RUN=0; 222 231 private static final int SUSPEND=1; 232 239 private static final int STOPPING=2; 240 247 private static final int STOP=3; 248 249 252 private static final String THREAD_NAME="TimeScheduler.Thread"; 253 254 255 258 private Thread thread=null; 259 262 private int thread_state=SUSPEND; 263 267 private long suspend_interval=SUSPEND_INTERVAL; 268 271 private final TaskQueue queue; 272 273 protected static final Log log=LogFactory.getLog(TimeScheduler.class); 274 275 276 279 private String _toString(Throwable ex) { 280 StringWriter sw=new StringWriter (); 281 PrintWriter pw=new PrintWriter (sw); 282 ex.printStackTrace(pw); 283 return (sw.toString()); 284 } 285 286 287 290 private void _start() { 291 thread_state=RUN; 292 293 if(thread == null || !thread.isAlive()) { 295 thread=new Thread (new Loop(), THREAD_NAME); 296 thread.setDaemon(true); 297 thread.start(); 298 } 299 } 300 301 304 private void _unsuspend() { 305 thread_state=RUN; 306 307 if(thread == null || !thread.isAlive()) { 309 thread=new Thread (new Loop(), THREAD_NAME); 310 thread.setDaemon(true); 311 thread.start(); 312 } 313 } 314 315 318 private void _suspend() { 319 thread_state=SUSPEND; 320 thread=null; 321 } 322 323 326 private void _stopping() { 327 thread_state=STOPPING; 328 } 329 330 333 private void _stop() { 334 thread_state=STOP; 335 thread=null; 336 } 337 338 339 347 private void _run() { 348 IntTask intTask; 349 Task task; 350 long currTime, execTime, waitTime, intervalTime, schedTime; 351 352 while(true) { 353 synchronized(this) { 354 if(thread == null || thread.isInterrupted()) return; 355 } 356 357 synchronized(queue) { 358 while(true) { 359 if(!queue.isEmpty()) break; 360 try { 361 queue.wait(suspend_interval); 362 } 363 catch(InterruptedException ex) { 364 return; 365 } 366 if(!queue.isEmpty()) break; 367 _suspend(); 368 return; 369 } 370 371 intTask=queue.getFirst(); 372 synchronized(intTask) { 373 task=intTask.task; 374 if(task.cancelled()) { 375 queue.removeFirst(); 376 continue; 377 } 378 currTime=System.currentTimeMillis(); 379 execTime=intTask.sched; 380 if((waitTime=execTime - currTime) <= 0) { 381 intervalTime=task.nextInterval(); 383 schedTime=intTask.relative ? 384 currTime + intervalTime : execTime + intervalTime; 385 queue.rescheduleFirst(schedTime); 386 } 387 } 388 if(waitTime > 0) { 389 try { 391 queue.wait(waitTime); 392 } 393 catch(InterruptedException ex) { 394 return; 395 } 396 continue; 397 } 398 } 399 400 try { 401 task.run(); 402 } 403 catch(Exception ex) { 404 log.error(_toString(ex)); 405 } 406 } 407 } 408 409 410 418 public TimeScheduler(long suspend_interval) { 419 super(); 420 queue=new TaskQueue(); 421 this.suspend_interval=suspend_interval; 422 } 423 424 428 public TimeScheduler() { 429 this(SUSPEND_INTERVAL); 430 } 431 432 433 448 public void add(Task t, boolean relative) { 449 long interval, sched; 450 451 if((interval=t.nextInterval()) < 0) return; 452 sched=System.currentTimeMillis() + interval; 453 454 synchronized(queue) { 455 queue.add(new IntTask(t, sched, relative)); 456 switch(thread_state) { 457 case RUN: 458 queue.notifyAll(); 459 break; 460 case SUSPEND: 461 _unsuspend(); 462 break; 463 case STOPPING: 464 break; 465 case STOP: 466 break; 467 } 468 } 469 } 470 471 476 public void add(Task t) { 477 add(t, true); 478 } 479 480 481 484 public void start() { 485 synchronized(queue) { 486 switch(thread_state) { 487 case RUN: 488 break; 489 case SUSPEND: 490 _unsuspend(); 491 break; 492 case STOPPING: 493 break; 494 case STOP: 495 _start(); 496 break; 497 } 498 } 499 } 500 501 502 509 public void stop() throws InterruptedException { 510 synchronized(queue) { 514 switch(thread_state) { 515 case RUN: 516 _stopping(); 517 break; 518 case SUSPEND: 519 _stop(); 520 return; 521 case STOPPING: 522 return; 523 case STOP: 524 return; 525 } 526 thread.interrupt(); 527 } 528 529 thread.join(); 530 531 synchronized(queue) { 532 queue.clear(); 533 _stop(); 534 } 535 } 536 } 537 | Popular Tags |