KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jsr181 > Jsr181ExchangeProcessor


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jsr181;
18
19 import java.io.ByteArrayOutputStream JavaDoc;
20 import java.util.Iterator JavaDoc;
21
22 import javax.activation.DataHandler JavaDoc;
23 import javax.jbi.messaging.DeliveryChannel;
24 import javax.jbi.messaging.ExchangeStatus;
25 import javax.jbi.messaging.Fault;
26 import javax.jbi.messaging.InOptionalOut;
27 import javax.jbi.messaging.InOut;
28 import javax.jbi.messaging.MessageExchange;
29 import javax.jbi.messaging.NormalizedMessage;
30 import javax.xml.stream.XMLStreamException;
31 import javax.xml.stream.XMLStreamReader;
32 import javax.xml.transform.Source JavaDoc;
33 import javax.xml.transform.TransformerException JavaDoc;
34
35 import org.apache.servicemix.common.ExchangeProcessor;
36 import org.apache.servicemix.common.xbean.XBeanServiceUnit;
37 import org.apache.servicemix.jbi.jaxp.StAXSourceTransformer;
38 import org.apache.servicemix.jbi.jaxp.StringSource;
39 import org.apache.servicemix.jsr181.xfire.JbiTransport;
40 import org.codehaus.xfire.MessageContext;
41 import org.codehaus.xfire.XFire;
42 import org.codehaus.xfire.attachments.JavaMailAttachments;
43 import org.codehaus.xfire.attachments.SimpleAttachment;
44 import org.codehaus.xfire.exchange.InMessage;
45 import org.codehaus.xfire.service.OperationInfo;
46 import org.codehaus.xfire.service.Service;
47 import org.codehaus.xfire.transport.Channel;
48 import org.codehaus.xfire.transport.Transport;
49
50 public class Jsr181ExchangeProcessor implements ExchangeProcessor {
51
52     protected DeliveryChannel channel;
53     protected Jsr181Endpoint endpoint;
54     protected StAXSourceTransformer transformer;
55     
56     public Jsr181ExchangeProcessor(Jsr181Endpoint endpoint) {
57         this.endpoint = endpoint;
58         this.transformer = new StAXSourceTransformer();
59     }
60
61     public void process(MessageExchange exchange) throws Exception JavaDoc {
62         ClassLoader JavaDoc oldCl = Thread.currentThread().getContextClassLoader();
63         try {
64             ClassLoader JavaDoc classLoader = ((XBeanServiceUnit) endpoint.getServiceUnit()).getConfigurationClassLoader();
65             Thread.currentThread().setContextClassLoader(classLoader);
66             doProcess(exchange);
67         } finally {
68             Thread.currentThread().setContextClassLoader(oldCl);
69         }
70     }
71         
72     protected void doProcess(MessageExchange exchange) throws Exception JavaDoc {
73         if (exchange.getStatus() == ExchangeStatus.DONE) {
74             return;
75         } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
76             return;
77         }
78
79         // TODO: fault should not be serialized as soap
80
// TODO: clean this code
81
XFire xfire = endpoint.getXFire();
82         Service service = endpoint.getXFireService();
83         Transport t = xfire.getTransportManager().getTransport(JbiTransport.JBI_BINDING);
84         ByteArrayOutputStream JavaDoc out = new ByteArrayOutputStream JavaDoc();
85         Channel c = t.createChannel();
86         MessageContext ctx = new MessageContext();
87         ctx.setXFire(xfire);
88         ctx.setService(service);
89         ctx.setProperty(Channel.BACKCHANNEL_URI, out);
90         ctx.setExchange(new org.codehaus.xfire.exchange.MessageExchange(ctx));
91         InMessage msg = new InMessage();
92         ctx.getExchange().setInMessage(msg);
93         if (exchange.getOperation() != null) {
94             OperationInfo op = service.getServiceInfo().getOperation(exchange.getOperation().getLocalPart());
95             if (op != null) {
96                 ctx.getExchange().setOperation(op);
97             }
98         }
99         ctx.setCurrentMessage(msg);
100         NormalizedMessage in = exchange.getMessage("in");
101         msg.setXMLStreamReader(getXMLStreamReader(in.getContent()));
102         if (in.getAttachmentNames() != null && in.getAttachmentNames().size() > 0) {
103             JavaMailAttachments attachments = new JavaMailAttachments();
104             for (Iterator JavaDoc it = in.getAttachmentNames().iterator(); it.hasNext();) {
105                 String JavaDoc name = (String JavaDoc) it.next();
106                 DataHandler JavaDoc dh = in.getAttachment(name);
107                 attachments.addPart(new SimpleAttachment(name, dh));
108             }
109             msg.setAttachments(attachments);
110         }
111         c.receive(ctx, msg);
112         c.close();
113         
114         // Set response or DONE status
115
if (isInAndOut(exchange)) {
116             if (ctx.getExchange().hasFaultMessage() && ctx.getExchange().getFaultMessage().getBody() != null) {
117                 Fault fault = exchange.createFault();
118                 fault.setContent(new StringSource(out.toString()));
119                 exchange.setFault(fault);
120             } else {
121                 NormalizedMessage outMsg = exchange.createMessage();
122                 outMsg.setContent(new StringSource(out.toString()));
123                 exchange.setMessage(outMsg, "out");
124             }
125         } else {
126             exchange.setStatus(ExchangeStatus.DONE);
127         }
128         channel.send(exchange);
129     }
130
131     public void start() throws Exception JavaDoc {
132         channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
133     }
134
135     public void stop() throws Exception JavaDoc {
136     }
137
138     protected XMLStreamReader getXMLStreamReader(Source JavaDoc source) throws TransformerException JavaDoc, XMLStreamException {
139         return transformer.toXMLStreamReader(source);
140     }
141     
142     protected boolean isInAndOut(MessageExchange exchange) {
143         return exchange instanceof InOut || exchange instanceof InOptionalOut;
144     }
145
146 }
147
Popular Tags