KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > routing > inbound > AbstractEventAggregator


1 /*
2  * $Id: AbstractEventAggregator.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.routing.inbound;
12
13 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
14 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
15 import org.mule.impl.MuleEvent;
16 import org.mule.impl.endpoint.MuleEndpoint;
17 import org.mule.routing.AggregationException;
18 import org.mule.umo.MessagingException;
19 import org.mule.umo.UMOEvent;
20 import org.mule.umo.UMOMessage;
21 import org.mule.umo.endpoint.UMOEndpoint;
22
23 import java.util.Map JavaDoc;
24
25 /**
26  * <code>AbstractEventAggregator</code> will aggregate a set of messages into a
27  * single message.
28  *
29  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
30  * @version $Revision: 3798 $
31  */

32
33 public abstract class AbstractEventAggregator extends SelectiveConsumer
34 {
35     protected static final String JavaDoc NO_CORRELATION_ID = "no-id";
36
37     protected Map eventGroups = new ConcurrentHashMap();
38     private final Object JavaDoc lock = new Object JavaDoc();
39
40     public UMOEvent[] process(UMOEvent event) throws MessagingException
41     {
42         AtomicBoolean doAggregate = new AtomicBoolean(false);
43         EventGroup eg = null;
44         // synchronized (lock)
45
// {
46
if (isMatch(event))
47         {
48             eg = addEvent(event);
49             doAggregate.compareAndSet(false, shouldAggregate(eg));
50         }
51         // }
52
if (doAggregate.get())
53         {
54             synchronized (lock)
55             {
56                 // TODO how should we handle EventGroup if still null?
57
UMOMessage returnMessage = aggregateEvents(eg);
58                 removeGroup(eg.getGroupId());
59                 UMOEndpoint endpoint = new MuleEndpoint(event.getEndpoint());
60                 endpoint.setTransformer(null);
61                 endpoint.setName(getClass().getName());
62                 UMOEvent returnEvent = new MuleEvent(returnMessage, endpoint, event.getComponent(), event);
63                 return new UMOEvent[]{returnEvent};
64             }
65         }
66         return null;
67     }
68
69     /**
70      * Adds the event to an event group. Groups are defined by the correlationId on
71      * the message. If no correlationId is set a default group is created for all
72      * events without a correlationId. If there is no group for the current
73      * correlationId one will be created and added to the router.
74      *
75      * @param event
76      * @return returns either a new EventGroup with the new event added or an
77      * Existing EventGoup with the new event added
78      */

79     protected EventGroup addEvent(UMOEvent event)
80     {
81         String JavaDoc cId = event.getMessage().getCorrelationId();
82         if (cId == null)
83         {
84             cId = NO_CORRELATION_ID;
85         }
86         EventGroup eg = (EventGroup)eventGroups.get(cId);
87         if (eg == null)
88         {
89             eg = new EventGroup(cId);
90             eg.addEvent(event);
91             eventGroups.put(eg.getGroupId(), eg);
92         }
93         else
94         {
95             eg.addEvent(event);
96         }
97         return eg;
98     }
99
100     protected void removeGroup(Object JavaDoc id)
101     {
102         // synchronized (eventGroups)
103
// {
104
eventGroups.remove(id);
105         // }
106
}
107
108     /**
109      * Determines if the event group is ready to be aggregated. if the group is ready
110      * to be aggregated (this is entirely up to the application. it could be
111      * determined by volume, last modified time or some oher criteria based on the
112      * last event received)
113      *
114      * @param events
115      * @return true if the group is ready for aggregation
116      */

117     protected abstract boolean shouldAggregate(EventGroup events);
118
119     /**
120      * This method is invoked if the shouldAggregate method is called and returns
121      * true. Once this method returns an aggregated message the event group is
122      * removed from the router
123      *
124      * @param events the event group for this request
125      * @return an aggregated message
126      * @throws AggregationException if the aggregation fails. in this scenario the
127      * whole event group is removed and passed to the exception handler
128      * for this componenet
129      */

130     protected abstract UMOMessage aggregateEvents(EventGroup events) throws AggregationException;
131 }
132
Popular Tags