KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > sf > mybatchfwk > BatchService


1 /*
2  * MyBatchFramework - Open-source batch framework.
3  * Copyright (C) 2006 Jérôme Bertèche cyberteche@users.sourceforge.net
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * Jérôme Bertèche
16  * Email: cyberteche@users.sourceforge.net
17  */

18 package net.sf.mybatchfwk;
19
20 import java.util.Date JavaDoc;
21 import java.util.Properties JavaDoc;
22 import java.util.concurrent.BlockingQueue JavaDoc;
23 import java.util.concurrent.ExecutorService JavaDoc;
24 import java.util.concurrent.Executors JavaDoc;
25 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
26 import java.util.concurrent.ScheduledExecutorService JavaDoc;
27 import java.util.concurrent.TimeUnit JavaDoc;
28 import java.util.concurrent.locks.ReentrantLock JavaDoc;
29
30 import net.sf.mybatchfwk.history.IExecutionHistory;
31 import net.sf.mybatchfwk.jmx.ManagedBatchService;
32 import net.sf.mybatchfwk.utils.PropertiesLoader;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36
37 /**
38  * The class used to launch the batch.<br><br>
39  *
40  * <i>Automatic launching</i>:<br>
41  * <tab>- define configuration into the file 'my-batch-fwk.properties'<br>
42  * <tab>- call the main method<br><br>
43  *
44  * <i>Manual launching</i>:<br>
45  * <tab>- build an instance of BatchConfiguration<br>
46  * <tab>- call the init method<br>
47  * <tab>- call the launch method<br><br>
48  *
49  * <i>Note:</i> the BatchConfiguration instance can be build from the properties of the configuration file
50  * (see PropertiesLoader.getProperties).<br><br>
51  *
52  * The execution of the batch can be managed through JMX (ObjectName=net.sf.mybatchfwk:type=BatchService).
53  *
54  * @author Jérôme Bertèche (cyberteche@users.sourceforge.net)
55  */

