KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > jms > TransactedSingleResourceJmsMessageReceiver


1 /*
2  * $Id: TransactedSingleResourceJmsMessageReceiver.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MPL style
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.jms;
12
13 import javax.jms.Destination JavaDoc;
14 import javax.jms.JMSException JavaDoc;
15 import javax.jms.Message JavaDoc;
16 import javax.jms.MessageConsumer JavaDoc;
17 import javax.jms.MessageListener JavaDoc;
18 import javax.jms.Session JavaDoc;
19 import javax.jms.Topic JavaDoc;
20 import javax.resource.spi.work.Work JavaDoc;
21
22 import org.apache.commons.collections.MapUtils;
23 import org.mule.impl.MuleMessage;
24 import org.mule.providers.AbstractMessageReceiver;
25 import org.mule.providers.ConnectException;
26 import org.mule.providers.jms.filters.JmsSelectorFilter;
27 import org.mule.transaction.TransactionCallback;
28 import org.mule.transaction.TransactionCoordination;
29 import org.mule.transaction.TransactionTemplate;
30 import org.mule.umo.UMOComponent;
31 import org.mule.umo.UMOException;
32 import org.mule.umo.UMOTransaction;
33 import org.mule.umo.endpoint.UMOEndpoint;
34 import org.mule.umo.lifecycle.InitialisationException;
35 import org.mule.umo.lifecycle.LifecycleException;
36 import org.mule.umo.provider.UMOConnector;
37 import org.mule.umo.provider.UMOMessageAdapter;
38
39 public class TransactedSingleResourceJmsMessageReceiver extends AbstractMessageReceiver
40     implements MessageListener JavaDoc
41 {
42     protected JmsConnector connector;
43     protected RedeliveryHandler redeliveryHandler;
44     protected MessageConsumer JavaDoc consumer;
45     protected Session JavaDoc session;
46     protected boolean startOnConnect = false;
47
48     /** determines whether messages will be received in a transaction template */
49     protected boolean receiveMessagesInTransaction = true;
50
51     /** determines whether Multiple receivers are created to improve throughput */
52     protected boolean useMultipleReceivers = true;
53
54     /**
55      * @param connector
56      * @param component
57      * @param endpoint
58      * @throws InitialisationException
59      */

