KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > jms > JmsMessageDispatcher


1 /*
2  * $Id: JmsMessageDispatcher.java 3982 2006-11-22 14:28:01Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.jms;
12
13 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
14
15 import javax.jms.DeliveryMode JavaDoc;
16 import javax.jms.Destination JavaDoc;
17 import javax.jms.Message JavaDoc;
18 import javax.jms.MessageConsumer JavaDoc;
19 import javax.jms.MessageListener JavaDoc;
20 import javax.jms.MessageProducer JavaDoc;
21 import javax.jms.Session JavaDoc;
22 import javax.jms.TemporaryQueue JavaDoc;
23 import javax.jms.TemporaryTopic JavaDoc;
24
25 import org.apache.commons.collections.MapUtils;
26 import org.mule.MuleException;
27 import org.mule.config.i18n.Messages;
28 import org.mule.impl.MuleMessage;
29 import org.mule.providers.AbstractMessageDispatcher;
30 import org.mule.transaction.IllegalTransactionStateException;
31 import org.mule.umo.UMOEvent;
32 import org.mule.umo.UMOException;
33 import org.mule.umo.UMOMessage;
34 import org.mule.umo.endpoint.UMOEndpointURI;
35 import org.mule.umo.endpoint.UMOImmutableEndpoint;
36 import org.mule.umo.provider.DispatchException;
37 import org.mule.umo.provider.UMOConnector;
38 import org.mule.umo.provider.UMOMessageAdapter;
39 import org.mule.util.concurrent.Latch;
40 import org.mule.util.concurrent.WaitableBoolean;
41
42 /**
43  * <code>JmsMessageDispatcher</code> is responsible for dispatching messages to JMS
44  * destinations. All JMS semantics apply and settings such as replyTo and QoS
45  * properties are read from the event properties or defaults are used (according to
46  * the JMS specification)
47  */

