1 10 11 package org.mule.routing.outbound; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; 14 import org.apache.commons.lang.SystemUtils; 15 import org.apache.commons.logging.Log; 16 import org.apache.commons.logging.LogFactory; 17 import org.mule.MuleManager; 18 import org.mule.util.ClassUtils; 19 import org.mule.config.MuleProperties; 20 import org.mule.util.properties.PropertyExtractor; 21 import org.mule.management.stats.RouterStatistics; 22 import org.mule.routing.CorrelationPropertiesExtractor; 23 import org.mule.umo.UMOException; 24 import org.mule.umo.UMOMessage; 25 import org.mule.umo.UMOSession; 26 import org.mule.umo.UMOTransactionConfig; 27 import org.mule.umo.endpoint.UMOEndpoint; 28 import org.mule.umo.routing.UMOOutboundRouter; 29 import org.mule.util.StringMessageUtils; 30 31 import java.util.Iterator ; 32 import java.util.List ; 33 34 41 public abstract class AbstractOutboundRouter implements UMOOutboundRouter 42 { 43 public static final int ENABLE_CORRELATION_IF_NOT_SET = 0; 44 public static final int ENABLE_CORRELATION_ALWAYS = 1; 45 public static final int ENABLE_CORRELATION_NEVER = 2; 46 49 protected transient Log logger = LogFactory.getLog(getClass()); 50 51 protected List endpoints = new CopyOnWriteArrayList(); 52 53 protected String replyTo = null; 54 55 protected int enableCorrelation = ENABLE_CORRELATION_IF_NOT_SET; 56 57 protected PropertyExtractor propertyExtractor = new CorrelationPropertiesExtractor(); 58 59 protected RouterStatistics routerStatistics; 60 61 protected UMOTransactionConfig transactionConfig; 62 63 public void dispatch(UMOSession session, UMOMessage message, UMOEndpoint endpoint) throws UMOException 64 { 65 setMessageProperties(session, message, endpoint); 66 67 if (logger.isDebugEnabled()) 68 { 69 try 70 { 71 logger.debug("Message being sent to: " + endpoint.getEndpointURI() + " Message payload: \n" 72 + StringMessageUtils.truncate(message.getPayloadAsString(), 100, false)); 73 } 74 catch (Exception e) 75 { 76 logger.debug("Message being sent to: " + endpoint.getEndpointURI() 77 + " Message payload: \n(unable to retrieve payload: " + e.getMessage()); 78 } 79 } 80 81 session.dispatchEvent(message, endpoint); 82 if (routerStatistics != null) 83 { 84 if (routerStatistics.isEnabled()) 85 { 86 routerStatistics.incrementRoutedMessage(endpoint); 87 } 88 } 89 } 90 91 public UMOMessage send(UMOSession session, UMOMessage message, UMOEndpoint endpoint) throws UMOException 92 { 93 94 if (replyTo != null) 95 { 96 logger.debug("event was dispatched synchronously, but there is a ReplyTo endpoint set, so using asynchronous dispatch"); 97 dispatch(session, message, endpoint); 98 return null; 99 } 100 setMessageProperties(session, message, endpoint); 101 102 if (logger.isDebugEnabled()) 103 { 104 logger.debug("Message being sent to: " + endpoint.getEndpointURI()); 105 logger.debug(message); 106 } 107 if (logger.isTraceEnabled()) 108 { 109 try 110 { 111 logger.trace("Message payload: \n" + message.getPayloadAsString()); 112 } 113 catch (Exception e) 114 { 115 } 117 } 118 UMOMessage result = session.sendEvent(message, endpoint); 119 if (routerStatistics != null) 120 { 121 if (routerStatistics.isEnabled()) 122 { 123 routerStatistics.incrementRoutedMessage(endpoint); 124 } 125 } 126 127 if (logger.isDebugEnabled()) 128 { 129 logger.debug("Response message from sending to: " + endpoint.getEndpointURI()); 130 logger.debug(message); 131 } 132 if (logger.isTraceEnabled()) 133 { 134 try 135 { 136 logger.trace("Message payload: \n" + message.getPayloadAsString()); 137 } 138 catch (Exception e) 139 { 140 } 142 } 143 return result; 144 } 145 146 protected void setMessageProperties(UMOSession session, UMOMessage message, UMOEndpoint endpoint) 147 { 148 if (replyTo != null) 149 { 150 message.setReplyTo(replyTo); 153 message.setProperty(MuleProperties.MULE_REPLY_TO_REQUESTOR_PROPERTY, session.getComponent() 154 .getDescriptor() 155 .getName()); 156 if (logger.isDebugEnabled()) 157 { 158 logger.debug("Setting replyTo=" + replyTo + " for outbound endpoint: " 159 + endpoint.getEndpointURI()); 160 } 161 } 162 if (enableCorrelation != ENABLE_CORRELATION_NEVER) 163 { 164 boolean correlationSet = message.getCorrelationId() != null; 165 if (correlationSet && (enableCorrelation == ENABLE_CORRELATION_IF_NOT_SET)) 166 { 167 if (logger.isDebugEnabled()) 168 { 169 logger.debug("CorrelationId is already set to '" + message.getCorrelationId() 170 + "' , not setting it again"); 171 } 172 return; 173 } 174 else if (correlationSet) 175 { 176 if (logger.isDebugEnabled()) 177 { 178 logger.debug("CorrelationId is already set to '" + message.getCorrelationId() 179 + "', but router is configured to overwrite it"); 180 } 181 } 182 else 183 { 184 if (logger.isDebugEnabled()) 185 { 186 logger.debug("No CorrelationId is set on the message, will set a new Id"); 187 } 188 } 189 190 String correlation; 191 Object o = propertyExtractor.getProperty(MuleProperties.MULE_CORRELATION_ID_PROPERTY, message); 192 if (logger.isDebugEnabled()) 193 { 194 logger.debug("Extracted correlation Id as: " + o); 195 } 196 correlation = o.toString(); 197 198 if (logger.isDebugEnabled()) 199 { 200 StringBuffer buf = new StringBuffer (); 201 buf.append("Setting Correlation info on Outbound router for endpoint: ").append( 202 endpoint.getEndpointURI()); 203 buf.append(SystemUtils.LINE_SEPARATOR).append("Id=").append(correlation); 204 logger.debug(buf.toString()); 207 } 208 message.setCorrelationId(correlation); 209 } 212 } 213 214 public List getEndpoints() 215 { 216 return endpoints; 217 } 218 219 public void setEndpoints(List endpoints) 220 { 221 for (Iterator iterator = endpoints.iterator(); iterator.hasNext();) 223 { 224 UMOEndpoint umoEndpoint = (UMOEndpoint)iterator.next(); 225 addEndpoint(umoEndpoint); 226 } 227 } 228 229 public void addEndpoint(UMOEndpoint endpoint) 230 { 231 endpoint.setType(UMOEndpoint.ENDPOINT_TYPE_SENDER); 232 endpoints.add(endpoint); 233 } 234 235 public boolean removeEndpoint(UMOEndpoint endpoint) 236 { 237 return endpoints.remove(endpoint); 238 } 239 240 public String getReplyTo() 241 { 242 return replyTo; 243 } 244 245 public void setReplyTo(String replyTo) 246 { 247 if (replyTo != null) 248 { 249 this.replyTo = MuleManager.getInstance().lookupEndpointIdentifier(replyTo, replyTo); 250 } 251 else 252 { 253 this.replyTo = null; 254 } 255 } 256 257 public RouterStatistics getRouterStatistics() 258 { 259 return routerStatistics; 260 } 261 262 public void setRouterStatistics(RouterStatistics routerStatistics) 263 { 264 this.routerStatistics = routerStatistics; 265 } 266 267 public int getEnableCorrelation() 268 { 269 return enableCorrelation; 270 } 271 272 public void setEnableCorrelation(int enableCorrelation) 273 { 274 this.enableCorrelation = enableCorrelation; 275 } 276 277 public void setEnableCorrelationAsString(String enableCorrelation) 278 { 279 if (enableCorrelation != null) 280 { 281 if (enableCorrelation.equals("ALWAYS")) 282 { 283 this.enableCorrelation = ENABLE_CORRELATION_ALWAYS; 284 } 285 else if (enableCorrelation.equals("NEVER")) 286 { 287 this.enableCorrelation = ENABLE_CORRELATION_NEVER; 288 } 289 else if (enableCorrelation.equals("IF_NOT_SET")) 290 { 291 this.enableCorrelation = ENABLE_CORRELATION_IF_NOT_SET; 292 } 293 else 294 { 295 throw new IllegalArgumentException ("Value for enableCorrelation not recognised: " 296 + enableCorrelation); 297 } 298 } 299 } 300 301 public PropertyExtractor getPropertyExtractor() 302 { 303 return propertyExtractor; 304 } 305 306 public void setPropertyExtractor(PropertyExtractor propertyExtractor) 307 { 308 this.propertyExtractor = propertyExtractor; 309 } 310 311 public void setPropertyExtractorAsString(String className) 312 { 313 try 314 { 315 this.propertyExtractor = (PropertyExtractor)ClassUtils.instanciateClass(className, null, 316 getClass()); 317 } 318 catch (Exception ex) 319 { 320 throw new IllegalArgumentException ("Couldn't instanciate property extractor class " + className); 321 } 322 } 323 324 public UMOTransactionConfig getTransactionConfig() 325 { 326 return transactionConfig; 327 } 328 329 public void setTransactionConfig(UMOTransactionConfig transactionConfig) 330 { 331 this.transactionConfig = transactionConfig; 332 } 333 334 public boolean isDynamicEndpoints() 335 { 336 return false; 337 } 338 } 339 | Popular Tags |