60     public TransactedSingleResourceJmsMessageReceiver(UMOConnector connector,
61                                                       UMOComponent component,
62                                                       UMOEndpoint endpoint) throws InitialisationException
63     {
64         super(connector, component, endpoint);
65         this.connector = (JmsConnector)connector;
66
67         // TODO check which properties being set in the TransecteJmsMessage receiver
68
// are needed...
69

70         try
71         {
72             redeliveryHandler = this.connector.createRedeliveryHandler();
73             redeliveryHandler.setConnector(this.connector);
74         }
75         catch (Exception JavaDoc e)
76         {
77             throw new InitialisationException(e, this);
78         }
79     }
80
81     public void doConnect() throws Exception JavaDoc
82     {
83         try
84         {
85             JmsSupport jmsSupport = this.connector.getJmsSupport();
86             // Create session if none exists
87
if (session == null)
88             {
89                 session = this.connector.getSession(endpoint);
90             }
91
92             // Create destination
93
String JavaDoc resourceInfo = endpoint.getEndpointURI().getResourceInfo();
94             boolean topic = (resourceInfo != null && JmsConstants.TOPIC_PROPERTY.equalsIgnoreCase(resourceInfo));
95
96             // todo MULE20 remove resource Info support
97
if (!topic)
98             {
99                 topic = MapUtils.getBooleanValue(endpoint.getProperties(), JmsConstants.TOPIC_PROPERTY, false);
100             }
101
102             Destination JavaDoc dest = jmsSupport.createDestination(session, endpoint.getEndpointURI().getAddress(),
103                 topic);
104
105             // Extract jms selector
106
String JavaDoc selector = null;
107             if (endpoint.getFilter() != null && endpoint.getFilter() instanceof JmsSelectorFilter)
108             {
109                 selector = ((JmsSelectorFilter)endpoint.getFilter()).getExpression();
110             }
111             else if (endpoint.getProperties() != null)
112             {
113                 // still allow the selector to be set as a property on the endpoint
114
// to be backward compatable
115
selector = (String JavaDoc)endpoint.getProperties().get(JmsConstants.JMS_SELECTOR_PROPERTY);
116             }
117             String JavaDoc tempDurable = (String JavaDoc)endpoint.getProperties().get(JmsConstants.DURABLE_PROPERTY);
118             boolean durable = connector.isDurable();
119             if (tempDurable != null)
120             {
121                 durable = Boolean.valueOf(tempDurable).booleanValue();
122             }
123
124             // Get the durable subscriber name if there is one
125
String JavaDoc durableName = (String JavaDoc)endpoint.getProperties().get(JmsConstants.DURABLE_NAME_PROPERTY);
126             if (durableName == null && durable && dest instanceof Topic JavaDoc)
127             {
128                 durableName = "mule." + connector.getName() + "." + endpoint.getEndpointURI().getAddress();
129                 logger.debug("Jms Connector for this receiver is durable but no durable name has been specified. Defaulting to: "
130                              + durableName);
131             }
132
133             // Create consumer
134
consumer = jmsSupport.createConsumer(session, dest, selector, connector.isNoLocal(), durableName,
135                 topic);
136         }
137         catch (JMSException JavaDoc e)
138         {
139             throw new ConnectException(e, this);
140         }
141     }
142
143     public void onMessage(Message JavaDoc message)
144     {
145         try
146         {
147             getWorkManager().scheduleWork(new MessageReceiverWorker(message));
148         }
149         catch (Exception JavaDoc e)
150         {
151             handleException(e);
152         }
153     }
154
155     public void doStart() throws UMOException
156     {
157         try
158         {
159             // We ned to register the listener when start is called in order to only
160
// start receiving messages after
161
// start/
162
// If the consumer is null it means that the connection strategy is being
163
// run in a separate thread
164
// And hasn't managed to connect yet.
165
if (consumer == null)
166             {
167                 startOnConnect = true;
168             }
169             else
170             {
171                 startOnConnect = false;
172
173                 this.consumer.setMessageListener(this);
174             }
175         }
176         catch (JMSException JavaDoc e)
177         {
178             throw new LifecycleException(e, this);
179         }
180     }
181
182     public void doStop() throws UMOException
183     {
184         try
185         {
186             if (consumer != null)
187             {
188                 consumer.setMessageListener(null);
189             }
190         }
191         catch (JMSException JavaDoc e)
192         {
193             throw new LifecycleException(e, this);
194         }
195     }
196
197     public void doDisconnect() throws Exception JavaDoc
198     {
199         closeConsumer();
200     }
201
202     protected void closeConsumer()
203     {
204         connector.closeQuietly(consumer);
205         consumer = null;
206         connector.closeQuietly(session);
207         session = null;
208     }
209
210     protected class MessageReceiverWorker implements Work JavaDoc
211     {
212         Message JavaDoc message;
213
214         public MessageReceiverWorker(Message JavaDoc message)
215         {
216             this.message = message;
217         }
218
219         public void run()
220         {
221             try
222             {
223                 TransactionTemplate tt = new TransactionTemplate(endpoint.getTransactionConfig(),
224                     connector.getExceptionListener());
225
226                 if (receiveMessagesInTransaction)
227                 {
228                     TransactionCallback cb = new MessageTransactionCallback(message)
229                     {
230
231                         public Object JavaDoc doInTransaction() throws Exception JavaDoc
232                         {
233                             // Get Transaction & Bind Session
234
UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
235                             if (tx != null)
236                             {
237                                 tx.bindResource(connector.getConnection(), session);
238                             }
239                             if (tx instanceof JmsClientAcknowledgeTransaction)
240                             {
241                                 tx.bindResource(message, message);
242                             }
243
244                             if (logger.isDebugEnabled())
245                             {
246                                 logger.debug("Message received it is of type: "
247                                              + message.getClass().getName());
248                                 if (message.getJMSDestination() != null)
249                                 {
250                                     logger.debug("Message received on " + message.getJMSDestination() + " ("
251                                                  + message.getJMSDestination().getClass().getName() + ")");
252                                 }
253                                 else
254                                 {
255                                     logger.debug("Message received on unknown destination");
256                                 }
257                                 logger.debug("Message CorrelationId is: " + message.getJMSCorrelationID());
258                                 logger.debug("Jms Message Id is: " + message.getJMSMessageID());
259                             }
260
261                             if (message.getJMSRedelivered())
262                             {
263                                 if (logger.isDebugEnabled())
264                                 {
265                                     logger.debug("Message with correlationId: "
266                                                  + message.getJMSCorrelationID()
267                                                  + " is redelivered. handing off to Exception Handler");
268                                 }
269                                 redeliveryHandler.handleRedelivery(message);
270                             }
271
272                             UMOMessageAdapter adapter = connector.getMessageAdapter(message);
273                             routeMessage(new MuleMessage(adapter));
274                             return null;
275                         }
276                     };
277                     tt.execute(cb);
278                 }
279                 else
280                 {
281                     UMOMessageAdapter adapter = connector.getMessageAdapter(message);
282                     routeMessage(new MuleMessage(adapter));
283                 }
284
285             }
286             catch (Exception JavaDoc e)
287             {
288                 getConnector().handleException(e);
289             }
290
291         }
292
293         public void release()
294         {
295             // Nothing to release.
296
}
297
298     }
299
300 }
301
Popular Tags