KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > eip > support > AbstractAggregator


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.eip.support;
18
19 import java.util.Date JavaDoc;
20
21 import javax.jbi.messaging.ExchangeStatus;
22 import javax.jbi.messaging.InOnly;
23 import javax.jbi.messaging.MessageExchange;
24 import javax.jbi.messaging.NormalizedMessage;
25 import javax.jbi.messaging.RobustInOnly;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.servicemix.eip.EIPEndpoint;
30 import org.apache.servicemix.jbi.util.MessageUtil;
31 import org.apache.servicemix.timers.Timer;
32 import org.apache.servicemix.timers.TimerListener;
33
34 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
35 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
36 import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
37
38 /**
39  * Aggregator can be used to wait and combine several messages.
40  * This component implements the
41  * <a HREF="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>
42  * pattern.
43  *
44  * TODO: keep list of closed aggregations for a certain time
45  * TODO: distributed lock manager
46  * TODO: persistent / transactional timer
47  *
48  * @author gnodet
49  * @version $Revision: 376451 $
50  */

51 public abstract class AbstractAggregator extends EIPEndpoint {
52
53     private static final Log log = LogFactory.getLog(AbstractAggregator.class);
54
55     private ExchangeTarget target;
56     
57     private boolean rescheduleTimeouts;
58     
59     private boolean synchronous;
60
61     private ConcurrentMap closedAggregates = new ConcurrentHashMap();
62     
63     /**
64      * @return the synchronous
65      */

66     public boolean isSynchronous() {
67         return synchronous;
68     }
69
70     /**
71      * @param synchronous the synchronous to set
72      */

73     public void setSynchronous(boolean synchronous) {
74         this.synchronous = synchronous;
75     }
76
77     /**
78      * @return the rescheduleTimeouts
79      */

80     public boolean isRescheduleTimeouts() {
81         return rescheduleTimeouts;
82     }
83
84     /**
85      * @param rescheduleTimeouts the rescheduleTimeouts to set
86      */

87     public void setRescheduleTimeouts(boolean rescheduleTimeouts) {
88         this.rescheduleTimeouts = rescheduleTimeouts;
89     }
90
91     /**
92      * @return the target
93      */

94     public ExchangeTarget getTarget() {
95         return target;
96     }
97
98     /**
99      * @param target the target to set
100      */

101     public void setTarget(ExchangeTarget target) {
102         this.target = target;
103     }
104     
105     /* (non-Javadoc)
106      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
107      */

108     protected void processSync(MessageExchange exchange) throws Exception JavaDoc {
109         throw new IllegalStateException JavaDoc();
110     }
111
112     /* (non-Javadoc)
113      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
114      */

115     protected void processAsync(MessageExchange exchange) throws Exception JavaDoc {
116         throw new IllegalStateException JavaDoc();
117     }
118     
119     /* (non-Javadoc)
120      * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
121      */

122     public void process(MessageExchange exchange) throws Exception JavaDoc {
123         // Skip DONE
124
if (exchange.getStatus() == ExchangeStatus.DONE) {
125             return;
126         // Skip ERROR
127
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
128             return;
129         // Handle an ACTIVE exchange as a PROVIDER
130
} else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
131             if (exchange instanceof InOnly == false &&
132                 exchange instanceof RobustInOnly == false) {
133                 fail(exchange, new UnsupportedOperationException JavaDoc("Use an InOnly or RobustInOnly MEP"));
134             } else {
135                 NormalizedMessage in = MessageUtil.copyIn(exchange);
136                 final String JavaDoc correlationId = getCorrelationID(exchange, in);
137                 if (correlationId == null || correlationId.length() == 0) {
138                     throw new IllegalArgumentException JavaDoc("Could not retrieve correlation id for incoming exchange");
139                 }
140                 // Load existing aggregation
141
Lock lock = getLockManager().getLock(correlationId);
142                 lock.lock();
143                 try {
144                     Object JavaDoc aggregation = store.load(correlationId);
145                     Date JavaDoc timeout = null;
146                     // Create a new aggregate
147
if (aggregation == null) {
148                         if (isAggregationClosed(correlationId)) {
149                             // TODO: should we return an error here ?
150
} else {
151                             aggregation = createAggregation(correlationId);
152                             timeout = getTimeout(aggregation);
153                         }
154                     } else if (isRescheduleTimeouts()) {
155                         timeout = getTimeout(aggregation);
156                     }
157                     // If the aggregation is not closed
158
if (aggregation != null) {
159                         if (addMessage(aggregation, in, exchange)) {
160                             sendAggregate(correlationId, aggregation, false);
161                         } else {
162                             store.store(correlationId, aggregation);
163                             if (timeout != null) {
164                                 if (log.isDebugEnabled()) {
165                                     log.debug("Scheduling timeout at " + timeout + " for aggregate " + correlationId);
166                                 }
167                                 getTimerManager().schedule(new TimerListener() {
168                                     public void timerExpired(Timer timer) {
169                                         AbstractAggregator.this.onTimeout(correlationId);
170                                     }
171                                 }, timeout);
172                             }
173                         }
174                     }
175                     done(exchange);
176                 } finally {
177                     lock.unlock();
178                 }
179             }
180         // Handle an ACTIVE exchange as a CONSUMER
181
} else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
182             done(exchange);
183         }
184     }
185     
186     protected void sendAggregate(String JavaDoc correlationId,
187                                  Object JavaDoc aggregation,
188                                  boolean timeout) throws Exception JavaDoc {
189         InOnly me = exchangeFactory.createInOnlyExchange();
190         target.configureTarget(me, getContext());
191         NormalizedMessage nm = me.createMessage();
192         me.setInMessage(nm);
193         buildAggregate(aggregation, nm, me, timeout);
194         closeAggregation(correlationId);
195         if (isSynchronous()) {
196             sendSync(me);
197         } else {
198             send(me);
199         }
200     }
201
202     protected void onTimeout(String JavaDoc correlationId) {
203         if (log.isDebugEnabled()) {
204             log.debug("Timeout expired for aggregate " + correlationId);
205         }
206         Lock lock = getLockManager().getLock(correlationId);
207         lock.lock();
208         try {
209             Object JavaDoc aggregation = store.load(correlationId);
210             if (aggregation != null) {
211                 sendAggregate(correlationId, aggregation, true);
212             } else if (!isAggregationClosed(correlationId)) {
213                 throw new IllegalStateException JavaDoc("Aggregation is not closed, but can not be retrieved from the store");
214             } else {
215                 if (log.isDebugEnabled()) {
216                     log.debug("Aggregate " + correlationId + " is closed");
217                 }
218             }
219         } catch (Exception JavaDoc e) {
220             log.info("Caught exception while processing timeout aggregation", e);
221         } finally {
222             lock.unlock();
223         }
224     }
225     
226     /**
227      * Check if the aggregation with the given correlation id is closed or not.
228      * Called when the aggregation has not been found in the store.
229      *
230      * @param correlationId
231      * @return
232      */

