1 10 11 package org.mule.providers.soap.xfire.transport; 12 13 import java.io.IOException ; 14 import java.io.InputStream ; 15 import java.io.OutputStream ; 16 import java.io.PipedInputStream ; 17 import java.io.PipedOutputStream ; 18 import java.io.Reader ; 19 import java.io.StringReader ; 20 import java.io.UnsupportedEncodingException ; 21 22 import javax.resource.spi.work.Work ; 23 import javax.resource.spi.work.WorkException ; 24 import javax.xml.stream.XMLStreamReader; 25 import javax.xml.stream.XMLStreamWriter; 26 27 import org.apache.commons.io.output.ByteArrayOutputStream; 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 import org.codehaus.xfire.MessageContext; 31 import org.codehaus.xfire.XFire; 32 import org.codehaus.xfire.XFireException; 33 import org.codehaus.xfire.XFireRuntimeException; 34 import org.codehaus.xfire.exchange.AbstractMessage; 35 import org.codehaus.xfire.exchange.InMessage; 36 import org.codehaus.xfire.exchange.OutMessage; 37 import org.codehaus.xfire.service.Service; 38 import org.codehaus.xfire.transport.AbstractChannel; 39 import org.codehaus.xfire.transport.Channel; 40 import org.codehaus.xfire.transport.Session; 41 import org.codehaus.xfire.transport.Transport; 42 import org.codehaus.xfire.util.STAXUtils; 43 import org.mule.MuleException; 44 import org.mule.impl.message.ExceptionPayload; 45 import org.mule.providers.soap.xfire.XFireConnector; 46 import org.mule.umo.UMOEventContext; 47 import org.mule.umo.UMOException; 48 import org.mule.umo.manager.UMOWorkManager; 49 import org.mule.util.StringUtils; 50 51 import edu.emory.mathcs.backport.java.util.concurrent.Semaphore; 52 53 56 public class MuleLocalChannel extends AbstractChannel 57 { 58 protected static final String SENDER_URI = "senderUri"; 59 protected static final String OLD_CONTEXT = "urn:xfire:transport:local:oldContext"; 60 61 64 protected transient Log logger = LogFactory.getLog(getClass()); 65 66 private final Session session; 67 protected UMOWorkManager workManager; 68 69 public MuleLocalChannel(String uri, Transport transport, Session session) 70 { 71 this.session = session; 72 setUri(uri); 73 setTransport(transport); 74 } 75 76 public void open() 77 { 78 } 80 81 public void send(final MessageContext context, final OutMessage message) throws XFireException 82 { 83 if (message.getUri().equals(Channel.BACKCHANNEL_URI)) 84 { 85 final OutputStream out = (OutputStream )context.getProperty(Channel.BACKCHANNEL_URI); 86 if (out != null) 87 { 88 final XMLStreamWriter writer = STAXUtils.createXMLStreamWriter(out, message.getEncoding(), 89 context); 90 91 message.getSerializer().writeMessage(message, writer, context); 92 } 93 else 94 { 95 MessageContext oldContext = (MessageContext)context.getProperty(OLD_CONTEXT); 96 97 sendViaNewChannel(context, oldContext, message, (String )context.getProperty(SENDER_URI)); 98 } 99 } 100 else 101 { 102 MessageContext receivingContext = new MessageContext(); 103 receivingContext.setXFire(context.getXFire()); 104 receivingContext.setService(getService(context.getXFire(), message.getUri())); 105 receivingContext.setProperty(OLD_CONTEXT, context); 106 receivingContext.setProperty(SENDER_URI, getUri()); 107 receivingContext.setSession(session); 108 109 sendViaNewChannel(context, receivingContext, message, message.getUri()); 110 } 111 } 112 113 protected Service getService(XFire xfire, String uri) throws XFireException 114 { 115 if (null == xfire) 116 { 117 logger.warn("No XFire instance in context, unable to determine service"); 118 return null; 119 } 120 121 int i = uri.indexOf("//"); 122 123 if (i == -1) 124 { 125 throw new XFireException("Malformed service URI"); 126 } 127 128 String name = uri.substring(i + 2); 129 Service service = xfire.getServiceRegistry().getService(name); 130 131 if (null == service) 132 { 133 logger.warn("Unable to locate '" + name + "' in ServiceRegistry"); 135 } 136 137 return service; 138 } 139 140 private void sendViaNewChannel(final MessageContext context, 141 final MessageContext receivingContext, 142 final OutMessage message, 143 final String uri) throws XFireException 144 { 145 try 146 { 147 Channel channel; 148 PipedInputStream stream = new PipedInputStream (); 149 PipedOutputStream outStream = new PipedOutputStream (stream); 150 try 151 { 152 channel = getTransport().createChannel(uri); 153 } 154 catch (Exception e) 155 { 156 throw new XFireException("Couldn't create channel.", e); 157 } 158 159 Semaphore s = new Semaphore(2); 160 try 161 { 162 getWorkManager().scheduleWork(new WriterWorker(outStream, message, context, s)); 163 getWorkManager().scheduleWork( 164 new ReaderWorker(stream, message, channel, uri, receivingContext, s)); 165 } 166 catch (WorkException e) 167 { 168 throw new XFireException("Couldn't schedule worker threads. " + e.getMessage(), e); 169 } 170 171 try 172 { 173 s.acquire(); 174 } 175 catch (InterruptedException e) 176 { 177 } 179 } 180 catch (IOException e) 181 { 182 throw new XFireRuntimeException("Couldn't create stream.", e); 183 } 184 } 185 186 public void close() 187 { 188 } 190 191 public boolean isAsync() 192 { 193 return true; 194 } 195 196 public UMOWorkManager getWorkManager() 197 { 198 return workManager; 199 } 200 201 public void setWorkManager(UMOWorkManager workManager) 202 { 203 this.workManager = workManager; 204 } 205 206 private class ReaderWorker implements Work 207 { 208 209 private InputStream stream; 210 private OutMessage message; 211 private Channel channel; 212 private String uri; 213 private MessageContext context; 214 private Semaphore semaphore; 215 216 public ReaderWorker(InputStream stream, 217 OutMessage message, 218 Channel channel, 219 String uri, 220 MessageContext context, 221 Semaphore semaphore) 222 { 223 this.stream = stream; 224 this.message = message; 225 this.channel = channel; 226 this.uri = uri; 227 this.context = context; 228 this.semaphore = semaphore; 229 } 230 231 public void run() 232 { 233 try 234 { 235 final XMLStreamReader reader = STAXUtils.createXMLStreamReader(stream, message.getEncoding(), 236 context); 237 final InMessage inMessage = new InMessage(reader, uri); 238 inMessage.setEncoding(message.getEncoding()); 239 240 channel.receive(context, inMessage); 241 242 reader.close(); 243 stream.close(); 244 } 245 catch (Exception e) 246 { 247 throw new XFireRuntimeException("Couldn't read stream.", e); 248 } 249 finally 250 { 251 semaphore.release(); 252 } 253 } 254 255 public void release() 256 { 257 } 259 } 260 261 private class WriterWorker implements Work 262 { 263 264 private OutputStream stream; 265 private OutMessage message; 266 private MessageContext context; 267 private Semaphore semaphore; 268 269 public WriterWorker(OutputStream stream, 270 OutMessage message, 271 MessageContext context, 272 Semaphore semaphore) 273 { 274 this.stream = stream; 275 this.message = message; 276 this.context = context; 277 this.semaphore = semaphore; 278 } 279 280 public void run() 281 { 282 try 283 { 284 final XMLStreamWriter writer = STAXUtils.createXMLStreamWriter(stream, message.getEncoding(), 285 context); 286 message.getSerializer().writeMessage(message, writer, context); 287 288 writer.close(); 289 stream.close(); 290 291 } 292 catch (Exception e) 293 { 294 throw new XFireRuntimeException("Couldn't write stream.", e); 295 } 296 finally 297 { 298 semaphore.release(); 299 } 300 } 301 302 public void release() 303 { 304 } 306 } 307 308 311 protected String getService(UMOEventContext context) 312 { 313 String pathInfo = context.getEndpointURI().getPath(); 314 315 if (StringUtils.isEmpty(pathInfo)) 316 { 317 return context.getEndpointURI().getHost(); 318 } 319 320 String serviceName; 321 322 int i = pathInfo.lastIndexOf("/"); 323 324 if (i > -1) 325 { 326 serviceName = pathInfo.substring(i + 1); 327 } 328 else 329 { 330 serviceName = pathInfo; 331 } 332 333 return serviceName; 334 } 335 336 public Object onCall(UMOEventContext ctx) throws UMOException 337 { 338 339 try 340 { 341 MessageContext context = new MessageContext(); 342 343 XFire xfire = (XFire)ctx.getComponentDescriptor().getProperties().get( 344 XFireConnector.XFIRE_PROPERTY); 345 346 context.setService(xfire.getServiceRegistry().getService(getService(ctx))); 347 context.setXFire(xfire); 348 349 ByteArrayOutputStream resultStream = new ByteArrayOutputStream(); 351 352 context.setProperty(Channel.BACKCHANNEL_URI, resultStream); 354 355 XMLStreamReader reader; 356 357 Object payload = ctx.getMessage().getPayload(); 359 if (payload instanceof InputStream ) 360 { 361 reader = STAXUtils.createXMLStreamReader((InputStream )payload, ctx.getEncoding(), context); 362 } 363 else if (payload instanceof Reader ) 364 { 365 reader = STAXUtils.createXMLStreamReader((Reader )payload, context); 366 } 367 else 368 { 369 String text = ctx.getTransformedMessageAsString(ctx.getEncoding()); 370 reader = STAXUtils.createXMLStreamReader(new StringReader (text), context); 371 } 372 373 InMessage in = new InMessage(reader, getUri()); 374 375 receive(context, in); 376 377 Object result = null; 378 379 try 380 { 381 AbstractMessage fault = context.getExchange().getFaultMessage(); 387 if (fault != null && fault.getBody() != null) 388 { 389 result = resultStream.toString(fault.getEncoding()); 390 ExceptionPayload exceptionPayload = new ExceptionPayload(new Exception (result.toString())); 391 ctx.getMessage().setExceptionPayload(exceptionPayload); 392 } 393 else if (context.getExchange().hasOutMessage()) 394 { 395 result = resultStream.toString(context.getExchange().getOutMessage().getEncoding()); 396 } 397 } 398 catch (UnsupportedEncodingException e1) 399 { 400 throw new MuleException(e1); 401 } 402 403 return result; 404 405 } 406 catch (UMOException e) 407 { 408 logger.warn("Could not dispatch message to XFire!", e); 409 throw e; 410 } 411 } 412 } 413 | Popular Tags |