1 16 package org.outerj.daisy.doctaskrunner.serverimpl; 17 18 import org.outerj.daisy.doctaskrunner.*; 19 import org.outerj.daisy.doctaskrunner.commonimpl.*; 20 import org.outerj.daisy.repository.Repository; 21 import org.outerj.daisy.repository.ExtensionRegistrar; 22 import org.outerj.daisy.repository.VariantKey; 23 import org.outerj.daisy.repository.ExtensionProvider; 24 import org.outerj.daisy.repository.user.Role; 25 import org.outerj.daisy.jdbcutil.JdbcHelper; 26 import org.outerj.daisy.jdbcutil.SqlCounter; 27 import org.outerj.daisy.backuplock.SuspendableProcess; 28 import org.outerj.daisy.backuplock.SuspendForBackupRegistrar; 29 import org.apache.avalon.framework.service.Serviceable; 30 import org.apache.avalon.framework.service.ServiceManager; 31 import org.apache.avalon.framework.service.ServiceException; 32 import org.apache.avalon.framework.activity.Initializable; 33 import org.apache.avalon.framework.activity.Disposable; 34 import org.apache.avalon.framework.activity.Startable; 35 import org.apache.avalon.framework.logger.LogEnabled; 36 import org.apache.avalon.framework.logger.Logger; 37 import org.apache.avalon.framework.configuration.Configurable; 38 import org.apache.avalon.framework.configuration.Configuration; 39 import org.apache.avalon.framework.configuration.ConfigurationException; 40 41 import javax.sql.DataSource ; 42 import java.sql.*; 43 import java.util.*; 44 import java.util.Date ; 45 46 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; 47 import EDU.oswego.cs.dl.util.concurrent.Sync; 48 49 52 public class CommonDocumentTaskManager implements Serviceable, Initializable, LogEnabled, Disposable, Startable, 53 Configurable, SuspendableProcess { 54 private ServiceManager serviceManager; 55 private DataSource dataSource; 56 private ExtensionRegistrar extensionRegistrar; 57 private SuspendForBackupRegistrar suspendForBackupRegistrar; 58 private JdbcHelper jdbcHelper; 59 private Logger logger; 60 private SqlCounter taskCounter; 61 private Map tasksById = Collections.synchronizedMap(new HashMap()); 62 private ExtensionProvider extensionProvider = new MyExtensionProvider(); 63 private long taskJanitorTaskMaxAge; 64 private long taskJanitorRunInterval; 65 private Thread janitorThread; 66 private WriterPreferenceReadWriteLock suspendLock = new WriterPreferenceReadWriteLock(); 67 68 73 public void service(ServiceManager serviceManager) throws ServiceException { 74 this.serviceManager = serviceManager; 75 dataSource = (DataSource )serviceManager.lookup("datasource"); 76 extensionRegistrar = (ExtensionRegistrar)serviceManager.lookup("extensionRegistrar"); 77 suspendForBackupRegistrar = (SuspendForBackupRegistrar)serviceManager.lookup("suspendForBackupRegistrar"); 78 } 79 80 public void enableLogging(Logger logger) { 81 this.logger = logger; 82 } 83 84 public void configure(Configuration configuration) throws ConfigurationException { 85 this.taskJanitorTaskMaxAge = configuration.getChild("taskJanitor").getAttributeAsLong("maxAge"); 86 this.taskJanitorRunInterval = configuration.getChild("taskJanitor").getAttributeAsLong("runInterval"); 87 } 88 89 public void initialize() throws Exception { 90 jdbcHelper = JdbcHelper.getInstance(dataSource, logger); 91 taskCounter = new SqlCounter("task_sequence", dataSource, logger); 92 93 markRunningTasksAsInterrupted(); 94 extensionRegistrar.registerExtension("DocumentTaskManager", extensionProvider); 95 96 suspendForBackupRegistrar.register("Document Task Manager", this); 101 } 102 103 public void dispose() { 104 extensionRegistrar.unregisterExtension(extensionProvider); 105 suspendForBackupRegistrar.unregister(this); 106 serviceManager.release(extensionRegistrar); 107 serviceManager.release(suspendForBackupRegistrar); 108 serviceManager.release(dataSource); 109 } 110 111 public void start() throws Exception { 112 janitorThread = new Thread (new ExpiredTasksJanitor(), "Daisy Expired Document Tasks Janitor"); 113 janitorThread.start(); 114 } 115 116 public void stop() throws Exception { 117 janitorThread.interrupt(); 118 try { janitorThread.join(); } catch (InterruptedException e) {} 119 } 120 121 class MyExtensionProvider implements ExtensionProvider { 122 public Object createExtension(Repository repository) { 123 return new DocumentTaskManagerImpl(CommonDocumentTaskManager.this, repository); 124 } 125 } 126 127 public boolean suspendExecution(long msecs) throws InterruptedException { 128 return suspendLock.writeLock().attempt(msecs); 129 } 130 131 public void resumeExecution() { 132 suspendLock.writeLock().release(); 133 } 134 135 private void markRunningTasksAsInterrupted() throws Exception { 136 Connection conn = null; 137 PreparedStatement stmt = null; 138 try { 139 conn = dataSource.getConnection(); 140 stmt = conn.prepareStatement("update document_tasks set state = ? where state = ? or state = ?"); 141 stmt.setString(1, TaskState.INTERRUPTED_BY_SHUTDOWN.getCode()); 142 stmt.setString(2, TaskState.INITIALISING.getCode()); 143 stmt.setString(3, TaskState.RUNNING.getCode()); 144 int updatedRows = stmt.executeUpdate(); 145 if (logger.isDebugEnabled()) 146 logger.debug("Number of tasks marked as 'interrupted by shutdown': " + updatedRows); 147 } catch (Throwable e) { 148 throw new Exception ("Error while marking tasks as 'interrupted by shutdown'", e); 149 } finally { 150 jdbcHelper.closeStatement(stmt); 151 jdbcHelper.closeConnection(conn); 152 } 153 } 154 155 public long runTask(DocumentSelection documentSelection, TaskSpecification taskSpecification, Repository repository) throws TaskException { 156 long taskId; 157 { 158 Connection conn = null; 159 PreparedStatement stmt = null; 160 try { 161 taskId = taskCounter.getNextId(); 162 java.util.Date now = new java.util.Date (); 163 164 conn = dataSource.getConnection(); 165 stmt = conn.prepareStatement("insert into document_tasks(id, owner, state, started_at, progress, description, script, scriptlanguage) values(?,?,?,?,?,?,?,?)"); 166 stmt.setLong(1, taskId); 167 stmt.setLong(2, repository.getUserId()); 168 stmt.setString(3, TaskState.INITIALISING.getCode()); 169 stmt.setTimestamp(4, new Timestamp(now.getTime())); 170 stmt.setString(5, "initialising"); 171 stmt.setString(6, taskSpecification.getDescription()); 172 stmt.setString(7, taskSpecification.getScript()); 173 stmt.setString(8, taskSpecification.getScriptLanguage()); 174 stmt.execute(); 175 } catch (Throwable e) { 176 throw new TaskException("Error inserting task record.", e); 177 } finally { 178 jdbcHelper.closeStatement(stmt); 179 jdbcHelper.closeConnection(conn); 180 } 181 } 182 183 try { 184 TaskContextImpl taskContext = new TaskContextImpl(taskId); 185 TaskRunner taskRunner = new TaskRunner(documentSelection, taskSpecification, taskContext, repository); 186 TaskHolder taskHolder = new TaskHolder(taskRunner, taskContext, repository.getUserId()); 187 tasksById.put(new Long (taskId), taskHolder); 188 189 Thread thread = new Thread (taskRunner); 190 thread.start(); 191 } catch (Throwable e) { 192 tasksById.remove(new Long (taskId)); 193 Connection conn = null; 194 PreparedStatement stmt = null; 195 try { 196 conn = dataSource.getConnection(); 197 stmt = conn.prepareStatement("delete from document_tasks where id = ?"); 198 stmt.setLong(1, taskId); 199 stmt.execute(); 200 } catch (Exception e2) { 201 throw new TaskException("Problem starting task and problem cleaning up afterwards: " + e.toString(), e2); 202 } finally { 203 jdbcHelper.closeStatement(stmt); 204 jdbcHelper.closeConnection(conn); 205 } 206 throw new TaskException("Problem starting task.", e); 207 } 208 209 return taskId; 210 } 211 212 static class TaskHolder { 213 private final TaskRunner taskRunner; 214 private final TaskContextImpl taskContext; 215 private final long ownerId; 216 217 public TaskHolder(TaskRunner taskRunner, TaskContextImpl taskContext, long ownerId) { 218 this.taskRunner = taskRunner; 219 this.taskContext = taskContext; 220 this.ownerId = ownerId; 221 } 222 223 public TaskRunner getTaskRunner() { 224 return taskRunner; 225 } 226 227 public TaskContextImpl getTaskContext() { 228 return taskContext; 229 } 230 231 public long getOwnerId() { 232 return ownerId; 233 } 234 } 235 236 class TaskContextImpl implements TaskContext { 237 private boolean interrupted = false; 238 private long taskId; 239 240 public TaskContextImpl(long taskId) { 241 this.taskId = taskId; 242 } 243 244 public void interrupt() { 245 this.interrupted = true; 246 } 247 248 public boolean isInterrupted() { 249 return interrupted; 250 } 251 252 public void setProgress(String progress) { 253 Connection conn = null; 254 PreparedStatement stmt = null; 255 try { 256 conn = dataSource.getConnection(); 257 stmt = conn.prepareStatement("update document_tasks set progress = ? where id = ?"); 258 stmt.setString(1, progress); 259 stmt.setLong(2, taskId); 260 stmt.execute(); 261 } catch (Throwable e) { 262 throw new RuntimeException ("Unexpected error trying to update task progress.", e); 263 } finally { 264 jdbcHelper.closeStatement(stmt); 265 jdbcHelper.closeConnection(conn); 266 } 267 } 268 269 public void initDocumentResults(VariantKey[] keys) { 270 Connection conn = null; 271 PreparedStatement stmt = null; 272 try { 273 conn = dataSource.getConnection(); 274 jdbcHelper.startTransaction(conn); 275 276 stmt = conn.prepareStatement("insert into task_doc_details(task_id, doc_id, branch_id, lang_id, seqnr, state) values(?,?,?,?,?,?)"); 277 stmt.setLong(1, taskId); 278 stmt.setString(6, DocumentExecutionState.WAITING.getCode()); 279 280 for (int i = 0; i < keys.length; i++) { 281 stmt.setLong(2, keys[i].getDocumentId()); 282 stmt.setLong(3, keys[i].getBranchId()); 283 stmt.setLong(4, keys[i].getLanguageId()); 284 stmt.setLong(5, i); 285 stmt.execute(); 286 } 287 conn.commit(); 288 } catch (Throwable e) { 289 jdbcHelper.rollback(conn); 290 throw new RuntimeException ("Unexpected error trying to initialise document states.", e); 291 } finally { 292 jdbcHelper.closeStatement(stmt); 293 jdbcHelper.closeConnection(conn); 294 } 295 } 296 297 public void setDocumentResult(VariantKey key, DocumentExecutionState state, String details) { 298 Connection conn = null; 299 PreparedStatement stmt = null; 300 try { 301 conn = dataSource.getConnection(); 302 stmt = conn.prepareStatement("update task_doc_details set state = ?, details = ? where task_id = ? and doc_id = ? and branch_id = ? and lang_id = ?"); 303 stmt.setString(1, state.getCode()); 304 stmt.setString(2, details); 305 stmt.setLong(3, taskId); 306 stmt.setLong(4, key.getDocumentId()); 307 stmt.setLong(5, key.getBranchId()); 308 stmt.setLong(6, key.getLanguageId()); 309 stmt.execute(); 310 } catch (Throwable e) { 311 throw new RuntimeException ("Unexpected error trying to update document state.", e); 312 } finally { 313 jdbcHelper.closeStatement(stmt); 314 jdbcHelper.closeConnection(conn); 315 } 316 } 317 318 public void setTaskState(TaskState state, String progress, String details) { 319 Connection conn = null; 320 PreparedStatement stmt = null; 321 try { 322 conn = dataSource.getConnection(); 323 stmt = conn.prepareStatement("update document_tasks set state = ?, progress = ?, details = ?, finished_at = ? where id = ?"); 324 stmt.setString(1, state.getCode()); 325 stmt.setString(2, progress); 326 stmt.setString(3, details); 327 stmt.setTimestamp(4, state.isStoppedState() ? new Timestamp(System.currentTimeMillis()) : null); 328 stmt.setLong(5, taskId); 329 stmt.execute(); 330 } catch (Throwable e) { 331 throw new RuntimeException ("Unexpected error trying to update task state.", e); 332 } finally { 333 jdbcHelper.closeStatement(stmt); 334 jdbcHelper.closeConnection(conn); 335 } 336 } 337 338 public void cleanup() { 339 tasksById.remove(new Long (taskId)); 340 } 341 342 public Sync getExecutionLock() { 343 return suspendLock.readLock(); 344 } 345 } 346 347 private static final String SELECT_TASK = "select id, scriptlanguage, owner, started_at, finished_at, state, progress, description, script, details from document_tasks"; 348 349 public Task getTask(long taskId, Repository repository) throws TaskException { 350 Connection conn = null; 351 PreparedStatement stmt = null; 352 try { 353 conn = dataSource.getConnection(); 354 stmt = conn.prepareStatement(SELECT_TASK + " where id = ?"); 355 stmt.setLong(1, taskId); 356 ResultSet rs = stmt.executeQuery(); 357 358 if (!rs.next()) 359 throw new TaskException("No task found with ID " + taskId); 360 361 if (!repository.isInRole(Role.ADMINISTRATOR) && rs.getLong("owner") != repository.getUserId()) 362 throw new TaskException("Access denied to task with ID " + taskId); 363 364 return getTaskFromResultSet(rs); 365 } catch (Throwable e) { 366 if (e instanceof TaskException) 367 throw (TaskException)e; 368 369 throw new TaskException("Error loading task with ID " + taskId, e); 370 } finally { 371 jdbcHelper.closeStatement(stmt); 372 jdbcHelper.closeConnection(conn); 373 } 374 } 375 376 public Tasks getTasks(Repository repository) throws TaskException { 377 Connection conn = null; 378 PreparedStatement stmt = null; 379 try { 380 conn = dataSource.getConnection(); 381 StringBuffer query = new StringBuffer (SELECT_TASK); 382 if (!repository.isInRole(Role.ADMINISTRATOR)) 383 query.append(" where owner = ?"); 384 stmt = conn.prepareStatement(query.toString()); 385 if (!repository.isInRole(Role.ADMINISTRATOR)) 386 stmt.setLong(1, repository.getUserId()); 387 ResultSet rs = stmt.executeQuery(); 388 389 List tasks = new ArrayList(); 390 391 while (rs.next()) { 392 tasks.add(getTaskFromResultSet(rs)); 393 } 394 395 return new TasksImpl((Task[])tasks.toArray(new Task[tasks.size()])); 396 } catch (Throwable e) { 397 throw new TaskException("Error loading tasks.", e); 398 } finally { 399 jdbcHelper.closeStatement(stmt); 400 jdbcHelper.closeConnection(conn); 401 } 402 } 403 404 private TaskImpl getTaskFromResultSet(ResultSet rs) throws SQLException { 405 long taskId = rs.getLong("id"); 406 String description = rs.getString("description"); 407 TaskState state = TaskState.getByCode(rs.getString("state")); 408 long ownerId = rs.getLong("owner"); 409 String progress = rs.getString("progress"); 410 String details = rs.getString("details"); 411 String script = rs.getString("script"); 412 String scriptLanguage = rs.getString("scriptlanguage"); 413 Date startedAt = rs.getTimestamp("started_at"); 414 Date finishedAt = rs.getTimestamp("finished_at"); 415 416 TaskImpl task = new TaskImpl(taskId, description, state, ownerId, progress, details, script, scriptLanguage, 417 startedAt, finishedAt); 418 419 return task; 420 } 421 422 public void deleteTask(long taskId, Repository repository) throws TaskException { 423 Connection conn = null; 424 PreparedStatement stmt = null; 425 try { 426 conn = dataSource.getConnection(); 427 jdbcHelper.startTransaction(conn); 428 429 stmt = conn.prepareStatement("select state, owner from document_tasks where id = ? " + jdbcHelper.getSharedLockClause()); 430 stmt.setLong(1, taskId); 431 ResultSet rs = stmt.executeQuery(); 432 433 if (!rs.next()) 434 throw new TaskException("No task found with ID " + taskId); 435 436 if (!repository.isInRole(Role.ADMINISTRATOR) && rs.getLong("owner") != repository.getUserId()) 437 throw new TaskException("Access denied to task with ID " + taskId); 438 439 TaskState state = TaskState.getByCode(rs.getString("state")); 440 if (state == TaskState.INITIALISING || state == TaskState.RUNNING) 441 throw new TaskException("Cannot delete task with ID " + taskId + " because it has not yet ended."); 442 443 stmt.close(); 444 445 deleteTask(taskId, conn); 446 447 conn.commit(); 448 } catch (Throwable e) { 449 jdbcHelper.rollback(conn); 450 if (e instanceof TaskException) 451 throw (TaskException)e; 452 453 throw new TaskException("Problem deleting task with ID " + taskId, e); 454 } finally { 455 jdbcHelper.closeStatement(stmt); 456 jdbcHelper.closeConnection(conn); 457 } 458 } 459 460 463 private void deleteTask(long taskId, Connection conn) throws SQLException { 464 PreparedStatement stmt = null; 465 try { 466 stmt = conn.prepareStatement("delete from task_doc_details where task_id = ?"); 467 stmt.setLong(1, taskId); 468 stmt.execute(); 469 stmt.close(); 470 471 stmt = conn.prepareStatement("delete from document_tasks where id = ?"); 472 stmt.setLong(1, taskId); 473 stmt.execute(); 474 stmt.close(); 475 } finally { 476 jdbcHelper.closeStatement(stmt); 477 } 478 479 } 480 481 public void interruptTask(long taskId, Repository repository) throws TaskException { 482 Long taskKey = new Long (taskId); 483 TaskHolder taskHolder = (TaskHolder)tasksById.get(taskKey); 484 485 if (taskHolder == null) 486 throw new TaskException("There is no task running with ID " + taskId); 487 488 if (!repository.isInRole(Role.ADMINISTRATOR) && taskHolder.getOwnerId() != repository.getUserId()) 489 throw new TaskException("You are not allowed to interrupt the task with ID " + taskId); 490 491 taskHolder.getTaskContext().interrupt(); 492 } 493 494 public TaskDocDetails getTaskDocDetails(long taskId, Repository repository) throws TaskException { 495 getTask(taskId, repository); 497 498 Connection conn = null; 499 PreparedStatement stmt = null; 500 try { 501 conn = dataSource.getConnection(); 502 stmt = conn.prepareStatement("select doc_id, branch_id, lang_id, state, details from task_doc_details where task_id = ? order by seqnr"); 503 stmt.setLong(1, taskId); 504 ResultSet rs = stmt.executeQuery(); 505 506 List taskDocDetails = new ArrayList(); 507 while (rs.next()) { 508 VariantKey variantKey = new VariantKey(rs.getLong("doc_id"), rs.getLong("branch_id"), rs.getLong("lang_id")); 509 DocumentExecutionState state = DocumentExecutionState.getByCode(rs.getString("state")); 510 String details = rs.getString("details"); 511 taskDocDetails.add(new TaskDocDetailImpl(variantKey, state, details)); 512 } 513 514 return new TaskDocDetailsImpl((TaskDocDetail[])taskDocDetails.toArray(new TaskDocDetail[taskDocDetails.size()])); 515 } catch (Throwable e) { 516 throw new TaskException("Error retrieving task document details for task " + taskId, e); 517 } finally { 518 jdbcHelper.closeStatement(stmt); 519 jdbcHelper.closeConnection(conn); 520 } 521 } 522 523 class ExpiredTasksJanitor implements Runnable { 524 public void run() { 525 while (true) { 526 try { 527 Thread.sleep(taskJanitorRunInterval); 528 } catch (InterruptedException e) { 529 logger.debug("ExpiredTaskJanitor thread was interrupted."); 530 return; 531 } 532 Connection conn = null; 533 PreparedStatement stmt = null; 534 try { 535 conn = dataSource.getConnection(); 536 jdbcHelper.startTransaction(conn); 537 538 stmt = conn.prepareStatement("select id from document_tasks where started_at < ? and state not in ('" + TaskState.INITIALISING.getCode() + "', '" + TaskState.RUNNING.getCode() + "') " + jdbcHelper.getSharedLockClause()); 541 stmt.setTimestamp(1, new Timestamp(System.currentTimeMillis() - taskJanitorTaskMaxAge)); 542 ResultSet rs = stmt.executeQuery(); 543 ArrayList taskIds = new ArrayList(); 544 while (rs.next()) { 545 taskIds.add(new Long (rs.getLong(1))); 546 } 547 stmt.close(); 548 549 Iterator taskIdsIt = taskIds.iterator(); 550 while (taskIdsIt.hasNext()) { 551 long taskId = ((Long )taskIdsIt.next()).longValue(); 552 deleteTask(taskId, conn); 553 } 554 555 conn.commit(); 556 } catch (Throwable e) { 557 jdbcHelper.rollback(conn); 558 logger.error("Expired tasks janitor: error while performing my job.", e); 559 } finally { 560 jdbcHelper.closeStatement(stmt); 561 jdbcHelper.closeConnection(conn); 562 } 563 } 564 } 565 } 566 567 } 568 | Popular Tags |