1 17 package org.apache.servicemix.eip; 18 19 import javax.jbi.messaging.InOnly; 20 import javax.jbi.messaging.NormalizedMessage; 21 import javax.xml.namespace.QName ; 22 23 import org.apache.servicemix.eip.patterns.SplitAggregator; 24 import org.apache.servicemix.eip.support.AbstractSplitter; 25 import org.apache.servicemix.tck.ReceiverComponent; 26 27 public class SplitAggregatorTxTest extends AbstractEIPTransactionalTest { 28 29 private SplitAggregator aggregator; 30 31 protected void setUp() throws Exception { 32 super.setUp(); 33 34 aggregator = new SplitAggregator(); 35 aggregator.setTarget(createServiceExchangeTarget(new QName ("target"))); 36 configurePattern(aggregator); 37 activateComponent(aggregator, "aggregator"); 38 } 39 40 protected NormalizedMessage testRun(boolean[] msgs, boolean async) throws Exception { 41 ReceiverComponent rec = activateReceiver("target"); 42 43 int nbMessages = 3; 44 String corrId = Long.toString(System.currentTimeMillis()); 45 for (int i = 0; i < 3; i++) { 46 if (msgs == null || msgs[i]) { 47 InOnly me = client.createInOnlyExchange(); 48 me.setService(new QName ("aggregator")); 49 me.getInMessage().setContent(createSource("<hello id='" + i + "' />")); 50 me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer (nbMessages)); 51 me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer (i)); 52 me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId); 53 tm.begin(); 54 if (async) { 55 client.send(me); 56 } else { 57 client.sendSync(me); 58 } 59 tm.commit(); 60 } 61 } 62 63 rec.getMessageList().assertMessagesReceived(1); 64 return (NormalizedMessage) rec.getMessageList().flushMessages().get(0); 65 } 66 67 public void testAsync() throws Exception { 68 testRun(null, true); 69 } 70 71 public void testSync() throws Exception { 72 testRun(null, false); 73 } 74 75 } 76 | Popular Tags |