KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > geronimo > timer > ThreadPooledTimer


1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17
18 package org.apache.geronimo.timer;
19
20 import java.util.ArrayList JavaDoc;
21 import java.util.Collection JavaDoc;
22 import java.util.Collections JavaDoc;
23 import java.util.Date JavaDoc;
24 import java.util.HashMap JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.Map JavaDoc;
27 import java.util.Timer JavaDoc;
28 import java.util.TimerTask JavaDoc;
29 import javax.transaction.RollbackException JavaDoc;
30 import javax.transaction.Status JavaDoc;
31 import javax.transaction.Synchronization JavaDoc;
32 import javax.transaction.SystemException JavaDoc;
33 import javax.transaction.Transaction JavaDoc;
34 import javax.transaction.TransactionManager JavaDoc;
35
36 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.geronimo.gbean.GBeanLifecycle;
41
42 /**
43  *
44  *
45  * @version $Rev: 476049 $ $Date: 2006-11-16 23:35:17 -0500 (Thu, 16 Nov 2006) $
46  *
47  * */

48 public class ThreadPooledTimer implements PersistentTimer, GBeanLifecycle {
49
50     private static final Log log = LogFactory.getLog(ThreadPooledTimer.class);
51
52     private final ExecutorTaskFactory executorTaskFactory;
53     private final WorkerPersistence workerPersistence;
54     private final Executor executor;
55     private final TransactionManager JavaDoc transactionManager;
56
57     private Timer JavaDoc delegate;
58
59     private final Map JavaDoc idToWorkInfoMap = Collections.synchronizedMap(new HashMap JavaDoc());
60
61     //default constructor for use as reference endpoint.
62
public ThreadPooledTimer() {
63         this(null, null, null, null);
64     }
65
66     public ThreadPooledTimer(ExecutorTaskFactory executorTaskFactory, WorkerPersistence workerPersistence, Executor executor, TransactionManager JavaDoc transactionManager) {
67         this.executorTaskFactory = executorTaskFactory;
68         this.workerPersistence = workerPersistence;
69         this.executor = executor;
70         this.transactionManager = transactionManager;
71     }
72
73     public void doStart() throws Exception JavaDoc {
74         delegate = new Timer JavaDoc(true);
75     }
76
77     public void doStop() {
78         if (delegate != null) {
79             delegate.cancel();
80             delegate = null;
81         }
82     }
83
84     public void doFail() {
85         doStop();
86     }
87
88     public WorkInfo schedule(UserTaskFactory userTaskFactory, String JavaDoc key, Object JavaDoc userId, Object JavaDoc userInfo, long delay) throws PersistenceException, RollbackException JavaDoc, SystemException JavaDoc {
89         if (delay < 0) {
90             throw new IllegalArgumentException JavaDoc("Negative delay: " + delay);
91         }
92         Date JavaDoc time = new Date JavaDoc(System.currentTimeMillis() + delay);
93         return schedule(key, userTaskFactory, userId, userInfo, time);
94     }
95
96     public WorkInfo schedule(String JavaDoc key, UserTaskFactory userTaskFactory, Object JavaDoc userId, Object JavaDoc userInfo, Date JavaDoc time) throws PersistenceException, RollbackException JavaDoc, SystemException JavaDoc {
97         if (time ==null) {
98             throw new IllegalArgumentException JavaDoc("No time supplied");
99         }
100         if (time.getTime() < 0) {
101             throw new IllegalArgumentException JavaDoc("Negative time: " + time.getTime());
102         }
103         WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, time, null, false);
104         registerSynchronization(new ScheduleSynchronization(worker.getExecutorFeedingTimerTask(), time));
105         addWorkInfo(worker);
106         return worker;
107     }
108
109     public WorkInfo schedule(String JavaDoc key, UserTaskFactory userTaskFactory, Object JavaDoc userInfo, long delay, long period, Object JavaDoc userId) throws PersistenceException, RollbackException JavaDoc, SystemException JavaDoc {
110         if (delay < 0) {
111             throw new IllegalArgumentException JavaDoc("Negative delay: " + delay);
112         }
113         if (period < 0) {
114             throw new IllegalArgumentException JavaDoc("Negative period: " + period);
115         }
116         Date JavaDoc time = new Date JavaDoc(System.currentTimeMillis() + delay);
117         return schedule(key, userTaskFactory, userId, userInfo, time, period);
118     }
119
120     public WorkInfo schedule(String JavaDoc key, UserTaskFactory userTaskFactory, Object JavaDoc userId, Object JavaDoc userInfo, Date JavaDoc firstTime, long period) throws PersistenceException, RollbackException JavaDoc, SystemException JavaDoc {
121         if (firstTime ==null) {
122             throw new IllegalArgumentException JavaDoc("No time supplied");
123         }
124         if (firstTime.getTime() < 0) {
125             throw new IllegalArgumentException JavaDoc("Negative time: " + firstTime.getTime());
126         }
127         if (period < 0) {
128             throw new IllegalArgumentException JavaDoc("Negative period: " + period);
129         }
130         WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long JavaDoc(period), false);
131         registerSynchronization(new ScheduleRepeatedSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
132         addWorkInfo(worker);
133         return worker;
134     }
135
136     public WorkInfo scheduleAtFixedRate(String JavaDoc key, UserTaskFactory userTaskFactory, Object JavaDoc userId, Object JavaDoc userInfo, long delay, long period) throws PersistenceException, RollbackException JavaDoc, SystemException JavaDoc {
137         if (delay < 0) {
138             throw new IllegalArgumentException JavaDoc("Negative delay: " + delay);
139         }
140         if (period < 0) {
141             throw new IllegalArgumentException JavaDoc("Negative period: " + period);
142         }
143         Date JavaDoc time = new Date JavaDoc(System.currentTimeMillis() + delay);
144         return scheduleAtFixedRate(key, userTaskFactory, userId, userInfo, time, period);
145     }
146
147     public WorkInfo scheduleAtFixedRate(String JavaDoc key, UserTaskFactory userTaskFactory, Object JavaDoc userId, Object JavaDoc userInfo, Date JavaDoc firstTime, long period) throws PersistenceException, RollbackException JavaDoc, SystemException JavaDoc {
148         if (firstTime ==null) {
149             throw new IllegalArgumentException JavaDoc("No time supplied");
150         }
151         if (firstTime.getTime() < 0) {
152             throw new IllegalArgumentException JavaDoc("Negative time: " + firstTime.getTime());
153         }
154         if (period < 0) {
155             throw new IllegalArgumentException JavaDoc("Negative period: " + period);
156         }
157         WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long JavaDoc(period), true);
158         registerSynchronization(new ScheduleAtFixedRateSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
159         addWorkInfo(worker);
160         return worker;
161     }
162
163     public Collection JavaDoc playback(String JavaDoc key, UserTaskFactory userTaskFactory) throws PersistenceException {
164         PlaybackImpl playback = new PlaybackImpl(userTaskFactory);
165         workerPersistence.playback(key, playback);
166         return playback.getWorkInfos();
167     }
168
169     public Collection JavaDoc getIdsByKey(String JavaDoc key, Object JavaDoc userId) throws PersistenceException {
170         return workerPersistence.getIdsByKey(key, userId);
171     }
172
173     public WorkInfo getWorkInfo(Long JavaDoc id) {
174         return (WorkInfo) idToWorkInfoMap.get(id);
175     }
176
177     /**
178      * Called when client, eg. ejb container, is stopped and needs to cancel its timertasks without
179      * affecting persisted timer data.
180      * @param ids list of ids to have their corresponding workInfo timertasks cancelled.
181      */

