KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > jbi > transport > JBIServerTransport


1 package org.objectweb.celtix.jbi.transport;
2
3
4
5 import java.io.ByteArrayInputStream JavaDoc;
6 import java.io.ByteArrayOutputStream JavaDoc;
7 import java.io.IOException JavaDoc;
8 import java.io.InputStream JavaDoc;
9 import java.util.logging.Level JavaDoc;
10 import java.util.logging.Logger JavaDoc;
11
12 import javax.jbi.messaging.DeliveryChannel;
13 import javax.jbi.messaging.MessageExchange;
14 import javax.jbi.messaging.NormalizedMessage;
15 import javax.jbi.servicedesc.ServiceEndpoint;
16 import javax.xml.namespace.QName JavaDoc;
17 import javax.xml.parsers.DocumentBuilder JavaDoc;
18 import javax.xml.parsers.DocumentBuilderFactory JavaDoc;
19 import javax.xml.transform.dom.DOMSource JavaDoc;
20 import javax.xml.ws.handler.MessageContext;
21
22 import org.w3c.dom.Document JavaDoc;
23
24 import org.objectweb.celtix.context.ObjectMessageContext;
25 import org.objectweb.celtix.context.ObjectMessageContextImpl;
26 import org.objectweb.celtix.context.OutputStreamMessageContext;
27 import org.objectweb.celtix.jbi.se.CeltixServiceUnit;
28 import org.objectweb.celtix.jbi.se.CeltixServiceUnitManager;
29 import org.objectweb.celtix.transports.ServerTransport;
30 import org.objectweb.celtix.transports.ServerTransportCallback;
31 import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
32
33 /**
34  * Connects Celtix clients to the NormalizedMessageRouter. Celtix
35  * messages are wrapped in a NormalizedMessage before being sent to
36  * the NMR and are unwrapped when being received from it.
37  */

38 public class JBIServerTransport implements ServerTransport {
39     
40     private static final Logger JavaDoc LOG = Logger.getLogger(JBIServerTransport.class.getName());
41     
42     private static final String JavaDoc MESSAGE_EXCHANGE_PROPERTY = "celtix.jbi.message.exchange";
43     private final CeltixServiceUnitManager suManager;
44     private final DeliveryChannel channel;
45     private ServerTransportCallback callback;
46     private volatile boolean running;
47     private JBIDispatcher dispatcher;
48     private final DocumentBuilderFactory JavaDoc docBuilderFactory = DocumentBuilderFactory.newInstance();
49     
50     
51     public JBIServerTransport(CeltixServiceUnitManager sum, DeliveryChannel dc) {
52         suManager = sum;
53         channel = dc;
54         docBuilderFactory.setNamespaceAware(true);
55     }
56     
57     public void shutdown() {
58         running = false;
59     }
60     
61     public OutputStreamMessageContext createOutputStreamContext(MessageContext context)
62         throws IOException JavaDoc {
63         
64         return new JBIOutputStreamMessageContext(context);
65     }
66     
67     
68     public void finalPrepareOutputStreamContext(OutputStreamMessageContext context)
69         throws IOException JavaDoc {
70     }
71     
72     public void activate(ServerTransportCallback cb) throws IOException JavaDoc {
73         // activate endpoints here
74
LOG.info("activating JBI server transport");
75         callback = cb;
76         dispatcher = new JBIDispatcher();
77         new Thread JavaDoc(dispatcher).start();
78     }
79     
80     
81     public void deactivate() throws IOException JavaDoc {
82         running = false;
83     }
84     
85     public void postDispatch(MessageContext ctx, OutputStreamMessageContext msgContext) {
86         
87         try {
88             JBIOutputStreamMessageContext jbiCtx = (JBIOutputStreamMessageContext)msgContext;
89             ByteArrayOutputStream JavaDoc baos = (ByteArrayOutputStream JavaDoc)jbiCtx.getOutputStream();
90             ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(baos.toByteArray());
91             LOG.finest("building document from bytes");
92             DocumentBuilder JavaDoc builder = docBuilderFactory.newDocumentBuilder();
93             Document JavaDoc doc = builder.parse(bais);
94             
95             MessageExchange xchng = (MessageExchange)ctx.get(MESSAGE_EXCHANGE_PROPERTY);
96             LOG.fine("creating NormalizedMessage");
97             NormalizedMessage msg = xchng.createMessage();
98             msg.setContent(new DOMSource JavaDoc(doc));
99             xchng.setMessage(msg, "out");
100             LOG.fine("postDispatch sending out message to NWR");
101             channel.send(xchng);
102         } catch (Exception JavaDoc ex) {
103             LOG.log(Level.SEVERE, "error sending Out message", ex);
104         }
105     }
106     
107     public OutputStreamMessageContext rebase(MessageContext context,
108                        EndpointReferenceType decoupledResponseEndpoint)
109         throws IOException JavaDoc {
110         // TODO Auto-generated method stub
111
return null;
112     }
113     
114     private void dispatch(MessageExchange exchange, ServerTransportCallback cb)
115         throws IOException JavaDoc {
116         
117         try {
118             QName JavaDoc opName = exchange.getOperation();
119             LOG.fine("dispatch: " + opName);
120             
121             NormalizedMessage nm = exchange.getMessage("in");
122             final InputStream JavaDoc in = JBIMessageHelper.convertMessageToInputStream(nm.getContent());
123             // dispatch through callback
124

125             ObjectMessageContext ctx = new ObjectMessageContextImpl();
126             LOG.finest("dispatching message on callback: " + cb);
127             ctx.put(MESSAGE_EXCHANGE_PROPERTY, exchange);
128             cb.dispatch(new JBIInputStreamMessageContext(ctx, in), this);
129         } catch (Exception JavaDoc ex) {
130             LOG.log(Level.SEVERE, "error preparing message", ex);
131             throw new IOException JavaDoc(ex.getMessage());
132         }
133     }
134
135    
136     private class JBIDispatcher implements Runnable JavaDoc {
137         
138         public final void run() {
139             
140             try {
141                 running = true;
142                 LOG.fine("JBIServerTransport message receiving thread started");
143                 do {
144                     MessageExchange exchange = channel.accept();
145                     if (exchange != null) {
146                         // REVISIT: serialized message handling not such a
147
// good idea.
148
// REVISIT: can there be more than one ep?
149
ServiceEndpoint ep = exchange.getEndpoint();
150                         CeltixServiceUnit csu = suManager.getServiceUnitForEndpoint(ep);
151                         ClassLoader JavaDoc oldLoader = Thread.currentThread().getContextClassLoader();
152                         
153                         try {
154                             Thread.currentThread().setContextClassLoader(csu.getClassLoader());
155                             if (csu != null) {
156                                 LOG.finest("dispatching to Celtix service unit");
157                                 dispatch(exchange, callback);
158                             } else {
159                                 LOG.info("no CeltixServiceUnit found");
160                             }
161                         } finally {
162                             Thread.currentThread().setContextClassLoader(oldLoader);
163                         }
164                     }
165                 } while(running);
166             } catch (Exception JavaDoc ex) {
167                 LOG.log(Level.SEVERE, "error running dispatch thread", ex);
168             }
169             LOG.fine("JBIServerTransport message processing thread exitting");
170         }
171     }
172 }
173
Popular Tags