1 45 package org.exolab.jms.events; 46 47 import java.util.Comparator ; 48 import java.util.HashMap ; 49 import java.util.Iterator ; 50 51 import org.apache.commons.logging.Log; 52 import org.apache.commons.logging.LogFactory; 53 54 import org.exolab.jms.service.BasicService; 55 import org.exolab.jms.service.ServiceException; 56 import org.exolab.jms.service.ServiceState; 57 import org.exolab.jms.common.threads.ThreadPool; 58 import org.exolab.jms.common.util.OrderedQueue; 59 import org.exolab.jms.threads.ThreadPoolExistsException; 60 import org.exolab.jms.threads.ThreadPoolManager; 61 62 63 96 public class BasicEventManager 97 extends BasicService 98 implements EventManager { 99 100 public transient static final String NAME = "EventManager"; 102 103 public transient static final int MAX_THREADS = 5; 105 106 109 private HashMap _events = new HashMap (); 110 111 114 private transient ThreadPool _pool; 115 116 119 private transient Object _queueSync = new Object (); 120 121 124 private transient OrderedQueue _queue = new OrderedQueue(_queueComparator); 125 126 129 private transient long _seed; 130 131 134 private static final String EVENT_MANAGER_THREAD_NAME = 135 "EventManagerThread"; 136 137 140 transient static private BasicEventManager _instance = null; 141 142 145 private static final Log _log = LogFactory.getLog(BasicEventManager.class); 146 147 148 153 public static BasicEventManager instance() { 154 if (_instance == null) 155 _instance = new BasicEventManager(); 156 157 return _instance; 158 } 159 160 protected BasicEventManager() { 161 super(EVENT_MANAGER_THREAD_NAME); 162 } 163 164 184 public String registerEvent(Event event, long absolute) 185 throws IllegalEventDefinedException { 186 synchronized (_queueSync) { 187 QueueEntry entry = new QueueEntry(event, absolute, generateId()); 188 189 _queue.add(entry); 191 _events.put(entry.id, entry); 192 193 _queueSync.notifyAll(); 195 return entry.id; 196 } 197 } 198 199 216 public String registerEventRelative(Event event, long relative) 217 throws IllegalEventDefinedException { 218 return registerEvent(event, System.currentTimeMillis() + relative); 219 } 220 221 227 public void unregisterEvent(String id) { 228 synchronized (_queueSync) { 229 Object obj = _events.remove(id); 231 if (obj == null) 232 return; 233 _queue.remove(obj); 235 } 236 } 237 238 public void run() { 240 synchronized (_queueSync) { 241 QueueEntry entry; 242 long currentTime; 243 while (getState() != ServiceState.STOPPED) { 244 currentTime = System.currentTimeMillis(); 245 try { 246 entry = (QueueEntry) _queue.firstElement(); 247 } catch (java.util.NoSuchElementException ex) { 248 try { 250 _queueSync.wait(); 251 } catch (InterruptedException ex1) { 252 break; 253 } 254 continue; 255 } 256 257 if (entry.absolute <= currentTime) { 258 try { 260 getThreadPool().execute(entry); 261 } catch (InterruptedException ex) { 262 } 263 _queue.removeFirstElement(); 264 _events.remove(entry.id); 265 } else { 266 try { 269 _queueSync.wait(entry.absolute - currentTime); 270 } catch (InterruptedException ex) { 271 } 273 } 274 } 275 } 276 } 277 278 281 private synchronized String generateId() { 282 return Long.toString(++_seed); 283 } 284 285 public void start() throws ServiceException { 286 super.start(); 287 } 288 289 295 private ThreadPool getThreadPool() { 296 if (_pool == null) { 297 try { 304 _pool = ThreadPoolManager.instance().createThreadPool 305 (NAME, MAX_THREADS); 306 } catch (ThreadPoolExistsException err) { 307 _log.error("Thread pool " + NAME + " already exists"); 308 } 309 } 310 311 return _pool; 312 } 313 314 317 private transient static final Comparator _queueComparator = 318 new Comparator () { 319 320 public int compare(Object obj1, Object obj2) { 321 QueueEntry qe1 = (QueueEntry) obj1; 322 QueueEntry qe2 = (QueueEntry) obj2; 323 324 if (qe1.absolute < qe2.absolute) 325 return -1; 326 if (qe1.absolute > qe2.absolute) 327 return 1; 328 return 0; 329 } 330 331 public boolean equals(Object that) { 332 return (this == that); 333 } 334 }; 335 336 339 class QueueEntry implements Runnable { 340 341 QueueEntry(Event event, long absolute, String id) { 342 this.absolute = absolute; 343 this.event = event; 344 this.id = id; 345 } 346 347 private long absolute; 348 private Event event; 349 private String id; 350 351 public void run() { 352 event.getEventListener().handleEvent(event.getEventType(), 353 event.getCallbackObject(), System.currentTimeMillis()); 354 } 355 } 356 357 } 358 | Popular Tags |