1 17 package org.apache.servicemix.eip.patterns; 18 19 import javax.jbi.management.DeploymentException; 20 import javax.jbi.messaging.ExchangeStatus; 21 import javax.jbi.messaging.Fault; 22 import javax.jbi.messaging.InOut; 23 import javax.jbi.messaging.MessageExchange; 24 25 import org.apache.servicemix.eip.EIPEndpoint; 26 import org.apache.servicemix.eip.support.ExchangeTarget; 27 import org.apache.servicemix.jbi.util.MessageUtil; 28 29 45 public class StaticRoutingSlip extends EIPEndpoint { 46 47 50 private ExchangeTarget[] targets; 51 54 private String correlation; 55 58 private String index; 59 62 private String previous; 63 64 67 public ExchangeTarget[] getTargets() { 68 return targets; 69 } 70 71 74 public void setTargets(ExchangeTarget[] targets) { 75 this.targets = targets; 76 } 77 78 81 public void validate() throws DeploymentException { 82 super.validate(); 83 if (targets == null || targets.length == 0) { 85 throw new IllegalArgumentException ("targets should contain at least one ExchangeTarget"); 86 } 87 correlation = "RoutingSlip.Correlation." + getService() + "." + getEndpoint(); 89 index = "RoutingSlip.Index." + getService() + "." + getEndpoint(); 90 previous = "RoutingSlip.Previous." + getService() + "." + getEndpoint(); 91 } 92 93 96 protected void processSync(MessageExchange exchange) throws Exception { 97 if (exchange instanceof InOut == false) { 98 throw new IllegalStateException ("Use an InOut MEP"); 99 } 100 MessageExchange current = exchange; 101 for (int i = 0; i < targets.length; i++) { 102 InOut me = exchangeFactory.createInOutExchange(); 103 targets[i].configureTarget(me, getContext()); 104 if (i == 0) { 105 MessageUtil.transferInToIn(current, me); 106 } else { 107 MessageUtil.transferOutToIn(current, me); 108 } 109 sendSync(me); 110 if (i != 0) { 111 done(current); 112 } 113 if (me.getStatus() == ExchangeStatus.DONE) { 114 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.DONE); 115 } else if (me.getStatus() == ExchangeStatus.ERROR) { 116 fail(exchange, me.getError()); 117 return; 118 } else if (me.getFault() != null) { 119 Fault fault = MessageUtil.copyFault(me); 120 MessageUtil.transferToFault(fault, exchange); 121 done(me); 122 sendSync(exchange); 123 return; 124 } else if (me.getOutMessage() == null) { 125 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message"); 126 } 127 current = me; 128 } 129 MessageUtil.transferToOut(MessageUtil.copyOut(current), exchange); 130 done(current); 131 sendSync(exchange); 132 } 133 134 137 protected void processAsync(MessageExchange exchange) throws Exception { 138 if (exchange.getRole() == MessageExchange.Role.PROVIDER) { 140 if (exchange.getStatus() == ExchangeStatus.DONE) { 141 String correlationId = (String ) exchange.getProperty(correlation); 142 if (correlationId == null) { 143 throw new IllegalStateException (correlation + " property not found"); 144 } 145 MessageExchange me = (MessageExchange) store.load(correlationId); 147 done(me); 148 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 149 String correlationId = (String ) exchange.getProperty(correlation); 150 if (correlationId == null) { 151 throw new IllegalStateException (correlation + " property not found"); 152 } 153 MessageExchange me = (MessageExchange) store.load(correlationId); 155 done(me); 156 } else if (exchange instanceof InOut == false) { 157 throw new IllegalStateException ("Use an InOut MEP"); 158 } else { 159 MessageExchange me = exchangeFactory.createInOutExchange(); 160 me.setProperty(correlation, exchange.getExchangeId()); 161 me.setProperty(index, new Integer (0)); 162 targets[0].configureTarget(me, getContext()); 163 store.store(exchange.getExchangeId(), exchange); 164 MessageUtil.transferInToIn(exchange, me); 165 send(me); 166 } 167 } else { 169 String correlationId = (String ) exchange.getProperty(correlation); 170 String previousId = (String ) exchange.getProperty(previous); 171 Integer prevIndex = (Integer ) exchange.getProperty(index); 172 if (correlationId == null) { 173 throw new IllegalStateException (correlation + " property not found"); 174 } 175 if (prevIndex == null) { 176 throw new IllegalStateException (previous + " property not found"); 177 } 178 if (exchange.getStatus() == ExchangeStatus.DONE) { 180 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.DONE); 181 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 183 MessageExchange me = (MessageExchange) store.load(correlationId); 184 fail(me, exchange.getError()); 185 if (previousId != null) { 187 me = (MessageExchange) store.load(previousId); 188 done(me); 189 } 190 } else if (exchange.getFault() != null) { 192 MessageExchange me = (MessageExchange) store.load(correlationId); 193 me.setProperty(correlation, exchange.getExchangeId()); 194 store.store(exchange.getExchangeId(), exchange); 195 MessageUtil.transferFaultToFault(exchange, me); 196 send(me); 197 if (previousId != null) { 199 me = (MessageExchange) store.load(previousId); 200 done(me); 201 } 202 } else if (exchange.getMessage("out") != null) { 204 if (prevIndex.intValue() == targets.length - 1) { 206 MessageExchange me = (MessageExchange) store.load(correlationId); 207 me.setProperty(correlation, exchange.getExchangeId()); 208 store.store(exchange.getExchangeId(), exchange); 209 MessageUtil.transferOutToOut(exchange, me); 210 send(me); 211 if (previousId != null) { 212 me = (MessageExchange) store.load(previousId); 213 done(me); 214 } 215 } else { 217 MessageExchange me = exchangeFactory.createInOutExchange(); 218 Integer curIndex = new Integer (prevIndex.intValue() + 1); 219 me.setProperty(correlation, correlationId); 220 me.setProperty(index, curIndex); 221 me.setProperty(previous, exchange.getExchangeId()); 222 targets[curIndex.intValue()].configureTarget(me, getContext()); 223 store.store(exchange.getExchangeId(), exchange); 224 MessageUtil.transferOutToIn(exchange, me); 225 send(me); 226 if (previousId != null) { 227 me = (MessageExchange) store.load(previousId); 228 done(me); 229 } 230 } 231 } else { 233 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message"); 234 } 235 } 236 } 237 238 } 239 | Popular Tags |