1 10 11 package org.mule.routing.outbound; 12 13 import java.util.ArrayList ; 14 import java.util.Iterator ; 15 import java.util.List ; 16 17 import org.apache.commons.logging.Log; 18 import org.apache.commons.logging.LogFactory; 19 import org.mule.impl.MuleMessage; 20 import org.mule.impl.endpoint.MuleEndpoint; 21 import org.mule.impl.endpoint.MuleEndpointURI; 22 import org.mule.umo.UMOException; 23 import org.mule.umo.UMOMessage; 24 import org.mule.umo.UMOSession; 25 import org.mule.umo.endpoint.UMOEndpoint; 26 import org.mule.umo.endpoint.UMOEndpointURI; 27 import org.mule.umo.routing.CouldNotRouteOutboundMessageException; 28 import org.mule.umo.routing.RoutingException; 29 30 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 31 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap; 32 33 38 39 public abstract class AbstractRecipientList extends FilteringOutboundRouter 40 { 41 44 protected final Log logger = LogFactory.getLog(getClass()); 45 46 private final ConcurrentMap recipientCache = new ConcurrentHashMap(); 47 48 public UMOMessage route(UMOMessage message, UMOSession session, boolean synchronous) 49 throws RoutingException 50 { 51 List list = getRecipients(message); 52 List results = new ArrayList (); 53 54 if (enableCorrelation != ENABLE_CORRELATION_NEVER) 55 { 56 boolean correlationSet = message.getCorrelationGroupSize() != -1; 57 if (correlationSet && (enableCorrelation == ENABLE_CORRELATION_IF_NOT_SET)) 58 { 59 logger.debug("CorrelationId is already set, not setting Correlation group size"); 60 } 61 else 62 { 63 message.setCorrelationGroupSize(list.size()); 65 } 66 } 67 68 UMOMessage result = null; 69 UMOEndpoint endpoint; 70 UMOMessage request; 71 72 for (Iterator iterator = list.iterator(); iterator.hasNext();) 73 { 74 String recipient = (String )iterator.next(); 75 request = new MuleMessage(message.getPayload(), message); 79 endpoint = getRecipientEndpoint(request, recipient); 80 81 try 82 { 83 if (synchronous) 84 { 85 result = send(session, request, endpoint); 86 if (result != null) 87 { 88 results.add(result.getPayload()); 89 } 90 else 91 { 92 if (logger.isDebugEnabled()) 93 { 94 logger.debug("No result was returned for sync call to: " 95 + endpoint.getEndpointURI()); 96 } 97 } 98 } 99 else 100 { 101 dispatch(session, request, endpoint); 102 } 103 } 104 catch (UMOException e) 105 { 106 throw new CouldNotRouteOutboundMessageException(request, endpoint, e); 107 } 108 } 109 110 if (results.size() == 0) 111 { 112 return null; 113 } 114 else if (results.size() == 1) 115 { 116 return new MuleMessage(results.get(0), result); 117 } 118 else 119 { 120 return new MuleMessage(results, result); 121 } 122 } 123 124 protected UMOEndpoint getRecipientEndpoint(UMOMessage message, String recipient) throws RoutingException 125 { 126 UMOEndpointURI endpointUri = null; 127 UMOEndpoint endpoint = (UMOEndpoint)recipientCache.get(recipient); 128 129 if (endpoint != null) 130 { 131 return endpoint; 132 } 133 134 try 135 { 136 endpointUri = new MuleEndpointURI(recipient); 137 endpoint = MuleEndpoint.getOrCreateEndpointForUri(endpointUri, UMOEndpoint.ENDPOINT_TYPE_SENDER); 138 } 139 catch (UMOException e) 140 { 141 throw new RoutingException(message, endpoint, e); 142 } 143 144 UMOEndpoint existingEndpoint = (UMOEndpoint)recipientCache.putIfAbsent(recipient, endpoint); 145 if (existingEndpoint != null) 146 { 147 endpoint = existingEndpoint; 148 } 149 150 return endpoint; 151 } 152 153 protected abstract List getRecipients(UMOMessage message); 154 155 public boolean isDynamicEndpoints() 156 { 157 return true; 158 } 159 160 } 161 | Popular Tags |