233     protected boolean isAggregationClosed(String JavaDoc correlationId) {
234         // TODO: implement this using a persistent / cached behavior
235
return closedAggregates.containsKey(correlationId);
236     }
237     
238     /**
239      * Mark an aggregation as closed
240      * @param correlationId
241      */

242     protected void closeAggregation(String JavaDoc correlationId) {
243         // TODO: implement this using a persistent / cached behavior
244
closedAggregates.put(correlationId, Boolean.TRUE);
245     }
246     
247     /**
248      * Retrieve the correlation ID of the given exchange
249      * @param exchange
250      * @param message
251      * @return the correlationID
252      * @throws Exception
253      */

254     protected abstract String JavaDoc getCorrelationID(MessageExchange exchange, NormalizedMessage message) throws Exception JavaDoc;
255     
256     /**
257      * Creates a new empty aggregation.
258      * @param correlationID
259      * @return a newly created aggregation
260      */

261     protected abstract Object JavaDoc createAggregation(String JavaDoc correlationID) throws Exception JavaDoc;
262
263     /**
264      * Returns the date when the onTimeout method should be called if the aggregation is not completed yet,
265      * or null if the aggregation has no timeout.
266      *
267      * @param aggregate
268      * @return
269      */

270     protected abstract Date JavaDoc getTimeout(Object JavaDoc aggregate);
271
272     /**
273      * Add a newly received message to this aggregation
274      *
275      * @param aggregate
276      * @param message
277      * @param exchange
278      * @return <code>true</code> if the aggregate id complete
279      */

280     protected abstract boolean addMessage(Object JavaDoc aggregate,
281                                           NormalizedMessage message,
282                                           MessageExchange exchange) throws Exception JavaDoc;
283     
284     /**
285      * Fill the given JBI message with the aggregation result.
286      *
287      * @param aggregate
288      * @param message
289      * @param exchange
290      * @param timeout <code>false</code> if the aggregation has completed or <code>true</code> if this aggregation has timed out
291      */

292     protected abstract void buildAggregate(Object JavaDoc aggregate,
293                                            NormalizedMessage message,
294                                            MessageExchange exchange,
295                                            boolean timeout) throws Exception JavaDoc;
296 }
297
Popular Tags