1 package org.apache.slide.projector.engine; 2 3 import java.io.IOException ; 4 import java.util.ArrayList ; 5 import java.util.Collection ; 6 import java.util.HashMap ; 7 import java.util.Iterator ; 8 import java.util.List ; 9 import java.util.Map ; 10 import java.util.Timer ; 11 import java.util.logging.Level ; 12 import java.util.logging.Logger ; 13 14 import org.apache.slide.projector.Context; 15 import org.apache.slide.projector.Processor; 16 import org.apache.slide.projector.Projector; 17 import org.apache.slide.projector.SystemContext; 18 import org.apache.slide.projector.URI; 19 import org.apache.slide.projector.application.Application; 20 import org.apache.slide.projector.application.ApplicationListener; 21 import org.apache.slide.projector.processor.process.Process; 22 import org.apache.slide.projector.processor.process.Step; 23 import org.apache.slide.projector.value.DocumentValue; 24 import org.apache.slide.projector.value.StreamableValue; 25 import org.apache.slide.projector.value.StringValue; 26 import org.apache.slide.projector.value.URIValue; 27 import org.apache.webdav.lib.Subscriber; 28 import org.jdom.Document; 29 import org.jdom.Element; 30 import org.jdom.xpath.XPath; 31 32 import de.zeigermann.xml.XMLStringWriter; 33 import de.zeigermann.xml.XMLWriter; 34 35 public class Scheduler implements ApplicationListener { 36 private final static Logger logger = Logger.getLogger(Scheduler.class.getName()); 37 38 public final static String JOBS = "jobs.xml"; 39 public final static String JOB_DEFINITIONS = "jobDefinitions.xml"; 40 41 private final static String STARTUP = "startup"; 42 private final static String SESSION = "session"; 43 private final static String REQUEST = "request"; 44 private final static String STARTUP_IDENTIFIER = "startup:"; 45 46 private final Context context = new SystemContext(); 47 48 private static Scheduler scheduler = new Scheduler(); 49 private static ThreadLocal threadContext = new ThreadLocal (); 50 private static Timer timer = new Timer (); 51 52 private List jobs = new ArrayList (); 53 private List sessionJobs = new ArrayList (); 54 private List requestJobs = new ArrayList (); 55 private List restoredJobs = new ArrayList (); 56 private List installedJobNames = new ArrayList (); 57 58 private Map installedJobs = new HashMap (); 59 60 private Scheduler() { 61 } 62 63 public static Scheduler getInstance() { 64 return scheduler; 65 } 66 67 public void install(String type, URI applicationUri, URI configurationUri) { 68 if ( type == Application.JOBS ) { 69 install(configurationUri, false); 70 } 71 } 72 73 public void uninstall(String type, URI applicationUri, URI configurationUri) { 74 if ( type == Application.JOBS ) { 75 uninstall(configurationUri); 76 } 77 } 78 79 public void update(String type, URI applicationUri, URI configurationUri) { 80 if ( type == Application.JOBS ) { 81 update(configurationUri); 82 } 83 } 84 85 public void install(URI jobsUri, boolean restoreRunningJobs) { 86 logger.log(Level.FINE, "Installing scheduled jobs of application '"+jobsUri+"'"); 87 try { 88 List applicationJobs = new ArrayList (); 89 StreamableValue jobsResource = (StreamableValue)Projector.getRepository().getResource(jobsUri, Projector.getCredentials()); 90 if ( jobsResource != null ) { 92 DocumentValue documentResource = new DocumentValue(jobsResource); 93 Document document = documentResource.getDocument(); 94 Element rootElement = document.getRootElement(); 95 List jobElements = XPath.newInstance("/jobs/job").selectNodes(rootElement); 96 List startupJobs = new ArrayList (); 97 for ( Iterator i = jobElements.iterator(); i.hasNext(); ) { 98 Element jobElement = (Element)i.next(); 99 Step job = new Step(); 100 job.configure(jobElement); 101 String trigger = jobElement.getAttributeValue("trigger"); 102 if ( trigger == null || trigger.equals(STARTUP) ) { 103 if ( !installedJobNames.contains(STARTUP_IDENTIFIER+job.getName()) ) { 104 startupJobs.add(job); 105 installedJobNames.add(job.getName()); 106 } 107 } else if ( trigger.equals(REQUEST) ) { 108 requestJobs.add(job); 109 } else if ( trigger.equals(SESSION) ) { 110 sessionJobs.add(job); 111 } 112 } 113 for ( Iterator i = startupJobs.iterator(); i.hasNext(); ) { 115 Step job = (Step)i.next(); 116 if ( job.getName().startsWith(STARTUP_IDENTIFIER) ) { 117 context.setStep(job.getName()); 118 } else { 119 context.setStep(STARTUP_IDENTIFIER+job.getName()); 120 } 121 launchJob(job, context); 122 if ( !restoreRunningJobs ) { 123 Projector.getRepository().subscribe("Update", jobsUri, 0, 124 new Subscriber() { 125 public void notify(String uri, Map information) { 126 update(new URIValue(uri)); 127 } 128 }, Projector.getCredentials()); 129 } 130 } 131 } else { 132 logger.log(Level.FINE, "Configured jobs resource '"+jobsUri+"' not found!"); 133 } 134 installedJobs.put(jobsUri, applicationJobs); 136 } catch (Exception exception) { 137 logger.log(Level.SEVERE, "Error while parsing messages", exception); 138 } 139 } 140 141 public void uninstall(URI jobsUri) { 142 logger.log(Level.FINE, "Uninstalling jobs '"+jobsUri+"'"); 143 Collection jobKeys = (Collection )installedJobs.get(jobs); 144 for ( Iterator i = jobKeys.iterator(); i.hasNext(); ) { 145 String jobKey = (String )i.next(); 146 jobs.remove(jobKey); 147 logger.log(Level.FINE, "Removing job '"+jobKey+"'"); 148 } 149 installedJobs.remove(jobsUri); 150 } 151 152 public void update(URI jobsUri) { 153 uninstall(jobsUri); 154 install(jobsUri, true); 155 } 156 157 public void launchSessionJobs(Context context) throws Exception { 158 for ( Iterator i = sessionJobs.iterator(); i.hasNext(); ) { 159 Step job = (Step)i.next(); 160 launchJob(job, context); 161 } 162 } 163 164 public void launchRequestJobs(Context context) throws Exception { 165 for ( Iterator i = requestJobs.iterator(); i.hasNext(); ) { 166 Step job = (Step)i.next(); 167 launchJob(job, context); 168 } 169 } 170 171 private void launchJob(Step job, Context context) throws Exception { 172 logger.log(Level.FINE, "Launching job: '"+job.getName()+"'"); 173 Processor processor = ProcessorManager.getInstance().getProcessor(job.getProcessorURI()); 174 Map processorParameters = Process.loadParameters(job, processor, context); 175 ProcessorManager.process(processor, processorParameters, context); 176 } 177 178 181 public static Timer getTimer() { 182 return timer; 183 } 184 185 public void notify(final Job job) { 186 if ( job.getRemainingCondition().evaluate() ) { 187 job.launch(); 188 jobs.remove(job); 189 if ( job.repeat() ) { 190 job.restart(); 191 registerJob(job); 192 } 193 } 194 if ( job.isPersistent() ) { 195 saveJobs(); 196 } 197 } 198 199 public void registerJob(Job job) { 200 synchronized ( jobs ) { 201 jobs.add(job); 202 job.activate(); 203 } 204 } 205 206 public static void setContext(Context context) { 207 threadContext.set(context); 208 } 209 210 public static Context getContext() { 211 if ( threadContext.get() == null ) { 212 threadContext.set(new SystemContext()); 213 } 214 return (Context)threadContext.get(); 215 } 216 217 public void saveJobs() { 218 XMLStringWriter writer = XMLStringWriter.create(); 219 writer.writeXMLDeclaration(); 220 writer.writeStartTag(XMLWriter.createStartTag("jobs")); 221 synchronized ( jobs ) { 222 for ( Iterator i = jobs.iterator(); i.hasNext(); ) { 223 Job job = (Job)i.next(); 224 if (job.isPersistent()) { 225 job.save(writer); 226 } 227 } 228 } 229 writer.writeEndTag(XMLWriter.createEndTag("jobs")); 230 try { 231 Projector.getRepository().setResource(new URIValue(Projector.getWorkDir() + JOBS), new StringValue(writer.toString()), Projector.getCredentials()); 232 } catch (IOException exception) { 233 logger.log(Level.SEVERE, "Exception occured while writing jobs to repository", exception); 234 } 235 } 236 } | Popular Tags |