1 17 package org.apache.servicemix.bpe; 18 19 import java.util.Map ; 20 21 import javax.jbi.component.ComponentContext; 22 import javax.jbi.messaging.DeliveryChannel; 23 import javax.jbi.messaging.ExchangeStatus; 24 import javax.jbi.messaging.Fault; 25 import javax.jbi.messaging.InOnly; 26 import javax.jbi.messaging.InOptionalOut; 27 import javax.jbi.messaging.InOut; 28 import javax.jbi.messaging.MessageExchange; 29 import javax.jbi.messaging.NormalizedMessage; 30 import javax.jbi.messaging.RobustInOnly; 31 import javax.jbi.messaging.MessageExchange.Role; 32 import javax.jbi.servicedesc.ServiceEndpoint; 33 import javax.wsdl.Operation; 34 import javax.wsdl.PortType; 35 import javax.xml.transform.dom.DOMSource ; 36 37 import org.apache.ode.bpe.bped.EventDirector; 38 import org.apache.ode.bpe.client.IFormattableValue; 39 import org.apache.ode.bpe.event.BPELStaticKey; 40 import org.apache.ode.bpe.event.IResponseMessage; 41 import org.apache.ode.bpe.event.SimpleRequestMessageEvent; 42 import org.apache.ode.bpe.interaction.IInteraction; 43 import org.apache.ode.bpe.interaction.InteractionException; 44 import org.apache.ode.bpe.interaction.InvocationFactory; 45 import org.apache.ode.bpe.interaction.XMLInteractionObject; 46 import org.apache.ode.bpe.scope.service.BPRuntimeException; 47 import org.apache.servicemix.common.Endpoint; 48 import org.apache.servicemix.common.ExchangeProcessor; 49 import org.apache.servicemix.jbi.jaxp.SourceTransformer; 50 import org.w3c.dom.Document ; 51 52 public class BPEEndpoint extends Endpoint implements ExchangeProcessor { 53 54 protected ServiceEndpoint activated; 55 protected DeliveryChannel channel; 56 protected SourceTransformer transformer = new SourceTransformer(); 57 58 private static final ThreadLocal ENDPOINT = new ThreadLocal (); 59 60 public static BPEEndpoint getCurrent() { 61 return (BPEEndpoint) ENDPOINT.get(); 62 } 63 64 public static void setCurrent(BPEEndpoint endpoint) { 65 ENDPOINT.set(endpoint); 66 } 67 68 public Role getRole() { 69 return Role.PROVIDER; 70 } 71 72 public void activate() throws Exception { 73 logger = this.serviceUnit.getComponent().getLogger(); 74 ComponentContext ctx = this.serviceUnit.getComponent().getComponentContext(); 75 activated = ctx.activateEndpoint(service, endpoint); 76 channel = ctx.getDeliveryChannel(); 77 } 78 79 public void deactivate() throws Exception { 80 ServiceEndpoint ep = activated; 81 activated = null; 82 ComponentContext ctx = this.serviceUnit.getComponent().getComponentContext(); 83 ctx.deactivateEndpoint(ep); 84 } 85 86 public ExchangeProcessor getProcessor() { 87 return this; 88 } 89 90 public void process(MessageExchange exchange) throws Exception { 91 if (exchange.getStatus() == ExchangeStatus.DONE) { 92 return; 93 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 94 return; 95 } 96 97 String inputPartName = BPEComponent.PART_PAYLOAD; 98 String outputPartName = BPEComponent.PART_PAYLOAD; 99 if (exchange.getOperation() != null) { 100 PortType pt = getDefinition().getPortType(getInterfaceName()); 101 Operation oper = pt.getOperation(exchange.getOperation().getLocalPart(), null, null); 102 if (oper.getInput() != null && oper.getInput().getMessage() != null) { 103 Map parts = oper.getInput().getMessage().getParts(); 104 inputPartName = (String ) parts.keySet().iterator().next(); 105 } 106 if (oper.getOutput() != null && oper.getOutput().getMessage() != null) { 107 Map parts = oper.getOutput().getMessage().getParts(); 108 outputPartName = (String ) parts.keySet().iterator().next(); 109 } 110 } 111 112 113 BPELStaticKey bsk = new BPELStaticKey(); 114 bsk.setTargetNamespace(getInterfaceName().getNamespaceURI()); 115 bsk.setPortType(getInterfaceName().getLocalPart()); 116 if (exchange.getOperation() != null) { 117 bsk.setOperation(exchange.getOperation().getLocalPart()); 118 } 119 SimpleRequestMessageEvent msg = new SimpleRequestMessageEvent(); 120 msg.setStaticKey(bsk); 121 XMLInteractionObject interaction = new XMLInteractionObject(); 122 interaction.setDocument(transformer.toDOMDocument(exchange.getMessage("in"))); 123 msg.setPart(inputPartName, interaction); 124 125 EventDirector ed = ((BPEComponent) getServiceUnit().getComponent()).getEventDirector(); 126 try { 127 IResponseMessage response; 128 try { 129 BPEEndpoint.setCurrent(this); 130 response = ed.sendEvent(msg, true); 131 } finally { 132 BPEEndpoint.setCurrent(null); 133 } 134 IInteraction payload = response.getPart(outputPartName); 135 if (response.getFault() != null) { 136 Exception e = response.getFault().getFaultException(); 137 if (e != null) { 138 throw e; 139 } 140 throw new BPRuntimeException(response.getFault().getFaultString(), ""); 142 } else if (exchange instanceof InOnly || exchange instanceof RobustInOnly) { 143 if (payload != null) { 144 throw new UnsupportedOperationException ("Did not expect return value for in-only or robust-in-only"); 145 } 146 exchange.setStatus(ExchangeStatus.DONE); 147 channel.send(exchange); 148 } else if (exchange instanceof InOptionalOut) { 149 if (payload == null) { 150 exchange.setStatus(ExchangeStatus.DONE); 151 channel.send(exchange); 152 } else { 153 NormalizedMessage out = exchange.createMessage(); 154 out.setContent(new DOMSource (getDocument(payload))); 155 exchange.setMessage(out, "out"); 156 channel.send(exchange); 157 } 158 } else if (exchange instanceof InOut) { 159 if (payload == null) { 160 throw new UnsupportedOperationException ("Expected return data for in-out"); 161 } 162 NormalizedMessage out = exchange.createMessage(); 163 out.setContent(new DOMSource (getDocument(payload))); 164 exchange.setMessage(out, "out"); 165 channel.send(exchange); 166 } else { 167 throw new UnsupportedOperationException ("Unhandled mep: " + exchange.getPattern()); 168 } 169 } catch (BPRuntimeException e) { 170 if (logger.isDebugEnabled()) { 171 logger.debug("Exception caught", e); 172 } 173 Object payload = e.getPartMessage(BPEComponent.PART_PAYLOAD); 174 if (payload instanceof IInteraction) { 175 Fault fault = exchange.createFault(); 176 fault.setContent(new DOMSource (getDocument((IInteraction) payload))); 177 exchange.setFault(fault); 178 } else if (payload instanceof IFormattableValue) { 179 Fault fault = exchange.createFault(); 180 Document doc = (Document ) ((IFormattableValue) payload).getValueAs(Document .class); 181 fault.setContent(new DOMSource (doc)); 182 exchange.setFault(fault); 183 } else { 184 exchange.setError(e); 185 } 186 channel.send(exchange); 187 } 188 } 189 190 public void start() throws Exception { 191 } 192 193 public void stop() throws Exception { 194 } 195 196 protected Document getDocument(IInteraction interaction) throws InteractionException { 197 Object obj = interaction.invoke(InvocationFactory.newInstance().createGetObjectInvocation()); 198 if (obj instanceof Document ) { 199 return (Document ) obj; 200 } else if (obj instanceof IFormattableValue) { 201 return (Document ) ((IFormattableValue) obj).getValueAs(Document .class); 202 } else { 203 throw new IllegalStateException ("Unable to handle object of type: " + obj.getClass().getName()); 204 } 205 } 206 207 208 } 209 | Popular Tags |