KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > sapia > taskman > TaskManager


1 package org.sapia.taskman;
2
3 import org.sapia.taskman.transaction.Transaction;
4 import org.sapia.taskman.transaction.TransactionIdFactory;
5 import org.sapia.taskman.transaction.TransactionListener;
6
7 import java.io.IOException JavaDoc;
8 import java.io.ObjectInput JavaDoc;
9 import java.io.ObjectOutput JavaDoc;
10
11 import java.util.ArrayList JavaDoc;
12 import java.util.Collections JavaDoc;
13 import java.util.List JavaDoc;
14
15 /**
16  * An instance of this class executes <code>Task</code> instances. An new task
17  * is added to the task manager through a <code>TaskDescriptor</code>. The
18  * following excerpt shows how to use a task manager:
19  *
20  * <pre>
21  *
22  * // The following adds a periodic task that is executed every 5 seconds.
23  *
24  * TaskManager mgr = new TaskManager();
25  * TaskDescriptor periodicTask = new PeriodicTaskDescriptor(&quot;periodic&quot;, 5000,
26  * new MyTask());
27  * mgr.execTaskFor(periodicTask);
28  * mgr.start();
29  *
30  * // Tasks can be added after the task manager has started:
31  *
32  * TaskDescriptor transientTask = new TransientTaskDescriptor(&quot;transient task&quot;,
33  * new MyTask());
34  * mgr.execTaskFor(transientTask);
35  *
36  * // The above adds a &quot;transient task&quot; - a task that will be executed only once.
37  *
38  *
39  * </pre>
40  *
41  * The above code demonstrates how to add asynchronous tasks: tasks are added,
42  * and "eventually" executed. Yet, tasks can also be executed synchronously:
43  *
44  * <pre>
45  *
46  *
47  * TaskManager mgr = new TaskManager();
48  * mgr.start();
49  * mgr.execSyncTask(&quot;synchronous&quot;, new MyTask());
50  *
51  * // or
52  *
53  * mgr.execSyncTask(&quot;synchronous&quot;, new MyTask(), myOutput);
54  *
55  *
56  * </pre>
57  *
58  * In the above snippet, the second case shows that it is possible for the
59  * caller to pass an instance of <code>TaskOutput</code> as a parameter. This
60  * allows a caller to trap task output information and send it where it pleases.
61  *
62  * <p>
63  * A task manager internally processes its list of tasks to execute and executes
64  * them one by one. Once the list has been processed, the task manager goes to
65  * sleep (in order not to consume precious cpu cycles if no task needs being
66  * executed for a given while) - this sleeping delay can be configured through
67  * the <code>setRunInterval()</code> method.
68  * <p>
69  * More specifically, the task manager behaves in the following way: if its task
70  * list is empty, the task manager waits for a new task to be added before
71  * waking up; if the task list is not empty, the task manager waits for the next
72  * time a task is ready for execution, or for the addition of a new task.
73  *
74  * <p>
75  * In addition, this class has support for "transactions": a series of tasks is executed
76  * has part of the same transaction; execution of tasks that are part of the same
77  * transaction is started when the <code>commit()</code> method is called on the corresponding
78  * transaction object:
79  * <pre>
80  * MyTransactionListener listener = new MyTransactionListener();
81  * Transaction tx = tm.newTransaction(listener);
82  * tm.execSyncTask("t1", task1);
83  * tm.execSyncTask("t2", task2);
84  * tm.execSyncTask("t3", task3);
85  * tx.commit();
86  * </pre>
87  * In the above snippet, tasks are executed only <b>once</b> <code>commit()</code> is called.
88  * The outcome of the transaction (success or failure) can be checked by using a
89  * <code>TransactionListener</code> implementation.
90  * <p>
91  * The transactions in question here are rather "pseudo-transactions": they cannot be rolled back, so
92  * behavior is not "ACID". Therefore, such transactions are more logical groups of tasks executed
93  * sequentially. The party that triggers the task is interested in the failure or success of these
94  * tasks as a group. For example, in the case of a set of tasks pertaining to a deployment, such a
95  * deployment could consist of 1) upload, 2) remove current deployment, 3) start new deployment.
96  * Provided that one of the tasks fails, the whole procedure should be interpreted as a failure.
97  * Since such transactions are not made of atomic deterministic tasks (as the read and write of databases),
98  * recovery behavior is therefore application-specific. In our deployment case, a DeploymentListener could
99  * be implemented that would provide the recovery logic pertaining to failed deployments.
100  *
101  * @author Yanick Duchesne 15-Apr-2003
102  * <dl>
103  * <dt><b>Copyright: </b>
104  * <dd>Copyright &#169; 2002-2004 <a
105  * HREF="http://www.sapia-oss.org">Sapia Open Source Software </a>. All
106  * Rights Reserved.</dd>
107  * </dt>
108  * <dt><b>License: </b>
109  * <dd>Read the license.txt file of the jar or visit the <a
110  * HREF="http://www.sapia-oss.org/license.html">license page </a> at the
111  * Sapia OSS web site</dd>
112  * </dt>
113  * </dl>
114  */

