1 17 package org.apache.servicemix.eip.support; 18 19 import javax.jbi.management.DeploymentException; 20 import javax.jbi.messaging.ExchangeStatus; 21 import javax.jbi.messaging.Fault; 22 import javax.jbi.messaging.MessageExchange; 23 import javax.jbi.messaging.NormalizedMessage; 24 25 import org.apache.servicemix.JbiConstants; 26 import org.apache.servicemix.eip.EIPEndpoint; 27 import org.apache.servicemix.jbi.util.MessageUtil; 28 import org.apache.servicemix.store.Store; 29 30 39 public abstract class AbstractContentBasedRouter extends EIPEndpoint { 40 41 44 private String correlation; 45 46 49 public void validate() throws DeploymentException { 50 super.validate(); 51 correlation = "AbstractContentBasedRouter.Correlation." + getService() + "." + getEndpoint(); 53 } 54 55 58 protected void processSync(MessageExchange exchange) throws Exception { 59 MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern()); 61 NormalizedMessage in = MessageUtil.copyIn(exchange); 65 MessageUtil.transferToIn(in, tme); 66 ExchangeTarget target = getDestination(tme); 68 target.configureTarget(tme, getContext()); 69 sendSync(tme); 71 if (tme.getStatus() == ExchangeStatus.DONE) { 73 done(exchange); 74 } else if (tme.getStatus() == ExchangeStatus.ERROR) { 75 fail(exchange, tme.getError()); 76 } else if (tme.getFault() != null) { 77 Fault fault = MessageUtil.copyFault(tme); 78 done(tme); 79 MessageUtil.transferToFault(fault, exchange); 80 sendSync(exchange); 81 } else if (tme.getMessage("out") != null) { 82 NormalizedMessage out = MessageUtil.copyOut(tme); 83 done(tme); 84 MessageUtil.transferToOut(out, exchange); 85 sendSync(exchange); 86 } else { 87 done(tme); 88 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message"); 89 } 90 } 91 92 95 protected void processAsync(MessageExchange exchange) throws Exception { 96 if (exchange.getRole() == MessageExchange.Role.PROVIDER && 97 exchange.getProperty(correlation) == null) { 98 MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern()); 100 if (store.hasFeature(Store.CLUSTERED)) { 101 exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE); 102 tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE); 103 } 104 tme.setProperty(correlation, exchange.getExchangeId()); 106 exchange.setProperty(correlation, tme.getExchangeId()); 107 store.store(exchange.getExchangeId(), exchange); 109 NormalizedMessage in = MessageUtil.copyIn(exchange); 113 MessageUtil.transferToIn(in, tme); 114 ExchangeTarget target = getDestination(tme); 116 target.configureTarget(tme, getContext()); 117 send(tme); 119 } else { 121 String id = (String ) exchange.getProperty(correlation); 122 if (id == null) { 123 throw new IllegalStateException (correlation + " property not found"); 124 } 125 MessageExchange org = (MessageExchange) store.load(id); 126 if (org == null) { 127 throw new IllegalStateException ("Could not load original exchange with id " + id); 128 } 129 if (exchange.getStatus() == ExchangeStatus.DONE) { 131 done(org); 132 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 134 fail(org, exchange.getError()); 135 } else if (exchange.getFault() != null) { 137 store.store(exchange.getExchangeId(), exchange); 138 MessageUtil.transferTo(exchange, org, "fault"); 139 send(org); 140 } else if (exchange.getMessage("out") != null) { 142 store.store(exchange.getExchangeId(), exchange); 143 MessageUtil.transferTo(exchange, org, "out"); 144 send(org); 145 } else { 146 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message"); 147 } 148 } 149 } 150 151 157 protected abstract ExchangeTarget getDestination(MessageExchange exchange) throws Exception ; 158 159 } 160 | Popular Tags |