56 public class BatchService {
57     
58     private static final Log log = LogFactory.getLog(BatchService.class);
59     
60     /**
61      * The state of the batch
62      * @author Administrateur
63      */

64     public static enum STATE { INITIALIZING, WAITING_FOR_LAUNCH, RUNNING, SLEEPING, WAITING_FOR_SHUTDOWN, SHUTDOWN };
65     
66     /**
67      * In standalone mode the instance is referenced as a static member
68      */

69     private static BatchService instance = new BatchService();
70     
71     /**
72      * The batch of the user
73      */

74     private IBatch batch;
75     
76     /**
77      * The service used to execute the tasks
78      */

79     private PausableThreadPoolExecutor executorService;
80     
81     /**
82      * The service used to launch executions at fixed rate or to delay the execution
83      */

84     private ScheduledExecutorService JavaDoc scheduledExecutorService;
85     
86     /**
87      * The configuration
88      */

89     private BatchConfiguration config;
90     
91     /**
92      * The state
93      */

94     private volatile STATE state;
95     
96     /**
97      * Used to keep the consistency of the batch state
98      */

99     private ReentrantLock JavaDoc stateLock;
100     
101     /**
102      * Used to skip the launch of the batch
103      */

104     private boolean skipLaunch;
105     
106     /**
107      * Used to pause the batch service
108      */

109     private boolean pauseBatchService;
110     
111     /**
112      * Used to pause the schedule service
113      */

114     private boolean pauseScheduleService;
115     
116     /**
117      * Restart the batch again
118      */

119     private boolean restart;
120     
121     /**
122      * The execution report
123      */

124     private ExecutionReport executionReport;
125     
126     /**
127      * The MBean to manage the batch service
128      */

129     private ManagedBatchService mbean;
130     
131     /**
132      * The storage used to store the history of the execution
133      */

134     private IExecutionHistory history;
135     
136
137     /**
138      * In standalone mode the instance is referenced as a static member
139      * @return
140      */

141     public static BatchService getInstance() {
142         return instance;
143     }
144     
145     /**
146      * Main method to start the batch service in standalone mode.
147      * Configuration is load from the config file.
148      * @param args no args
149      */

150     public static void main(String JavaDoc[] args) {
151         
152         Properties JavaDoc properties = null;
153         try {
154             properties = PropertiesLoader.getProperties(BatchConfiguration.CONFIGURATION_FILENAME);
155         } catch (Exception JavaDoc e) {
156             log.fatal("an error occured during the reading of the configuration", e);
157             System.exit(1);
158         }
159         
160         BatchService manager = getInstance();
161         try {
162             BatchConfiguration config = new BatchConfiguration(properties);
163             manager.init(config);
164         } catch (Exception JavaDoc e) {
165             manager.manageFatalError(e);
166             log.fatal("an error occured during the initialization of the batch", e);
167             System.exit(2);
168         }
169         
170         try {
171             manager.launch();
172         } catch (Exception JavaDoc e) {
173             manager.manageFatalError(e);
174             log.fatal("an error occured during the execution of the batch", e);
175             System.exit(3);
176         }
177         
178         System.exit(0);
179     }
180     
181     /**
182      * Initialization method.<br>
183      * Build the batch object if needed and initialize it.
184      * @param config the configuration
185      * @throws BatchException
186      */

187     public void init(BatchConfiguration conf) throws BatchException {
188         
189         config = conf;
190         
191         setState(STATE.INITIALIZING);
192         
193         // initializing properties
194
skipLaunch = false;
195         pauseBatchService = false;
196         pauseScheduleService = false;
197         restart = true;
198         stateLock = new ReentrantLock JavaDoc();
199         
200         if (batch == null) {
201             if ((config.getBatchClassName() == null) || ("".equals(config.getBatchClassName().trim()))) {
202                 throw new BatchException("the class of the batch is missing (parameter 'mbf.batch.className' required)");
203             }
204             try {
205                 batch = (IBatch) Class.forName(config.getBatchClassName()).newInstance();
206             } catch (ClassCastException JavaDoc e) {
207                 throw new BatchException("the batch class '" + config.getBatchClassName() +
208                         "' must implements the interface '" + IBatch.class.getName() + "'", e);
209             } catch (Exception JavaDoc e) {
210                 throw new BatchException("unable to build a new intance of batch class '" + config.getBatchClassName() + "'", e);
211             }
212         }
213         
214         batch.init(config);
215         
216         if (executionReport == null) {
217             if ((config.getExecutionReportClassName() == null) || ("".equals(config.getExecutionReportClassName().trim()))) {
218                 executionReport = new ExecutionReport();
219             } else {
220                 try {
221                     executionReport = (ExecutionReport) Class.forName(config.getExecutionReportClassName()).newInstance();
222                 } catch (ClassCastException JavaDoc e) {
223                     throw new BatchException("the execution report class '" + config.getExecutionReportClassName() +
224                             "' must extends the class '" + ExecutionReport.class.getName() + "'", e);
225                 } catch (Exception JavaDoc e) {
226                     throw new BatchException("unable to build a new intance of execution report class '" + config.getExecutionReportClassName() + "'", e);
227                 }
228             }
229         }
230         
231         if ((history == null) && (config.getExecutionHistoryClassName() != null) && (!"".equals(config.getExecutionHistoryClassName().trim()))) {
232             try {
233                 history = (IExecutionHistory) Class.forName(config.getExecutionHistoryClassName()).newInstance();
234             } catch (ClassCastException JavaDoc e) {
235                 throw new BatchException("the history storage class '" + config.getExecutionReportClassName() +
236                         "' must implements the interface '" + IExecutionHistory.class.getName() + "'", e);
237             } catch (Exception JavaDoc e) {
238                 throw new BatchException("unable to build a new intance of history storage class '" + config.getExecutionHistoryClassName() + "'", e);
239             }
240         }
241         
242         if (history != null) {
243             executionReport.setHistory(history);
244             try {
245                 history.initStorage(conf);
246             } catch (BatchException e) {
247                 throw new BatchException("an error occured during the initialization of the history storage", e);
248             }
249         }
250         prepareForLaunch();
251     }
252     
253     /**
254      * Called just before the execution of the batch.<br>
255      * Check the consistency of the configuration and build the executor service.
256      * @throws BatchException
257      */

258     public void prepareForLaunch() throws BatchException {
259         
260         config.checkConsistency();
261         
262         if (config.isJmxEnable()) {
263             try {
264                 mbean = new ManagedBatchService(this);
265             } catch (Exception JavaDoc e) {
266                 throw new BatchException("unable to register the MBean 'BatchService' to the MBean server", e);
267             }
268         }
269         
270         int minSize = config.getThreadPoolMinSize();
271         int maxSize = config.getThreadPoolMaxSize();
272         
273         BlockingQueue JavaDoc<Runnable JavaDoc> queue = new LinkedBlockingQueue JavaDoc<Runnable JavaDoc>(config.getBlockingQueueCapacity());
274         if ((minSize == maxSize) && (minSize == 1)) {
275             executorService = new PausableThreadPoolExecutor(
276                     this, 1, 1, 0L, TimeUnit.MILLISECONDS, queue);
277         } else if ((minSize == maxSize) && (minSize > 1)) {
278             executorService = new PausableThreadPoolExecutor(
279                     this, minSize, minSize, 0L, TimeUnit.MILLISECONDS, queue);
280         } else {
281             executorService = new PausableThreadPoolExecutor(
282                     this, (minSize > 0)?minSize:0, (maxSize > 0)?maxSize:Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, queue);
283         }
284         
285         if (config.isDeferredStart()) {
286             setState(STATE.WAITING_FOR_LAUNCH);
287             sleepBatchService();
288         }
289     }
290     
291     private volatile long loopCounter = 0;
292     
293     /**
294      * Launch the batch
295      * @throws BatchException
296      */

297     public void launch() throws BatchException {
298         
299         while (restart) {
300             restart = false;
301             executeBatch();
302         }
303         
304         executorService = null;
305         scheduledExecutorService = null;
306         
307         setState(STATE.SHUTDOWN);
308     }
309     
310     /**
311      * Method in which the batch is executed
312      * @throws BatchException
313      */

314     protected void executeBatch() throws BatchException {
315         
316         executionReport.setBeginDate(new Date JavaDoc());
317         
318         if (!skipLaunch) {
319             setState(STATE.RUNNING);
320             
321             if (config.isScheduleEnable()) {
322                 scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
323                 loopCounter = config.getScheduleNumberOfLoop();
324                 
325                 Runnable JavaDoc runnable = new Runnable JavaDoc() {
326                     public void run() {
327                         
328                         // we manage the pause of the schedule
329
synchronized (batch) {
330                             while (pauseScheduleService) {
331                                 try {
332                                     batch.wait();
333                                 } catch (InterruptedException JavaDoc e) {
334                                     throw new RuntimeException JavaDoc("the schedule service thread has been interrupted", e);
335                                 }
336                             }
337                         }
338                         
339                         if (((config.getScheduleEndDate() != null) && (new Date JavaDoc().after(config.getScheduleEndDate()))) ||
340                                 ((config.getScheduleNumberOfLoop() > 0) && (loopCounter-- == 0))) {
341                             shutdownScheduleService();
342                         } else {
343                             try {
344                                 batch.execute(executorService);
345                             } catch (BatchException e) {
346                                 throw new RuntimeException JavaDoc(e);
347                             }
348                             
349                             // we test again shutdown conditions
350
if (((config.getScheduleEndDate() != null) && (new Date JavaDoc().after(config.getScheduleEndDate()))) ||
351                                     ((config.getScheduleNumberOfLoop() > 0) && (loopCounter == 0)) ||
352                                     (config.getSchedulePeriodTime() <= 0)) {
353                                 shutdownScheduleService();
354                             }
355                         }
356                     }
357                 };
358                 
359                 long initialDelay = 0;
360                 if (config.getScheduleStartDate() != null) {
361                     initialDelay = Math.max(config.getScheduleStartDate().getTime()- (new Date JavaDoc()).getTime(), 0);
362                 }
363                 if (config.getSchedulePeriodTime() > 0) {
364                     scheduledExecutorService.scheduleAtFixedRate(
365                             runnable, initialDelay, config.getSchedulePeriodTime(), TimeUnit.MILLISECONDS);
366                 } else {
367                     scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.MILLISECONDS);
368                 }
369                 
370                 awaitServiceTermination(scheduledExecutorService);
371                 
372             } else {
373                 batch.execute(executorService);
374                 executorService.shutdown();
375                 awaitServiceTermination(executorService);
376             }
377         }
378         
379         executionReport.setEndDate(new Date JavaDoc());
380         
381         if (history != null) {
382             try {
383                 history.closeStorage();
384             } catch (BatchException e) {
385                 throw new BatchException("an error occured during the close of the history storage", e);
386             }
387         }
388         batch.end(executionReport);
389             
390         if (config.isStayAliveAfterShutdown()) {
391             setState(STATE.WAITING_FOR_SHUTDOWN);
392             sleepBatchService();
393         }
394     }
395     
396     /**
397      * Shutdown the schedule service
398      */