115 public class TaskManager extends Thread JavaDoc implements java.io.Externalizable JavaDoc {
116
117   static final long serialVersionUID = 1L;
118
119   public static final long RUN_INTERVAL = 250;
120   private static final byte RUNNING = 0;
121   private static final byte SHUTDOWN_REQUESTED = 1;
122   private static final byte SHUT_DOWN = 2;
123   protected List JavaDoc _transactions = Collections
124                                                        .synchronizedList(new ArrayList JavaDoc());
125   private long _interval = RUN_INTERVAL;
126   private transient byte _status;
127   private transient ThreadLocal JavaDoc _currentTx = new ThreadLocal JavaDoc();
128   private transient TransactionIdFactory _txIdFactory = new DefaultTxIdFactory();
129
130   /**
131    * Creates a new <code>TaskManager</code>.
132    */

133   public TaskManager() {
134     super("TaskManager");
135   }
136
137   /**
138    * Creates an instance of this class with the given name.
139    *
140    * @param name
141    * a name.
142    */

143   public TaskManager(String JavaDoc name) {
144     super(name);
145   }
146
147   /**
148    * Adds the given list of task descriptors to this instance. <p/>If the
149    * calling thread is associated with a not-committed transaction, the given
150    * list of tasks is added to that transaction.
151    *
152    * @param a
153    * <code>List</code> of <code>TaskDescriptor</code>s.
154    */

155   public void addTaskDescriptors(List JavaDoc tasks) {
156     TaskmanTransaction tx = (TaskmanTransaction) _currentTx.get();
157
158     if((tx != null) && (tx.getStatus() == Transaction.STATUS_INITIAL)) {
159       for(int i = 0; i < tasks.size(); i++) {
160         tx.registerTask((TaskDescriptor) tasks.get(i));
161       }
162     } else {
163       for(int i = 0; i < tasks.size(); i++) {
164         _transactions.add(wrap((TaskDescriptor) tasks.get(i)));
165       }
166     }
167   }
168
169   /**
170    * Returns this instance's internal list of transactions in another list.
171    *
172    * @return the <code>List</code> of current <code>Transactions</code> that
173    * this instance holds
174    */

175   public synchronized List JavaDoc getTransactions() {
176     synchronized(_transactions) {
177       List JavaDoc toReturn = new ArrayList JavaDoc(_transactions.size());
178       toReturn.addAll(_transactions);
179
180       return toReturn;
181     }
182   }
183
184   /**
185    * Sets the interval at which this instance will wake up to process its list
186    * of internal tasks.
187    *
188    * @param millis
189    * a time interval in milliseconds.
190    */

191   public synchronized void setRunInterval(long millis) {
192     _interval = millis;
193   }
194
195   /**
196    * Asynchronously executes the task corresponding to the given descriptor.
197    * <p/>If the calling thread is associated with a not-committed transaction,
198    * the given task is added to that transaction.
199    *
200    * @param desc
201    * a <code>TaskDescriptor</code>
202    */

203   public void execTaskFor(TaskDescriptor desc) {
204     addTaskDescriptor(desc.setRoot(true));
205   }
206
207   /**
208    * Executes the given task synchronously. <p/>If the calling thread is
209    * associated with a not-committed transaction, the given task is added to
210    * that transaction.
211    *
212    * @param name
213    * the name of the task - will be passed to the
214    * <code>TaskOutput</code> that will be associated with the task.
215    * @param t
216    * a <code>Task</code>.
217    */

218   public void execSyncTask(String JavaDoc name, Task t) {
219     execSyncTask(name, t, newTaskOutput(name));
220   }
221
222   /**
223    * Executes the given task synchronously. <p/>If the calling thread is
224    * associated with a not-committed transaction, the given task is added to
225    * that transaction.
226    *
227    * @param name
228    * the name of the task.
229    * @param t
230    * a <code>Task</code>.
231    * @param out
232    * a <code>TaskOutput</code>.
233    */

234   public void execSyncTask(String JavaDoc name, Task t, TaskOutput out) {
235     // trying to get current tx
236
// or tx of currently executed task
237
TaskmanTransaction tx = (TaskmanTransaction) _currentTx.get();
238
239     if((tx != null) && (tx.getStatus() == Transaction.STATUS_INITIAL)) {
240       // creating new task descriptor for task
241
TransientTaskDescriptor td = new TransientTaskDescriptor(name, 0, t);
242       td.setTaskOutput(out/* new TxTaskOutput(out, tx) */);
243
244       // adding to tx
245
tx.registerTask(td);
246     } else {
247       // not part of tx; exec right away
248
TransientTaskDescriptor desc = new TransientTaskDescriptor(name, 0, t);
249       tx = wrap(desc);
250       desc.setTaskOutput(out/* new TxTaskOutput(out, tx) */);
251       desc.setRoot(true);
252       tx.execute();
253     }
254   }
255
256   /**
257    * Executes the given task asynchronously. The calling thread will return
258    * immediately, the task will be executed as soon as possible, logging its
259    * output to the given <code>TaskOutput</code>.
260    * <p>
261    * The calling thread can thus receive task output asynchronously. <p/>If the
262    * calling thread is associated with a not-committed transaction, the given
263    * task is added to that transaction.
264    *
265    * @param name
266    * the name of the task.
267    * @param t
268    * a <code>Task</code>.
269    * @param out
270    * a <code>TaskOutput</code>.
271    */

272   public void execAsyncTask(String JavaDoc name, Task t, TaskOutput out) {
273     // trying to get current tx
274
// or tx of currently executed task
275
TaskmanTransaction tx = (TaskmanTransaction) _currentTx.get();
276
277     if((tx != null) && (tx.getStatus() == Transaction.STATUS_INITIAL)) {
278       // creating new task descriptor for task
279
TransientTaskDescriptor td = new TransientTaskDescriptor(name, 0, t);
280       td.setTaskOutput(new TxTaskOutput(out, tx));
281
282       // adding to tx
283
tx.registerTask(td);
284     } else {
285       // not part of tx.
286
TransientTaskDescriptor desc = new TransientTaskDescriptor(name, 0, t);
287       desc.setTaskOutput(out);
288       addTaskDescriptor(desc.setRoot(true));
289     }
290   }
291
292   /**
293    * @see #newTransaction(TransactionListener)
294    */

295   public Transaction newTransaction() {
296     return newTransaction(null);
297   }
298
299   /**
300    * Internally creates a new transaction and associates it with the current
301    * thread. <p/>
302    *
303    * @param listener
304    * a <code>TransactionListener</code>.
305    * @return a <code>Transaction</code>
306    */

307   public Transaction newTransaction(TransactionListener listener) {
308     if(_currentTx.get() != null) {
309       if(((TaskmanTransaction) _currentTx.get()).getStatus() != Transaction.STATUS_COMMITTED) {
310         throw new IllegalStateException JavaDoc(
311             "Current thread already registered with existing transaction");
312       }
313     }
314
315     TaskmanTransaction tx = new TaskmanTransaction(_txIdFactory.newTxId(), this, listener);
316     _currentTx.set(tx);
317     _transactions.add(tx);
318
319     return tx;
320   }
321
322   /**
323    * Registers the current thread with the given transaction.
324    *
325    * @param tx
326    * a <code>Transaction</code>
327    */

328   public void registerTransaction(Transaction tx) {
329     if(!(tx instanceof TaskmanTransaction)) {
330       throw new IllegalArgumentException JavaDoc("Transaction is not an instance of: "
331           + TaskmanTransaction.class.getName());
332     }
333
334     if(_currentTx.get() != null) {
335       if(((TaskmanTransaction) _currentTx.get()).getStatus() != Transaction.STATUS_COMMITTED) {
336         throw new IllegalStateException JavaDoc(
337             "Current thread already registered with existing transaction");
338       }
339     }
340
341     _currentTx.set(tx);
342   }
343
344   /**
345    * Unregisters the current thread from its transaction.
346    */

347   public void unregisterTransaction() {
348     _currentTx.set(null);
349   }
350
351   /**
352    * @return the <code>Transaction</code> to which the current thread is
353    * associated.
354    */

355   public Transaction currentTransaction() {
356     Transaction tx = (Transaction) _currentTx.get();
357
358     if(tx == null) {
359       throw new IllegalStateException JavaDoc(
360           "Current thread not registered with a transaction");
361     }
362
363     return tx;
364   }
365
366   /**
367    * @return <code>true</code> if the current thread is "in transaction".
368    */

369   public boolean isInTransaction() {
370     if(_currentTx.get() == null) {
371       return false;
372     } else {
373       return ((TaskmanTransaction) _currentTx.get()).getStatus() != Transaction.STATUS_COMMITTED;
374     }
375   }
376
377   /**
378    * Shuts down this instance. This method waits for the currently executing
379    * task to complete, then exits.
380    */

381   public synchronized void shutdown() {
382     _status = SHUTDOWN_REQUESTED;
383     notifyAll();
384
385     while((_status != SHUT_DOWN) && (_transactions.size() > 0)) {
386       try {
387         wait();
388       } catch(InterruptedException JavaDoc e) {
389         break;
390       }
391     }
392   }
393
394   /**
395    * @see java.io.Externalizable#readExternal(ObjectInput)
396    */

397   public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc,
398       ClassNotFoundException JavaDoc {
399     _transactions = (List JavaDoc) in.readObject();
400     TaskmanTransaction tx;
401     for(int i = 0; i < _transactions.size(); i++) {
402       tx = (TaskmanTransaction) _transactions.get(i);
403       tx.setTaskManager(this);
404     }
405     _interval = in.readLong();
406     _txIdFactory = new DefaultTxIdFactory();
407     _currentTx = new ThreadLocal JavaDoc();
408   }
409   
410   /**
411    * @param factory the <code>TransactionIdFactory</code> that this
412    * instance uses.
413    */

414   public void setTransactionIdFactory(TransactionIdFactory factory){
415     _txIdFactory = factory;
416   }
417
418   /**
419    * @see java.io.Externalizable#writeExternal(ObjectOutput)
420    */

421   public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
422     out.writeObject(_transactions);
423     out.writeLong(_interval);
424   }
425
426   /**
427    * This template method is called every time a <code>TaskOutput</code> is
428    * created and associated with a <code>Task</code>.
429    *
430    * @param taskName
431    * the name of the task for which to create a <code>TaskOutput</code>.
432    */

