1 17 package org.apache.servicemix.eip.patterns; 18 19 import java.net.URI ; 20 21 import javax.jbi.management.DeploymentException; 22 import javax.jbi.messaging.ExchangeStatus; 23 import javax.jbi.messaging.Fault; 24 import javax.jbi.messaging.InOnly; 25 import javax.jbi.messaging.InOut; 26 import javax.jbi.messaging.MessageExchange; 27 import javax.jbi.messaging.RobustInOnly; 28 import javax.wsdl.Definition; 29 30 import org.apache.servicemix.eip.EIPEndpoint; 31 import org.apache.servicemix.eip.support.ExchangeTarget; 32 import org.apache.servicemix.jbi.FaultException; 33 import org.apache.servicemix.jbi.jaxp.SourceTransformer; 34 import org.apache.servicemix.jbi.util.MessageUtil; 35 36 50 public class Pipeline extends EIPEndpoint { 51 52 private static final String TRANSFORMER = "Pipeline.Transformer"; 53 54 private static final String CONSUMER_MEP = "Pipeline.ConsumerMEP"; 55 56 59 private ExchangeTarget transformer; 60 61 64 private ExchangeTarget target; 65 66 69 private String correlationConsumer; 70 71 74 private String correlationTransformer; 75 76 79 private String correlationTarget; 80 81 84 public ExchangeTarget getTarget() { 85 return target; 86 } 87 88 91 public void setTarget(ExchangeTarget target) { 92 this.target = target; 93 } 94 95 98 public ExchangeTarget getTransformer() { 99 return transformer; 100 } 101 102 105 public void setTransformer(ExchangeTarget transformer) { 106 this.transformer = transformer; 107 } 108 109 112 public void validate() throws DeploymentException { 113 super.validate(); 114 if (target == null) { 116 throw new IllegalArgumentException ("target should be set to a valid ExchangeTarget"); 117 } 118 if (transformer == null) { 120 throw new IllegalArgumentException ("transformer should be set to a valid ExchangeTarget"); 121 } 122 correlationConsumer = "Pipeline.Consumer." + getService() + "." + getEndpoint(); 124 correlationTransformer = "Pipeline.Transformer." + getService() + "." + getEndpoint(); 125 correlationTarget = "Pipeline.Target." + getService() + "." + getEndpoint(); 126 } 127 128 131 protected void processSync(MessageExchange exchange) throws Exception { 132 if (exchange instanceof InOnly == false && 133 exchange instanceof RobustInOnly == false) { 134 fail(exchange, new UnsupportedOperationException ("Use an InOnly or RobustInOnly MEP")); 135 return; 136 } 137 InOut tme = exchangeFactory.createInOutExchange(); 139 transformer.configureTarget(tme, getContext()); 140 MessageUtil.transferInToIn(exchange, tme); 142 sendSync(tme); 143 if (tme.getStatus() == ExchangeStatus.DONE) { 145 throw new IllegalStateException ("Received a DONE status from the transformer"); 146 } 147 else if (tme.getStatus() == ExchangeStatus.ERROR) { 149 fail(exchange, tme.getError()); 150 } 151 else if (tme.getFault() != null) { 153 if (exchange instanceof InOnly) { 154 String fault = new SourceTransformer().contentToString(tme.getFault()); 157 done(tme); 158 fail(exchange, new FaultException(fault, null, null)); 159 } else { 160 Fault fault = MessageUtil.copyFault(tme); 161 MessageUtil.transferToFault(fault, exchange); 162 done(tme); 163 sendSync(exchange); 164 } 165 } 166 else if (tme.getOutMessage() == null) { 168 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set"); 169 } 170 MessageExchange me = exchangeFactory.createExchange(exchange.getPattern()); 172 target.configureTarget(me, getContext()); 173 MessageUtil.transferOutToIn(tme, me); 174 sendSync(me); 175 done(tme); 176 if (me.getStatus() == ExchangeStatus.DONE) { 177 done(exchange); 178 } else if (me.getStatus() == ExchangeStatus.ERROR) { 179 fail(exchange, me.getError()); 180 } else if (me.getFault() != null) { 181 if (exchange instanceof InOnly) { 182 String fault = new SourceTransformer().contentToString(me.getFault()); 185 done(me); 186 fail(exchange, new FaultException(fault, null, null)); 187 } else { 188 Fault fault = MessageUtil.copyFault(me); 189 MessageUtil.transferToFault(fault, exchange); 190 done(me); 191 sendSync(exchange); 192 } 193 } else { 194 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set"); 195 } 196 } 197 198 201 protected void processAsync(MessageExchange exchange) throws Exception { 202 if (exchange.getRole() == MessageExchange.Role.PROVIDER) { 204 if (exchange.getStatus() == ExchangeStatus.DONE) { 207 String transformerId = (String ) exchange.getProperty(correlationTransformer); 208 String targetId = (String ) exchange.getProperty(correlationTarget); 209 if (transformerId == null && targetId == null) { 210 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set"); 211 } 212 MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId); 214 done(me); 215 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 217 String transformerId = (String ) exchange.getProperty(correlationTransformer); 218 String targetId = (String ) exchange.getProperty(correlationTarget); 219 if (transformerId == null && targetId == null) { 220 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set"); 221 } 222 MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId); 224 fail(me, exchange.getError()); 225 } else if (exchange.getProperty(correlationTransformer) == null) { 227 if (exchange instanceof InOnly == false && exchange instanceof RobustInOnly == false) { 228 fail(exchange, new UnsupportedOperationException ("Use an InOnly or RobustInOnly MEP")); 229 return; 230 } 231 MessageExchange tme = exchangeFactory.createInOutExchange(); 233 transformer.configureTarget(tme, getContext()); 234 exchange.setProperty(correlationTransformer, tme.getExchangeId()); 236 tme.setProperty(correlationConsumer, exchange.getExchangeId()); 237 tme.setProperty(TRANSFORMER, Boolean.TRUE); 238 tme.setProperty(CONSUMER_MEP, exchange.getPattern()); 239 store.store(exchange.getExchangeId(), exchange); 241 MessageUtil.transferInToIn(exchange, tme); 243 send(tme); 244 } else { 245 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set"); 246 } 247 } else if (Boolean.TRUE.equals(exchange.getProperty(TRANSFORMER))) { 249 String consumerId = (String ) exchange.getProperty(correlationConsumer); 251 if (consumerId == null) { 252 throw new IllegalStateException (correlationConsumer + " property not found"); 253 } 254 if (exchange.getStatus() == ExchangeStatus.DONE) { 257 throw new IllegalStateException ("Received a DONE status from the transformer"); 258 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 260 MessageExchange me = (MessageExchange) store.load(consumerId); 261 fail(me, exchange.getError()); 262 } else if (exchange.getFault() != null) { 264 MessageExchange me = (MessageExchange) store.load(consumerId); 265 if (me instanceof InOnly) { 266 String fault = new SourceTransformer().contentToString(exchange.getFault()); 269 fail(me, new FaultException(fault, null, null)); 270 done(exchange); 271 } else { 272 store.store(exchange.getExchangeId(), exchange); 273 MessageUtil.transferFaultToFault(exchange, me); 274 send(me); 275 } 276 } else if (exchange.getMessage("out") != null) { 278 URI mep = (URI ) exchange.getProperty(CONSUMER_MEP); 280 if (mep == null) { 281 throw new IllegalStateException ("Exchange does not carry the consumer MEP"); 282 } 283 MessageExchange me = exchangeFactory.createExchange(mep); 284 target.configureTarget(me, getContext()); 285 me.setProperty(correlationConsumer, consumerId); 286 me.setProperty(correlationTransformer, exchange.getExchangeId()); 287 store.store(exchange.getExchangeId(), exchange); 288 MessageUtil.transferOutToIn(exchange, me); 289 send(me); 290 } else { 292 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message"); 293 } 294 } else { 296 String consumerId = (String ) exchange.getProperty(correlationConsumer); 298 if (consumerId == null) { 299 throw new IllegalStateException (correlationConsumer + " property not found"); 300 } 301 String transformerId = (String ) exchange.getProperty(correlationTransformer); 303 if (transformerId == null) { 304 throw new IllegalStateException (correlationTransformer + " property not found"); 305 } 306 if (exchange.getStatus() == ExchangeStatus.DONE) { 308 MessageExchange tme = (MessageExchange) store.load(transformerId); 310 done(tme); 311 MessageExchange cme = (MessageExchange) store.load(consumerId); 313 done(cme); 314 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 316 MessageExchange tme = (MessageExchange) store.load(transformerId); 318 done(tme); 319 MessageExchange cme = (MessageExchange) store.load(consumerId); 321 fail(cme, exchange.getError()); 322 } else if (exchange.getFault() != null) { 324 MessageExchange tme = (MessageExchange) store.load(transformerId); 326 done(tme); 327 store.store(exchange.getExchangeId(), exchange); 329 MessageExchange cme = (MessageExchange) store.load(consumerId); 330 cme.setProperty(correlationTarget, exchange.getExchangeId()); 331 MessageUtil.transferFaultToFault(exchange, cme); 332 send(cme); 333 } else { 335 throw new IllegalStateException ("Exchange from target has a " + ExchangeStatus.ACTIVE + " status but has no Fault message"); 336 } 337 } 338 } 339 340 protected Definition getDefinitionFromWsdlExchangeTarget() { 341 Definition rc = super.getDefinitionFromWsdlExchangeTarget(); 342 if( rc !=null ) { 343 } 346 return rc; 347 } 348 349 } 350 | Popular Tags |