KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > outerj > daisy > doctaskrunner > serverimpl > CommonDocumentTaskManager


1 /*
2  * Copyright 2004 Outerthought bvba and Schaubroeck nv
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.outerj.daisy.doctaskrunner.serverimpl;
17
18 import org.outerj.daisy.doctaskrunner.*;
19 import org.outerj.daisy.doctaskrunner.commonimpl.*;
20 import org.outerj.daisy.repository.Repository;
21 import org.outerj.daisy.repository.ExtensionRegistrar;
22 import org.outerj.daisy.repository.VariantKey;
23 import org.outerj.daisy.repository.ExtensionProvider;
24 import org.outerj.daisy.repository.user.Role;
25 import org.outerj.daisy.jdbcutil.JdbcHelper;
26 import org.outerj.daisy.jdbcutil.SqlCounter;
27 import org.outerj.daisy.backuplock.SuspendableProcess;
28 import org.outerj.daisy.backuplock.SuspendForBackupRegistrar;
29 import org.apache.avalon.framework.service.Serviceable;
30 import org.apache.avalon.framework.service.ServiceManager;
31 import org.apache.avalon.framework.service.ServiceException;
32 import org.apache.avalon.framework.activity.Initializable;
33 import org.apache.avalon.framework.activity.Disposable;
34 import org.apache.avalon.framework.activity.Startable;
35 import org.apache.avalon.framework.logger.LogEnabled;
36 import org.apache.avalon.framework.logger.Logger;
37 import org.apache.avalon.framework.configuration.Configurable;
38 import org.apache.avalon.framework.configuration.Configuration;
39 import org.apache.avalon.framework.configuration.ConfigurationException;
40
41 import javax.sql.DataSource JavaDoc;
42 import java.sql.*;
43 import java.util.*;
44 import java.util.Date JavaDoc;
45
46 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
47 import EDU.oswego.cs.dl.util.concurrent.Sync;
48
49 /**
50  * @avalon.component version="1.0" name="documentTaskManager" lifestyle="singleton"
51  */

