1 10 11 package org.mule.routing.response; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 14 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap; 15 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; 16 17 import org.mule.config.i18n.Message; 18 import org.mule.config.i18n.Messages; 19 import org.mule.routing.inbound.EventGroup; 20 import org.mule.umo.UMOEvent; 21 import org.mule.umo.UMOMessage; 22 import org.mule.umo.routing.ResponseTimeoutException; 23 import org.mule.umo.routing.RoutingException; 24 import org.mule.util.MapUtils; 25 import org.mule.util.concurrent.Latch; 26 27 36 public abstract class AbstractResponseAggregator extends AbstractResponseRouter 37 { 38 42 protected ConcurrentMap responseEvents = new ConcurrentHashMap(); 43 44 48 private ConcurrentMap locks = new ConcurrentHashMap(); 49 50 55 protected final ConcurrentMap eventGroups = new ConcurrentHashMap(); 56 57 public void process(UMOEvent event) throws RoutingException 58 { 59 EventGroup eg = addEvent(event); 62 63 if (shouldAggregate(eg)) 65 { 66 UMOMessage returnMessage = aggregateEvents(eg); 68 Object id = eg.getGroupId(); 69 70 removeEventGroup(id); 73 74 UMOMessage previousResult = (UMOMessage)responseEvents.putIfAbsent(id, returnMessage); 77 if (previousResult != null) 78 { 79 throw new IllegalStateException ("Detected duplicate aggregation result message with id: " 83 + id); 84 } 85 86 Latch l = (Latch)locks.get(id); 89 if (l == null) 90 { 91 if (logger.isDebugEnabled()) 92 { 93 logger.debug("Creating latch for " + id + " in " + this); 94 } 95 96 l = new Latch(); 97 Latch previous = (Latch)locks.putIfAbsent(id, l); 98 if (previous != null) 99 { 100 l = previous; 101 } 102 } 103 104 l.countDown(); 105 } 106 } 107 108 118 protected EventGroup addEvent(UMOEvent event) throws RoutingException 119 { 120 Object cId = getReplyAggregateIdentifier(event.getMessage()); 121 122 if (cId == null || cId.equals("-1")) 123 { 124 throw new RoutingException(new Message(Messages.NO_CORRELATION_ID), event.getMessage(), 125 event.getEndpoint()); 126 } 127 128 if (logger.isDebugEnabled()) 129 { 130 logger.debug("Adding event to response aggregator group: " + cId); 131 } 132 133 EventGroup eg = (EventGroup)eventGroups.get(cId); 134 if (eg == null) 135 { 136 eg = createEventGroup(cId, event); 137 EventGroup previous = (EventGroup)eventGroups.putIfAbsent(eg.getGroupId(), eg); 138 if (previous != null) 139 { 140 eg = previous; 141 } 142 } 143 144 eg.addEvent(event); 145 146 return eg; 147 } 148 149 157 protected EventGroup createEventGroup(Object id, UMOEvent event) 158 { 159 if (logger.isDebugEnabled()) 160 { 161 logger.debug("Creating new event group: " + id + " in " + this); 162 } 163 return new EventGroup(id); 164 } 165 166 protected void removeEventGroup(Object id) 167 { 168 eventGroups.remove(id); 169 } 170 171 179 public UMOMessage getResponse(UMOMessage message) throws RoutingException 180 { 181 Object responseId = getCallResponseAggregateIdentifier(message); 182 183 if (logger.isDebugEnabled()) 184 { 185 logger.debug("Waiting for response for message id: " + responseId + " in " + this); 186 } 187 188 Latch l = (Latch)locks.get(responseId); 189 if (l == null) 190 { 191 if (logger.isDebugEnabled()) 192 { 193 logger.debug("Got response but no one is waiting for it yet. Creating latch for " 194 + responseId + " in " + this); 195 } 196 197 l = new Latch(); 198 Latch previous = (Latch)locks.putIfAbsent(responseId, l); 199 if (previous != null) 200 { 201 l = previous; 202 } 203 } 204 205 if (logger.isDebugEnabled()) 206 { 207 logger.debug("Got latch for message: " + responseId); 208 } 209 210 UMOMessage result; 212 213 boolean b = false; 215 216 try 217 { 218 if (logger.isDebugEnabled()) 219 { 220 logger.debug("Waiting for response to message: " + responseId); 221 } 222 223 if (getTimeout() <= 0) 225 { 226 l.await(); 227 b = true; 228 } 229 else 230 { 231 b = l.await(this.getTimeout(), TimeUnit.MILLISECONDS); 232 } 233 } 234 catch (InterruptedException e) 235 { 236 logger.error(e.getMessage(), e); 237 } 238 finally 239 { 240 locks.remove(responseId); 241 result = (UMOMessage)responseEvents.remove(responseId); 242 } 243 244 if (!b) 245 { 246 if (logger.isTraceEnabled()) 247 { 248 logger.trace("Current responses are: \n" + MapUtils.toString(responseEvents, true)); 249 } 250 251 throw new ResponseTimeoutException(new Message(Messages.RESPONSE_TIMED_OUT_X_WAITING_FOR_ID_X, 252 String.valueOf(getTimeout()), responseId), message, null); 253 } 254 255 if (result == null) 256 { 257 throw new IllegalStateException ("Response Message is null"); 259 } 260 261 if (logger.isDebugEnabled()) 262 { 263 logger.debug("remaining locks : " + locks.keySet()); 264 logger.debug("remaining results: " + responseEvents.keySet()); 265 } 266 267 return result; 268 } 269 270 279 protected abstract boolean shouldAggregate(EventGroup events); 280 281 292 protected abstract UMOMessage aggregateEvents(EventGroup events) throws RoutingException; 293 294 } 295 | Popular Tags |