KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > routing > response > AbstractResponseAggregator


1 /*
2  * $Id: AbstractResponseAggregator.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.routing.response;
12
13 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
14 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
15 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
16
17 import org.mule.config.i18n.Message;
18 import org.mule.config.i18n.Messages;
19 import org.mule.routing.inbound.EventGroup;
20 import org.mule.umo.UMOEvent;
21 import org.mule.umo.UMOMessage;
22 import org.mule.umo.routing.ResponseTimeoutException;
23 import org.mule.umo.routing.RoutingException;
24 import org.mule.util.MapUtils;
25 import org.mule.util.concurrent.Latch;
26
27 /**
28  * <code>AbstractResponseAggregator</code> provides a base class for implementing
29  * response aggregator routers. This provides a thread-safe implemenetation and
30  * allows developers to customise how and when events are grouped and collated.
31  * Response Agrregators are used to collect responses that are usually sent to
32  * replyTo endpoints set on outbound routers. When an event is sent out via an
33  * outbound router, the response router will block the response flow on an
34  * UMOComponent until the Response Router resolves a reply or times out.
35  */

36 public abstract class AbstractResponseAggregator extends AbstractResponseRouter
37 {
38     /**
39      * The collection of messages that are ready to be returned to the callee. Keyed
40      * by Message ID
41      */

42     protected ConcurrentMap responseEvents = new ConcurrentHashMap();
43
44     /**
45      * A map of locks used to synchronize operations on an eventGroup or response
46      * message for a given Message ID.
47      */

48     private ConcurrentMap locks = new ConcurrentHashMap();
49
50     /**
51      * Map of EventGroup objects. These represent one or mre messages to be
52      * agregated. These are keyed on Message ID. There will be one responseEvent for
53      * every EventGroup.
54      */

55     protected final ConcurrentMap eventGroups = new ConcurrentHashMap();
56
57     public void process(UMOEvent event) throws RoutingException
58     {
59         // add new event to an event group (it will create a new group if one does
60
// not exist for the event correlation ID)
61
EventGroup eg = addEvent(event);
62
63         // check to see if the event group is ready to be aggregated
64
if (shouldAggregate(eg))
65         {
66             // create the response message
67
UMOMessage returnMessage = aggregateEvents(eg);
68             Object JavaDoc id = eg.getGroupId();
69
70             // remove the eventGroup as no further message will be received for this
71
// group once we aggregate
72
removeEventGroup(id);
73
74             // add the new response message so that it can be collected by the
75
// response Thread
76
UMOMessage previousResult = (UMOMessage)responseEvents.putIfAbsent(id, returnMessage);
77             if (previousResult != null)
78             {
79                 // this would indicate that we need a better way to prevent continued
80
// aggregation for a group that is currently being processed. Can
81
// this actually happen?
82
throw new IllegalStateException JavaDoc("Detected duplicate aggregation result message with id: "
83                                                 + id);
84             }
85
86             // will get/create a latch for the response Message ID and release it,
87
// notifying other threads that the response message is available
88
Latch l = (Latch)locks.get(id);
89             if (l == null)
90             {
91                 if (logger.isDebugEnabled())
92                 {
93                     logger.debug("Creating latch for " + id + " in " + this);
94                 }
95
96                 l = new Latch();
97                 Latch previous = (Latch)locks.putIfAbsent(id, l);
98                 if (previous != null)
99                 {
100                     l = previous;
101                 }
102             }
103
104             l.countDown();
105         }
106     }
107
108     /**
109      * Adds the event to an event group. Groups are defined by the correlationId on
110      * the message. If no 'correlation Id' is returned from calling
111      * <code>getReplyAggregateIdentifier()</code> a routing exception will be
112      * thrown
113      *
114      * @param event the reply event received by the response router
115      * @return The event group for the current event or a new group if the current
116      * event doesn't belong to an existing group
117      */

118     protected EventGroup addEvent(UMOEvent event) throws RoutingException
119     {
120         Object JavaDoc cId = getReplyAggregateIdentifier(event.getMessage());
121
122         if (cId == null || cId.equals("-1"))
123         {
124             throw new RoutingException(new Message(Messages.NO_CORRELATION_ID), event.getMessage(),
125                 event.getEndpoint());
126         }
127
128         if (logger.isDebugEnabled())
129         {
130             logger.debug("Adding event to response aggregator group: " + cId);
131         }
132
133         EventGroup eg = (EventGroup)eventGroups.get(cId);
134         if (eg == null)
135         {
136             eg = createEventGroup(cId, event);
137             EventGroup previous = (EventGroup)eventGroups.putIfAbsent(eg.getGroupId(), eg);
138             if (previous != null)
139             {
140                 eg = previous;
141             }
142         }
143
144         eg.addEvent(event);
145
146         return eg;
147     }
148
149     /**
150      * Creates a new event group with the given Id and can use other properties on
151      * the event Custom implementations can even overload the eventGroup object here
152      *
153      * @param id The Event group Id for the new Group
154      * @param event the current event
155      * @return a New event group for the incoming event
156      */

157     protected EventGroup createEventGroup(Object JavaDoc id, UMOEvent event)
158     {
159         if (logger.isDebugEnabled())
160         {
161             logger.debug("Creating new event group: " + id + " in " + this);
162         }
163         return new EventGroup(id);
164     }
165
166     protected void removeEventGroup(Object JavaDoc id)
167     {
168         eventGroups.remove(id);
169     }
170
171     /**
172      * This method is called by the responding callee thread and should return the
173      * aggregated response message
174      *
175      * @param message
176      * @return
177      * @throws RoutingException
178      */

179     public UMOMessage getResponse(UMOMessage message) throws RoutingException
180     {
181         Object JavaDoc responseId = getCallResponseAggregateIdentifier(message);
182
183         if (logger.isDebugEnabled())
184         {
185             logger.debug("Waiting for response for message id: " + responseId + " in " + this);
186         }
187
188         Latch l = (Latch)locks.get(responseId);
189         if (l == null)
190         {
191             if (logger.isDebugEnabled())
192             {
193                 logger.debug("Got response but no one is waiting for it yet. Creating latch for "
194                              + responseId + " in " + this);
195             }
196
197             l = new Latch();
198             Latch previous = (Latch)locks.putIfAbsent(responseId, l);
199             if (previous != null)
200             {
201                 l = previous;
202             }
203         }
204
205         if (logger.isDebugEnabled())
206         {
207             logger.debug("Got latch for message: " + responseId);
208         }
209
210         // the final result message
211
UMOMessage result;
212
213         // indicates whether waiting for the result timed out
214
boolean b = false;
215
216         try
217         {
218             if (logger.isDebugEnabled())
219             {
220                 logger.debug("Waiting for response to message: " + responseId);
221             }
222
223             // how long should we wait for the lock?
224
if (getTimeout() <= 0)
225             {
226                 l.await();
227                 b = true;
228             }
229             else
230             {
231                 b = l.await(this.getTimeout(), TimeUnit.MILLISECONDS);
232             }
233         }
234         catch (InterruptedException JavaDoc e)
235         {
236             logger.error(e.getMessage(), e);
237         }
238         finally
239         {
240             locks.remove(responseId);
241             result = (UMOMessage)responseEvents.remove(responseId);
242         }
243
244         if (!b)
245         {
246             if (logger.isTraceEnabled())
247             {
248                 logger.trace("Current responses are: \n" + MapUtils.toString(responseEvents, true));
249             }
250
251             throw new ResponseTimeoutException(new Message(Messages.RESPONSE_TIMED_OUT_X_WAITING_FOR_ID_X,
252                 String.valueOf(getTimeout()), responseId), message, null);
253         }
254
255         if (result == null)
256         {
257             // this should never happen, just using it as a safe guard for now
258
throw new IllegalStateException JavaDoc("Response Message is null");
259         }
260
261         if (logger.isDebugEnabled())
262         {
263             logger.debug("remaining locks : " + locks.keySet());
264             logger.debug("remaining results: " + responseEvents.keySet());
265         }
266
267         return result;
268     }
269
270     /**
271      * Determines if the event group is ready to be aggregated. if the group is ready
272      * to be aggregated (this is entirely up to the application. it could be
273      * determined by volume, last modified time or some oher criteria based on the
274      * last event received)
275      *
276      * @param events
277      * @return true if the event gorep is ready for aggregation
278      */

279     protected abstract boolean shouldAggregate(EventGroup events);
280
281     /**
282      * This method is invoked if the shouldAggregate method is called and returns
283      * true. Once this method returns an aggregated message the event group is
284      * removed from the router
285      *
286      * @param events the event group for this request
287      * @return an aggregated message
288      * @throws RoutingException if the aggregation fails. in this scenario the whole
289      * event group is removed and passed to the exception handler for
290      * this componenet
291      */

292     protected abstract UMOMessage aggregateEvents(EventGroup events) throws RoutingException;
293
294 }
295
Popular Tags