1 10 11 package org.mule.routing.inbound; 12 13 import org.mule.umo.UMOEvent; 14 15 23 public abstract class CorrelationAggregator extends AbstractEventAggregator 24 { 25 26 35 protected boolean shouldAggregate(EventGroup events) 36 { 37 int size = events.expectedSize(); 38 if (size == -1) 39 { 40 logger.warn("Correlation Group Size not set, but CorrelationAggregator is being used. Message is being forwarded"); 41 return true; 42 } 43 if (logger.isDebugEnabled()) 44 { 45 logger.debug("Aggregator: Current Event groups = " + eventGroups.size()); 46 logger.debug("correlation size is " + size + ". current event group size is " + events.size() 47 + " for correlation " + events.getGroupId()); 48 } 49 return size == events.size(); 50 } 51 52 61 protected EventGroup addEvent(UMOEvent event) 62 { 63 String cId = event.getMessage().getCorrelationId(); 64 int groupSize = event.getMessage().getCorrelationGroupSize(); 65 if (cId == null) 66 { 67 cId = NO_CORRELATION_ID; 68 } 69 EventGroup eg = (EventGroup)eventGroups.get(cId); 70 if (eg == null) 71 { 72 eg = new EventGroup(cId, groupSize); 73 eg.addEvent(event); 74 eventGroups.put(eg.getGroupId(), eg); 75 } 76 else 77 { 78 eg.addEvent(event); 79 } 80 return eg; 81 } 82 } 83 | Popular Tags |