1 17 package org.apache.servicemix.eip.patterns; 18 19 import javax.jbi.management.DeploymentException; 20 import javax.jbi.messaging.ExchangeStatus; 21 import javax.jbi.messaging.InOnly; 22 import javax.jbi.messaging.MessageExchange; 23 import javax.jbi.messaging.NormalizedMessage; 24 25 import org.apache.servicemix.JbiConstants; 26 import org.apache.servicemix.eip.EIPEndpoint; 27 import org.apache.servicemix.eip.support.ExchangeTarget; 28 import org.apache.servicemix.jbi.util.MessageUtil; 29 import org.apache.servicemix.store.Store; 30 31 46 public class WireTap extends EIPEndpoint { 47 48 51 private ExchangeTarget target; 52 55 private ExchangeTarget inListener; 56 59 private ExchangeTarget outListener; 60 63 private ExchangeTarget faultListener; 64 67 private String correlation; 68 69 72 public ExchangeTarget getTarget() { 73 return target; 74 } 75 76 79 public void setTarget(ExchangeTarget target) { 80 this.target = target; 81 this.wsdlExchangeTarget = target; 82 } 83 84 87 public ExchangeTarget getFaultListener() { 88 return faultListener; 89 } 90 91 94 public void setFaultListener(ExchangeTarget faultListener) { 95 this.faultListener = faultListener; 96 } 97 98 101 public ExchangeTarget getInListener() { 102 return inListener; 103 } 104 105 108 public void setInListener(ExchangeTarget inListener) { 109 this.inListener = inListener; 110 } 111 112 115 public ExchangeTarget getOutListener() { 116 return outListener; 117 } 118 119 122 public void setOutListener(ExchangeTarget outListener) { 123 this.outListener = outListener; 124 } 125 126 129 public void validate() throws DeploymentException { 130 super.validate(); 131 if (target == null) { 133 throw new IllegalArgumentException ("target should be set to a valid ExchangeTarget"); 134 } 135 correlation = "WireTap.Correlation." + getService() + "." + getEndpoint(); 137 } 138 139 142 protected void processSync(MessageExchange exchange) throws Exception { 143 MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern()); 145 target.configureTarget(tme, getContext()); 146 sendSyncToListenerAndTarget(exchange, tme, inListener, "in"); 147 if (tme.getStatus() == ExchangeStatus.DONE) { 148 done(exchange); 149 } else if (tme.getStatus() == ExchangeStatus.ERROR) { 150 fail(exchange, tme.getError()); 151 } else if (tme.getFault() != null) { 152 sendSyncToListenerAndTarget(tme, exchange, faultListener, "fault"); 153 done(tme); 154 } else if (tme.getMessage("out") != null) { 155 sendSyncToListenerAndTarget(tme, exchange, outListener, "out"); 156 done(tme); 157 } else { 158 done(tme); 159 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message"); 160 } 161 } 162 163 166 protected void processAsync(MessageExchange exchange) throws Exception { 167 if (exchange.getRole() == MessageExchange.Role.PROVIDER && 168 exchange.getProperty(correlation) == null) { 169 MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern()); 171 if (store.hasFeature(Store.CLUSTERED)) { 172 exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE); 173 tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE); 174 } 175 target.configureTarget(tme, getContext()); 176 exchange.setProperty(correlation, tme.getExchangeId()); 178 tme.setProperty(correlation, exchange.getExchangeId()); 179 store.store(exchange.getExchangeId(), exchange); 181 sendToListenerAndTarget(exchange, tme, inListener, "in"); 183 } else { 185 String id = (String ) exchange.getProperty(correlation); 186 if (id == null) { 187 if (exchange.getRole() == MessageExchange.Role.CONSUMER && 188 exchange.getStatus() != ExchangeStatus.ACTIVE) { 189 return; 191 } 192 throw new IllegalStateException (correlation + " property not found"); 193 } 194 MessageExchange org = (MessageExchange) store.load(id); 195 if (org == null) { 196 throw new IllegalStateException ("Could not load original exchange with id " + id); 197 } 198 if (exchange.getStatus() == ExchangeStatus.DONE) { 200 done(org); 201 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 203 fail(org, exchange.getError()); 204 } else if (exchange.getFault() != null) { 206 store.store(exchange.getExchangeId(), exchange); 207 sendToListenerAndTarget(exchange, org, faultListener, "fault"); 208 } else if (exchange.getMessage("out") != null) { 210 store.store(exchange.getExchangeId(), exchange); 211 sendToListenerAndTarget(exchange, org, outListener, "out"); 212 } else { 213 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message"); 214 } 215 } 216 } 217 218 private void sendToListenerAndTarget(MessageExchange source, 219 MessageExchange dest, 220 ExchangeTarget listener, 221 String message) throws Exception { 222 if (listener != null) { 223 NormalizedMessage msg = MessageUtil.copy(source.getMessage(message)); 224 InOnly lme = exchangeFactory.createInOnlyExchange(); 225 if (store.hasFeature(Store.CLUSTERED)) { 226 lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE); 227 } 228 listener.configureTarget(lme, getContext()); 229 MessageUtil.transferToIn(msg, lme); 230 send(lme); 231 MessageUtil.transferTo(msg, dest, message); 232 send(dest); 233 } else { 234 MessageUtil.transferTo(source, dest, message); 235 send(dest); 236 } 237 } 238 239 private void sendSyncToListenerAndTarget(MessageExchange source, 240 MessageExchange dest, 241 ExchangeTarget listener, 242 String message) throws Exception { 243 if (listener != null) { 244 NormalizedMessage msg = MessageUtil.copy(source.getMessage(message)); 245 InOnly lme = exchangeFactory.createInOnlyExchange(); 246 if (store.hasFeature(Store.CLUSTERED)) { 247 lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE); 248 } 249 listener.configureTarget(lme, getContext()); 250 MessageUtil.transferToIn(msg, lme); 251 sendSync(lme); 252 MessageUtil.transferTo(msg, dest, message); 253 sendSync(dest); 254 } else { 255 MessageUtil.transferTo(source, dest, message); 256 sendSync(dest); 257 } 258 } 259 260 } 261 | Popular Tags |