1 10 11 package org.mule.routing.inbound; 12 13 import org.mule.impl.MuleMessage; 14 import org.mule.routing.AggregationException; 15 import org.mule.umo.UMOEvent; 16 import org.mule.umo.UMOMessage; 17 18 import java.io.ByteArrayOutputStream ; 19 import java.util.Collections ; 20 import java.util.Comparator ; 21 import java.util.Iterator ; 22 import java.util.List ; 23 24 import org.apache.commons.collections.IteratorUtils; 25 import org.apache.commons.io.IOUtils; 26 import org.apache.commons.lang.SerializationException; 27 import org.apache.commons.lang.SerializationUtils; 28 29 35 public class MessageChunkingAggregator extends CorrelationAggregator 36 { 37 38 49 protected UMOMessage aggregateEvents(EventGroup events) throws AggregationException 50 { 51 List eventList = IteratorUtils.toList(events.iterator(), events.size()); 52 UMOEvent firstEvent = (UMOEvent)eventList.get(0); 53 Collections.sort(eventList, SequenceComparator.getInstance()); 54 ByteArrayOutputStream baos = new ByteArrayOutputStream (4096); 55 try 56 { 57 for (Iterator iterator = eventList.iterator(); iterator.hasNext();) 58 { 59 UMOEvent event = (UMOEvent)iterator.next(); 60 baos.write(event.getMessageAsBytes()); 61 } 62 UMOMessage message; 63 try 66 { 67 message = new MuleMessage(SerializationUtils.deserialize(baos.toByteArray()), 68 firstEvent.getMessage()); 69 70 } 71 catch (SerializationException e) 72 { 73 message = new MuleMessage(baos.toByteArray(), firstEvent.getMessage()); 74 } 75 message.setCorrelationGroupSize(-1); 76 message.setCorrelationSequence(-1); 77 return message; 78 } 79 catch (Exception e) 80 { 81 throw new AggregationException(events, firstEvent.getEndpoint(), e); 82 } 83 finally 84 { 85 IOUtils.closeQuietly(baos); 86 } 87 } 88 89 public static class SequenceComparator implements Comparator 90 { 91 private static SequenceComparator _instance = new SequenceComparator(); 92 93 public static SequenceComparator getInstance() 94 { 95 return _instance; 96 } 97 98 private SequenceComparator() 99 { 100 super(); 101 } 102 103 public int compare(Object o1, Object o2) 104 { 105 UMOEvent event1 = (UMOEvent)o1; 106 UMOEvent event2 = (UMOEvent)o2; 107 if (event1.getMessage().getCorrelationSequence() > event2.getMessage() 108 .getCorrelationSequence()) 109 { 110 return 1; 111 } 112 else 113 { 114 return -1; 115 } 116 } 117 } 118 } 119 | Popular Tags |