1 17 18 package org.apache.geronimo.timer; 19 20 import java.util.ArrayList ; 21 import java.util.Collection ; 22 import java.util.Collections ; 23 import java.util.Date ; 24 import java.util.HashMap ; 25 import java.util.Iterator ; 26 import java.util.Map ; 27 import java.util.Timer ; 28 import java.util.TimerTask ; 29 import javax.transaction.RollbackException ; 30 import javax.transaction.Status ; 31 import javax.transaction.Synchronization ; 32 import javax.transaction.SystemException ; 33 import javax.transaction.Transaction ; 34 import javax.transaction.TransactionManager ; 35 36 import edu.emory.mathcs.backport.java.util.concurrent.Executor; 37 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 import org.apache.geronimo.gbean.GBeanLifecycle; 41 42 48 public class ThreadPooledTimer implements PersistentTimer, GBeanLifecycle { 49 50 private static final Log log = LogFactory.getLog(ThreadPooledTimer.class); 51 52 private final ExecutorTaskFactory executorTaskFactory; 53 private final WorkerPersistence workerPersistence; 54 private final Executor executor; 55 private final TransactionManager transactionManager; 56 57 private Timer delegate; 58 59 private final Map idToWorkInfoMap = Collections.synchronizedMap(new HashMap ()); 60 61 public ThreadPooledTimer() { 63 this(null, null, null, null); 64 } 65 66 public ThreadPooledTimer(ExecutorTaskFactory executorTaskFactory, WorkerPersistence workerPersistence, Executor executor, TransactionManager transactionManager) { 67 this.executorTaskFactory = executorTaskFactory; 68 this.workerPersistence = workerPersistence; 69 this.executor = executor; 70 this.transactionManager = transactionManager; 71 } 72 73 public void doStart() throws Exception { 74 delegate = new Timer (true); 75 } 76 77 public void doStop() { 78 if (delegate != null) { 79 delegate.cancel(); 80 delegate = null; 81 } 82 } 83 84 public void doFail() { 85 doStop(); 86 } 87 88 public WorkInfo schedule(UserTaskFactory userTaskFactory, String key, Object userId, Object userInfo, long delay) throws PersistenceException, RollbackException , SystemException { 89 if (delay < 0) { 90 throw new IllegalArgumentException ("Negative delay: " + delay); 91 } 92 Date time = new Date (System.currentTimeMillis() + delay); 93 return schedule(key, userTaskFactory, userId, userInfo, time); 94 } 95 96 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date time) throws PersistenceException, RollbackException , SystemException { 97 if (time ==null) { 98 throw new IllegalArgumentException ("No time supplied"); 99 } 100 if (time.getTime() < 0) { 101 throw new IllegalArgumentException ("Negative time: " + time.getTime()); 102 } 103 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, time, null, false); 104 registerSynchronization(new ScheduleSynchronization(worker.getExecutorFeedingTimerTask(), time)); 105 addWorkInfo(worker); 106 return worker; 107 } 108 109 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period, Object userId) throws PersistenceException, RollbackException , SystemException { 110 if (delay < 0) { 111 throw new IllegalArgumentException ("Negative delay: " + delay); 112 } 113 if (period < 0) { 114 throw new IllegalArgumentException ("Negative period: " + period); 115 } 116 Date time = new Date (System.currentTimeMillis() + delay); 117 return schedule(key, userTaskFactory, userId, userInfo, time, period); 118 } 119 120 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException , SystemException { 121 if (firstTime ==null) { 122 throw new IllegalArgumentException ("No time supplied"); 123 } 124 if (firstTime.getTime() < 0) { 125 throw new IllegalArgumentException ("Negative time: " + firstTime.getTime()); 126 } 127 if (period < 0) { 128 throw new IllegalArgumentException ("Negative period: " + period); 129 } 130 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long (period), false); 131 registerSynchronization(new ScheduleRepeatedSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period)); 132 addWorkInfo(worker); 133 return worker; 134 } 135 136 public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, long delay, long period) throws PersistenceException, RollbackException , SystemException { 137 if (delay < 0) { 138 throw new IllegalArgumentException ("Negative delay: " + delay); 139 } 140 if (period < 0) { 141 throw new IllegalArgumentException ("Negative period: " + period); 142 } 143 Date time = new Date (System.currentTimeMillis() + delay); 144 return scheduleAtFixedRate(key, userTaskFactory, userId, userInfo, time, period); 145 } 146 147 public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException , SystemException { 148 if (firstTime ==null) { 149 throw new IllegalArgumentException ("No time supplied"); 150 } 151 if (firstTime.getTime() < 0) { 152 throw new IllegalArgumentException ("Negative time: " + firstTime.getTime()); 153 } 154 if (period < 0) { 155 throw new IllegalArgumentException ("Negative period: " + period); 156 } 157 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long (period), true); 158 registerSynchronization(new ScheduleAtFixedRateSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period)); 159 addWorkInfo(worker); 160 return worker; 161 } 162 163 public Collection playback(String key, UserTaskFactory userTaskFactory) throws PersistenceException { 164 PlaybackImpl playback = new PlaybackImpl(userTaskFactory); 165 workerPersistence.playback(key, playback); 166 return playback.getWorkInfos(); 167 } 168 169 public Collection getIdsByKey(String key, Object userId) throws PersistenceException { 170 return workerPersistence.getIdsByKey(key, userId); 171 } 172 173 public WorkInfo getWorkInfo(Long id) { 174 return (WorkInfo) idToWorkInfoMap.get(id); 175 } 176 177 182 public void cancelTimerTasks(Collection ids) { 183 for (Iterator iterator = ids.iterator(); iterator.hasNext();) { 184 Long idLong = (Long ) iterator.next(); 185 WorkInfo workInfo = getWorkInfo(idLong); 186 if (workInfo != null) { 187 TimerTask timerTask = workInfo.getExecutorFeedingTimerTask(); 188 timerTask.cancel(); 189 } 190 } 191 } 192 193 void addWorkInfo(WorkInfo worker) { 194 idToWorkInfoMap.put(new Long (worker.getId()), worker); 195 } 196 197 void removeWorkInfo(WorkInfo workInfo) { 198 idToWorkInfoMap.remove(new Long (workInfo.getId())); 199 } 200 201 void workPerformed(WorkInfo workInfo) throws PersistenceException { 202 if (workInfo.isOneTime()) { 203 workerPersistence.cancel(workInfo.getId()); 204 } else if (workInfo.getAtFixedRate()) { 205 workerPersistence.fixedRateWorkPerformed(workInfo.getId()); 206 workInfo.nextTime(); 207 } else { 208 workInfo.nextInterval(); 209 workerPersistence.intervalWorkPerformed(workInfo.getId(), workInfo.getPeriod().longValue()); 210 } 211 } 212 213 Timer getTimer() { 214 if (delegate == null) { 215 throw new IllegalStateException ("Timer is stopped"); 216 } 217 return delegate; 218 } 219 220 WorkerPersistence getWorkerPersistence() { 221 return workerPersistence; 222 } 223 224 Executor getExecutor() { 225 return executor; 226 } 227 228 private WorkInfo createWorker(String key, UserTaskFactory userTaskFactory, ExecutorTaskFactory executorTaskFactory, Object userId, Object userInfo, Date time, Long period, boolean atFixedRate) throws PersistenceException { 229 if (time == null) { 230 throw new IllegalArgumentException ("Null initial time"); 231 } 232 WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate); 233 workerPersistence.save(workInfo); 235 236 Runnable userTask = userTaskFactory.newTask(workInfo.getId()); 237 ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, this); 238 ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, this); 239 workInfo.initialize(worker, executorTask); 240 return workInfo; 241 } 242 243 void registerSynchronization(Synchronization sync) throws RollbackException , SystemException { 244 Transaction transaction = transactionManager.getTransaction(); 245 int status = transaction == null ? Status.STATUS_NO_TRANSACTION : transaction.getStatus(); 246 247 if (transaction != null && status == Status.STATUS_ACTIVE || status == Status.STATUS_MARKED_ROLLBACK) { 248 transaction.registerSynchronization(sync); 249 } else { 250 sync.beforeCompletion(); 251 sync.afterCompletion(Status.STATUS_COMMITTED); 252 } 253 } 254 255 private class ScheduleSynchronization implements Synchronization { 256 257 private final ExecutorFeedingTimerTask worker; 258 private final Date time; 259 260 public ScheduleSynchronization(ExecutorFeedingTimerTask worker, Date time) { 261 this.worker = worker; 262 this.time = time; 263 } 264 265 public void beforeCompletion() { 266 } 267 268 public void afterCompletion(int status) { 269 if (status == Status.STATUS_COMMITTED) { 270 if (worker.isCancelled()) { 271 log.trace("Worker is already cancelled, not scheduling"); 272 return; 273 } 274 try { 275 getTimer().schedule(worker, time); 276 } catch (IllegalStateException e) { 277 log.warn("Couldn't schedule worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime()); 279 } 280 } 281 } 282 } 283 284 private class ScheduleRepeatedSynchronization implements Synchronization { 285 286 private final ExecutorFeedingTimerTask worker; 287 private final Date time; 288 private final long period; 289 290 public ScheduleRepeatedSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) { 291 this.worker = worker; 292 this.time = time; 293 this.period = period; 294 } 295 296 public void beforeCompletion() { 297 } 298 299 public void afterCompletion(int status) { 300 if (status == Status.STATUS_COMMITTED) { 301 if (worker.isCancelled()) { 302 log.trace("Worker is already cancelled, not scheduling/period"); 303 return; 304 } 305 try { 306 getTimer().schedule(worker, time, period); 307 } catch (Exception e) { 308 log.warn("Couldn't schedule/period worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime()); 309 } 310 } 311 } 312 } 313 314 private class ScheduleAtFixedRateSynchronization implements Synchronization { 315 316 private final ExecutorFeedingTimerTask worker; 317 private final Date time; 318 private final long period; 319 320 public ScheduleAtFixedRateSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) { 321 this.worker = worker; 322 this.time = time; 323 this.period = period; 324 } 325 326 public void beforeCompletion() { 327 } 328 329 public void afterCompletion(int status) { 330 if (status == Status.STATUS_COMMITTED) { 331 if (worker.isCancelled()) { 332 log.trace("Worker is already cancelled, not scheduleAtFixedRate"); 333 return; 334 } 335 try { 336 getTimer().scheduleAtFixedRate(worker, time, period); 337 } catch (Exception e) { 338 log.warn("Couldn't scheduleAtFixedRate worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime()); 339 } 340 } 341 } 342 } 343 344 private class PlaybackImpl implements Playback { 345 346 private final UserTaskFactory userTaskFactory; 347 348 private final Collection workInfos = new ArrayList (); 349 350 public PlaybackImpl(UserTaskFactory userTaskFactory) { 351 this.userTaskFactory = userTaskFactory; 352 } 353 354 public void schedule(WorkInfo workInfo) { 355 Runnable userTask = userTaskFactory.newTask(workInfo.getId()); 356 ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, ThreadPooledTimer.this); 357 ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, ThreadPooledTimer.this); 358 workInfo.initialize(worker, executorTask); 359 if (workInfo.getPeriod() == null) { 360 getTimer().schedule(worker, workInfo.getTime()); 361 } else if (!workInfo.getAtFixedRate()) { 362 getTimer().schedule(worker, workInfo.getTime(), workInfo.getPeriod().longValue()); 363 } else { 364 getTimer().scheduleAtFixedRate(worker, workInfo.getTime(), workInfo.getPeriod().longValue()); 365 } 366 addWorkInfo(workInfo); 367 workInfos.add(workInfo); 368 } 369 370 public Collection getWorkInfos() { 371 return workInfos; 372 } 373 374 } 375 376 } 377 | Popular Tags |