399     protected void shutdownScheduleService() {
400         executorService.shutdown();
401         try {
402             awaitServiceTermination(executorService);
403         } catch (BatchException e) {
404             throw new RuntimeException JavaDoc(e);
405         }
406         scheduledExecutorService.shutdown();
407     }
408     
409     /**
410      * Put the batch service on the sleep mode.
411      * @throws BatchException
412      */

413     protected void sleepBatchService() throws BatchException {
414         synchronized (batch) {
415             pauseBatchService = true;
416             try {
417                 while (pauseBatchService) {
418                     batch.wait();
419                 }
420             } catch (InterruptedException JavaDoc e) {
421                 throw new BatchException("the batch service thread has been interrupted", e);
422             }
423         }
424     }
425     
426     /**
427      * Wait for the termination of the service
428      * @param service
429      * @throws BatchException
430      */

431     protected void awaitServiceTermination(ExecutorService JavaDoc service) throws BatchException {
432         while (!service.isTerminated()) {
433             try {
434                 service.awaitTermination(1, TimeUnit.SECONDS);
435             } catch (InterruptedException JavaDoc e) {
436                 throw new BatchException("the service thread has been interrupted", e);
437             }
438         }
439     }
440     
441     /**
442      * Set the new state of the batch and process to the notification of the new state if jmx is enable
443      * @param newState
444      */

