1 10 11 package org.mule.routing.inbound; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 14 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 15 import org.mule.impl.MuleEvent; 16 import org.mule.impl.endpoint.MuleEndpoint; 17 import org.mule.routing.AggregationException; 18 import org.mule.umo.MessagingException; 19 import org.mule.umo.UMOEvent; 20 import org.mule.umo.UMOMessage; 21 import org.mule.umo.endpoint.UMOEndpoint; 22 23 import java.util.Map ; 24 25 32 33 public abstract class AbstractEventAggregator extends SelectiveConsumer 34 { 35 protected static final String NO_CORRELATION_ID = "no-id"; 36 37 protected Map eventGroups = new ConcurrentHashMap(); 38 private final Object lock = new Object (); 39 40 public UMOEvent[] process(UMOEvent event) throws MessagingException 41 { 42 AtomicBoolean doAggregate = new AtomicBoolean(false); 43 EventGroup eg = null; 44 if (isMatch(event)) 47 { 48 eg = addEvent(event); 49 doAggregate.compareAndSet(false, shouldAggregate(eg)); 50 } 51 if (doAggregate.get()) 53 { 54 synchronized (lock) 55 { 56 UMOMessage returnMessage = aggregateEvents(eg); 58 removeGroup(eg.getGroupId()); 59 UMOEndpoint endpoint = new MuleEndpoint(event.getEndpoint()); 60 endpoint.setTransformer(null); 61 endpoint.setName(getClass().getName()); 62 UMOEvent returnEvent = new MuleEvent(returnMessage, endpoint, event.getComponent(), event); 63 return new UMOEvent[]{returnEvent}; 64 } 65 } 66 return null; 67 } 68 69 79 protected EventGroup addEvent(UMOEvent event) 80 { 81 String cId = event.getMessage().getCorrelationId(); 82 if (cId == null) 83 { 84 cId = NO_CORRELATION_ID; 85 } 86 EventGroup eg = (EventGroup)eventGroups.get(cId); 87 if (eg == null) 88 { 89 eg = new EventGroup(cId); 90 eg.addEvent(event); 91 eventGroups.put(eg.getGroupId(), eg); 92 } 93 else 94 { 95 eg.addEvent(event); 96 } 97 return eg; 98 } 99 100 protected void removeGroup(Object id) 101 { 102 eventGroups.remove(id); 105 } 107 108 117 protected abstract boolean shouldAggregate(EventGroup events); 118 119 130 protected abstract UMOMessage aggregateEvents(EventGroup events) throws AggregationException; 131 } 132 | Popular Tags |