1 package org.objectweb.celtix.jbi.transport; 2 3 4 5 import java.io.ByteArrayInputStream ; 6 import java.io.ByteArrayOutputStream ; 7 import java.io.IOException ; 8 import java.io.InputStream ; 9 import java.util.logging.Level ; 10 import java.util.logging.Logger ; 11 12 import javax.jbi.messaging.DeliveryChannel; 13 import javax.jbi.messaging.MessageExchange; 14 import javax.jbi.messaging.NormalizedMessage; 15 import javax.jbi.servicedesc.ServiceEndpoint; 16 import javax.xml.namespace.QName ; 17 import javax.xml.parsers.DocumentBuilder ; 18 import javax.xml.parsers.DocumentBuilderFactory ; 19 import javax.xml.transform.dom.DOMSource ; 20 import javax.xml.ws.handler.MessageContext; 21 22 import org.w3c.dom.Document ; 23 24 import org.objectweb.celtix.context.ObjectMessageContext; 25 import org.objectweb.celtix.context.ObjectMessageContextImpl; 26 import org.objectweb.celtix.context.OutputStreamMessageContext; 27 import org.objectweb.celtix.jbi.se.CeltixServiceUnit; 28 import org.objectweb.celtix.jbi.se.CeltixServiceUnitManager; 29 import org.objectweb.celtix.transports.ServerTransport; 30 import org.objectweb.celtix.transports.ServerTransportCallback; 31 import org.objectweb.celtix.ws.addressing.EndpointReferenceType; 32 33 38 public class JBIServerTransport implements ServerTransport { 39 40 private static final Logger LOG = Logger.getLogger(JBIServerTransport.class.getName()); 41 42 private static final String MESSAGE_EXCHANGE_PROPERTY = "celtix.jbi.message.exchange"; 43 private final CeltixServiceUnitManager suManager; 44 private final DeliveryChannel channel; 45 private ServerTransportCallback callback; 46 private volatile boolean running; 47 private JBIDispatcher dispatcher; 48 private final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); 49 50 51 public JBIServerTransport(CeltixServiceUnitManager sum, DeliveryChannel dc) { 52 suManager = sum; 53 channel = dc; 54 docBuilderFactory.setNamespaceAware(true); 55 } 56 57 public void shutdown() { 58 running = false; 59 } 60 61 public OutputStreamMessageContext createOutputStreamContext(MessageContext context) 62 throws IOException { 63 64 return new JBIOutputStreamMessageContext(context); 65 } 66 67 68 public void finalPrepareOutputStreamContext(OutputStreamMessageContext context) 69 throws IOException { 70 } 71 72 public void activate(ServerTransportCallback cb) throws IOException { 73 LOG.info("activating JBI server transport"); 75 callback = cb; 76 dispatcher = new JBIDispatcher(); 77 new Thread (dispatcher).start(); 78 } 79 80 81 public void deactivate() throws IOException { 82 running = false; 83 } 84 85 public void postDispatch(MessageContext ctx, OutputStreamMessageContext msgContext) { 86 87 try { 88 JBIOutputStreamMessageContext jbiCtx = (JBIOutputStreamMessageContext)msgContext; 89 ByteArrayOutputStream baos = (ByteArrayOutputStream )jbiCtx.getOutputStream(); 90 ByteArrayInputStream bais = new ByteArrayInputStream (baos.toByteArray()); 91 LOG.finest("building document from bytes"); 92 DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); 93 Document doc = builder.parse(bais); 94 95 MessageExchange xchng = (MessageExchange)ctx.get(MESSAGE_EXCHANGE_PROPERTY); 96 LOG.fine("creating NormalizedMessage"); 97 NormalizedMessage msg = xchng.createMessage(); 98 msg.setContent(new DOMSource (doc)); 99 xchng.setMessage(msg, "out"); 100 LOG.fine("postDispatch sending out message to NWR"); 101 channel.send(xchng); 102 } catch (Exception ex) { 103 LOG.log(Level.SEVERE, "error sending Out message", ex); 104 } 105 } 106 107 public OutputStreamMessageContext rebase(MessageContext context, 108 EndpointReferenceType decoupledResponseEndpoint) 109 throws IOException { 110 return null; 112 } 113 114 private void dispatch(MessageExchange exchange, ServerTransportCallback cb) 115 throws IOException { 116 117 try { 118 QName opName = exchange.getOperation(); 119 LOG.fine("dispatch: " + opName); 120 121 NormalizedMessage nm = exchange.getMessage("in"); 122 final InputStream in = JBIMessageHelper.convertMessageToInputStream(nm.getContent()); 123 125 ObjectMessageContext ctx = new ObjectMessageContextImpl(); 126 LOG.finest("dispatching message on callback: " + cb); 127 ctx.put(MESSAGE_EXCHANGE_PROPERTY, exchange); 128 cb.dispatch(new JBIInputStreamMessageContext(ctx, in), this); 129 } catch (Exception ex) { 130 LOG.log(Level.SEVERE, "error preparing message", ex); 131 throw new IOException (ex.getMessage()); 132 } 133 } 134 135 136 private class JBIDispatcher implements Runnable { 137 138 public final void run() { 139 140 try { 141 running = true; 142 LOG.fine("JBIServerTransport message receiving thread started"); 143 do { 144 MessageExchange exchange = channel.accept(); 145 if (exchange != null) { 146 ServiceEndpoint ep = exchange.getEndpoint(); 150 CeltixServiceUnit csu = suManager.getServiceUnitForEndpoint(ep); 151 ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); 152 153 try { 154 Thread.currentThread().setContextClassLoader(csu.getClassLoader()); 155 if (csu != null) { 156 LOG.finest("dispatching to Celtix service unit"); 157 dispatch(exchange, callback); 158 } else { 159 LOG.info("no CeltixServiceUnit found"); 160 } 161 } finally { 162 Thread.currentThread().setContextClassLoader(oldLoader); 163 } 164 } 165 } while(running); 166 } catch (Exception ex) { 167 LOG.log(Level.SEVERE, "error running dispatch thread", ex); 168 } 169 LOG.fine("JBIServerTransport message processing thread exitting"); 170 } 171 } 172 } 173 | Popular Tags |