KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > jms > TransactedJmsMessageReceiver


1 /*
2  * $Id: TransactedJmsMessageReceiver.java 4259 2006-12-14 03:12:07Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.jms;
12
13 import java.util.List JavaDoc;
14 import javax.jms.Destination JavaDoc;
15 import javax.jms.JMSException JavaDoc;
16 import javax.jms.Message JavaDoc;
17 import javax.jms.MessageConsumer JavaDoc;
18 import javax.jms.Session JavaDoc;
19 import javax.jms.Topic JavaDoc;
20
21 import org.apache.commons.collections.MapUtils;
22
23 import org.mule.impl.MuleMessage;
24 import org.mule.providers.ConnectException;
25 import org.mule.providers.SingleAttemptConnectionStrategy;
26 import org.mule.providers.TransactedPollingMessageReceiver;
27 import org.mule.providers.jms.filters.JmsSelectorFilter;
28 import org.mule.transaction.TransactionCoordination;
29 import org.mule.umo.UMOComponent;
30 import org.mule.umo.UMOTransaction;
31 import org.mule.umo.endpoint.UMOEndpoint;
32 import org.mule.umo.lifecycle.InitialisationException;
33 import org.mule.umo.provider.UMOConnector;
34 import org.mule.umo.provider.UMOMessageAdapter;
35
36 public class TransactedJmsMessageReceiver extends TransactedPollingMessageReceiver
37 {
38     protected final JmsConnector connector;
39     protected boolean reuseConsumer;
40     protected boolean reuseSession;
41     protected final ThreadContextLocal context = new ThreadContextLocal();
42     protected final long timeout;
43     protected final RedeliveryHandler redeliveryHandler;
44
45     /**
46      * Holder receiving the session and consumer for this thread.
47      */

48     protected static class JmsThreadContext
49     {
50         public Session JavaDoc session;
51         public MessageConsumer JavaDoc consumer;
52     }
53
54     /**
55      * Strongly typed ThreadLocal for ThreadContext.
56      */

57     protected static class ThreadContextLocal extends ThreadLocal JavaDoc
58     {
59         public JmsThreadContext getContext()
60         {
61             return (JmsThreadContext)get();
62         }
63
64         protected Object JavaDoc initialValue()
65         {
66             return new JmsThreadContext();
67         }
68     }
69
70     public TransactedJmsMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint)
71         throws InitialisationException
72     {
73         super(connector, component, endpoint, new Long JavaDoc(0));
74         this.connector = (JmsConnector)connector;
75         this.timeout = endpoint.getTransactionConfig().getTimeout();
76
77         // If reconnection is set, default reuse strategy to false
78
// as some jms brokers will not detect lost connections if the
79
// same consumer / session is used
80
if (this.connectionStrategy instanceof SingleAttemptConnectionStrategy)
81         {
82             this.reuseConsumer = true;
83             this.reuseSession = true;
84         }
85         // User may override reuse strategy if necessary
86
this.reuseConsumer = MapUtils.getBooleanValue(endpoint.getProperties(), "reuseConsumer",
87             this.reuseConsumer);
88         this.reuseSession = MapUtils.getBooleanValue(endpoint.getProperties(), "reuseSession",
89             this.reuseSession);
90
91         // Check if the destination is a queue and
92
// if we are in transactional mode.
93
// If true, set receiveMessagesInTransaction to true.
94
// It will start multiple threads, depending on the threading profile.
95
String JavaDoc resourceInfo = endpoint.getEndpointURI().getResourceInfo();
96         boolean topic = (resourceInfo != null && "topic".equalsIgnoreCase(resourceInfo));
97
98         // If we're using topics We dont want to use multiple receivers as we'll get
99
// the same message
100
// multiple times
101
useMultipleReceivers = !topic;
102
103         try
104         {
105             redeliveryHandler = this.connector.createRedeliveryHandler();
106             redeliveryHandler.setConnector(this.connector);
107         }
108         catch (Exception JavaDoc e)
109         {
110             throw new InitialisationException(e, this);
111         }
112
113     }
114
115     public void doConnect() throws Exception JavaDoc
116     {
117         if (connector.isConnected())
118         {
119             // TODO Fix Bug
120

121             // creating this consumer now would prevent from the actual worker
122
// consumer
123
// to receive the message!
124
//Antoine Borg 08 Dec 2006 - Uncommented for MULE-1150
125
createConsumer();
126             // if we comment this line, if one tries to restart the service through
127
// JMX,
128
// this will fail...
129
}
130     }
131
132     public void doDisconnect() throws Exception JavaDoc
133     {
134         if (connector.isConnected())
135         {
136             closeConsumer(true);
137         }
138     }
139
140     /**
141      * The poll method is overrident from the
142      */

143     public void poll() throws Exception JavaDoc
144     {
145         try
146         {
147             JmsThreadContext ctx = context.getContext();
148             // Create consumer if necessary
149
if (ctx.consumer == null)
150             {
151                 createConsumer();
152             }
153             // Do polling
154
super.poll();
155         }
156         catch (Exception JavaDoc e)
157         {
158             // Force consumer to close
159
closeConsumer(true);
160             throw e;
161         }
162         finally
163         {
164             // Close consumer if necessary
165
closeConsumer(false);
166         }
167     }
168
169     /*
170      * (non-Javadoc)
171      *
172      * @see org.mule.providers.TransactionEnabledPollingMessageReceiver#getMessages()
173      */