445     protected void setState(STATE newState) {
446         this.state = newState;
447         if ((mbean != null) && (config.isJmxEnable()) && (config.isJmxEnableNotifications())) {
448             mbean.notifyBatchState(newState);
449         }
450     }
451     
452     /**
453      * Determine if a task can be executed or not
454      * @param task
455      * @return
456      * @throws BatchException
457      */

458     public boolean canBeExecuted(ITask task) throws BatchException {
459         boolean execute = true;
460         if ((history != null) && (config.isTaskFilterEnable())) {
461             if (history.isCompletedTask(task.getId())) {
462                 if (!config.isExecuteCompletedTasks()) {
463                     execute = false;
464                 }
465             } else if (history.isFailedTask(task.getId())) {
466                 if (!config.isExecuteFailedTasks()) {
467                     execute = false;
468                 }
469             } else {
470                 if (!config.isExecuteNewTasks()) {
471                     execute = false;
472                 }
473             }
474         }
475         return execute;
476     }
477     
478     /**
479      * Report an execution termination
480      * @param task
481      * @param throwable
482      */

483     protected void afterExecute(ITask task, Throwable JavaDoc throwable) {
484         if (throwable == null) {
485             executionReport.reportCompletion(task);
486             if (history != null) {
487                 try {
488                     history.storeCompletedTaskId(task.getId());
489                 } catch (BatchException e) {
490                     log.error("an error occured during the storage of a task id", e);
491                 }
492             }
493         } else {
494             executionReport.reportFailure(task, throwable);
495             if (history != null) {
496                 try {
497                     history.storeFailedTaskId(task.getId());
498                 } catch (BatchException e) {
499                     log.error("an error occured during the storage of a task id", e);
500                 }
501             }
502         }
503         batch.endOfExecution(task, throwable);
504     }
505     
506     /**
507      * Manage a fatal error during the batch execution that cause a crash
508      * @param throwable
509      */

510     public void manageFatalError(Throwable JavaDoc throwable) {
511         if (batch != null) {
512             batch.manageFatalError(executionReport, throwable);
513         }
514     }
515     
516     // ######################################################
517
// #################### JMX methods #####################
518
// ######################################################
519

520     /**
521      * Suspends the execution of the batch/executor/schedule services.
522      * Called through JMX.
523      * @throws Exception
524      */

525     public void pause() throws Exception JavaDoc {
526         stateLock.lock();
527         try {
528             if (state != STATE.RUNNING) {
529                 throw new Exception JavaDoc("unable to process the 'pause' operation, the batch must be into the 'running' state");
530             }
531             if (state == STATE.RUNNING) {
532                 // we sleep the schedule service
533
synchronized (batch) {
534                     pauseScheduleService = true;
535                 }
536                 // we pause the executor service
537
executorService.pause();
538                 setState(STATE.SLEEPING);
539             }
540         } finally {
541             stateLock.unlock();
542         }
543     }
544     
545     /**
546      * Resume the execution of the batch/executor/schedule services.
547      * Called through JMX.
548      * @throws Exception
549      */

