1 43 package org.exolab.jms.scheduler; 44 45 import java.util.HashMap ; 46 import java.util.LinkedList ; 47 48 import org.apache.commons.logging.Log; 49 import org.apache.commons.logging.LogFactory; 50 51 import org.exolab.jms.service.BasicService; 52 import org.exolab.jms.service.ServiceException; 53 import org.exolab.jms.common.threads.ThreadPool; 54 import org.exolab.jms.config.Configuration; 55 import org.exolab.jms.config.ConfigurationManager; 56 import org.exolab.jms.config.SchedulerConfiguration; 57 import org.exolab.jms.threads.ThreadPoolManager; 58 59 60 77 public class Scheduler extends BasicService { 78 79 82 private ThreadPool _threads = null; 83 84 87 private LinkedList _queue = new LinkedList ();; 88 89 93 private HashMap _references = new HashMap (); 94 95 98 private volatile boolean _stop = false; 99 100 105 private int _threadCount = 6; 106 107 112 private final static int MIN_THREAD_COUNT = 2; 113 114 117 private static final String SCHEDULER_NAME = "Scheduler"; 118 119 122 private static Scheduler _instance = null; 123 124 127 private static final Log _log = LogFactory.getLog(Scheduler.class); 128 129 134 public static Scheduler createInstance() throws ServiceException { 135 _instance = new Scheduler(); 136 return _instance; 137 } 138 139 145 public static Scheduler instance() { 146 return _instance; 147 } 148 149 155 private Scheduler() throws ServiceException { 156 super(SCHEDULER_NAME); 157 158 Configuration config = ConfigurationManager.getConfig(); 160 SchedulerConfiguration sched_config = 161 config.getSchedulerConfiguration(); 162 163 int count = sched_config.getMaxThreads(); 164 if (count < MIN_THREAD_COUNT) { 165 count = MIN_THREAD_COUNT; 166 } 167 _threadCount = count; 168 169 _threads = ThreadPoolManager.instance().createThreadPool( 171 SCHEDULER_NAME, _threadCount); 172 } 173 174 180 public void add(Runnable runner) { 181 synchronized (_queue) { 182 _queue.addLast(runner); 183 addReference(runner); 184 _queue.notify(); 185 } 186 } 187 188 196 public boolean remove(Runnable runner) { 197 boolean result = false; 198 synchronized (_queue) { 199 result = _queue.remove(runner); 200 } 201 return result; 202 } 203 204 212 public boolean contains(Runnable runner) { 213 boolean result = false; 214 synchronized (_queue) { 215 result = (_references.get(runner) != null); 216 } 217 return result; 218 } 219 220 225 public boolean isEmpty() { 226 boolean result = false; 227 synchronized (_queue) { 228 result = _queue.isEmpty(); 229 } 230 return result; 231 } 232 233 237 public void run() { 238 while (!_stop) { 239 Runnable runner = next(); 240 if (!_stop && runner != null) { 241 try { 242 _threads.execute(runner); 243 } catch (Exception exception) { 244 _log.error(exception); 245 } 246 } 247 } 248 } 249 250 public void stop() throws ServiceException { 252 _threads.stopRequestAllWorkers(); 254 _stop = true; 255 super.stop(); 256 } 257 258 264 protected Runnable next() { 265 Runnable result = null; 266 synchronized (_queue) { 267 while (!_stop && _queue.isEmpty()) { 268 try { 269 _queue.wait(); 270 } catch (InterruptedException ignore) { 271 } 273 } 274 if (!_stop) { 275 result = (Runnable ) _queue.removeFirst(); 276 removeReference(result); 277 } 278 } 279 return result; 280 } 281 282 286 private void addReference(Runnable runner) { 287 Integer count = (Integer ) _references.get(runner); 288 if (count != null) { 289 count = new Integer (count.intValue() + 1); 290 _references.put(runner, count); 291 } else { 292 _references.put(runner, new Integer (1)); 293 } 294 } 295 296 300 private void removeReference(Runnable runner) { 301 Integer count = (Integer ) _references.get(runner); 304 if (count.intValue() <= 1) { 305 _references.remove(runner); 306 } else { 307 _references.put(runner, new Integer (count.intValue() - 1)); 308 } 309 } 310 311 } 312 | Popular Tags |