1 10 11 package org.mule.routing.outbound; 12 13 import org.mule.config.MuleProperties; 14 import org.mule.umo.UMOException; 15 import org.mule.umo.UMOMessage; 16 import org.mule.umo.UMOSession; 17 import org.mule.umo.endpoint.UMOEndpoint; 18 import org.mule.umo.routing.CouldNotRouteOutboundMessageException; 19 import org.mule.umo.routing.RoutingException; 20 21 import java.util.Iterator ; 22 import java.util.List ; 23 24 34 35 public abstract class AbstractMessageSplitter extends FilteringOutboundRouter 36 { 37 protected boolean multimatch = true; 40 41 protected boolean honorSynchronicity = false; 44 45 public UMOMessage route(UMOMessage message, UMOSession session, boolean synchronous) 46 throws RoutingException 47 { 48 String correlationId = (String )propertyExtractor.getProperty( 49 MuleProperties.MULE_CORRELATION_ID_PROPERTY, message); 50 initialise(message); 51 52 UMOEndpoint endpoint; 53 UMOMessage result = null; 54 List list = getEndpoints(); 55 int correlationSequence = 1; 56 for (Iterator iterator = list.iterator(); iterator.hasNext();) 57 { 58 endpoint = (UMOEndpoint)iterator.next(); 59 message = getMessagePart(message, endpoint); 60 if (message == null) 61 { 62 logger.warn("Message part is null for endpoint: " + endpoint.getEndpointURI().toString()); 64 } 65 while (message != null) 69 { 70 if (honorSynchronicity) 71 { 72 synchronous = endpoint.isSynchronous(); 73 } 74 try 75 { 76 if (enableCorrelation != ENABLE_CORRELATION_NEVER) 77 { 78 boolean correlationSet = message.getCorrelationId() != null; 79 if (!correlationSet && (enableCorrelation == ENABLE_CORRELATION_IF_NOT_SET)) 80 { 81 message.setCorrelationId(correlationId); 82 } 83 84 final int groupSize = message.getCorrelationGroupSize(); 88 message.setCorrelationGroupSize(groupSize); 89 message.setCorrelationSequence(correlationSequence++); 90 } 91 if (honorSynchronicity) 92 { 93 message.setBooleanProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, 94 endpoint.isRemoteSync()); 95 } 96 if (synchronous) 97 { 98 result = send(session, message, endpoint); 99 } 100 else 101 { 102 dispatch(session, message, endpoint); 103 } 104 } 105 catch (UMOException e) 106 { 107 throw new CouldNotRouteOutboundMessageException(message, endpoint, e); 108 } 109 if (!multimatch) 110 { 111 break; 112 } 113 message = getMessagePart(message, endpoint); 114 } 115 } 116 return result; 117 } 118 119 125 protected void initialise(UMOMessage message) 126 { 127 } 129 130 public boolean isHonorSynchronicity() 131 { 132 return honorSynchronicity; 133 } 134 135 140 public void setHonorSynchronicity(boolean honorSynchronicity) 141 { 142 this.honorSynchronicity = honorSynchronicity; 143 } 144 145 156 protected abstract UMOMessage getMessagePart(UMOMessage message, UMOEndpoint endpoint); 157 } 158 | Popular Tags |