KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > transports > jms > JMSServerTransport


1 package org.objectweb.celtix.bus.transports.jms;
2
3 import java.io.ByteArrayInputStream JavaDoc;
4 import java.io.ByteArrayOutputStream JavaDoc;
5 import java.io.IOException JavaDoc;
6
7 import java.util.Calendar JavaDoc;
8 import java.util.GregorianCalendar JavaDoc;
9 import java.util.SimpleTimeZone JavaDoc;
10 import java.util.TimeZone JavaDoc;
11 import java.util.concurrent.Executor JavaDoc;
12 import java.util.concurrent.RejectedExecutionException JavaDoc;
13 import java.util.logging.Level JavaDoc;
14 import java.util.logging.Logger JavaDoc;
15
16 import javax.jms.JMSException JavaDoc;
17 import javax.jms.Message JavaDoc;
18 import javax.jms.Queue JavaDoc;
19 import javax.jms.QueueSender JavaDoc;
20 import javax.jms.TextMessage JavaDoc;
21 import javax.naming.NamingException JavaDoc;
22 import javax.wsdl.WSDLException;
23 import javax.xml.ws.handler.MessageContext;
24
25 import org.objectweb.celtix.Bus;
26 import org.objectweb.celtix.BusEvent;
27 import org.objectweb.celtix.BusEventListener;
28 import org.objectweb.celtix.BusException;
29 import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent;
30 import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent;
31 import org.objectweb.celtix.bus.configuration.ConfigurationEvent;
32 import org.objectweb.celtix.bus.management.counters.TransportServerCounters;
33 import org.objectweb.celtix.common.logging.LogUtils;
34 import org.objectweb.celtix.configuration.Configuration;
35 import org.objectweb.celtix.context.OutputStreamMessageContext;
36 import org.objectweb.celtix.transports.ServerTransport;
37 import org.objectweb.celtix.transports.ServerTransportCallback;
38 import org.objectweb.celtix.transports.jms.JMSServerBehaviorPolicyType;
39 import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType;
40 import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
41
42
43 public class JMSServerTransport extends JMSTransportBase
44     implements ServerTransport, BusEventListener {
45     static final Logger JavaDoc LOG = LogUtils.getL7dLogger(JMSServerTransport.class);
46     private static final String JavaDoc JMS_SERVER_TRANSPORT_MESSAGE =
47         JMSServerTransport.class.getName() + ".IncomingMessage";
48
49     ServerTransportCallback callback;
50     TransportServerCounters counters;
51     private PooledSession listenerSession;
52     private Thread JavaDoc listenerThread;
53     private JMSServerBehaviorPolicyType serverBehaviourPolicy;
54     
55
56
57     public JMSServerTransport(Bus b, EndpointReferenceType address)
58         throws WSDLException {
59         super(b, address, true);
60         serverBehaviourPolicy = getServerPolicy(configuration);
61         counters = new TransportServerCounters("JMSServerTranpsort");
62         entry("JMSServerTransport Constructor");
63         bus.sendEvent(new ComponentCreatedEvent(this));
64     }
65     
66     private JMSServerBehaviorPolicyType getServerPolicy(Configuration conf) {
67         JMSServerBehaviorPolicyType pol = conf.getObject(JMSServerBehaviorPolicyType.class, "jmsServer");
68         if (pol == null) {
69             pol = new JMSServerBehaviorPolicyType();
70         }
71         return pol;
72     }
73     
74     public JMSServerBehaviorPolicyType getJMSServerBehaviourPolicy() {
75         return serverBehaviourPolicy;
76     }
77
78     public void activate(ServerTransportCallback transportCB) throws IOException JavaDoc {
79         entry("JMSServerTransport activate().... ");
80         callback = transportCB;
81
82         try {
83             LOG.log(Level.FINE, "establishing JMS connection");
84             JMSProviderHub.connect(this);
85
86             //Get a non-pooled session.
87
listenerSession = sessionFactory.get(targetDestination);
88             listenerThread = new JMSListenerThread(listenerSession, this);
89             listenerThread.start();
90         } catch (JMSException JavaDoc ex) {
91             LOG.log(Level.FINE, "JMS connect failed with JMSException : ", ex);
92             throw new IOException JavaDoc(ex.getMessage());
93         } catch (NamingException JavaDoc nex) {
94             LOG.log(Level.FINE, "JMS connect failed with NamingException : ", nex);
95             throw new IOException JavaDoc(nex.getMessage());
96         }
97     }
98     
99     public OutputStreamMessageContext rebase(MessageContext context,
100                                              EndpointReferenceType decoupledResponseEndpoint)
101         throws IOException JavaDoc {
102         OutputStreamMessageContext octx = new JMSOutputStreamContext(context);
103        
104         String JavaDoc replyTo = decoupledResponseEndpoint.getAddress().getValue();
105         replyTo = replyTo.substring(replyTo.indexOf('#') + 1);
106         octx.put(JMSConstants.JMS_REBASED_REPLY_TO, replyTo);
107         return octx;
108     }
109
110     public OutputStreamMessageContext createOutputStreamContext(MessageContext context) throws IOException JavaDoc {
111         return new JMSOutputStreamContext(context);
112     }
113
114     public void finalPrepareOutputStreamContext(OutputStreamMessageContext context) throws IOException JavaDoc {
115     }
116
117     public void deactivate() throws IOException JavaDoc {
118         try {
119             listenerSession.consumer().close();
120             if (listenerThread != null) {
121                 listenerThread.join();
122             }
123             sessionFactory.shutdown();
124         } catch (InterruptedException JavaDoc e) {
125             //Don't do anything...
126
} catch (JMSException JavaDoc ex) {
127             //
128
}
129     }
130
131     public void shutdown() {
132         entry("JMSServerTransport shutdown()");
133         try {
134             this.deactivate();
135         } catch (IOException JavaDoc ex) {
136             // Ignore for now.
137
}
138         bus.sendEvent(new ComponentRemovedEvent(this));
139     }
140
141     public void postDispatch(MessageContext bindingContext, OutputStreamMessageContext context)
142         throws IOException JavaDoc {
143
144         Message JavaDoc message = (Message JavaDoc)bindingContext.get(JMS_SERVER_TRANSPORT_MESSAGE);
145         PooledSession replySession = null;
146          // ensure non-oneways in point-to-point domain
147
counters.getRequestTotal().increase();
148         
149         if (!context.isOneWay()) {
150             if (queueDestinationStyle) {
151                 try {
152 // send reply
153
Queue JavaDoc replyTo = getReplyToDestination(context, message);
154                     replySession = sessionFactory.get(false);
155
156                     Message JavaDoc reply = marshalResponse(message, context, replySession);
157                     setReplyCorrelationID(message, reply);
158
159                     QueueSender JavaDoc sender = (QueueSender JavaDoc)replySession.producer();
160
161                     sendResponse(context, message, reply, sender, replyTo);
162                     
163                 } catch (JMSException JavaDoc ex) {
164                     LOG.log(Level.WARNING, "Failed in post dispatch ...", ex);
165                     counters.getTotalError().increase();
166                     throw new IOException JavaDoc(ex.getMessage());
167                 } catch (NamingException JavaDoc nex) {
168                     LOG.log(Level.WARNING, "Failed in post dispatch ...", nex);
169                     counters.getTotalError().increase();
170                     throw new IOException JavaDoc(nex.getMessage());
171                 } finally {
172                     // house-keeping
173
if (replySession != null) {
174                         sessionFactory.recycle(replySession);
175                     }
176                 }
177             } else {
178                 // we will never receive a non-oneway invocation in pub-sub
179
// domain from Celtix client - however a mis-behaving pure JMS
180
// client could conceivably make suce an invocation, in which
181
// case we silently discard the reply
182
LOG.log(Level.WARNING,
183                                              "discarding reply for non-oneway invocation ",
184                                               "with 'topic' destinationStyle");
185                 counters.getTotalError().increase();
186             }
187         } else {
188             // counter for oneway request
189
counters.getRequestOneWay().increase();
190         }
191     }
192     
193     public Queue JavaDoc getReplyToDestination(OutputStreamMessageContext context, Message JavaDoc message)
194         throws JMSException JavaDoc, NamingException JavaDoc {
195         Queue JavaDoc replyTo;
196         // If WS-Addressing had set the replyTo header.
197
if (context.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) {
198             replyTo = sessionFactory.getQueueFromInitialContext(
199                                   (String JavaDoc) context.get(JMSConstants.JMS_REBASED_REPLY_TO));
200         } else {
201             replyTo = (null != message.getJMSReplyTo())
202                 ? (Queue JavaDoc)message.getJMSReplyTo() : (Queue JavaDoc)replyDestination;
203         }
204         
205         return replyTo;
206     }
207     
208     public Message JavaDoc marshalResponse(Message JavaDoc message,
209                                 OutputStreamMessageContext context,
210                                 PooledSession replySession) throws JMSException JavaDoc {
211         
212         Message JavaDoc reply;
213         boolean textPayload = message instanceof TextMessage JavaDoc
214             ? true : false;
215         if (textPayload) {
216             reply = marshal(context.getOutputStream().toString(),
217                                 replySession.session(),
218                                 null,
219                                 JMSConstants.TEXT_MESSAGE_TYPE);
220         } else {
221             reply = marshal(((ByteArrayOutputStream JavaDoc) context.getOutputStream()).toByteArray(),
222                                replySession.session(),
223                                null,
224                               JMSConstants.BINARY_MESSAGE_TYPE);
225         }
226          
227         return reply;
228     }
229     
230     public void setReplyCorrelationID(Message JavaDoc message, Message JavaDoc reply)
231         throws JMSException JavaDoc {
232         String JavaDoc correlationID = message.getJMSCorrelationID();
233
234         if (correlationID == null
235             || "".equals(correlationID)
236             && serverBehaviourPolicy.isUseMessageIDAsCorrelationID()) {
237             correlationID = message.getJMSMessageID();
238         }
239         
240         if (correlationID != null && !"".equals(correlationID)) {
241             reply.setJMSCorrelationID(correlationID);
242         }
243     }
244     
245     
246     public void sendResponse(OutputStreamMessageContext context,
247                              Message JavaDoc request,
248                              Message JavaDoc reply,
249                              QueueSender JavaDoc sender,
250                              Queue JavaDoc replyTo)
251         throws JMSException JavaDoc {
252         JMSMessageHeadersType headers =
253             (JMSMessageHeadersType) context.get(JMSConstants.JMS_SERVER_HEADERS);
254
255         int deliveryMode = getJMSDeliveryMode(headers);
256         int priority = getJMSPriority(headers);
257         long ttl = getTimeToLive(headers);
258
259         setMessageProperties(headers, reply);
260
261         LOG.log(Level.FINE, "server sending reply: ", reply);
262
263         long timeToLive = 0;
264         if (request.getJMSExpiration() > 0) {
265             TimeZone JavaDoc tz = new SimpleTimeZone JavaDoc(0, "GMT");
266             Calendar JavaDoc cal = new GregorianCalendar JavaDoc(tz);
267             timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
268         }
269         
270         if (timeToLive >= 0) {
271             ttl = ttl > 0 ? ttl : timeToLive;
272             sender.send(replyTo, reply, deliveryMode, priority, ttl);
273         } else {
274             LOG.log(Level.INFO, "Message time to live is already expired skipping response.");
275         }
276     }
277
278
279     /**
280      * Helper method to process incoming message.
281      *
282      * @param message the incoming message
283      */

284     protected void incoming(Message JavaDoc message) throws IOException JavaDoc {
285         try {
286             LOG.log(Level.FINE, "server received request: ", message);
287            
288
289             String JavaDoc msgType = message instanceof TextMessage JavaDoc
290                     ? JMSConstants.TEXT_MESSAGE_TYPE : JMSConstants.BINARY_MESSAGE_TYPE;
291             Object JavaDoc request = unmarshal(message, msgType);
292
293             byte[] bytes = null;
294
295             if (JMSConstants.TEXT_MESSAGE_TYPE.equals(msgType)) {
296                 String JavaDoc requestString = (String JavaDoc)request;
297                 LOG.log(Level.FINE, "server received request: ", requestString);
298                 bytes = requestString.getBytes();
299             } else {
300                 bytes = (byte[])request;
301             }
302
303             JMSInputStreamContext context = new JMSInputStreamContext(new ByteArrayInputStream JavaDoc(bytes));
304             populateIncomingContext(message, context, JMSConstants.JMS_SERVER_HEADERS);
305
306
307             context.put(JMS_SERVER_TRANSPORT_MESSAGE, message);
308             callback.dispatch(context, this);
309
310         } catch (JMSException JavaDoc jmsex) {
311             //TODO: need to revisit for which exception should we throw.
312
throw new IOException JavaDoc(jmsex.getMessage());
313         }
314     }
315
316     class JMSListenerThread extends Thread JavaDoc {
317         final JMSServerTransport theTransport;
318         private final PooledSession listenSession;
319
320         public JMSListenerThread(PooledSession session,
321                                  JMSServerTransport transport) {
322             listenSession = session;
323             theTransport = transport;
324         }
325
326         public void run() {
327             try {
328                 while (true) {
329                     Message JavaDoc message = listenSession.consumer().receive();
330                     if (message == null) {
331                         LOG.log(Level.WARNING,
332                                 "Null message received from message consumer.",
333                                 " Exiting ListenerThread::run().");
334                         return;
335                     }
336                     while (message != null) {
337                         Executor JavaDoc executor = theTransport.callback.getExecutor();
338                         if (executor == null) {
339                             executor = theTransport.bus
340                                 .getWorkQueueManager().getAutomaticWorkQueue();
341                         }
342                         if (executor != null) {
343                             try {
344                                 executor.execute(new JMSExecutor(theTransport, message));
345                                 message = null;
346                             } catch (RejectedExecutionException JavaDoc ree) {
347                                 //FIXME - no room left on workqueue, what to do
348
//for now, loop until it WILL fit on the queue,
349
//although we could just dispatch on this thread.
350
}
351                         } else {
352                             //shouldn't ever get here....
353
try {
354                                 theTransport.incoming(message);
355                             } catch (IOException JavaDoc ex) {
356                                 LOG.log(Level.WARNING, "Failed to process incoming message : ", ex);
357                             }
358                             message = null;
359                         }
360                     }
361                 }
362             } catch (JMSException JavaDoc jmsex) {
363                 jmsex.printStackTrace();
364                 LOG.log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage());
365             } catch (Throwable JavaDoc jmsex) {
366                 jmsex.printStackTrace();
367                 LOG.log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage());
368             }
369         }
370     }
371     
372     static class JMSExecutor implements Runnable JavaDoc {
373         Message JavaDoc message;
374         JMSServerTransport transport;
375         
376         JMSExecutor(JMSServerTransport t, Message JavaDoc m) {
377             message = m;
378             transport = t;
379         }
380
381         public void run() {
382             try {
383                 transport.incoming(message);
384             } catch (IOException JavaDoc ex) {
385                 //TODO: Decide what to do if we receive the exception.
386
LOG.log(Level.WARNING,
387                         "Failed to process incoming message : ", ex);
388             }
389         }
390         
391     }
392
393     public void processEvent(BusEvent e) throws BusException {
394         if (e.getID().equals(ConfigurationEvent.RECONFIGURED)) {
395             String JavaDoc configName = (String JavaDoc)e.getSource();
396             reConfigure(configName);
397         }
398     }
399
400     private void reConfigure(String JavaDoc configName) {
401         if ("servicesMonitoring".equals(configName)) {
402             if (bus.getConfiguration().getBoolean("servicesMonitoring")) {
403                 counters.resetCounters();
404             } else {
405                 counters.stopCounters();
406             }
407         }
408     }
409 }
410
Popular Tags