KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > beanflow > ParallelActivity


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.beanflow;
18
19 import org.apache.servicemix.beanflow.support.CallablesFactory;
20 import org.apache.servicemix.beanflow.support.FindCallableMethods;
21
22 import java.util.ArrayList JavaDoc;
23 import java.util.List JavaDoc;
24 import java.util.Timer JavaDoc;
25 import java.util.concurrent.Callable JavaDoc;
26 import java.util.concurrent.CountDownLatch JavaDoc;
27 import java.util.concurrent.Executor JavaDoc;
28 import java.util.concurrent.Future JavaDoc;
29 import java.util.concurrent.TimeUnit JavaDoc;
30 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
31
32 /**
33  * An activity which invokes a collection of {@link Callable<T>} methods.
34  *
35  * @version $Revision: $
36  */

37 public class ParallelActivity<T> extends ProxyActivity {
38     private JoinSupport joinActivity;
39     private List JavaDoc<CallableActivity<T>> activities;
40     private AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc();
41     private Object JavaDoc lock = new Object JavaDoc();
42     private CountDownLatch JavaDoc countDownLatch;
43
44     /**
45      * A helper method to create a new {@link ParallelActivity} which invokes a
46      * number of methods on a POJO in parallel and then joins on them all
47      * completing
48      */

49     public static ParallelActivity newParallelMethodActivity(Executor JavaDoc executor, Object JavaDoc bean) {
50         return newParallelMethodActivity(new JoinAll(), executor, bean);
51     }
52
53     /**
54      * A helper method to create a new {@link ParallelActivity} which invokes a
55      * number of methods on a POJO in parallel and then performs a custom join
56      */

57     @SuppressWarnings JavaDoc("unchecked")
58     public static ParallelActivity newParallelMethodActivity(JoinSupport join, Executor JavaDoc executor, Object JavaDoc bean) {
59         return new ParallelActivity(join, executor, new FindCallableMethods(bean));
60     }
61
62     public ParallelActivity(JoinSupport activity, Executor JavaDoc executor, CallablesFactory<T> callablesFactory) {
63         this(activity, executor, callablesFactory.createCallables());
64     }
65
66     public ParallelActivity(JoinSupport activity, Executor JavaDoc executor, List JavaDoc<Callable JavaDoc<T>> callables) {
67         super(activity);
68         this.joinActivity = activity;
69         this.activities = new ArrayList JavaDoc<CallableActivity<T>>();
70         for (Callable JavaDoc<T> callable : callables) {
71             activities.add(new CallableActivity<T>(executor, callable));
72         }
73     }
74
75     public ParallelActivity(JoinSupport activity, List JavaDoc<CallableActivity<T>> activities) {
76         super(activity);
77         this.joinActivity = activity;
78         this.activities = activities;
79     }
80
81     public List JavaDoc<Future JavaDoc<T>> getFutures() {
82         List JavaDoc<Future JavaDoc<T>> answer = new ArrayList JavaDoc<Future JavaDoc<T>>();
83         synchronized (lock) {
84             for (CallableActivity<T> activity : activities) {
85                 answer.add(activity.getFuture());
86             }
87         }
88         return answer;
89     }
90
91     @Override JavaDoc
92     public void start() {
93         super.start();
94         init();
95     }
96
97     @Override JavaDoc
98     public void startWithTimeout(Timer JavaDoc timer, long timeout) {
99         init();
100         super.startWithTimeout(timer, timeout);
101     }
102
103     public void sync() {
104         try {
105             CountDownLatch JavaDoc latch = getCountDownLatch();
106             latch.countDown();
107             latch.await();
108         }
109         catch (InterruptedException JavaDoc e) {
110             Thread.currentThread().interrupt();
111         }
112         finally {
113             resetCountDownLatch();
114         }
115     }
116
117     public boolean sync(long millis) {
118         try {
119             CountDownLatch JavaDoc latch = getCountDownLatch();
120             latch.countDown();
121             return latch.await(millis, TimeUnit.MILLISECONDS);
122         }
123         catch (InterruptedException JavaDoc e) {
124             Thread.currentThread().interrupt();
125             return false;
126         }
127         finally {
128             resetCountDownLatch();
129         }
130     }
131
132
133     // Implementation methods
134
// -------------------------------------------------------------------------
135
protected void resetCountDownLatch() {
136         synchronized (lock) {
137             if (countDownLatch != null && countDownLatch.getCount() == 0) {
138                 countDownLatch = null;
139             }
140         }
141         
142     }
143
144     protected CountDownLatch JavaDoc getCountDownLatch() {
145         synchronized (lock) {
146             int count = activities.size();
147             if (countDownLatch == null) {
148                 countDownLatch = new CountDownLatch JavaDoc(count);
149             }
150             return countDownLatch;
151         }
152     }
153
154     private void init() {
155         if (started.compareAndSet(false, true)) {
156             doStart();
157         }
158     }
159
160     protected void doStart() {
161         for (CallableActivity<T> activity : activities) {
162             joinActivity.fork(activity);
163         }
164     }
165
166 }
167
Popular Tags