1 package org.sapia.taskman; 2 3 import org.sapia.taskman.transaction.Transaction; 4 import org.sapia.taskman.transaction.TransactionIdFactory; 5 import org.sapia.taskman.transaction.TransactionListener; 6 7 import java.io.IOException ; 8 import java.io.ObjectInput ; 9 import java.io.ObjectOutput ; 10 11 import java.util.ArrayList ; 12 import java.util.Collections ; 13 import java.util.List ; 14 15 115 public class TaskManager extends Thread implements java.io.Externalizable { 116 117 static final long serialVersionUID = 1L; 118 119 public static final long RUN_INTERVAL = 250; 120 private static final byte RUNNING = 0; 121 private static final byte SHUTDOWN_REQUESTED = 1; 122 private static final byte SHUT_DOWN = 2; 123 protected List _transactions = Collections 124 .synchronizedList(new ArrayList ()); 125 private long _interval = RUN_INTERVAL; 126 private transient byte _status; 127 private transient ThreadLocal _currentTx = new ThreadLocal (); 128 private transient TransactionIdFactory _txIdFactory = new DefaultTxIdFactory(); 129 130 133 public TaskManager() { 134 super("TaskManager"); 135 } 136 137 143 public TaskManager(String name) { 144 super(name); 145 } 146 147 155 public void addTaskDescriptors(List tasks) { 156 TaskmanTransaction tx = (TaskmanTransaction) _currentTx.get(); 157 158 if((tx != null) && (tx.getStatus() == Transaction.STATUS_INITIAL)) { 159 for(int i = 0; i < tasks.size(); i++) { 160 tx.registerTask((TaskDescriptor) tasks.get(i)); 161 } 162 } else { 163 for(int i = 0; i < tasks.size(); i++) { 164 _transactions.add(wrap((TaskDescriptor) tasks.get(i))); 165 } 166 } 167 } 168 169 175 public synchronized List getTransactions() { 176 synchronized(_transactions) { 177 List toReturn = new ArrayList (_transactions.size()); 178 toReturn.addAll(_transactions); 179 180 return toReturn; 181 } 182 } 183 184 191 public synchronized void setRunInterval(long millis) { 192 _interval = millis; 193 } 194 195 203 public void execTaskFor(TaskDescriptor desc) { 204 addTaskDescriptor(desc.setRoot(true)); 205 } 206 207 218 public void execSyncTask(String name, Task t) { 219 execSyncTask(name, t, newTaskOutput(name)); 220 } 221 222 234 public void execSyncTask(String name, Task t, TaskOutput out) { 235 TaskmanTransaction tx = (TaskmanTransaction) _currentTx.get(); 238 239 if((tx != null) && (tx.getStatus() == Transaction.STATUS_INITIAL)) { 240 TransientTaskDescriptor td = new TransientTaskDescriptor(name, 0, t); 242 td.setTaskOutput(out); 243 244 tx.registerTask(td); 246 } else { 247 TransientTaskDescriptor desc = new TransientTaskDescriptor(name, 0, t); 249 tx = wrap(desc); 250 desc.setTaskOutput(out); 251 desc.setRoot(true); 252 tx.execute(); 253 } 254 } 255 256 272 public void execAsyncTask(String name, Task t, TaskOutput out) { 273 TaskmanTransaction tx = (TaskmanTransaction) _currentTx.get(); 276 277 if((tx != null) && (tx.getStatus() == Transaction.STATUS_INITIAL)) { 278 TransientTaskDescriptor td = new TransientTaskDescriptor(name, 0, t); 280 td.setTaskOutput(new TxTaskOutput(out, tx)); 281 282 tx.registerTask(td); 284 } else { 285 TransientTaskDescriptor desc = new TransientTaskDescriptor(name, 0, t); 287 desc.setTaskOutput(out); 288 addTaskDescriptor(desc.setRoot(true)); 289 } 290 } 291 292 295 public Transaction newTransaction() { 296 return newTransaction(null); 297 } 298 299 307 public Transaction newTransaction(TransactionListener listener) { 308 if(_currentTx.get() != null) { 309 if(((TaskmanTransaction) _currentTx.get()).getStatus() != Transaction.STATUS_COMMITTED) { 310 throw new IllegalStateException ( 311 "Current thread already registered with existing transaction"); 312 } 313 } 314 315 TaskmanTransaction tx = new TaskmanTransaction(_txIdFactory.newTxId(), this, listener); 316 _currentTx.set(tx); 317 _transactions.add(tx); 318 319 return tx; 320 } 321 322 328 public void registerTransaction(Transaction tx) { 329 if(!(tx instanceof TaskmanTransaction)) { 330 throw new IllegalArgumentException ("Transaction is not an instance of: " 331 + TaskmanTransaction.class.getName()); 332 } 333 334 if(_currentTx.get() != null) { 335 if(((TaskmanTransaction) _currentTx.get()).getStatus() != Transaction.STATUS_COMMITTED) { 336 throw new IllegalStateException ( 337 "Current thread already registered with existing transaction"); 338 } 339 } 340 341 _currentTx.set(tx); 342 } 343 344 347 public void unregisterTransaction() { 348 _currentTx.set(null); 349 } 350 351 355 public Transaction currentTransaction() { 356 Transaction tx = (Transaction) _currentTx.get(); 357 358 if(tx == null) { 359 throw new IllegalStateException ( 360 "Current thread not registered with a transaction"); 361 } 362 363 return tx; 364 } 365 366 369 public boolean isInTransaction() { 370 if(_currentTx.get() == null) { 371 return false; 372 } else { 373 return ((TaskmanTransaction) _currentTx.get()).getStatus() != Transaction.STATUS_COMMITTED; 374 } 375 } 376 377 381 public synchronized void shutdown() { 382 _status = SHUTDOWN_REQUESTED; 383 notifyAll(); 384 385 while((_status != SHUT_DOWN) && (_transactions.size() > 0)) { 386 try { 387 wait(); 388 } catch(InterruptedException e) { 389 break; 390 } 391 } 392 } 393 394 397 public void readExternal(ObjectInput in) throws IOException , 398 ClassNotFoundException { 399 _transactions = (List ) in.readObject(); 400 TaskmanTransaction tx; 401 for(int i = 0; i < _transactions.size(); i++) { 402 tx = (TaskmanTransaction) _transactions.get(i); 403 tx.setTaskManager(this); 404 } 405 _interval = in.readLong(); 406 _txIdFactory = new DefaultTxIdFactory(); 407 _currentTx = new ThreadLocal (); 408 } 409 410 414 public void setTransactionIdFactory(TransactionIdFactory factory){ 415 _txIdFactory = factory; 416 } 417 418 421 public void writeExternal(ObjectOutput out) throws IOException { 422 out.writeObject(_transactions); 423 out.writeLong(_interval); 424 } 425 426 433 protected TaskOutput newTaskOutput(String taskName) { 434 return new DefaultTaskOutput(taskName, DefaultTaskOutput.DEBUG); 435 } 436 437 void execSyncTask(String name, Task t, TaskContext ctx) { 438 TransientTaskDescriptor desc = new TransientTaskDescriptor(name, 0, t); 439 ctx.getTaskOutput().setTaskName(name); 440 desc.setTaskOutput(ctx.getTaskOutput()); 441 desc.setContextVals(ctx.getVals()); 442 TaskmanTransaction tx = ctx.getTransaction(); 443 tx.registerTask(desc); 444 tx.execute(); 445 } 446 447 void execAsyncTask(String name, Task t, TaskContext ctx) { 448 TransientTaskDescriptor desc = new TransientTaskDescriptor(name, 0, t); 449 desc.setContextVals(ctx.getVals()); 450 ctx.getTransaction().registerTask(desc); 451 } 452 453 456 public void run() { 457 TaskmanTransaction current; 458 459 while(true) { 460 long next = -1; 461 462 for(int i = 0; i < _transactions.size(); i++) { 463 current = (TaskmanTransaction) _transactions.get(i); 464 465 if(current.getStatus() == Transaction.STATUS_COMMITTED) { 466 _transactions.remove(i--); 467 } else if(current.getStatus() == Transaction.STATUS_INITIAL) { 468 } else { 470 if(System.currentTimeMillis() >= current.nextExecTime()) { 471 current.execute(); 472 473 if(next == -1) { 474 next = current.nextExecTime(); 475 } else if(current.nextExecTime() < next) { 476 next = current.nextExecTime(); 477 } 478 479 Thread.yield(); 481 } 482 } 483 } 484 485 if(_status == SHUTDOWN_REQUESTED) { 486 notifyShutDown(); 487 488 break; 489 } 490 491 try { 492 waitForTx(next); 493 494 if(_status == SHUTDOWN_REQUESTED) { 495 notifyShutDown(); 496 497 break; 498 } 499 } catch(InterruptedException e) { 500 notifyShutDown(); 501 502 break; 503 } 504 } 505 } 506 507 protected synchronized void addTaskDescriptor(TaskDescriptor desc) { 508 TaskmanTransaction tx = (TaskmanTransaction) _currentTx.get(); 509 510 if((tx != null) && (tx.getStatus() == Transaction.STATUS_INITIAL)) { 511 tx.registerTask(desc); 512 } else { 513 tx = wrap(desc); 514 _transactions.add(tx); 515 tx.commit(); 516 } 517 } 518 519 synchronized void wakeUp() { 520 notify(); 521 } 522 523 private synchronized void waitForTx(long nextTime) 524 throws InterruptedException { 525 long start = System.currentTimeMillis(); 526 527 if(_transactions.size() == 0) { 528 while((_transactions.size() == 0) && (_status != SHUTDOWN_REQUESTED)) { 529 wait(); 530 } 531 } else if(nextTime <= 0) { 532 wait(_interval); 533 } else { 534 long delay = nextTime - System.currentTimeMillis(); 535 536 if(delay <= 0) { 537 wait(_interval); 538 } else { 539 wait(delay); 540 } 541 } 542 } 543 544 private TaskmanTransaction wrap(TaskDescriptor td) { 545 TaskmanTransaction tx = new TaskmanTransaction(_txIdFactory.newTxId(), this); 546 tx.registerTask(td); 547 548 return tx; 549 } 550 551 private synchronized void notifyShutDown() { 552 _status = SHUT_DOWN; 553 notifyAll(); 554 } 555 } 556 | Popular Tags |