48 public class JmsMessageDispatcher extends AbstractMessageDispatcher
49 {
50
51     private JmsConnector connector;
52     private Session JavaDoc delegateSession;
53     private Session JavaDoc cachedSession;
54
55     public JmsMessageDispatcher(UMOImmutableEndpoint endpoint)
56     {
57         super(endpoint);
58         this.connector = (JmsConnector)endpoint.getConnector();
59     }
60
61     /*
62      * (non-Javadoc)
63      *
64      * @see org.mule.providers.UMOConnector#dispatchEvent(org.mule.MuleEvent,
65      * org.mule.providers.MuleEndpoint)
66      */

67     protected void doDispatch(UMOEvent event) throws Exception JavaDoc
68     {
69         dispatchMessage(event);
70     }
71
72     protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception JavaDoc
73     {
74         // template method
75
}
76
77     protected void doDisconnect() throws Exception JavaDoc
78     {
79         // template method
80
}
81
82     private UMOMessage dispatchMessage(UMOEvent event) throws Exception JavaDoc
83     {
84         Session JavaDoc session = null;
85         MessageProducer JavaDoc producer = null;
86         MessageConsumer JavaDoc consumer = null;
87         Destination JavaDoc replyTo = null;
88         boolean transacted = false;
89         boolean cached = false;
90         boolean remoteSync = useRemoteSync(event);
91
92         if (logger.isDebugEnabled())
93         {
94             logger.debug("dispatching on endpoint: " + event.getEndpoint().getEndpointURI()
95                          + ". Event id is: " + event.getId());
96         }
97
98         try
99         {
100             // Retrieve the session from the current transaction.
101
session = connector.getSessionFromTransaction();
102             if (session != null)
103             {
104                 transacted = true;
105
106                 // If a transaction is running, we can not receive any messages
107
// in the same transaction.
108
if (remoteSync)
109                 {
110                     throw new IllegalTransactionStateException(new org.mule.config.i18n.Message("jms", 2));
111                 }
112             }
113             // Should we be caching sessions? Note this is not part of the JMS spec.
114
// and is turned off by default.
115
else if (event.getMessage().getBooleanProperty(JmsConstants.CACHE_JMS_SESSIONS_PROPERTY,
116                 connector.isCacheJmsSessions()))
117             {
118                 cached = true;
119                 if (cachedSession != null)
120                 {
121                     session = cachedSession;
122                 }
123                 else
124                 {
125                     // Retrieve a session from the connector
126
session = connector.getSession(event.getEndpoint());
127                     cachedSession = session;
128                 }
129             }
130             else
131             {
132                 // Retrieve a session from the connector
133
session = connector.getSession(event.getEndpoint());
134                 if (event.getEndpoint().getTransactionConfig().isTransacted())
135                 {
136                     transacted = true;
137                 }
138             }
139
140             // Add a reference to the JMS session used so that an
141
// EventAwareTransformer
142
// can later retrieve it.
143
// TODO Figure out a better way to accomplish this: MULE-1079
144
// event.getMessage().setProperty(MuleProperties.MULE_JMS_SESSION,
145
// session);
146

147             UMOEndpointURI endpointUri = event.getEndpoint().getEndpointURI();
148
149             // determine if endpointUri is a queue or topic
150
// the format is topic:destination
151
boolean topic = false;
152             String JavaDoc resourceInfo = endpointUri.getResourceInfo();
153             topic = (resourceInfo != null && JmsConstants.TOPIC_PROPERTY.equalsIgnoreCase(resourceInfo));
154             // TODO MULE20 remove resource info support
155
if (!topic)
156             {
157                 topic = MapUtils.getBooleanValue(event.getEndpoint().getProperties(),
158                     JmsConstants.TOPIC_PROPERTY, false);
159             }
160
161             Destination JavaDoc dest = connector.getJmsSupport().createDestination(session, endpointUri.getAddress(),
162                 topic);
163             producer = connector.getJmsSupport().createProducer(session, dest, topic);
164
165             Object JavaDoc message = event.getTransformedMessage();
166             if (!(message instanceof Message JavaDoc))
167             {
168                 throw new DispatchException(new org.mule.config.i18n.Message(
169                     Messages.MESSAGE_NOT_X_IT_IS_TYPE_X_CHECK_TRANSFORMER_ON_X, "JMS message",
170                     message.getClass().getName(), connector.getName()), event.getMessage(),
171                     event.getEndpoint());
172             }
173
174             Message JavaDoc msg = (Message JavaDoc)message;
175             if (event.getMessage().getCorrelationId() != null)
176             {
177                 msg.setJMSCorrelationID(event.getMessage().getCorrelationId());
178             }
179
180             UMOMessage eventMsg = event.getMessage();
181
182             // Some JMS implementations might not support the ReplyTo property.
183
if (connector.supportsProperty(JmsConstants.JMS_REPLY_TO))
184             {
185                 Object JavaDoc tempReplyTo = eventMsg.removeProperty(JmsConstants.JMS_REPLY_TO);
186                 if (tempReplyTo != null)
187                 {
188                     if (tempReplyTo instanceof Destination JavaDoc)
189                     {
190                         replyTo = (Destination JavaDoc)tempReplyTo;
191                     }
192                     else
193                     {
194                         boolean replyToTopic = false;
195                         String JavaDoc reply = tempReplyTo.toString();
196                         int i = reply.indexOf(":");
197                         if (i > -1)
198                         {
199                             String JavaDoc qtype = reply.substring(0, i);
200                             replyToTopic = "topic".equalsIgnoreCase(qtype);
201                             reply = reply.substring(i + 1);
202                         }
203                         replyTo = connector.getJmsSupport().createDestination(session, reply, replyToTopic);
204                     }
205                 }
206                 // Are we going to wait for a return event ?
207
if (remoteSync && replyTo == null)
208                 {
209                     replyTo = connector.getJmsSupport().createTemporaryDestination(session, topic);
210                 }
211                 // Set the replyTo property
212
if (replyTo != null)
213                 {
214                     msg.setJMSReplyTo(replyTo);
215                 }
216
217                 // Are we going to wait for a return event ?
218
if (remoteSync)
219                 {
220                     consumer = connector.getJmsSupport().createConsumer(session, replyTo, topic);
221                 }
222             }
223
224             // QoS support
225
String JavaDoc ttlString = (String JavaDoc)eventMsg.removeProperty(JmsConstants.TIME_TO_LIVE_PROPERTY);
226             String JavaDoc priorityString = (String JavaDoc)eventMsg.removeProperty(JmsConstants.PRIORITY_PROPERTY);
227             String JavaDoc persistentDeliveryString = (String JavaDoc)eventMsg.removeProperty(JmsConstants.PERSISTENT_DELIVERY_PROPERTY);
228
229             long ttl = Message.DEFAULT_TIME_TO_LIVE;
230             int priority = Message.DEFAULT_PRIORITY;
231             boolean persistent = Message.DEFAULT_DELIVERY_MODE == DeliveryMode.PERSISTENT;
232
233             if (ttlString != null)
234             {
235                 ttl = Long.parseLong(ttlString);
236             }
237             if (priorityString != null)
238             {
239                 priority = Integer.parseInt(priorityString);
240             }
241             if (persistentDeliveryString != null)
242             {
243                 persistent = Boolean.valueOf(persistentDeliveryString).booleanValue();
244             }
245
246             if (logger.isDebugEnabled())
247             {
248                 logger.debug("Sending message of type " + msg.getClass().getName());
249             }
250
251             if (consumer != null && topic)
252             {
253                 // need to register a listener for a topic
254
Latch l = new Latch();
255                 ReplyToListener listener = new ReplyToListener(l);
256                 consumer.setMessageListener(listener);
257
258                 connector.getJmsSupport().send(producer, msg, persistent, priority, ttl, topic);
259
260                 int timeout = event.getTimeout();
261
262                 if (logger.isDebugEnabled())
263                 {
264                     logger.debug("Waiting for return event for: " + timeout + " ms on " + replyTo);
265                 }
266
267                 l.await(timeout, TimeUnit.MILLISECONDS);
268                 consumer.setMessageListener(null);
269                 listener.release();
270                 Message JavaDoc result = listener.getMessage();
271                 if (result == null)
272                 {
273                     logger.debug("No message was returned via replyTo destination");
274                     return null;
275                 }
276                 else
277                 {
278                     UMOMessageAdapter adapter = connector.getMessageAdapter(result);
279                     return new MuleMessage(JmsMessageUtils.toObject(result, connector.getSpecification()),
280                         adapter);
281                 }
282             }
283             else
284             {
285                 connector.getJmsSupport().send(producer, msg, persistent, priority, ttl, topic);
286                 if (consumer != null)
287                 {
288                     int timeout = event.getTimeout();
289
290                     if (logger.isDebugEnabled())
291                     {
292                         logger.debug("Waiting for return event for: " + timeout + " ms on " + replyTo);
293                     }
294
295                     Message JavaDoc result = consumer.receive(timeout);
296                     if (result == null)
297                     {
298                         logger.debug("No message was returned via replyTo destination");
299                         return null;
300                     }
301                     else
302                     {
303                         UMOMessageAdapter adapter = connector.getMessageAdapter(result);
304                         return new MuleMessage(
305                             JmsMessageUtils.toObject(result, connector.getSpecification()), adapter);
306                     }
307                 }
308             }
309             return null;
310         }
311         finally
312         {
313             connector.closeQuietly(consumer);
314             connector.closeQuietly(producer);
315
316             // TODO I wonder if those temporary destinations also implement BOTH
317
// interfaces...keep it 'simple' for now
318
if (replyTo != null && (replyTo instanceof TemporaryQueue JavaDoc || replyTo instanceof TemporaryTopic JavaDoc))
319             {
320                 if (replyTo instanceof TemporaryQueue JavaDoc)
321                 {
322                     connector.closeQuietly((TemporaryQueue JavaDoc)replyTo);
323                 }
324                 else
325                 {
326                     // hope there are no more non-standard tricks from JMS vendors
327
// here ;)
328
connector.closeQuietly((TemporaryTopic JavaDoc)replyTo);
329                 }
330             }
331
332             // If the session is from the current transaction, it is up to the
333
// transaction to close it.
334
if (session != null && !cached && !transacted)
335             {
336                 connector.closeQuietly(session);
337             }
338         }
339     }
340
341     /*
342      * (non-Javadoc)
343      *
344      * @see org.mule.providers.UMOConnector#sendEvent(org.mule.MuleEvent,
345      * org.mule.providers.MuleEndpoint)
346      */

347     protected UMOMessage doSend(UMOEvent event) throws Exception JavaDoc
348     {
349         UMOMessage message = dispatchMessage(event);
350         return message;
351     }
352
353     /**
354      * Make a specific request to the underlying transport
355      *
356      * @param endpoint the endpoint to use when connecting to the resource
357      * @param timeout the maximum time the operation should block before returning.
358      * The call should return immediately if there is data available. If
359      * no data becomes available before the timeout elapses, null will be
360      * returned
361      * @return the result of the request wrapped in a UMOMessage object. Null will be
362      * returned if no data was avaialable
363      * @throws Exception if the call to the underlying protocal cuases an exception
364      */

365     protected UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception JavaDoc
366     {
367
368         Session JavaDoc session = null;
369         Destination JavaDoc dest = null;
370         MessageConsumer JavaDoc consumer = null;
371         try
372         {
373             boolean topic = false;
374             String JavaDoc resourceInfo = endpoint.getEndpointURI().getResourceInfo();
375             topic = (resourceInfo != null && JmsConstants.TOPIC_PROPERTY.equalsIgnoreCase(resourceInfo));
376
377             session = connector.getSession(false, topic);
378             dest = connector.getJmsSupport().createDestination(session,
379                 endpoint.getEndpointURI().getAddress(), topic);
380             consumer = connector.getJmsSupport().createConsumer(session, dest, topic);
381
382             try
383             {
384                 Message JavaDoc message = null;
385                 if (timeout == RECEIVE_NO_WAIT)
386                 {
387                     message = consumer.receiveNoWait();
388                 }
389                 else if (timeout == RECEIVE_WAIT_INDEFINITELY)
390                 {
391                     message = consumer.receive();
392                 }
393                 else
394                 {
395                     message = consumer.receive(timeout);
396                 }
397                 if (message == null)
398                 {
399                     return null;
400                 }
401
402                 message = connector.preProcessMessage(message, session);
403
404                 return new MuleMessage(connector.getMessageAdapter(message));
405             }
406             catch (Exception JavaDoc e)
407             {
408                 connector.handleException(e);
409                 return null;
410             }
411         }
412         finally
413         {
414             connector.closeQuietly(consumer);
415             connector.closeQuietly(session);
416         }
417     }
418
419     /*
420      * (non-Javadoc)
421      *
422      * @see org.mule.umo.provider.UMOMessageDispatcher#getDelegateSession()
423      */

424     public synchronized Object JavaDoc getDelegateSession() throws UMOException
425     {
426         try
427         {
428             // Return the session bound to the current transaction
429
// if possible
430
Session JavaDoc session = connector.getSessionFromTransaction();
431             if (session != null)
432             {
433                 return session;
434             }
435             // Else create a session for this dispatcher and
436
// use it each time
437
if (delegateSession == null)
438             {
439                 delegateSession = connector.getSession(false, false);
440             }
441             return delegateSession;
442         }
443         catch (Exception JavaDoc e)
444         {
445             throw new MuleException(new org.mule.config.i18n.Message("jms", 3), e);
446         }
447     }
448
449     /*
450      * (non-Javadoc)
451      *
452      * @see org.mule.umo.provider.UMOMessageDispatcher#getConnector()
453      */

454     public UMOConnector getConnector()
455     {
456         return connector;
457     }
458
459     protected void doDispose()
460     {
461         // template method
462
}
463
464     private class ReplyToListener implements MessageListener JavaDoc
465     {
466         private final Latch latch;
467         private volatile Message JavaDoc message;
468         private final WaitableBoolean released = new WaitableBoolean(false);
469
470         public ReplyToListener(Latch latch)
471         {
472             this.latch = latch;
473         }
474
475         public Message JavaDoc getMessage()
476         {
477             return message;
478         }
479
480         public void release()
481         {
482             released.set(true);
483         }
484
485         public void onMessage(Message JavaDoc message)
486         {
487             this.message = message;
488             latch.countDown();
489             try
490             {
491                 released.whenTrue(null);
492             }
493             catch (InterruptedException JavaDoc e)
494             {
495                 // ignored
496
}
497         }
498
499     }
500
501 }
502
Popular Tags