1 17 package org.apache.servicemix.beanflow; 18 19 import org.apache.servicemix.beanflow.support.CallablesFactory; 20 import org.apache.servicemix.beanflow.support.FindCallableMethods; 21 22 import java.util.ArrayList ; 23 import java.util.List ; 24 import java.util.Timer ; 25 import java.util.concurrent.Callable ; 26 import java.util.concurrent.CountDownLatch ; 27 import java.util.concurrent.Executor ; 28 import java.util.concurrent.Future ; 29 import java.util.concurrent.TimeUnit ; 30 import java.util.concurrent.atomic.AtomicBoolean ; 31 32 37 public class ParallelActivity<T> extends ProxyActivity { 38 private JoinSupport joinActivity; 39 private List <CallableActivity<T>> activities; 40 private AtomicBoolean started = new AtomicBoolean (); 41 private Object lock = new Object (); 42 private CountDownLatch countDownLatch; 43 44 49 public static ParallelActivity newParallelMethodActivity(Executor executor, Object bean) { 50 return newParallelMethodActivity(new JoinAll(), executor, bean); 51 } 52 53 57 @SuppressWarnings ("unchecked") 58 public static ParallelActivity newParallelMethodActivity(JoinSupport join, Executor executor, Object bean) { 59 return new ParallelActivity(join, executor, new FindCallableMethods(bean)); 60 } 61 62 public ParallelActivity(JoinSupport activity, Executor executor, CallablesFactory<T> callablesFactory) { 63 this(activity, executor, callablesFactory.createCallables()); 64 } 65 66 public ParallelActivity(JoinSupport activity, Executor executor, List <Callable <T>> callables) { 67 super(activity); 68 this.joinActivity = activity; 69 this.activities = new ArrayList <CallableActivity<T>>(); 70 for (Callable <T> callable : callables) { 71 activities.add(new CallableActivity<T>(executor, callable)); 72 } 73 } 74 75 public ParallelActivity(JoinSupport activity, List <CallableActivity<T>> activities) { 76 super(activity); 77 this.joinActivity = activity; 78 this.activities = activities; 79 } 80 81 public List <Future <T>> getFutures() { 82 List <Future <T>> answer = new ArrayList <Future <T>>(); 83 synchronized (lock) { 84 for (CallableActivity<T> activity : activities) { 85 answer.add(activity.getFuture()); 86 } 87 } 88 return answer; 89 } 90 91 @Override 92 public void start() { 93 super.start(); 94 init(); 95 } 96 97 @Override 98 public void startWithTimeout(Timer timer, long timeout) { 99 init(); 100 super.startWithTimeout(timer, timeout); 101 } 102 103 public void sync() { 104 try { 105 CountDownLatch latch = getCountDownLatch(); 106 latch.countDown(); 107 latch.await(); 108 } 109 catch (InterruptedException e) { 110 Thread.currentThread().interrupt(); 111 } 112 finally { 113 resetCountDownLatch(); 114 } 115 } 116 117 public boolean sync(long millis) { 118 try { 119 CountDownLatch latch = getCountDownLatch(); 120 latch.countDown(); 121 return latch.await(millis, TimeUnit.MILLISECONDS); 122 } 123 catch (InterruptedException e) { 124 Thread.currentThread().interrupt(); 125 return false; 126 } 127 finally { 128 resetCountDownLatch(); 129 } 130 } 131 132 133 protected void resetCountDownLatch() { 136 synchronized (lock) { 137 if (countDownLatch != null && countDownLatch.getCount() == 0) { 138 countDownLatch = null; 139 } 140 } 141 142 } 143 144 protected CountDownLatch getCountDownLatch() { 145 synchronized (lock) { 146 int count = activities.size(); 147 if (countDownLatch == null) { 148 countDownLatch = new CountDownLatch (count); 149 } 150 return countDownLatch; 151 } 152 } 153 154 private void init() { 155 if (started.compareAndSet(false, true)) { 156 doStart(); 157 } 158 } 159 160 protected void doStart() { 161 for (CallableActivity<T> activity : activities) { 162 joinActivity.fork(activity); 163 } 164 } 165 166 } 167 | Popular Tags |