KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > soap > xfire > transport > MuleLocalChannel


1 /*
2  * $Id: MuleLocalChannel.java 4268 2006-12-14 17:08:47Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.soap.xfire.transport;
12
13 import java.io.IOException JavaDoc;
14 import java.io.InputStream JavaDoc;
15 import java.io.OutputStream JavaDoc;
16 import java.io.PipedInputStream JavaDoc;
17 import java.io.PipedOutputStream JavaDoc;
18 import java.io.Reader JavaDoc;
19 import java.io.StringReader JavaDoc;
20 import java.io.UnsupportedEncodingException JavaDoc;
21
22 import javax.resource.spi.work.Work JavaDoc;
23 import javax.resource.spi.work.WorkException JavaDoc;
24 import javax.xml.stream.XMLStreamReader;
25 import javax.xml.stream.XMLStreamWriter;
26
27 import org.apache.commons.io.output.ByteArrayOutputStream;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.codehaus.xfire.MessageContext;
31 import org.codehaus.xfire.XFire;
32 import org.codehaus.xfire.XFireException;
33 import org.codehaus.xfire.XFireRuntimeException;
34 import org.codehaus.xfire.exchange.AbstractMessage;
35 import org.codehaus.xfire.exchange.InMessage;
36 import org.codehaus.xfire.exchange.OutMessage;
37 import org.codehaus.xfire.service.Service;
38 import org.codehaus.xfire.transport.AbstractChannel;
39 import org.codehaus.xfire.transport.Channel;
40 import org.codehaus.xfire.transport.Session;
41 import org.codehaus.xfire.transport.Transport;
42 import org.codehaus.xfire.util.STAXUtils;
43 import org.mule.MuleException;
44 import org.mule.impl.message.ExceptionPayload;
45 import org.mule.providers.soap.xfire.XFireConnector;
46 import org.mule.umo.UMOEventContext;
47 import org.mule.umo.UMOException;
48 import org.mule.umo.manager.UMOWorkManager;
49 import org.mule.util.StringUtils;
50
51 import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
52
53 /**
54  * TODO document
55  */