433   protected TaskOutput newTaskOutput(String JavaDoc taskName) {
434     return new DefaultTaskOutput(taskName, DefaultTaskOutput.DEBUG);
435   }
436
437   void execSyncTask(String JavaDoc name, Task t, TaskContext ctx) {
438     TransientTaskDescriptor desc = new TransientTaskDescriptor(name, 0, t);
439     ctx.getTaskOutput().setTaskName(name);
440     desc.setTaskOutput(ctx.getTaskOutput());
441     desc.setContextVals(ctx.getVals());
442     TaskmanTransaction tx = ctx.getTransaction();
443     tx.registerTask(desc);
444     tx.execute();
445   }
446
447   void execAsyncTask(String JavaDoc name, Task t, TaskContext ctx) {
448     TransientTaskDescriptor desc = new TransientTaskDescriptor(name, 0, t);
449     desc.setContextVals(ctx.getVals());
450     ctx.getTransaction().registerTask(desc);
451   }
452
453   /**
454    * @see java.lang.Thread#run()
455    */

456   public void run() {
457     TaskmanTransaction current;
458
459     while(true) {
460       long next = -1;
461
462       for(int i = 0; i < _transactions.size(); i++) {
463         current = (TaskmanTransaction) _transactions.get(i);
464
465         if(current.getStatus() == Transaction.STATUS_COMMITTED) {
466           _transactions.remove(i--);
467         } else if(current.getStatus() == Transaction.STATUS_INITIAL) {
468           // noop
469
} else {
470           if(System.currentTimeMillis() >= current.nextExecTime()) {
471             current.execute();
472
473             if(next == -1) {
474               next = current.nextExecTime();
475             } else if(current.nextExecTime() < next) {
476               next = current.nextExecTime();
477             }
478
479             // giving a chance to other threads...
480
Thread.yield();
481           }
482         }
483       }
484
485       if(_status == SHUTDOWN_REQUESTED) {
486         notifyShutDown();
487
488         break;
489       }
490
491       try {
492         waitForTx(next);
493
494         if(_status == SHUTDOWN_REQUESTED) {
495           notifyShutDown();
496
497           break;
498         }
499       } catch(InterruptedException JavaDoc e) {
500         notifyShutDown();
501
502         break;
503       }
504     }
505   }
506
507   protected synchronized void addTaskDescriptor(TaskDescriptor desc) {
508     TaskmanTransaction tx = (TaskmanTransaction) _currentTx.get();
509
510     if((tx != null) && (tx.getStatus() == Transaction.STATUS_INITIAL)) {
511       tx.registerTask(desc);
512     } else {
513       tx = wrap(desc);
514       _transactions.add(tx);
515       tx.commit();
516     }
517   }
518
519   synchronized void wakeUp() {
520     notify();
521   }
522
523   private synchronized void waitForTx(long nextTime)
524       throws InterruptedException JavaDoc {
525     long start = System.currentTimeMillis();
526
527     if(_transactions.size() == 0) {
528       while((_transactions.size() == 0) && (_status != SHUTDOWN_REQUESTED)) {
529         wait();
530       }
531     } else if(nextTime <= 0) {
532       wait(_interval);
533     } else {
534       long delay = nextTime - System.currentTimeMillis();
535
536       if(delay <= 0) {
537         wait(_interval);
538       } else {
539         wait(delay);
540       }
541     }
542   }
543
544   private TaskmanTransaction wrap(TaskDescriptor td) {
545     TaskmanTransaction tx = new TaskmanTransaction(_txIdFactory.newTxId(), this);
546     tx.registerTask(td);
547
548     return tx;
549   }
550
551   private synchronized void notifyShutDown() {
552     _status = SHUT_DOWN;
553     notifyAll();
554   }
555 }
556
Popular Tags