174     protected List JavaDoc getMessages() throws Exception JavaDoc
175     {
176         // As the session is created outside the transaction, it is not
177
// bound to it yet
178
JmsThreadContext ctx = context.getContext();
179
180         UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
181         if (tx != null)
182         {
183             tx.bindResource(connector.getConnection(), ctx.session);
184         }
185
186         // Retrieve message
187
Message JavaDoc message = null;
188         try
189         {
190             message = ctx.consumer.receive(timeout);
191         }
192         catch (JMSException JavaDoc e)
193         {
194             // If we're being disconnected, ignore the exception
195
if (!this.isConnected())
196             {
197                 // ignore
198
}
199             else
200             {
201                 throw e;
202             }
203         }
204         if (message == null)
205         {
206             if (tx != null)
207             {
208                 tx.setRollbackOnly();
209             }
210             return null;
211         }
212         message = connector.preProcessMessage(message, ctx.session);
213
214         // Process message
215
if (logger.isDebugEnabled())
216         {
217             logger.debug("Message received it is of type: " + message.getClass().getName());
218             if (message.getJMSDestination() != null)
219             {
220                 logger.debug("Message received on " + message.getJMSDestination() + " ("
221                              + message.getJMSDestination().getClass().getName() + ")");
222             }
223             else
224             {
225                 logger.debug("Message received on unknown destination");
226             }
227             logger.debug("Message CorrelationId is: " + message.getJMSCorrelationID());
228             logger.debug("Jms Message Id is: " + message.getJMSMessageID());
229         }
230
231         if (message.getJMSRedelivered())
232         {
233             if (logger.isDebugEnabled())
234             {
235                 logger.debug("Message with correlationId: " + message.getJMSCorrelationID()
236                              + " is redelivered. handing off to Exception Handler");
237             }
238             redeliveryHandler.handleRedelivery(message);
239         }
240
241         if (tx instanceof JmsClientAcknowledgeTransaction)
242         {
243             tx.bindResource(message, null);
244         }
245
246         UMOMessageAdapter adapter = connector.getMessageAdapter(message);
247         routeMessage(new MuleMessage(adapter));
248         return null;
249     }
250
251     /*
252      * (non-Javadoc)
253      *
254      * @see org.mule.providers.TransactionEnabledPollingMessageReceiver#processMessage(java.lang.Object)
255      */

256     protected void processMessage(Object JavaDoc msg) throws Exception JavaDoc
257     {
258         // This method is never called as the
259
// message is processed when received
260
}
261
262     protected void closeConsumer(boolean force)
263     {
264         JmsThreadContext ctx = context.getContext();
265         if (ctx == null)
266         {
267             return;
268         }
269         // Close consumer
270
if (force || !reuseSession || !reuseConsumer)
271         {
272             connector.closeQuietly(ctx.consumer);
273             ctx.consumer = null;
274         }
275         // Do not close session if a transaction is in progress
276
// the session will be close by the transaction
277
if (force || !reuseSession)
278         {
279             connector.closeQuietly(ctx.session);
280             ctx.session = null;
281         }
282     }
283
284     /**
285      * Create a consumer for the jms destination
286      *
287      * @throws Exception
288      */

289     protected void createConsumer() throws Exception JavaDoc
290     {
291         try
292         {
293             JmsSupport jmsSupport = this.connector.getJmsSupport();
294             JmsThreadContext ctx = context.getContext();
295             // Create session if none exists
296
if (ctx.session == null)
297             {
298                 ctx.session = this.connector.getSession(endpoint);
299             }
300
301             // Create destination
302
String JavaDoc resourceInfo = endpoint.getEndpointURI().getResourceInfo();
303             boolean topic = (resourceInfo != null && "topic".equalsIgnoreCase(resourceInfo));
304             Destination JavaDoc dest = jmsSupport.createDestination(ctx.session, endpoint.getEndpointURI()
305                 .getAddress(), topic);
306
307             // Extract jms selector
308
String JavaDoc selector = null;
309             if (endpoint.getFilter() != null && endpoint.getFilter() instanceof JmsSelectorFilter)
310             {
311                 selector = ((JmsSelectorFilter)endpoint.getFilter()).getExpression();
312             }
313             else if (endpoint.getProperties() != null)
314             {
315                 // still allow the selector to be set as a property on the endpoint
316
// to be backward compatable
317
selector = (String JavaDoc)endpoint.getProperties().get(JmsConstants.JMS_SELECTOR_PROPERTY);
318             }
319             String JavaDoc tempDurable = (String JavaDoc)endpoint.getProperties().get("durable");
320             boolean durable = connector.isDurable();
321             if (tempDurable != null)
322             {
323                 durable = Boolean.valueOf(tempDurable).booleanValue();
324             }
325
326             // Get the durable subscriber name if there is one
327
String JavaDoc durableName = (String JavaDoc)endpoint.getProperties().get("durableName");
328             if (durableName == null && durable && dest instanceof Topic JavaDoc)
329             {
330                 durableName = "mule." + connector.getName() + "." + endpoint.getEndpointURI().getAddress();
331                 logger.debug("Jms Connector for this receiver is durable but no durable name has been specified. Defaulting to: "
332                              + durableName);
333             }
334
335             // Create consumer
336
ctx.consumer = jmsSupport.createConsumer(ctx.session, dest, selector, connector.isNoLocal(),
337                 durableName, topic);
338         }
339         catch (JMSException JavaDoc e)
340         {
341             throw new ConnectException(e, this);
342         }
343     }
344 }
345
Popular Tags