1 3 package org.jgroups.util; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 9 10 22 public class Scheduler implements Runnable { 23 final Queue queue=new Queue(); 24 Thread sched_thread=null; 25 Task current_task=null; 26 ThreadPool pool=null; 27 SchedulerListener listener=null; 28 29 protected static final Log log=LogFactory.getLog(Scheduler.class); 30 31 35 boolean concurrent_processing=false; 36 37 38 int NUM_THREADS=128; 39 40 static final int WAIT_FOR_THREAD_AVAILABILITY=3000; 41 static final int THREAD_JOIN_TIMEOUT=1000; 42 43 44 45 46 47 48 public Scheduler() { 49 try { 51 this.NUM_THREADS=Integer.parseInt(System.getProperty("scheduler.max.threads", "128")); 52 } 53 catch (SecurityException ex){ 54 } 56 } 57 58 59 public Scheduler(int num_threads) { 60 this.NUM_THREADS=num_threads; 61 } 62 63 64 public void setListener(SchedulerListener l) { 65 listener=l; 66 } 67 68 69 public boolean getConcurrentProcessing() { 70 return concurrent_processing; 71 } 72 73 public void setConcurrentProcessing(boolean process_concurrently) { 74 this.concurrent_processing=process_concurrently; 75 } 76 77 public void run() { 78 while(sched_thread != null) { 79 if(queue.closed()) break; 80 try { 81 current_task=(Task)queue.peek(); if(current_task == null) { if(log.isWarnEnabled()) log.warn("current task is null, queue.size()=" + queue.size() + 84 ", queue.closed()=" + queue.closed() + ", continuing"); 85 continue; 86 } 87 88 if(current_task.suspended) { 89 current_task.suspended=false; 90 current_task.thread.resume(); 91 if(listener != null) listener.resumed(current_task.target); 92 } 93 else { 94 if(current_task.thread == null) { 95 current_task.thread=pool.getThread(); 96 if(current_task.thread == null) { if(log.isWarnEnabled()) log.warn("thread pool exhausted, waiting for " + 98 WAIT_FOR_THREAD_AVAILABILITY + "ms before retrying"); 99 Util.sleep(WAIT_FOR_THREAD_AVAILABILITY); 100 continue; 101 } 102 } 103 104 if(listener != null) listener.started(current_task.target); 106 if(current_task.thread.assignTask(current_task.target) == false) 107 continue; 108 } 109 110 if(sched_thread.isInterrupted()) { 113 117 throw new InterruptedException (); 118 } 119 120 if(concurrent_processing == false) { synchronized(current_task.thread) { 122 while(!current_task.thread.done() && !current_task.thread.suspended) 123 current_task.thread.wait(); 124 } 125 if(listener != null) listener.stopped(current_task.target); 126 } 127 queue.removeElement(current_task); 128 } 129 catch(InterruptedException interrupted) { 130 if(sched_thread == null || queue.closed()) break; 131 if(current_task.thread != null) { 132 current_task.thread.suspend(); 133 if(listener != null) listener.suspended(current_task.target); 134 current_task.suspended=true; 135 } 136 Thread.interrupted(); continue; 138 } 139 catch(QueueClosedException closed_ex) { 140 return; 141 } 142 catch(Throwable ex) { 143 if(log.isErrorEnabled()) log.error("exception=" + Util.print(ex)); 144 continue; 145 } 146 } 147 if(log.isTraceEnabled()) log.trace("scheduler thread terminated"); 148 } 149 150 151 public void addPrio(Runnable task) { 152 Task new_task=new Task(task); 153 boolean do_interrupt=false; 154 155 try { 156 synchronized(queue) { if(queue.size() == 0) 158 queue.add(new_task); 159 else { 160 queue.addAtHead(new_task); 161 do_interrupt=true; 162 } 163 } 164 if(do_interrupt) sched_thread.interrupt(); 166 } 167 catch(Throwable e) { 168 if(log.isErrorEnabled()) log.error("exception=" + e); 169 } 170 } 171 172 173 public void add(Runnable task) { 174 Task new_task=new Task(task); 175 176 try { 177 synchronized(queue) { queue.add(new_task); 179 } 180 } 181 catch(Exception e) { 182 if(log.isErrorEnabled()) log.error("exception=" + e); 183 } 184 } 185 186 187 public void start() { 188 if(queue.closed()) 189 queue.reset(); 190 if(sched_thread == null) { 191 pool=new ThreadPool(NUM_THREADS); 192 sched_thread=new Thread (this, "Scheduler main thread"); 193 sched_thread.setDaemon(true); 194 sched_thread.start(); 195 } 196 } 197 198 199 205 public void stop() { 206 Thread tmp=null; 207 208 queue.close(false); 211 if(sched_thread != null && sched_thread.isAlive()) { 213 tmp=sched_thread; 214 sched_thread=null; 215 tmp.interrupt(); 216 try { 217 tmp.join(THREAD_JOIN_TIMEOUT); 218 } 219 catch(Exception ex) { 220 } 221 222 if(tmp.isAlive()) 223 if(log.isErrorEnabled()) log.error("scheduler thread is still not dead !!!"); 224 } 225 sched_thread=null; 226 227 if(pool != null) { 229 pool.destroy(); 230 pool=null; 231 } 232 } 233 234 235 236 public class Task { 237 ReusableThread thread=null; 238 Runnable target=null; 239 boolean suspended=false; 240 241 Task(Runnable target) { 242 this.target=target; 243 } 244 245 public String toString() { 246 return "[thread=" + thread + ", target=" + target + ", suspended=" + suspended + ']'; 247 } 248 } 249 250 251 } 252 | Popular Tags |