1 10 11 package org.mule.routing.outbound; 12 13 import org.dom4j.Document; 14 import org.mule.config.MuleProperties; 15 import org.mule.impl.MuleMessage; 16 import org.mule.umo.UMOException; 17 import org.mule.umo.UMOMessage; 18 import org.mule.umo.UMOSession; 19 import org.mule.umo.endpoint.UMOEndpoint; 20 import org.mule.umo.routing.CouldNotRouteOutboundMessageException; 21 import org.mule.umo.routing.RoutingException; 22 23 import java.util.HashMap ; 24 import java.util.Iterator ; 25 import java.util.List ; 26 import java.util.Map ; 27 28 32 public class RoundRobinXmlSplitter extends FilteringXmlMessageSplitter 33 { 34 protected boolean enableEndpointFiltering = false; 38 39 public UMOMessage route(UMOMessage message, UMOSession session, boolean synchronous) 40 throws RoutingException 41 { 42 try 43 { 44 String correlationId = (String )propertyExtractor.getProperty( 45 MuleProperties.MULE_CORRELATION_ID_PROPERTY, message); 46 initialise(message); 47 48 UMOEndpoint endpoint; 49 UMOMessage result = null; 50 Document part; 51 List parts = (List )nodes.get(); 52 if (parts == null) 53 { 54 logger.error("There are no parts for current message. No events were routed: " + message); 55 return null; 56 } 57 int correlationSequence = 1; 58 int epCounter = 0; 59 for (Iterator iterator = parts.iterator(); iterator.hasNext(); epCounter++) 60 { 61 part = (Document)iterator.next(); 62 if (epCounter == endpoints.size()) 63 { 64 epCounter = 0; 65 } 66 Map theProperties = (Map )properties.get(); 68 message = new MuleMessage(part, new HashMap (theProperties)); 69 70 if (enableEndpointFiltering) 71 { 72 endpoint = getEndpointForMessage(message); 73 } 74 else 75 { 76 endpoint = (UMOEndpoint)getEndpoints().get(epCounter); 77 } 78 79 if (endpoint == null) 80 { 81 logger.error("There was no matching endpoint for message part: " + part.asXML()); 82 } 83 else 84 { 85 try 86 { 87 if (enableCorrelation != ENABLE_CORRELATION_NEVER) 88 { 89 boolean correlationSet = message.getCorrelationId() != null; 90 if (!correlationSet && (enableCorrelation == ENABLE_CORRELATION_IF_NOT_SET)) 91 { 92 message.setCorrelationId(correlationId); 93 } 94 95 final int groupSize = message.getCorrelationGroupSize(); 99 message.setCorrelationGroupSize(groupSize); 100 message.setCorrelationSequence(correlationSequence++); 101 } 102 if (synchronous) 103 { 104 result = send(session, message, endpoint); 105 } 106 else 107 { 108 dispatch(session, message, endpoint); 109 } 110 } 111 catch (UMOException e) 112 { 113 throw new CouldNotRouteOutboundMessageException(message, endpoint, e); 114 } 115 } 116 } 117 return result; 118 } 119 finally 120 { 121 nodes.set(null); 122 properties.set(null); 123 } 124 } 125 126 133 protected UMOEndpoint getEndpointForMessage(UMOMessage message) 134 { 135 for (int i = 0; i < endpoints.size(); i++) 136 { 137 UMOEndpoint endpoint = (UMOEndpoint)endpoints.get(i); 138 139 try 140 { 141 if (endpoint.getFilter() == null || endpoint.getFilter().accept(message)) 142 { 143 if (logger.isDebugEnabled()) 144 { 145 logger.debug("Endpoint filter matched for node " + i + ". Routing message over: " 146 + endpoint.getEndpointURI().toString()); 147 } 148 return endpoint; 149 } 150 else 151 { 152 if (logger.isDebugEnabled()) 153 { 154 logger.debug("Endpoint filter did not match"); 155 } 156 } 157 } 158 catch (Exception e) 159 { 160 logger.error("Unable to create message for node at position " + i, e); 161 return null; 162 } 163 } 164 165 return null; 166 } 167 168 public void addEndpoint(UMOEndpoint endpoint) 169 { 170 if (endpoint.getFilter() != null && !enableEndpointFiltering) 171 { 172 throw new IllegalStateException ( 173 "Endpoints on the RoundRobin splitter router cannot have filters associated with them"); 174 } 175 super.addEndpoint(endpoint); 176 } 177 178 public boolean isEnableEndpointFiltering() 179 { 180 return enableEndpointFiltering; 181 } 182 183 public void setEnableEndpointFiltering(boolean enableEndpointFiltering) 184 { 185 this.enableEndpointFiltering = enableEndpointFiltering; 186 } 187 } 188 | Popular Tags |