1 17 package org.apache.servicemix.beanflow; 18 19 import org.apache.commons.logging.Log; 20 import org.apache.commons.logging.LogFactory; 21 import org.apache.servicemix.beanflow.support.EnumHelper; 22 import org.apache.servicemix.beanflow.support.Interpreter; 23 import org.apache.servicemix.beanflow.support.ReflectionInterpreter; 24 25 import java.util.Timer ; 26 import java.util.concurrent.BlockingQueue ; 27 import java.util.concurrent.Executor ; 28 import java.util.concurrent.Executors ; 29 import java.util.concurrent.LinkedBlockingQueue ; 30 import java.util.concurrent.atomic.AtomicBoolean ; 31 32 38 public class Workflow<T> extends JoinSupport { 39 private static final Log log = LogFactory.getLog(Workflow.class); 40 41 private Executor executor; 42 private Interpreter interpreter; 43 private Timer timer = new Timer (); 44 private AtomicBoolean suspended = new AtomicBoolean (); 45 private BlockingQueue <T> queue = new LinkedBlockingQueue <T>(); 46 47 52 53 @SuppressWarnings ("unchecked") 54 public Workflow(Class <T> enumType) { 55 this((T) getFirstStep(enumType)); 56 } 57 58 public Workflow(T firstStep) { 59 this(Executors.newSingleThreadExecutor(), firstStep); 60 } 61 62 public Workflow(Executor executor, T firstStep) { 63 this(executor, new ReflectionInterpreter(), firstStep); 64 } 65 66 public Workflow(Executor executor, Interpreter interpreter, T firstStep) { 67 this.executor = executor; 68 this.interpreter = interpreter; 69 70 if (firstStep instanceof Enum ) { 71 validateStepsExist(firstStep.getClass()); 72 } 73 addStep(firstStep); 74 } 75 76 79 public T getNextStep() { 80 return queue.peek(); 81 } 82 83 86 public void addStep(T stepName) { 87 suspended.set(false); 88 queue.add(stepName); 89 executor.execute(this); 90 } 91 92 public void run() { 93 while (!isStopped()) { 94 try { 95 T stepToExecute = queue.poll(); 96 if (stepToExecute != null) { 97 if (log.isDebugEnabled()) { 98 log.debug("About to execute step: " + stepToExecute); 99 } 100 interpreter.executeStep(stepToExecute, this); 101 } 102 else { 103 break; 104 } 105 } 106 catch (RuntimeException e) { 107 log.warn("Caught: " + e, e); 108 } 109 } 110 } 111 112 115 public void fork(TimeoutActivity... activities) { 116 for (TimeoutActivity activity : activities) { 117 activity.start(); 118 } 119 } 120 121 124 public void fork(long timeout, TimeoutActivity... activities) { 125 for (TimeoutActivity activity : activities) { 126 activity.scheduleTimeout(timer, timeout); 127 activity.start(); 128 } 129 } 130 131 135 public void joinAll(T joinedStep, long timeout, Activity... activities) { 136 JoinAll joinFlow = new JoinAll(activities); 137 join(joinFlow, joinedStep, timeout); 138 } 139 140 145 public void join(JoinSupport joinFlow, T joinedStep, long timeout) { 146 joinFlow.onStop(createGoToStepTask(joinedStep)); 148 149 fork(timeout, joinFlow); 151 } 152 153 157 public void suspend() { 158 suspended.set(true); 159 } 160 161 165 public boolean isSuspended() { 166 return suspended.get(); 167 } 168 169 172 public boolean isNextStepAvailable() { 173 return !queue.isEmpty(); 174 } 175 176 179 public Runnable createGoToStepTask(final T joinedStep) { 180 return new Runnable () { 181 public void run() { 182 addStep(joinedStep); 183 } 184 }; 185 } 186 187 190 public void onStepException(String stepName, Exception e) { 191 log.warn("Step failed: " + stepName + ". Reason: " + e, e); 192 suspend(); 193 fail("Failed to execute step: " + stepName + ". Reason: " + e, e); 194 } 195 196 @Override 197 protected void onChildStateChange(int childCount, int stoppedCount, int failedCount) { 198 } 199 200 205 protected void validateStepsExist(Class enumType) { 206 Object [] values = null; 207 try { 208 values = EnumHelper.getEnumValues(enumType); 209 } 210 catch (Exception e) { 211 fail("Cannot get the values of the enumeration: " + enumType.getName(), e); 212 } 213 if (values != null) { 214 interpreter.validateStepsExist(values, this); 215 } 216 } 217 218 protected static Object getFirstStep(Class enumType) { 219 try { 220 Object [] values = EnumHelper.getEnumValues(enumType); 221 return values[0]; 222 } 223 catch (Exception e) { 224 throw new IllegalArgumentException ("Could not find the values for the enumeration: " + enumType.getName() + ". Reason: " + e, e); 225 } 226 } 227 228 } 229 | Popular Tags |