1 16 package scriptella.execution; 17 18 import scriptella.configuration.ConfigurationEl; 19 import scriptella.configuration.ConfigurationFactory; 20 import scriptella.core.Session; 21 import scriptella.core.ThreadSafe; 22 import scriptella.interactive.ProgressCallback; 23 import scriptella.interactive.ProgressIndicator; 24 import scriptella.util.CollectionUtils; 25 import scriptella.util.IOUtils; 26 27 import java.io.File ; 28 import java.net.MalformedURLException ; 29 import java.net.URL ; 30 import java.util.Map ; 31 import java.util.logging.Level ; 32 import java.util.logging.Logger ; 33 34 35 64 public class EtlExecutor { 65 private static final Logger LOG = Logger.getLogger(EtlExecutor.class.getName()); 66 private ConfigurationEl configuration; 67 private boolean jmxEnabled; 68 69 72 public EtlExecutor() { 73 } 74 75 80 public EtlExecutor(ConfigurationEl configuration) { 81 this.configuration = configuration; 82 } 83 84 89 public ConfigurationEl getConfiguration() { 90 return configuration; 91 } 92 93 98 public void setConfiguration(final ConfigurationEl configuration) { 99 this.configuration = configuration; 100 } 101 102 103 111 public boolean isJmxEnabled() { 112 return jmxEnabled; 113 } 114 115 124 public void setJmxEnabled(boolean jmxEnabled) { 125 this.jmxEnabled = jmxEnabled; 126 } 127 128 131 @ThreadSafe 132 public ExecutionStatistics execute() throws EtlExecutorException { 133 return execute((ProgressIndicator) null); 134 } 135 136 141 @ThreadSafe 142 public ExecutionStatistics execute(final ProgressIndicator indicator) 143 throws EtlExecutorException { 144 EtlContext ctx = null; 145 JmxEtlManager etlManager = null; 146 147 try { 148 ctx = prepare(indicator); 149 if (jmxEnabled) { 150 etlManager = new JmxEtlManager(ctx); 151 etlManager.register(); 152 } 153 execute(ctx); 154 ctx.getProgressCallback().step(5, "Commiting transactions"); 155 commitAll(ctx); 156 } catch (Throwable e) { 157 if (ctx != null) { 158 rollbackAll(ctx); 159 } 160 throw new EtlExecutorException(e); 161 } finally { 162 if (ctx != null) { 163 closeAll(ctx); 164 ctx.getStatisticsBuilder().etlComplete(); 165 ctx.getProgressCallback().complete(); 166 } 167 if (etlManager != null) { 168 etlManager.unregister(); 169 } 170 } 171 172 return ctx.getStatisticsBuilder().getStatistics(); 173 } 174 175 void rollbackAll(final EtlContext ctx) { 176 try { 177 ctx.session.rollback(); 178 } catch (Exception e) { 179 LOG.log(Level.SEVERE, "Unable to rollback script", e); 180 } 181 } 182 183 void commitAll(final EtlContext ctx) { 184 ctx.session.commit(); 185 } 186 187 void closeAll(final EtlContext ctx) { 188 ctx.session.close(); 189 } 190 191 private void execute(final EtlContext ctx) { 192 final ProgressCallback oldProgress = ctx.getProgressCallback(); 193 194 final ProgressCallback p = oldProgress.fork(85, 100); 195 final ProgressCallback p2 = p.fork(100); 196 ctx.setProgressCallback(p2); 197 ctx.session.execute(ctx); 198 p.complete(); 199 ctx.setProgressCallback(oldProgress); 200 } 201 202 208 protected EtlContext prepare(final ProgressIndicator indicator) { 209 EtlContext ctx = new EtlContext(); 210 ctx.getStatisticsBuilder().etlStarted(); 211 ctx.setBaseURL(configuration.getDocumentUrl()); 212 ctx.setProgressCallback(new ProgressCallback(100, indicator)); 213 214 final ProgressCallback progress = ctx.getProgressCallback(); 215 progress.step(1, "Initializing properties"); 216 ctx.setProperties(configuration.getProperties()); 217 ctx.setProgressCallback(progress.fork(9, 100)); 218 ctx.session = new Session(configuration, ctx); 219 ctx.getProgressCallback().complete(); 220 ctx.setProgressCallback(progress); 222 return ctx; 223 } 224 225 232 public static EtlExecutor newExecutor(final File scriptFile) { 233 try { 234 return newExecutor(IOUtils.toUrl(scriptFile)); 235 } catch (MalformedURLException e) { 236 throw new IllegalArgumentException (e.getMessage(), e); 237 } 238 } 239 240 248 @ThreadSafe 249 public static EtlExecutor newExecutor(final URL scriptFileUrl) { 250 return newExecutor(scriptFileUrl, CollectionUtils.asMap(System.getProperties())); 251 } 252 253 261 @ThreadSafe 262 public static EtlExecutor newExecutor(final URL scriptFileUrl, final Map <String , ?> externalProperties) { 263 ConfigurationFactory cf = new ConfigurationFactory(); 264 cf.setResourceURL(scriptFileUrl); 265 if (externalProperties != null) { 266 cf.setExternalProperties(externalProperties); 267 } 268 return new EtlExecutor(cf.createConfiguration()); 269 } 270 271 } 272 | Popular Tags |