550     public void resume() throws Exception JavaDoc {
551         stateLock.lock();
552         try {
553             if ((state != STATE.SLEEPING) && (state != STATE.WAITING_FOR_LAUNCH) && (state != STATE.WAITING_FOR_SHUTDOWN)) {
554                 throw new Exception JavaDoc("unable to process the 'resume' operation, the batch must be into the state 'waiting for launch' or 'sleeping' or 'waiting for shutdown'");
555             }
556             if (state == STATE.SLEEPING) {
557                 // we wake up the schedule service
558
synchronized (batch) {
559                     pauseScheduleService = false;
560                     batch.notifyAll();
561                 }
562                 // we wake up the executor service
563
executorService.resume();
564                 setState(STATE.RUNNING);
565             } else if ((state == STATE.WAITING_FOR_LAUNCH) || (state == STATE.WAITING_FOR_SHUTDOWN)) {
566                 // we wake up the batch and schedule services
567
synchronized (batch) {
568                     pauseBatchService = false;
569                     pauseScheduleService = false;
570                     batch.notifyAll();
571                 }
572             }
573         } finally {
574             stateLock.unlock();
575         }
576     }
577     
578     /**
579      * Stop the execution of the batch.
580      * Called through JMX.
581      * @param waitForSubmittedTasks if true, then waits for the execution of the submitted tasks before to terminate.
582      * @throws Exception
583      */

584     public void shutdown(boolean waitForSubmittedTasks) throws Exception JavaDoc {
585         stateLock.lock();
586         try {
587             if (state == STATE.WAITING_FOR_LAUNCH) {
588                 skipLaunch = false;
589             } else if (state == STATE.SLEEPING) {
590                 // we wake up the schedule service
591
synchronized (batch) {
592                     pauseScheduleService = false;
593                     batch.notifyAll();
594                 }
595             }
596             
597             if ((state == STATE.WAITING_FOR_LAUNCH) || (state == STATE.WAITING_FOR_SHUTDOWN)) {
598                 // we wake up the batch service
599
synchronized (batch) {
600                     pauseBatchService = false;
601                     batch.notifyAll();
602                 }
603             }
604             
605             if ((state == STATE.RUNNING) || (state == STATE.SLEEPING)) {
606                 // we shutdown schedule and executor services
607
if (scheduledExecutorService != null) {
608                     scheduledExecutorService.shutdown();
609                 }
610                 if (waitForSubmittedTasks) {
611                     executorService.shutdown();
612                 } else {
613                     executorService.shutdownNow();
614                     executorService.getQueue().clear();
615                 }
616                 if (scheduledExecutorService != null) {
617                     awaitServiceTermination(scheduledExecutorService);
618                 }
619                 awaitServiceTermination(executorService);
620             }
621         } finally {
622             stateLock.unlock();
623         }
624     }
625     
626     /**
627      * Restart the execution of the batch.
628      * Called through JMX.
629      * @throws BatchException
630      */

631     public void restart() throws Exception JavaDoc {
632         stateLock.lock();
633         try {
634             if (state != STATE.WAITING_FOR_SHUTDOWN) {
635                 throw new Exception JavaDoc("unable to process the 'restart' operation, the batch must be into the 'waiting for shutdown' state");
636             }
637             if (state == STATE.WAITING_FOR_SHUTDOWN) {
638                 restart = true;
639                 // we wake up the batch service
640
synchronized (batch) {
641                     pauseBatchService = false;
642                     batch.notifyAll();
643                 }
644             }
645         } finally {
646             stateLock.unlock();
647         }
648     }
649     
650     // ######################################################
651
// ################ GETTERS and SETTERS #################
652
// ######################################################
653

654     /**
655      * @return the executor used to execute tasks
656      */

657     public ITaskExecutor getThreadPoolExecutor() {
658         return executorService;
659     }
660
661     /**
662      * @return the current state of the batch service
663      */

664     public STATE getState() {
665         return state;
666     }
667
668     /**
669      * @param batch The batch to set.
670      */

671     public void setBatch(IBatch batch) {
672         this.batch = batch;
673     }
674
675     /**
676      * @return Returns the executionReport.
677      */

678     public ExecutionReport getExecutionReport() {
679         return executionReport;
680     }
681 }
682
Popular Tags