52 public class CommonDocumentTaskManager implements Serviceable, Initializable, LogEnabled, Disposable, Startable,
53         Configurable, SuspendableProcess {
54     private ServiceManager serviceManager;
55     private DataSource JavaDoc dataSource;
56     private ExtensionRegistrar extensionRegistrar;
57     private SuspendForBackupRegistrar suspendForBackupRegistrar;
58     private JdbcHelper jdbcHelper;
59     private Logger logger;
60     private SqlCounter taskCounter;
61     private Map tasksById = Collections.synchronizedMap(new HashMap());
62     private ExtensionProvider extensionProvider = new MyExtensionProvider();
63     private long taskJanitorTaskMaxAge;
64     private long taskJanitorRunInterval;
65     private Thread JavaDoc janitorThread;
66     private WriterPreferenceReadWriteLock suspendLock = new WriterPreferenceReadWriteLock();
67
68     /**
69      * @avalon.dependency key="datasource" type="javax.sql.DataSource"
70      * @avalon.dependency key="extensionRegistrar" type="org.outerj.daisy.repository.ExtensionRegistrar"
71      * @avalon.dependency key="suspendForBackupRegistrar" type="org.outerj.daisy.backuplock.SuspendForBackupRegistrar"
72      */

73     public void service(ServiceManager serviceManager) throws ServiceException {
74         this.serviceManager = serviceManager;
75         dataSource = (DataSource JavaDoc)serviceManager.lookup("datasource");
76         extensionRegistrar = (ExtensionRegistrar)serviceManager.lookup("extensionRegistrar");
77         suspendForBackupRegistrar = (SuspendForBackupRegistrar)serviceManager.lookup("suspendForBackupRegistrar");
78     }
79
80     public void enableLogging(Logger logger) {
81         this.logger = logger;
82     }
83
84     public void configure(Configuration configuration) throws ConfigurationException {
85         this.taskJanitorTaskMaxAge = configuration.getChild("taskJanitor").getAttributeAsLong("maxAge");
86         this.taskJanitorRunInterval = configuration.getChild("taskJanitor").getAttributeAsLong("runInterval");
87     }
88
89     public void initialize() throws Exception JavaDoc {
90         jdbcHelper = JdbcHelper.getInstance(dataSource, logger);
91         taskCounter = new SqlCounter("task_sequence", dataSource, logger);
92
93         markRunningTasksAsInterrupted();
94         extensionRegistrar.registerExtension("DocumentTaskManager", extensionProvider);
95
96         // The document task manager registers itself for suspending its active while
97
// a backup is being taken. Only the actual execution of the scripts will be
98
// blocked while in suspended state, other operations will keep working
99
// (including adding new document tasks).
100
suspendForBackupRegistrar.register("Document Task Manager", this);
101     }
102
103     public void dispose() {
104         extensionRegistrar.unregisterExtension(extensionProvider);
105         suspendForBackupRegistrar.unregister(this);
106         serviceManager.release(extensionRegistrar);
107         serviceManager.release(suspendForBackupRegistrar);
108         serviceManager.release(dataSource);
109     }
110
111     public void start() throws Exception JavaDoc {
112         janitorThread = new Thread JavaDoc(new ExpiredTasksJanitor(), "Daisy Expired Document Tasks Janitor");
113         janitorThread.start();
114     }
115
116     public void stop() throws Exception JavaDoc {
117         janitorThread.interrupt();
118         try { janitorThread.join(); } catch (InterruptedException JavaDoc e) {}
119     }
120
121     class MyExtensionProvider implements ExtensionProvider {
122         public Object JavaDoc createExtension(Repository repository) {
123             return new DocumentTaskManagerImpl(CommonDocumentTaskManager.this, repository);
124         }
125     }
126
127     public boolean suspendExecution(long msecs) throws InterruptedException JavaDoc {
128         return suspendLock.writeLock().attempt(msecs);
129     }
130
131     public void resumeExecution() {
132         suspendLock.writeLock().release();
133     }
134
135     private void markRunningTasksAsInterrupted() throws Exception JavaDoc {
136         Connection conn = null;
137         PreparedStatement stmt = null;
138         try {
139             conn = dataSource.getConnection();
140             stmt = conn.prepareStatement("update document_tasks set state = ? where state = ? or state = ?");
141             stmt.setString(1, TaskState.INTERRUPTED_BY_SHUTDOWN.getCode());
142             stmt.setString(2, TaskState.INITIALISING.getCode());
143             stmt.setString(3, TaskState.RUNNING.getCode());
144             int updatedRows = stmt.executeUpdate();
145             if (logger.isDebugEnabled())
146                 logger.debug("Number of tasks marked as 'interrupted by shutdown': " + updatedRows);
147         } catch (Throwable JavaDoc e) {
148             throw new Exception JavaDoc("Error while marking tasks as 'interrupted by shutdown'", e);
149         } finally {
150             jdbcHelper.closeStatement(stmt);
151             jdbcHelper.closeConnection(conn);
152         }
153     }
154
155     public long runTask(DocumentSelection documentSelection, TaskSpecification taskSpecification, Repository repository) throws TaskException {
156         long taskId;
157         {
158             Connection conn = null;
159             PreparedStatement stmt = null;
160             try {
161                 taskId = taskCounter.getNextId();
162                 java.util.Date JavaDoc now = new java.util.Date JavaDoc();
163
164                 conn = dataSource.getConnection();
165                 stmt = conn.prepareStatement("insert into document_tasks(id, owner, state, started_at, progress, description, script, scriptlanguage) values(?,?,?,?,?,?,?,?)");
166                 stmt.setLong(1, taskId);
167                 stmt.setLong(2, repository.getUserId());
168                 stmt.setString(3, TaskState.INITIALISING.getCode());
169                 stmt.setTimestamp(4, new Timestamp(now.getTime()));
170                 stmt.setString(5, "initialising");
171                 stmt.setString(6, taskSpecification.getDescription());
172                 stmt.setString(7, taskSpecification.getScript());
173                 stmt.setString(8, taskSpecification.getScriptLanguage());
174                 stmt.execute();
175             } catch (Throwable JavaDoc e) {
176                 throw new TaskException("Error inserting task record.", e);
177             } finally {
178                 jdbcHelper.closeStatement(stmt);
179                 jdbcHelper.closeConnection(conn);
180             }
181         }
182
183         try {
184             TaskContextImpl taskContext = new TaskContextImpl(taskId);
185             TaskRunner taskRunner = new TaskRunner(documentSelection, taskSpecification, taskContext, repository);
186             TaskHolder taskHolder = new TaskHolder(taskRunner, taskContext, repository.getUserId());
187             tasksById.put(new Long JavaDoc(taskId), taskHolder);
188
189             Thread JavaDoc thread = new Thread JavaDoc(taskRunner);
190             thread.start();
191         } catch (Throwable JavaDoc e) {
192             tasksById.remove(new Long JavaDoc(taskId));
193             Connection conn = null;
194             PreparedStatement stmt = null;
195             try {
196                 conn = dataSource.getConnection();
197                 stmt = conn.prepareStatement("delete from document_tasks where id = ?");
198                 stmt.setLong(1, taskId);
199                 stmt.execute();
200             } catch (Exception JavaDoc e2) {
201                 throw new TaskException("Problem starting task and problem cleaning up afterwards: " + e.toString(), e2);
202             } finally {
203                 jdbcHelper.closeStatement(stmt);
204                 jdbcHelper.closeConnection(conn);
205             }
206             throw new TaskException("Problem starting task.", e);
207         }
208
209         return taskId;
210     }
211
212     static class TaskHolder {
213         private final TaskRunner taskRunner;
214         private final TaskContextImpl taskContext;
215         private final long ownerId;
216
217         public TaskHolder(TaskRunner taskRunner, TaskContextImpl taskContext, long ownerId) {
218             this.taskRunner = taskRunner;
219             this.taskContext = taskContext;
220             this.ownerId = ownerId;
221         }
222
223         public TaskRunner getTaskRunner() {
224             return taskRunner;
225         }
226
227         public TaskContextImpl getTaskContext() {
228             return taskContext;
229         }
230
231         public long getOwnerId() {
232             return ownerId;
233         }
234     }
235
236     class TaskContextImpl implements TaskContext {
237         private boolean interrupted = false;
238         private long taskId;
239
240         public TaskContextImpl(long taskId) {
241             this.taskId = taskId;
242         }
243
244         public void interrupt() {
245             this.interrupted = true;
246         }
247
248         public boolean isInterrupted() {
249             return interrupted;
250         }
251
252         public void setProgress(String JavaDoc progress) {
253             Connection conn = null;
254             PreparedStatement stmt = null;
255             try {
256                 conn = dataSource.getConnection();
257                 stmt = conn.prepareStatement("update document_tasks set progress = ? where id = ?");
258                 stmt.setString(1, progress);
259                 stmt.setLong(2, taskId);
260                 stmt.execute();
261             } catch (Throwable JavaDoc e) {
262                 throw new RuntimeException JavaDoc("Unexpected error trying to update task progress.", e);
263             } finally {
264                 jdbcHelper.closeStatement(stmt);
265                 jdbcHelper.closeConnection(conn);
266             }
267         }
268
269         public void initDocumentResults(VariantKey[] keys) {
270             Connection conn = null;
271             PreparedStatement stmt = null;
272             try {
273                 conn = dataSource.getConnection();
274                 jdbcHelper.startTransaction(conn);
275
276                 stmt = conn.prepareStatement("insert into task_doc_details(task_id, doc_id, branch_id, lang_id, seqnr, state) values(?,?,?,?,?,?)");
277                 stmt.setLong(1, taskId);
278                 stmt.setString(6, DocumentExecutionState.WAITING.getCode());
279
280                 for (int i = 0; i < keys.length; i++) {
281                     stmt.setLong(2, keys[i].getDocumentId());
282                     stmt.setLong(3, keys[i].getBranchId());
283                     stmt.setLong(4, keys[i].getLanguageId());
284                     stmt.setLong(5, i);
285                     stmt.execute();
286                 }
287                 conn.commit();
288             } catch (Throwable JavaDoc e) {
289                 jdbcHelper.rollback(conn);
290                 throw new RuntimeException JavaDoc("Unexpected error trying to initialise document states.", e);
291             } finally {
292                 jdbcHelper.closeStatement(stmt);
293                 jdbcHelper.closeConnection(conn);
294             }
295         }
296
297         public void setDocumentResult(VariantKey key, DocumentExecutionState state, String JavaDoc details) {
298             Connection conn = null;
299             PreparedStatement stmt = null;
300             try {
301                 conn = dataSource.getConnection();
302                 stmt = conn.prepareStatement("update task_doc_details set state = ?, details = ? where task_id = ? and doc_id = ? and branch_id = ? and lang_id = ?");
303                 stmt.setString(1, state.getCode());
304                 stmt.setString(2, details);
305                 stmt.setLong(3, taskId);
306                 stmt.setLong(4, key.getDocumentId());
307                 stmt.setLong(5, key.getBranchId());
308                 stmt.setLong(6, key.getLanguageId());
309                 stmt.execute();
310             } catch (Throwable JavaDoc e) {
311                 throw new RuntimeException JavaDoc("Unexpected error trying to update document state.", e);
312             } finally {
313                 jdbcHelper.closeStatement(stmt);
314                 jdbcHelper.closeConnection(conn);
315             }
316         }
317
318         public void setTaskState(TaskState state, String JavaDoc progress, String JavaDoc details) {
319             Connection conn = null;
320             PreparedStatement stmt = null;
321             try {
322                 conn = dataSource.getConnection();
323                 stmt = conn.prepareStatement("update document_tasks set state = ?, progress = ?, details = ?, finished_at = ? where id = ?");
324                 stmt.setString(1, state.getCode());
325                 stmt.setString(2, progress);
326                 stmt.setString(3, details);
327                 stmt.setTimestamp(4, state.isStoppedState() ? new Timestamp(System.currentTimeMillis()) : null);
328                 stmt.setLong(5, taskId);
329                 stmt.execute();
330             } catch (Throwable JavaDoc e) {
331                 throw new RuntimeException JavaDoc("Unexpected error trying to update task state.", e);
332             } finally {
333                 jdbcHelper.closeStatement(stmt);
334                 jdbcHelper.closeConnection(conn);
335             }
336         }
337
338         public void cleanup() {
339             tasksById.remove(new Long JavaDoc(taskId));
340         }
341
342         public Sync getExecutionLock() {
343             return suspendLock.readLock();
344         }
345     }
346
347     private static final String JavaDoc SELECT_TASK = "select id, scriptlanguage, owner, started_at, finished_at, state, progress, description, script, details from document_tasks";
348
349     public Task getTask(long taskId, Repository repository) throws TaskException {
350         Connection conn = null;
351         PreparedStatement stmt = null;
352         try {
353             conn = dataSource.getConnection();
354             stmt = conn.prepareStatement(SELECT_TASK + " where id = ?");
355             stmt.setLong(1, taskId);
356             ResultSet rs = stmt.executeQuery();
357
358             if (!rs.next())
359                 throw new TaskException("No task found with ID " + taskId);
360
361             if (!repository.isInRole(Role.ADMINISTRATOR) && rs.getLong("owner") != repository.getUserId())
362                 throw new TaskException("Access denied to task with ID " + taskId);
363
364             return getTaskFromResultSet(rs);
365         } catch (Throwable JavaDoc e) {
366             if (e instanceof TaskException)
367                 throw (TaskException)e;
368
369             throw new TaskException("Error loading task with ID " + taskId, e);
370         } finally {
371             jdbcHelper.closeStatement(stmt);
372             jdbcHelper.closeConnection(conn);
373         }
374     }
375
376     public Tasks getTasks(Repository repository) throws TaskException {
377         Connection conn = null;
378         PreparedStatement stmt = null;
379         try {
380             conn = dataSource.getConnection();
381             StringBuffer JavaDoc query = new StringBuffer JavaDoc(SELECT_TASK);
382             if (!repository.isInRole(Role.ADMINISTRATOR))
383                 query.append(" where owner = ?");
384             stmt = conn.prepareStatement(query.toString());
385             if (!repository.isInRole(Role.ADMINISTRATOR))
386                 stmt.setLong(1, repository.getUserId());
387             ResultSet rs = stmt.executeQuery();
388
389             List tasks = new ArrayList();
390
391             while (rs.next()) {
392                 tasks.add(getTaskFromResultSet(rs));
393             }
394
395             return new TasksImpl((Task[])tasks.toArray(new Task[tasks.size()]));
396         } catch (Throwable JavaDoc e) {
397             throw new TaskException("Error loading tasks.", e);
398         } finally {
399             jdbcHelper.closeStatement(stmt);
400             jdbcHelper.closeConnection(conn);
401         }
402     }
403
404     private TaskImpl getTaskFromResultSet(ResultSet rs) throws SQLException {
405         long taskId = rs.getLong("id");
406         String JavaDoc description = rs.getString("description");
407         TaskState state = TaskState.getByCode(rs.getString("state"));
408         long ownerId = rs.getLong("owner");
409         String JavaDoc progress = rs.getString("progress");
410         String JavaDoc details = rs.getString("details");
411         String JavaDoc script = rs.getString("script");
412         String JavaDoc scriptLanguage = rs.getString("scriptlanguage");
413         Date JavaDoc startedAt = rs.getTimestamp("started_at");
414         Date JavaDoc finishedAt = rs.getTimestamp("finished_at");
415
416         TaskImpl task = new TaskImpl(taskId, description, state, ownerId, progress, details, script, scriptLanguage,
417                 startedAt, finishedAt);
418
419         return task;
420     }
421
422     public void deleteTask(long taskId, Repository repository) throws TaskException {
423         Connection conn = null;
424         PreparedStatement stmt = null;
425         try {
426             conn = dataSource.getConnection();
427             jdbcHelper.startTransaction(conn);
428
429             stmt = conn.prepareStatement("select state, owner from document_tasks where id = ? " + jdbcHelper.getSharedLockClause());
430             stmt.setLong(1, taskId);
431             ResultSet rs = stmt.executeQuery();
432
433             if (!rs.next())
434                 throw new TaskException("No task found with ID " + taskId);
435
436             if (!repository.isInRole(Role.ADMINISTRATOR) && rs.getLong("owner") != repository.getUserId())
437                 throw new TaskException("Access denied to task with ID " + taskId);
438
439             TaskState state = TaskState.getByCode(rs.getString("state"));
440             if (state == TaskState.INITIALISING || state == TaskState.RUNNING)
441                 throw new TaskException("Cannot delete task with ID " + taskId + " because it has not yet ended.");
442
443             stmt.close();
444
445             deleteTask(taskId, conn);
446
447             conn.commit();
448         } catch (Throwable JavaDoc e) {
449             jdbcHelper.rollback(conn);
450             if (e instanceof TaskException)
451                 throw (TaskException)e;
452
453             throw new TaskException("Problem deleting task with ID " + taskId, e);
454         } finally {
455             jdbcHelper.closeStatement(stmt);
456             jdbcHelper.closeConnection(conn);
457         }
458     }
459
460     /**
461      * Pefroms actual deletion of task, assumes necessary locks are taken and transaction is started.
462      */

463     private void deleteTask(long taskId, Connection conn) throws SQLException {
464         PreparedStatement stmt = null;
465         try {
466             stmt = conn.prepareStatement("delete from task_doc_details where task_id = ?");
467             stmt.setLong(1, taskId);
468             stmt.execute();
469             stmt.close();
470
471             stmt = conn.prepareStatement("delete from document_tasks where id = ?");
472             stmt.setLong(1, taskId);
473             stmt.execute();
474             stmt.close();
475         } finally {
476             jdbcHelper.closeStatement(stmt);
477         }
478
479     }
480
481     public void interruptTask(long taskId, Repository repository) throws TaskException {
482         Long JavaDoc taskKey = new Long JavaDoc(taskId);
483         TaskHolder taskHolder = (TaskHolder)tasksById.get(taskKey);
484
485         if (taskHolder == null)
486             throw new TaskException("There is no task running with ID " + taskId);
487
488         if (!repository.isInRole(Role.ADMINISTRATOR) && taskHolder.getOwnerId() != repository.getUserId())
489             throw new TaskException("You are not allowed to interrupt the task with ID " + taskId);
490
491         taskHolder.getTaskContext().interrupt();
492     }
493
494     public TaskDocDetails getTaskDocDetails(long taskId, Repository repository) throws TaskException {
495         // Do a call to getTask so that existence and access permissions are verified
496
getTask(taskId, repository);
497
498         Connection conn = null;
499         PreparedStatement stmt = null;
500         try {
501             conn = dataSource.getConnection();
502             stmt = conn.prepareStatement("select doc_id, branch_id, lang_id, state, details from task_doc_details where task_id = ? order by seqnr");
503             stmt.setLong(1, taskId);
504             ResultSet rs = stmt.executeQuery();
505
506             List taskDocDetails = new ArrayList();
507             while (rs.next()) {
508                 VariantKey variantKey = new VariantKey(rs.getLong("doc_id"), rs.getLong("branch_id"), rs.getLong("lang_id"));
509                 DocumentExecutionState state = DocumentExecutionState.getByCode(rs.getString("state"));
510                 String JavaDoc details = rs.getString("details");
511                 taskDocDetails.add(new TaskDocDetailImpl(variantKey, state, details));
512             }
513
514             return new TaskDocDetailsImpl((TaskDocDetail[])taskDocDetails.toArray(new TaskDocDetail[taskDocDetails.size()]));
515         } catch (Throwable JavaDoc e) {
516             throw new TaskException("Error retrieving task document details for task " + taskId, e);
517         } finally {
518             jdbcHelper.closeStatement(stmt);
519             jdbcHelper.closeConnection(conn);
520         }
521     }
522
523     class ExpiredTasksJanitor implements Runnable JavaDoc {
524         public void run() {
525             while (true) {
526                 try {
527                     Thread.sleep(taskJanitorRunInterval);
528                 } catch (InterruptedException JavaDoc e) {
529                     logger.debug("ExpiredTaskJanitor thread was interrupted.");
530                     return;
531                 }
532                 Connection conn = null;
533                 PreparedStatement stmt = null;
534                 try {
535                     conn = dataSource.getConnection();
536                     jdbcHelper.startTransaction(conn);
537
538                     // Note: the search is performed on started_at and not on finished_at because finished_at may
539
// not always have a value (e.g. when the task was interrupted by shutdown)
540
stmt = conn.prepareStatement("select id from document_tasks where started_at < ? and state not in ('" + TaskState.INITIALISING.getCode() + "', '" + TaskState.RUNNING.getCode() + "') " + jdbcHelper.getSharedLockClause());
541                     stmt.setTimestamp(1, new Timestamp(System.currentTimeMillis() - taskJanitorTaskMaxAge));
542                     ResultSet rs = stmt.executeQuery();
543                     ArrayList taskIds = new ArrayList();
544                     while (rs.next()) {
545                         taskIds.add(new Long JavaDoc(rs.getLong(1)));
546                     }
547                     stmt.close();
548
549                     Iterator taskIdsIt = taskIds.iterator();
550                     while (taskIdsIt.hasNext()) {
551                         long taskId = ((Long JavaDoc)taskIdsIt.next()).longValue();
552                         deleteTask(taskId, conn);
553                     }
554
555                     conn.commit();
556                 } catch (Throwable JavaDoc e) {
557                     jdbcHelper.rollback(conn);
558                     logger.error("Expired tasks janitor: error while performing my job.", e);
559                 } finally {
560                     jdbcHelper.closeStatement(stmt);
561                     jdbcHelper.closeConnection(conn);
562                 }
563             }
564         }
565     }
566
567 }
568
Popular Tags