1 17 package org.apache.servicemix.jsr181; 18 19 import java.io.ByteArrayOutputStream ; 20 import java.util.Iterator ; 21 22 import javax.activation.DataHandler ; 23 import javax.jbi.messaging.DeliveryChannel; 24 import javax.jbi.messaging.ExchangeStatus; 25 import javax.jbi.messaging.Fault; 26 import javax.jbi.messaging.InOptionalOut; 27 import javax.jbi.messaging.InOut; 28 import javax.jbi.messaging.MessageExchange; 29 import javax.jbi.messaging.NormalizedMessage; 30 import javax.xml.stream.XMLStreamException; 31 import javax.xml.stream.XMLStreamReader; 32 import javax.xml.transform.Source ; 33 import javax.xml.transform.TransformerException ; 34 35 import org.apache.servicemix.common.ExchangeProcessor; 36 import org.apache.servicemix.common.xbean.XBeanServiceUnit; 37 import org.apache.servicemix.jbi.jaxp.StAXSourceTransformer; 38 import org.apache.servicemix.jbi.jaxp.StringSource; 39 import org.apache.servicemix.jsr181.xfire.JbiTransport; 40 import org.codehaus.xfire.MessageContext; 41 import org.codehaus.xfire.XFire; 42 import org.codehaus.xfire.attachments.JavaMailAttachments; 43 import org.codehaus.xfire.attachments.SimpleAttachment; 44 import org.codehaus.xfire.exchange.InMessage; 45 import org.codehaus.xfire.service.OperationInfo; 46 import org.codehaus.xfire.service.Service; 47 import org.codehaus.xfire.transport.Channel; 48 import org.codehaus.xfire.transport.Transport; 49 50 public class Jsr181ExchangeProcessor implements ExchangeProcessor { 51 52 protected DeliveryChannel channel; 53 protected Jsr181Endpoint endpoint; 54 protected StAXSourceTransformer transformer; 55 56 public Jsr181ExchangeProcessor(Jsr181Endpoint endpoint) { 57 this.endpoint = endpoint; 58 this.transformer = new StAXSourceTransformer(); 59 } 60 61 public void process(MessageExchange exchange) throws Exception { 62 ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); 63 try { 64 ClassLoader classLoader = ((XBeanServiceUnit) endpoint.getServiceUnit()).getConfigurationClassLoader(); 65 Thread.currentThread().setContextClassLoader(classLoader); 66 doProcess(exchange); 67 } finally { 68 Thread.currentThread().setContextClassLoader(oldCl); 69 } 70 } 71 72 protected void doProcess(MessageExchange exchange) throws Exception { 73 if (exchange.getStatus() == ExchangeStatus.DONE) { 74 return; 75 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 76 return; 77 } 78 79 XFire xfire = endpoint.getXFire(); 82 Service service = endpoint.getXFireService(); 83 Transport t = xfire.getTransportManager().getTransport(JbiTransport.JBI_BINDING); 84 ByteArrayOutputStream out = new ByteArrayOutputStream (); 85 Channel c = t.createChannel(); 86 MessageContext ctx = new MessageContext(); 87 ctx.setXFire(xfire); 88 ctx.setService(service); 89 ctx.setProperty(Channel.BACKCHANNEL_URI, out); 90 ctx.setExchange(new org.codehaus.xfire.exchange.MessageExchange(ctx)); 91 InMessage msg = new InMessage(); 92 ctx.getExchange().setInMessage(msg); 93 if (exchange.getOperation() != null) { 94 OperationInfo op = service.getServiceInfo().getOperation(exchange.getOperation().getLocalPart()); 95 if (op != null) { 96 ctx.getExchange().setOperation(op); 97 } 98 } 99 ctx.setCurrentMessage(msg); 100 NormalizedMessage in = exchange.getMessage("in"); 101 msg.setXMLStreamReader(getXMLStreamReader(in.getContent())); 102 if (in.getAttachmentNames() != null && in.getAttachmentNames().size() > 0) { 103 JavaMailAttachments attachments = new JavaMailAttachments(); 104 for (Iterator it = in.getAttachmentNames().iterator(); it.hasNext();) { 105 String name = (String ) it.next(); 106 DataHandler dh = in.getAttachment(name); 107 attachments.addPart(new SimpleAttachment(name, dh)); 108 } 109 msg.setAttachments(attachments); 110 } 111 c.receive(ctx, msg); 112 c.close(); 113 114 if (isInAndOut(exchange)) { 116 if (ctx.getExchange().hasFaultMessage() && ctx.getExchange().getFaultMessage().getBody() != null) { 117 Fault fault = exchange.createFault(); 118 fault.setContent(new StringSource(out.toString())); 119 exchange.setFault(fault); 120 } else { 121 NormalizedMessage outMsg = exchange.createMessage(); 122 outMsg.setContent(new StringSource(out.toString())); 123 exchange.setMessage(outMsg, "out"); 124 } 125 } else { 126 exchange.setStatus(ExchangeStatus.DONE); 127 } 128 channel.send(exchange); 129 } 130 131 public void start() throws Exception { 132 channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel(); 133 } 134 135 public void stop() throws Exception { 136 } 137 138 protected XMLStreamReader getXMLStreamReader(Source source) throws TransformerException , XMLStreamException { 139 return transformer.toXMLStreamReader(source); 140 } 141 142 protected boolean isInAndOut(MessageExchange exchange) { 143 return exchange instanceof InOut || exchange instanceof InOptionalOut; 144 } 145 146 } 147 | Popular Tags |