1 18 package net.sf.mybatchfwk; 19 20 import java.util.Date ; 21 import java.util.Properties ; 22 import java.util.concurrent.BlockingQueue ; 23 import java.util.concurrent.ExecutorService ; 24 import java.util.concurrent.Executors ; 25 import java.util.concurrent.LinkedBlockingQueue ; 26 import java.util.concurrent.ScheduledExecutorService ; 27 import java.util.concurrent.TimeUnit ; 28 import java.util.concurrent.locks.ReentrantLock ; 29 30 import net.sf.mybatchfwk.history.IExecutionHistory; 31 import net.sf.mybatchfwk.jmx.ManagedBatchService; 32 import net.sf.mybatchfwk.utils.PropertiesLoader; 33 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 37 56 public class BatchService { 57 58 private static final Log log = LogFactory.getLog(BatchService.class); 59 60 64 public static enum STATE { INITIALIZING, WAITING_FOR_LAUNCH, RUNNING, SLEEPING, WAITING_FOR_SHUTDOWN, SHUTDOWN }; 65 66 69 private static BatchService instance = new BatchService(); 70 71 74 private IBatch batch; 75 76 79 private PausableThreadPoolExecutor executorService; 80 81 84 private ScheduledExecutorService scheduledExecutorService; 85 86 89 private BatchConfiguration config; 90 91 94 private volatile STATE state; 95 96 99 private ReentrantLock stateLock; 100 101 104 private boolean skipLaunch; 105 106 109 private boolean pauseBatchService; 110 111 114 private boolean pauseScheduleService; 115 116 119 private boolean restart; 120 121 124 private ExecutionReport executionReport; 125 126 129 private ManagedBatchService mbean; 130 131 134 private IExecutionHistory history; 135 136 137 141 public static BatchService getInstance() { 142 return instance; 143 } 144 145 150 public static void main(String [] args) { 151 152 Properties properties = null; 153 try { 154 properties = PropertiesLoader.getProperties(BatchConfiguration.CONFIGURATION_FILENAME); 155 } catch (Exception e) { 156 log.fatal("an error occured during the reading of the configuration", e); 157 System.exit(1); 158 } 159 160 BatchService manager = getInstance(); 161 try { 162 BatchConfiguration config = new BatchConfiguration(properties); 163 manager.init(config); 164 } catch (Exception e) { 165 manager.manageFatalError(e); 166 log.fatal("an error occured during the initialization of the batch", e); 167 System.exit(2); 168 } 169 170 try { 171 manager.launch(); 172 } catch (Exception e) { 173 manager.manageFatalError(e); 174 log.fatal("an error occured during the execution of the batch", e); 175 System.exit(3); 176 } 177 178 System.exit(0); 179 } 180 181 187 public void init(BatchConfiguration conf) throws BatchException { 188 189 config = conf; 190 191 setState(STATE.INITIALIZING); 192 193 skipLaunch = false; 195 pauseBatchService = false; 196 pauseScheduleService = false; 197 restart = true; 198 stateLock = new ReentrantLock (); 199 200 if (batch == null) { 201 if ((config.getBatchClassName() == null) || ("".equals(config.getBatchClassName().trim()))) { 202 throw new BatchException("the class of the batch is missing (parameter 'mbf.batch.className' required)"); 203 } 204 try { 205 batch = (IBatch) Class.forName(config.getBatchClassName()).newInstance(); 206 } catch (ClassCastException e) { 207 throw new BatchException("the batch class '" + config.getBatchClassName() + 208 "' must implements the interface '" + IBatch.class.getName() + "'", e); 209 } catch (Exception e) { 210 throw new BatchException("unable to build a new intance of batch class '" + config.getBatchClassName() + "'", e); 211 } 212 } 213 214 batch.init(config); 215 216 if (executionReport == null) { 217 if ((config.getExecutionReportClassName() == null) || ("".equals(config.getExecutionReportClassName().trim()))) { 218 executionReport = new ExecutionReport(); 219 } else { 220 try { 221 executionReport = (ExecutionReport) Class.forName(config.getExecutionReportClassName()).newInstance(); 222 } catch (ClassCastException e) { 223 throw new BatchException("the execution report class '" + config.getExecutionReportClassName() + 224 "' must extends the class '" + ExecutionReport.class.getName() + "'", e); 225 } catch (Exception e) { 226 throw new BatchException("unable to build a new intance of execution report class '" + config.getExecutionReportClassName() + "'", e); 227 } 228 } 229 } 230 231 if ((history == null) && (config.getExecutionHistoryClassName() != null) && (!"".equals(config.getExecutionHistoryClassName().trim()))) { 232 try { 233 history = (IExecutionHistory) Class.forName(config.getExecutionHistoryClassName()).newInstance(); 234 } catch (ClassCastException e) { 235 throw new BatchException("the history storage class '" + config.getExecutionReportClassName() + 236 "' must implements the interface '" + IExecutionHistory.class.getName() + "'", e); 237 } catch (Exception e) { 238 throw new BatchException("unable to build a new intance of history storage class '" + config.getExecutionHistoryClassName() + "'", e); 239 } 240 } 241 242 if (history != null) { 243 executionReport.setHistory(history); 244 try { 245 history.initStorage(conf); 246 } catch (BatchException e) { 247 throw new BatchException("an error occured during the initialization of the history storage", e); 248 } 249 } 250 prepareForLaunch(); 251 } 252 253 258 public void prepareForLaunch() throws BatchException { 259 260 config.checkConsistency(); 261 262 if (config.isJmxEnable()) { 263 try { 264 mbean = new ManagedBatchService(this); 265 } catch (Exception e) { 266 throw new BatchException("unable to register the MBean 'BatchService' to the MBean server", e); 267 } 268 } 269 270 int minSize = config.getThreadPoolMinSize(); 271 int maxSize = config.getThreadPoolMaxSize(); 272 273 BlockingQueue <Runnable > queue = new LinkedBlockingQueue <Runnable >(config.getBlockingQueueCapacity()); 274 if ((minSize == maxSize) && (minSize == 1)) { 275 executorService = new PausableThreadPoolExecutor( 276 this, 1, 1, 0L, TimeUnit.MILLISECONDS, queue); 277 } else if ((minSize == maxSize) && (minSize > 1)) { 278 executorService = new PausableThreadPoolExecutor( 279 this, minSize, minSize, 0L, TimeUnit.MILLISECONDS, queue); 280 } else { 281 executorService = new PausableThreadPoolExecutor( 282 this, (minSize > 0)?minSize:0, (maxSize > 0)?maxSize:Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, queue); 283 } 284 285 if (config.isDeferredStart()) { 286 setState(STATE.WAITING_FOR_LAUNCH); 287 sleepBatchService(); 288 } 289 } 290 291 private volatile long loopCounter = 0; 292 293 297 public void launch() throws BatchException { 298 299 while (restart) { 300 restart = false; 301 executeBatch(); 302 } 303 304 executorService = null; 305 scheduledExecutorService = null; 306 307 setState(STATE.SHUTDOWN); 308 } 309 310 314 protected void executeBatch() throws BatchException { 315 316 executionReport.setBeginDate(new Date ()); 317 318 if (!skipLaunch) { 319 setState(STATE.RUNNING); 320 321 if (config.isScheduleEnable()) { 322 scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); 323 loopCounter = config.getScheduleNumberOfLoop(); 324 325 Runnable runnable = new Runnable () { 326 public void run() { 327 328 synchronized (batch) { 330 while (pauseScheduleService) { 331 try { 332 batch.wait(); 333 } catch (InterruptedException e) { 334 throw new RuntimeException ("the schedule service thread has been interrupted", e); 335 } 336 } 337 } 338 339 if (((config.getScheduleEndDate() != null) && (new Date ().after(config.getScheduleEndDate()))) || 340 ((config.getScheduleNumberOfLoop() > 0) && (loopCounter-- == 0))) { 341 shutdownScheduleService(); 342 } else { 343 try { 344 batch.execute(executorService); 345 } catch (BatchException e) { 346 throw new RuntimeException (e); 347 } 348 349 if (((config.getScheduleEndDate() != null) && (new Date ().after(config.getScheduleEndDate()))) || 351 ((config.getScheduleNumberOfLoop() > 0) && (loopCounter == 0)) || 352 (config.getSchedulePeriodTime() <= 0)) { 353 shutdownScheduleService(); 354 } 355 } 356 } 357 }; 358 359 long initialDelay = 0; 360 if (config.getScheduleStartDate() != null) { 361 initialDelay = Math.max(config.getScheduleStartDate().getTime()- (new Date ()).getTime(), 0); 362 } 363 if (config.getSchedulePeriodTime() > 0) { 364 scheduledExecutorService.scheduleAtFixedRate( 365 runnable, initialDelay, config.getSchedulePeriodTime(), TimeUnit.MILLISECONDS); 366 } else { 367 scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.MILLISECONDS); 368 } 369 370 awaitServiceTermination(scheduledExecutorService); 371 372 } else { 373 batch.execute(executorService); 374 executorService.shutdown(); 375 awaitServiceTermination(executorService); 376 } 377 } 378 379 executionReport.setEndDate(new Date ()); 380 381 if (history != null) { 382 try { 383 history.closeStorage(); 384 } catch (BatchException e) { 385 throw new BatchException("an error occured during the close of the history storage", e); 386 } 387 } 388 batch.end(executionReport); 389 390 if (config.isStayAliveAfterShutdown()) { 391 setState(STATE.WAITING_FOR_SHUTDOWN); 392 sleepBatchService(); 393 } 394 } 395 396 399 protected void shutdownScheduleService() { 400 executorService.shutdown(); 401 try { 402 awaitServiceTermination(executorService); 403 } catch (BatchException e) { 404 throw new RuntimeException (e); 405 } 406 scheduledExecutorService.shutdown(); 407 } 408 409 413 protected void sleepBatchService() throws BatchException { 414 synchronized (batch) { 415 pauseBatchService = true; 416 try { 417 while (pauseBatchService) { 418 batch.wait(); 419 } 420 } catch (InterruptedException e) { 421 throw new BatchException("the batch service thread has been interrupted", e); 422 } 423 } 424 } 425 426 431 protected void awaitServiceTermination(ExecutorService service) throws BatchException { 432 while (!service.isTerminated()) { 433 try { 434 service.awaitTermination(1, TimeUnit.SECONDS); 435 } catch (InterruptedException e) { 436 throw new BatchException("the service thread has been interrupted", e); 437 } 438 } 439 } 440 441 445 protected void setState(STATE newState) { 446 this.state = newState; 447 if ((mbean != null) && (config.isJmxEnable()) && (config.isJmxEnableNotifications())) { 448 mbean.notifyBatchState(newState); 449 } 450 } 451 452 458 public boolean canBeExecuted(ITask task) throws BatchException { 459 boolean execute = true; 460 if ((history != null) && (config.isTaskFilterEnable())) { 461 if (history.isCompletedTask(task.getId())) { 462 if (!config.isExecuteCompletedTasks()) { 463 execute = false; 464 } 465 } else if (history.isFailedTask(task.getId())) { 466 if (!config.isExecuteFailedTasks()) { 467 execute = false; 468 } 469 } else { 470 if (!config.isExecuteNewTasks()) { 471 execute = false; 472 } 473 } 474 } 475 return execute; 476 } 477 478 483 protected void afterExecute(ITask task, Throwable throwable) { 484 if (throwable == null) { 485 executionReport.reportCompletion(task); 486 if (history != null) { 487 try { 488 history.storeCompletedTaskId(task.getId()); 489 } catch (BatchException e) { 490 log.error("an error occured during the storage of a task id", e); 491 } 492 } 493 } else { 494 executionReport.reportFailure(task, throwable); 495 if (history != null) { 496 try { 497 history.storeFailedTaskId(task.getId()); 498 } catch (BatchException e) { 499 log.error("an error occured during the storage of a task id", e); 500 } 501 } 502 } 503 batch.endOfExecution(task, throwable); 504 } 505 506 510 public void manageFatalError(Throwable throwable) { 511 if (batch != null) { 512 batch.manageFatalError(executionReport, throwable); 513 } 514 } 515 516 520 525 public void pause() throws Exception { 526 stateLock.lock(); 527 try { 528 if (state != STATE.RUNNING) { 529 throw new Exception ("unable to process the 'pause' operation, the batch must be into the 'running' state"); 530 } 531 if (state == STATE.RUNNING) { 532 synchronized (batch) { 534 pauseScheduleService = true; 535 } 536 executorService.pause(); 538 setState(STATE.SLEEPING); 539 } 540 } finally { 541 stateLock.unlock(); 542 } 543 } 544 545 550 public void resume() throws Exception { 551 stateLock.lock(); 552 try { 553 if ((state != STATE.SLEEPING) && (state != STATE.WAITING_FOR_LAUNCH) && (state != STATE.WAITING_FOR_SHUTDOWN)) { 554 throw new Exception ("unable to process the 'resume' operation, the batch must be into the state 'waiting for launch' or 'sleeping' or 'waiting for shutdown'"); 555 } 556 if (state == STATE.SLEEPING) { 557 synchronized (batch) { 559 pauseScheduleService = false; 560 batch.notifyAll(); 561 } 562 executorService.resume(); 564 setState(STATE.RUNNING); 565 } else if ((state == STATE.WAITING_FOR_LAUNCH) || (state == STATE.WAITING_FOR_SHUTDOWN)) { 566 synchronized (batch) { 568 pauseBatchService = false; 569 pauseScheduleService = false; 570 batch.notifyAll(); 571 } 572 } 573 } finally { 574 stateLock.unlock(); 575 } 576 } 577 578 584 public void shutdown(boolean waitForSubmittedTasks) throws Exception { 585 stateLock.lock(); 586 try { 587 if (state == STATE.WAITING_FOR_LAUNCH) { 588 skipLaunch = false; 589 } else if (state == STATE.SLEEPING) { 590 synchronized (batch) { 592 pauseScheduleService = false; 593 batch.notifyAll(); 594 } 595 } 596 597 if ((state == STATE.WAITING_FOR_LAUNCH) || (state == STATE.WAITING_FOR_SHUTDOWN)) { 598 synchronized (batch) { 600 pauseBatchService = false; 601 batch.notifyAll(); 602 } 603 } 604 605 if ((state == STATE.RUNNING) || (state == STATE.SLEEPING)) { 606 if (scheduledExecutorService != null) { 608 scheduledExecutorService.shutdown(); 609 } 610 if (waitForSubmittedTasks) { 611 executorService.shutdown(); 612 } else { 613 executorService.shutdownNow(); 614 executorService.getQueue().clear(); 615 } 616 if (scheduledExecutorService != null) { 617 awaitServiceTermination(scheduledExecutorService); 618 } 619 awaitServiceTermination(executorService); 620 } 621 } finally { 622 stateLock.unlock(); 623 } 624 } 625 626 631 public void restart() throws Exception { 632 stateLock.lock(); 633 try { 634 if (state != STATE.WAITING_FOR_SHUTDOWN) { 635 throw new Exception ("unable to process the 'restart' operation, the batch must be into the 'waiting for shutdown' state"); 636 } 637 if (state == STATE.WAITING_FOR_SHUTDOWN) { 638 restart = true; 639 synchronized (batch) { 641 pauseBatchService = false; 642 batch.notifyAll(); 643 } 644 } 645 } finally { 646 stateLock.unlock(); 647 } 648 } 649 650 654 657 public ITaskExecutor getThreadPoolExecutor() { 658 return executorService; 659 } 660 661 664 public STATE getState() { 665 return state; 666 } 667 668 671 public void setBatch(IBatch batch) { 672 this.batch = batch; 673 } 674 675 678 public ExecutionReport getExecutionReport() { 679 return executionReport; 680 } 681 } 682 | Popular Tags |