1 17 package org.apache.servicemix.eip.support; 18 19 import java.util.Date ; 20 21 import javax.jbi.messaging.ExchangeStatus; 22 import javax.jbi.messaging.InOnly; 23 import javax.jbi.messaging.MessageExchange; 24 import javax.jbi.messaging.NormalizedMessage; 25 import javax.jbi.messaging.RobustInOnly; 26 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 import org.apache.servicemix.eip.EIPEndpoint; 30 import org.apache.servicemix.jbi.util.MessageUtil; 31 import org.apache.servicemix.timers.Timer; 32 import org.apache.servicemix.timers.TimerListener; 33 34 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 35 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap; 36 import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock; 37 38 51 public abstract class AbstractAggregator extends EIPEndpoint { 52 53 private static final Log log = LogFactory.getLog(AbstractAggregator.class); 54 55 private ExchangeTarget target; 56 57 private boolean rescheduleTimeouts; 58 59 private boolean synchronous; 60 61 private ConcurrentMap closedAggregates = new ConcurrentHashMap(); 62 63 66 public boolean isSynchronous() { 67 return synchronous; 68 } 69 70 73 public void setSynchronous(boolean synchronous) { 74 this.synchronous = synchronous; 75 } 76 77 80 public boolean isRescheduleTimeouts() { 81 return rescheduleTimeouts; 82 } 83 84 87 public void setRescheduleTimeouts(boolean rescheduleTimeouts) { 88 this.rescheduleTimeouts = rescheduleTimeouts; 89 } 90 91 94 public ExchangeTarget getTarget() { 95 return target; 96 } 97 98 101 public void setTarget(ExchangeTarget target) { 102 this.target = target; 103 } 104 105 108 protected void processSync(MessageExchange exchange) throws Exception { 109 throw new IllegalStateException (); 110 } 111 112 115 protected void processAsync(MessageExchange exchange) throws Exception { 116 throw new IllegalStateException (); 117 } 118 119 122 public void process(MessageExchange exchange) throws Exception { 123 if (exchange.getStatus() == ExchangeStatus.DONE) { 125 return; 126 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 128 return; 129 } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) { 131 if (exchange instanceof InOnly == false && 132 exchange instanceof RobustInOnly == false) { 133 fail(exchange, new UnsupportedOperationException ("Use an InOnly or RobustInOnly MEP")); 134 } else { 135 NormalizedMessage in = MessageUtil.copyIn(exchange); 136 final String correlationId = getCorrelationID(exchange, in); 137 if (correlationId == null || correlationId.length() == 0) { 138 throw new IllegalArgumentException ("Could not retrieve correlation id for incoming exchange"); 139 } 140 Lock lock = getLockManager().getLock(correlationId); 142 lock.lock(); 143 try { 144 Object aggregation = store.load(correlationId); 145 Date timeout = null; 146 if (aggregation == null) { 148 if (isAggregationClosed(correlationId)) { 149 } else { 151 aggregation = createAggregation(correlationId); 152 timeout = getTimeout(aggregation); 153 } 154 } else if (isRescheduleTimeouts()) { 155 timeout = getTimeout(aggregation); 156 } 157 if (aggregation != null) { 159 if (addMessage(aggregation, in, exchange)) { 160 sendAggregate(correlationId, aggregation, false); 161 } else { 162 store.store(correlationId, aggregation); 163 if (timeout != null) { 164 if (log.isDebugEnabled()) { 165 log.debug("Scheduling timeout at " + timeout + " for aggregate " + correlationId); 166 } 167 getTimerManager().schedule(new TimerListener() { 168 public void timerExpired(Timer timer) { 169 AbstractAggregator.this.onTimeout(correlationId); 170 } 171 }, timeout); 172 } 173 } 174 } 175 done(exchange); 176 } finally { 177 lock.unlock(); 178 } 179 } 180 } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 182 done(exchange); 183 } 184 } 185 186 protected void sendAggregate(String correlationId, 187 Object aggregation, 188 boolean timeout) throws Exception { 189 InOnly me = exchangeFactory.createInOnlyExchange(); 190 target.configureTarget(me, getContext()); 191 NormalizedMessage nm = me.createMessage(); 192 me.setInMessage(nm); 193 buildAggregate(aggregation, nm, me, timeout); 194 closeAggregation(correlationId); 195 if (isSynchronous()) { 196 sendSync(me); 197 } else { 198 send(me); 199 } 200 } 201 202 protected void onTimeout(String correlationId) { 203 if (log.isDebugEnabled()) { 204 log.debug("Timeout expired for aggregate " + correlationId); 205 } 206 Lock lock = getLockManager().getLock(correlationId); 207 lock.lock(); 208 try { 209 Object aggregation = store.load(correlationId); 210 if (aggregation != null) { 211 sendAggregate(correlationId, aggregation, true); 212 } else if (!isAggregationClosed(correlationId)) { 213 throw new IllegalStateException ("Aggregation is not closed, but can not be retrieved from the store"); 214 } else { 215 if (log.isDebugEnabled()) { 216 log.debug("Aggregate " + correlationId + " is closed"); 217 } 218 } 219 } catch (Exception e) { 220 log.info("Caught exception while processing timeout aggregation", e); 221 } finally { 222 lock.unlock(); 223 } 224 } 225 226 233 protected boolean isAggregationClosed(String correlationId) { 234 return closedAggregates.containsKey(correlationId); 236 } 237 238 242 protected void closeAggregation(String correlationId) { 243 closedAggregates.put(correlationId, Boolean.TRUE); 245 } 246 247 254 protected abstract String getCorrelationID(MessageExchange exchange, NormalizedMessage message) throws Exception ; 255 256 261 protected abstract Object createAggregation(String correlationID) throws Exception ; 262 263 270 protected abstract Date getTimeout(Object aggregate); 271 272 280 protected abstract boolean addMessage(Object aggregate, 281 NormalizedMessage message, 282 MessageExchange exchange) throws Exception ; 283 284 292 protected abstract void buildAggregate(Object aggregate, 293 NormalizedMessage message, 294 MessageExchange exchange, 295 boolean timeout) throws Exception ; 296 } 297 | Popular Tags |