1 10 11 package org.mule.routing.inbound; 12 13 import org.apache.commons.collections.IteratorUtils; 14 import org.apache.commons.logging.Log; 15 import org.apache.commons.logging.LogFactory; 16 import org.mule.umo.MessagingException; 17 import org.mule.umo.UMOEvent; 18 19 import java.util.Collections ; 20 import java.util.Comparator ; 21 import java.util.HashMap ; 22 import java.util.List ; 23 import java.util.Map ; 24 25 32 33 public abstract class AbstractEventResequencer extends SelectiveConsumer 34 { 35 protected static final String NO_CORRELATION_ID = "no-id"; 36 37 40 protected static Log logger = LogFactory.getLog(AbstractEventResequencer.class); 41 42 private Comparator comparator; 43 private Map eventGroups = new HashMap (); 44 45 public UMOEvent[] process(UMOEvent event) throws MessagingException 46 { 47 if (isMatch(event)) 48 { 49 EventGroup eg = addEvent(event); 50 if (shouldResequence(eg)) 51 { 52 List resequencedEvents = resequenceEvents(eg); 53 removeGroup(eg.getGroupId()); 54 UMOEvent[] returnEvents = new UMOEvent[resequencedEvents.size()]; 55 resequencedEvents.toArray(returnEvents); 56 return returnEvents; 57 } 58 } 59 return null; 60 } 61 62 protected EventGroup addEvent(UMOEvent event) 63 { 64 String cId = event.getMessage().getCorrelationId(); 65 if (cId == null) 66 { 67 cId = NO_CORRELATION_ID; 68 } 69 EventGroup eg = (EventGroup)eventGroups.get(cId); 70 if (eg == null) 71 { 72 eg = new EventGroup(cId); 73 eg.addEvent(event); 74 eventGroups.put(eg.getGroupId(), eg); 75 } 76 else 77 { 78 eg.addEvent(event); 79 } 80 return eg; 81 } 82 83 protected void removeGroup(Object id) 84 { 85 eventGroups.remove(id); 86 } 87 88 protected List resequenceEvents(EventGroup events) 89 { 90 List result = IteratorUtils.toList(events.iterator(), events.size()); 91 if (comparator != null) 92 { 93 Collections.sort(result, comparator); 94 } 95 else 96 { 97 logger.warn("Event comparator is null, events were not reordered"); 98 } 99 return result; 100 } 101 102 public Comparator getComparator() 103 { 104 return comparator; 105 } 106 107 public void setComparator(Comparator comparator) 108 { 109 this.comparator = comparator; 110 } 111 112 protected abstract boolean shouldResequence(EventGroup events); 113 } 114 | Popular Tags |