56 public class MuleLocalChannel extends AbstractChannel
57 {
58     protected static final String JavaDoc SENDER_URI = "senderUri";
59     protected static final String JavaDoc OLD_CONTEXT = "urn:xfire:transport:local:oldContext";
60     
61     /**
62      * logger used by this class
63      */

64     protected transient Log logger = LogFactory.getLog(getClass());
65
66     private final Session session;
67     protected UMOWorkManager workManager;
68
69     public MuleLocalChannel(String JavaDoc uri, Transport transport, Session session)
70     {
71         this.session = session;
72         setUri(uri);
73         setTransport(transport);
74     }
75
76     public void open()
77     {
78         // template method
79
}
80
81     public void send(final MessageContext context, final OutMessage message) throws XFireException
82     {
83         if (message.getUri().equals(Channel.BACKCHANNEL_URI))
84         {
85             final OutputStream JavaDoc out = (OutputStream JavaDoc)context.getProperty(Channel.BACKCHANNEL_URI);
86             if (out != null)
87             {
88                 final XMLStreamWriter writer = STAXUtils.createXMLStreamWriter(out, message.getEncoding(),
89                     context);
90
91                 message.getSerializer().writeMessage(message, writer, context);
92             }
93             else
94             {
95                 MessageContext oldContext = (MessageContext)context.getProperty(OLD_CONTEXT);
96
97                 sendViaNewChannel(context, oldContext, message, (String JavaDoc)context.getProperty(SENDER_URI));
98             }
99         }
100         else
101         {
102             MessageContext receivingContext = new MessageContext();
103             receivingContext.setXFire(context.getXFire());
104             receivingContext.setService(getService(context.getXFire(), message.getUri()));
105             receivingContext.setProperty(OLD_CONTEXT, context);
106             receivingContext.setProperty(SENDER_URI, getUri());
107             receivingContext.setSession(session);
108
109             sendViaNewChannel(context, receivingContext, message, message.getUri());
110         }
111     }
112
113     protected Service getService(XFire xfire, String JavaDoc uri) throws XFireException
114     {
115         if (null == xfire)
116         {
117             logger.warn("No XFire instance in context, unable to determine service");
118             return null;
119         }
120
121         int i = uri.indexOf("//");
122
123         if (i == -1)
124         {
125             throw new XFireException("Malformed service URI");
126         }
127
128         String JavaDoc name = uri.substring(i + 2);
129         Service service = xfire.getServiceRegistry().getService(name);
130
131         if (null == service)
132         {
133             // TODO this should be an exception...
134
logger.warn("Unable to locate '" + name + "' in ServiceRegistry");
135         }
136
137         return service;
138     }
139
140     private void sendViaNewChannel(final MessageContext context,
141                                    final MessageContext receivingContext,
142                                    final OutMessage message,
143                                    final String JavaDoc uri) throws XFireException
144     {
145         try
146         {
147             Channel channel;
148             PipedInputStream JavaDoc stream = new PipedInputStream JavaDoc();
149             PipedOutputStream JavaDoc outStream = new PipedOutputStream JavaDoc(stream);
150             try
151             {
152                 channel = getTransport().createChannel(uri);
153             }
154             catch (Exception JavaDoc e)
155             {
156                 throw new XFireException("Couldn't create channel.", e);
157             }
158
159             Semaphore s = new Semaphore(2);
160             try
161             {
162                 getWorkManager().scheduleWork(new WriterWorker(outStream, message, context, s));
163                 getWorkManager().scheduleWork(
164                     new ReaderWorker(stream, message, channel, uri, receivingContext, s));
165             }
166             catch (WorkException JavaDoc e)
167             {
168                 throw new XFireException("Couldn't schedule worker threads. " + e.getMessage(), e);
169             }
170
171             try
172             {
173                 s.acquire();
174             }
175             catch (InterruptedException JavaDoc e)
176             {
177                 // ignore is ok
178
}
179         }
180         catch (IOException JavaDoc e)
181         {
182             throw new XFireRuntimeException("Couldn't create stream.", e);
183         }
184     }
185
186     public void close()
187     {
188         // template method
189
}
190
191     public boolean isAsync()
192     {
193         return true;
194     }
195
196     public UMOWorkManager getWorkManager()
197     {
198         return workManager;
199     }
200
201     public void setWorkManager(UMOWorkManager workManager)
202     {
203         this.workManager = workManager;
204     }
205
206     private class ReaderWorker implements Work JavaDoc
207     {
208
209         private InputStream JavaDoc stream;
210         private OutMessage message;
211         private Channel channel;
212         private String JavaDoc uri;
213         private MessageContext context;
214         private Semaphore semaphore;
215
216         public ReaderWorker(InputStream JavaDoc stream,
217                             OutMessage message,
218                             Channel channel,
219                             String JavaDoc uri,
220                             MessageContext context,
221                             Semaphore semaphore)
222         {
223             this.stream = stream;
224             this.message = message;
225             this.channel = channel;
226             this.uri = uri;
227             this.context = context;
228             this.semaphore = semaphore;
229         }
230
231         public void run()
232         {
233             try
234             {
235                 final XMLStreamReader reader = STAXUtils.createXMLStreamReader(stream, message.getEncoding(),
236                     context);
237                 final InMessage inMessage = new InMessage(reader, uri);
238                 inMessage.setEncoding(message.getEncoding());
239
240                 channel.receive(context, inMessage);
241
242                 reader.close();
243                 stream.close();
244             }
245             catch (Exception JavaDoc e)
246             {
247                 throw new XFireRuntimeException("Couldn't read stream.", e);
248             }
249             finally
250             {
251                 semaphore.release();
252             }
253         }
254
255         public void release()
256         {
257             // template method
258
}
259     }
260
261     private class WriterWorker implements Work JavaDoc
262     {
263
264         private OutputStream JavaDoc stream;
265         private OutMessage message;
266         private MessageContext context;
267         private Semaphore semaphore;
268
269         public WriterWorker(OutputStream JavaDoc stream,
270                             OutMessage message,
271                             MessageContext context,
272                             Semaphore semaphore)
273         {
274             this.stream = stream;
275             this.message = message;
276             this.context = context;
277             this.semaphore = semaphore;
278         }
279
280         public void run()
281         {
282             try
283             {
284                 final XMLStreamWriter writer = STAXUtils.createXMLStreamWriter(stream, message.getEncoding(),
285                     context);
286                 message.getSerializer().writeMessage(message, writer, context);
287
288                 writer.close();
289                 stream.close();
290
291             }
292             catch (Exception JavaDoc e)
293             {
294                 throw new XFireRuntimeException("Couldn't write stream.", e);
295             }
296             finally
297             {
298                 semaphore.release();
299             }
300         }
301
302         public void release()
303         {
304             // template method
305
}
306     }
307
308     /**
309      * Get the service that is mapped to the specified request.
310      */

311     protected String JavaDoc getService(UMOEventContext context)
312     {
313         String JavaDoc pathInfo = context.getEndpointURI().getPath();
314
315         if (StringUtils.isEmpty(pathInfo))
316         {
317             return context.getEndpointURI().getHost();
318         }
319
320         String JavaDoc serviceName;
321
322         int i = pathInfo.lastIndexOf("/");
323
324         if (i > -1)
325         {
326             serviceName = pathInfo.substring(i + 1);
327         }
328         else
329         {
330             serviceName = pathInfo;
331         }
332
333         return serviceName;
334     }
335
336     public Object JavaDoc onCall(UMOEventContext ctx) throws UMOException
337     {
338
339         try
340         {
341             MessageContext context = new MessageContext();
342   
343             XFire xfire = (XFire)ctx.getComponentDescriptor().getProperties().get(
344                 XFireConnector.XFIRE_PROPERTY);
345
346             context.setService(xfire.getServiceRegistry().getService(getService(ctx)));
347             context.setXFire(xfire);
348
349             // Channel.BACKCHANNEL_URI
350
ByteArrayOutputStream resultStream = new ByteArrayOutputStream();
351
352             // Return the result to us, not to the sender.
353
context.setProperty(Channel.BACKCHANNEL_URI, resultStream);
354
355             XMLStreamReader reader;
356
357             // TODO isStreaming()?
358
Object JavaDoc payload = ctx.getMessage().getPayload();
359             if (payload instanceof InputStream JavaDoc)
360             {
361                 reader = STAXUtils.createXMLStreamReader((InputStream JavaDoc)payload, ctx.getEncoding(), context);
362             }
363             else if (payload instanceof Reader JavaDoc)
364             {
365                 reader = STAXUtils.createXMLStreamReader((Reader JavaDoc)payload, context);
366             }
367             else
368             {
369                 String JavaDoc text = ctx.getTransformedMessageAsString(ctx.getEncoding());
370                 reader = STAXUtils.createXMLStreamReader(new StringReader JavaDoc(text), context);
371             }
372
373             InMessage in = new InMessage(reader, getUri());
374
375             receive(context, in);
376
377             Object JavaDoc result = null;
378
379             try
380             {
381                 // We need to check if there is a fault message. If that's the case,
382
// we need to send back the fault to the client.
383
// TODO: see MULE-1113 for background about this workaround; I'm not
384
// even sure the fault reading is done correctly? (XFire API is a bit
385
// confusing)
386
AbstractMessage fault = context.getExchange().getFaultMessage();
387                 if (fault != null && fault.getBody() != null)
388                 {
389                     result = resultStream.toString(fault.getEncoding());
390                     ExceptionPayload exceptionPayload = new ExceptionPayload(new Exception JavaDoc(result.toString()));
391                     ctx.getMessage().setExceptionPayload(exceptionPayload);
392                 }
393                 else if (context.getExchange().hasOutMessage())
394                 {
395                     result = resultStream.toString(context.getExchange().getOutMessage().getEncoding());
396                 }
397             }
398             catch (UnsupportedEncodingException JavaDoc e1)
399             {
400                 throw new MuleException(e1);
401             }
402
403             return result;
404
405         }
406         catch (UMOException e)
407         {
408             logger.warn("Could not dispatch message to XFire!", e);
409             throw e;
410         }
411     }
412 }
413
Popular Tags