182     public void cancelTimerTasks(Collection JavaDoc ids) {
183         for (Iterator JavaDoc iterator = ids.iterator(); iterator.hasNext();) {
184             Long JavaDoc idLong = (Long JavaDoc) iterator.next();
185             WorkInfo workInfo = getWorkInfo(idLong);
186             if (workInfo != null) {
187                 TimerTask JavaDoc timerTask = workInfo.getExecutorFeedingTimerTask();
188                 timerTask.cancel();
189             }
190         }
191     }
192
193     void addWorkInfo(WorkInfo worker) {
194         idToWorkInfoMap.put(new Long JavaDoc(worker.getId()), worker);
195     }
196
197     void removeWorkInfo(WorkInfo workInfo) {
198         idToWorkInfoMap.remove(new Long JavaDoc(workInfo.getId()));
199     }
200
201     void workPerformed(WorkInfo workInfo) throws PersistenceException {
202         if (workInfo.isOneTime()) {
203             workerPersistence.cancel(workInfo.getId());
204         } else if (workInfo.getAtFixedRate()) {
205             workerPersistence.fixedRateWorkPerformed(workInfo.getId());
206             workInfo.nextTime();
207         } else {
208             workInfo.nextInterval();
209             workerPersistence.intervalWorkPerformed(workInfo.getId(), workInfo.getPeriod().longValue());
210         }
211     }
212
213     Timer JavaDoc getTimer() {
214         if (delegate == null) {
215             throw new IllegalStateException JavaDoc("Timer is stopped");
216         }
217         return delegate;
218     }
219
220     WorkerPersistence getWorkerPersistence() {
221         return workerPersistence;
222     }
223
224     Executor getExecutor() {
225         return executor;
226     }
227
228     private WorkInfo createWorker(String JavaDoc key, UserTaskFactory userTaskFactory, ExecutorTaskFactory executorTaskFactory, Object JavaDoc userId, Object JavaDoc userInfo, Date JavaDoc time, Long JavaDoc period, boolean atFixedRate) throws PersistenceException {
229         if (time == null) {
230             throw new IllegalArgumentException JavaDoc("Null initial time");
231         }
232         WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate);
233         //save and assign id
234
workerPersistence.save(workInfo);
235
236         Runnable JavaDoc userTask = userTaskFactory.newTask(workInfo.getId());
237         ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, this);
238         ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, this);
239         workInfo.initialize(worker, executorTask);
240         return workInfo;
241     }
242
243     void registerSynchronization(Synchronization JavaDoc sync) throws RollbackException JavaDoc, SystemException JavaDoc {
244         Transaction JavaDoc transaction = transactionManager.getTransaction();
245         int status = transaction == null ? Status.STATUS_NO_TRANSACTION : transaction.getStatus();
246
247         if (transaction != null && status == Status.STATUS_ACTIVE || status == Status.STATUS_MARKED_ROLLBACK) {
248             transaction.registerSynchronization(sync);
249         } else {
250             sync.beforeCompletion();
251             sync.afterCompletion(Status.STATUS_COMMITTED);
252         }
253     }
254
255     private class ScheduleSynchronization implements Synchronization JavaDoc {
256
257         private final ExecutorFeedingTimerTask worker;
258         private final Date JavaDoc time;
259
260         public ScheduleSynchronization(ExecutorFeedingTimerTask worker, Date JavaDoc time) {
261             this.worker = worker;
262             this.time = time;
263         }
264
265         public void beforeCompletion() {
266         }
267
268         public void afterCompletion(int status) {
269             if (status == Status.STATUS_COMMITTED) {
270                 if (worker.isCancelled()) {
271                     log.trace("Worker is already cancelled, not scheduling");
272                     return;
273                 }
274                 try {
275                     getTimer().schedule(worker, time);
276                 } catch (IllegalStateException JavaDoc e) {
277                     //TODO consider again if catching this exception is appropriate
278
log.warn("Couldn't schedule worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
279                 }
280             }
281         }
282     }
283
284     private class ScheduleRepeatedSynchronization implements Synchronization JavaDoc {
285
286         private final ExecutorFeedingTimerTask worker;
287         private final Date JavaDoc time;
288         private final long period;
289
290         public ScheduleRepeatedSynchronization(ExecutorFeedingTimerTask worker, Date JavaDoc time, long period) {
291             this.worker = worker;
292             this.time = time;
293             this.period = period;
294         }
295
296         public void beforeCompletion() {
297         }
298
299         public void afterCompletion(int status) {
300             if (status == Status.STATUS_COMMITTED) {
301                 if (worker.isCancelled()) {
302                     log.trace("Worker is already cancelled, not scheduling/period");
303                     return;
304                 }
305                 try {
306                     getTimer().schedule(worker, time, period);
307                 } catch (Exception JavaDoc e) {
308                     log.warn("Couldn't schedule/period worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
309                 }
310             }
311         }
312     }
313
314     private class ScheduleAtFixedRateSynchronization implements Synchronization JavaDoc {
315
316         private final ExecutorFeedingTimerTask worker;
317         private final Date JavaDoc time;
318         private final long period;
319
320         public ScheduleAtFixedRateSynchronization(ExecutorFeedingTimerTask worker, Date JavaDoc time, long period) {
321             this.worker = worker;
322             this.time = time;
323             this.period = period;
324         }
325
326         public void beforeCompletion() {
327         }
328
329         public void afterCompletion(int status) {
330             if (status == Status.STATUS_COMMITTED) {
331                 if (worker.isCancelled()) {
332                     log.trace("Worker is already cancelled, not scheduleAtFixedRate");
333                     return;
334                 }
335                 try {
336                     getTimer().scheduleAtFixedRate(worker, time, period);
337                 } catch (Exception JavaDoc e) {
338                     log.warn("Couldn't scheduleAtFixedRate worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
339                 }
340             }
341         }
342     }
343
344     private class PlaybackImpl implements Playback {
345
346         private final UserTaskFactory userTaskFactory;
347
348         private final Collection JavaDoc workInfos = new ArrayList JavaDoc();
349
350         public PlaybackImpl(UserTaskFactory userTaskFactory) {
351             this.userTaskFactory = userTaskFactory;
352         }
353
354         public void schedule(WorkInfo workInfo) {
355             Runnable JavaDoc userTask = userTaskFactory.newTask(workInfo.getId());
356             ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, ThreadPooledTimer.this);
357             ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, ThreadPooledTimer.this);
358             workInfo.initialize(worker, executorTask);
359             if (workInfo.getPeriod() == null) {
360                 getTimer().schedule(worker, workInfo.getTime());
361             } else if (!workInfo.getAtFixedRate()) {
362                 getTimer().schedule(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
363             } else {
364                 getTimer().scheduleAtFixedRate(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
365             }
366             addWorkInfo(workInfo);
367             workInfos.add(workInfo);
368         }
369
370         public Collection JavaDoc getWorkInfos() {
371             return workInfos;
372         }
373
374     }
375
376 }
377
Popular Tags