KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > springframework > jms > listener > AbstractPollingMessageListenerContainer


1 /*
2  * Copyright 2002-2007 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.springframework.jms.listener;
18
19 import javax.jms.Connection JavaDoc;
20 import javax.jms.Destination JavaDoc;
21 import javax.jms.JMSException JavaDoc;
22 import javax.jms.Message JavaDoc;
23 import javax.jms.MessageConsumer JavaDoc;
24 import javax.jms.Session JavaDoc;
25 import javax.jms.Topic JavaDoc;
26
27 import org.springframework.beans.factory.BeanNameAware;
28 import org.springframework.jms.connection.ConnectionFactoryUtils;
29 import org.springframework.jms.connection.JmsResourceHolder;
30 import org.springframework.jms.support.JmsUtils;
31 import org.springframework.transaction.PlatformTransactionManager;
32 import org.springframework.transaction.TransactionStatus;
33 import org.springframework.transaction.support.DefaultTransactionDefinition;
34 import org.springframework.transaction.support.ResourceTransactionManager;
35
36 /**
37  * Base class for listener container implementations which are based on polling.
38  * Provides support for listener handling based on {@link javax.jms.MessageConsumer},
39  * optionally participating in externally managed transactions.
40  *
41  * <p>This listener container variant is built for repeated polling attempts,
42  * each invoking the {@link #receiveAndExecute} method. The MessageConsumer used
43  * may be reobtained fo reach attempt or cached inbetween attempts; this is up
44  * to the concrete implementation. The receive timeout for each attempt can be
45  * configured through the {@link #setReceiveTimeout "receiveTimeout"} property.
46  *
47  * <p>The underlying mechanism is based on standard JMS MessageConsumer handling,
48  * which is perfectly compatible with both native JMS and JMS in a J2EE environment.
49  * Neither the JMS <code>MessageConsumer.setMessageListener</code> facility
50  * nor the JMS ServerSessionPool facility is required. A further advantage
51  * of this approach is full control over the listening process, allowing for
52  * custom scaling and throttling and of concurrent message processing
53  * (which is up to concrete subclasses).
54  *
55  * <p>Message reception and listener execution can automatically be wrapped
56  * in transactions through passing a Spring
57  * {@link org.springframework.transaction.PlatformTransactionManager} into the
58  * {@link #setTransactionManager "transactionManager"} property. This will usually
59  * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
60  * J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
61  * from JNDI (check your J2EE server's documentation).
62  *
63  * <p>This base class does not assume any specific mechanism for asynchronous
64  * execution of polling invokers. Check out {@link DefaultMessageListenerContainer}
65  * for a concrete implementation which is based on Spring's
66  * {@link org.springframework.core.task.TaskExecutor} abstraction,
67  * including dynamic scaling of concurrent consumers and automatic self recovery.
68  *
69  * @author Juergen Hoeller
70  * @since 2.0.3
71  * @see #createListenerConsumer(javax.jms.Session)
72  * @see #receiveAndExecute(javax.jms.Session, javax.jms.MessageConsumer)
73  * @see #setTransactionManager
74  */

75 public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer
76         implements BeanNameAware {
77
78     /**
79      * The default receive timeout: 1000 ms = 1 second.
80      */

81     public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
82
83
84     private final MessageListenerContainerResourceFactory transactionalResourceFactory =
85             new MessageListenerContainerResourceFactory();
86
87     private boolean sessionTransactedCalled = false;
88
89     private boolean pubSubNoLocal = false;
90
91     private PlatformTransactionManager transactionManager;
92
93     private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
94
95     private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
96
97
98     public void setSessionTransacted(boolean sessionTransacted) {
99         super.setSessionTransacted(sessionTransacted);
100         this.sessionTransactedCalled = true;
101     }
102
103     /**
104      * Set whether to inhibit the delivery of messages published by its own connection.
105      * Default is "false".
106      * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
107      */

108     public void setPubSubNoLocal(boolean pubSubNoLocal) {
109         this.pubSubNoLocal = pubSubNoLocal;
110     }
111
112     /**
113      * Return whether to inhibit the delivery of messages published by its own connection.
114      */

115     protected boolean isPubSubNoLocal() {
116         return this.pubSubNoLocal;
117     }
118
119     /**
120      * Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager}
121      * to use for transactional wrapping of message reception plus listener execution.
122      * <p>Default is none, not performing any transactional wrapping.
123      * If specified, this will usually be a Spring
124      * {@link org.springframework.transaction.jta.JtaTransactionManager} or one
125      * of its subclasses, in combination with a JTA-aware ConnectionFactory that
126      * this message listener container obtains its Connections from.
127      * <p><b>Note: Consider the use of local JMS transactions instead.</b>
128      * Simply switch the {@link #setSessionTransacted "sessionTransacted"} flag
129      * to "true" in order to use a locally transacted JMS Session for the entire
130      * receive processing, including any Session operations performed by a
131      * {@link SessionAwareMessageListener} (e.g. sending a response message).
132      * Alternatively, a {@link org.springframework.jms.connection.JmsTransactionManager}
133      * may be used for fully synchronized Spring transactions based on local JMS
134      * transactions. Check {@link AbstractMessageListenerContainer}'s javadoc for
135      * a discussion of transaction choices and message redelivery scenarios.
136      * @see org.springframework.transaction.jta.JtaTransactionManager
137      * @see org.springframework.jms.connection.JmsTransactionManager
138      */

139     public void setTransactionManager(PlatformTransactionManager transactionManager) {
140         this.transactionManager = transactionManager;
141     }
142
143     /**
144      * Return the Spring PlatformTransactionManager to use for transactional
145      * wrapping of message reception plus listener execution.
146      */

147     protected final PlatformTransactionManager getTransactionManager() {
148         return this.transactionManager;
149     }
150
151     /**
152      * Specify the transaction name to use for transactional wrapping.
153      * Default is the bean name of this listener container, if any.
154      * @see org.springframework.transaction.TransactionDefinition#getName()
155      */

156     public void setTransactionName(String JavaDoc transactionName) {
157         this.transactionDefinition.setName(transactionName);
158     }
159
160     /**
161      * Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>.
162      * Default is none, using the transaction manager's default timeout.
163      * @see org.springframework.transaction.TransactionDefinition#getTimeout()
164      * @see #setReceiveTimeout
165      */

166     public void setTransactionTimeout(int transactionTimeout) {
167         this.transactionDefinition.setTimeout(transactionTimeout);
168     }
169
170     /**
171      * Set the timeout to use for receive calls, in <b>milliseconds</b>.
172      * The default is 1000 ms, that is, 1 second.
173      * <p><b>NOTE:</b> This value needs to be smaller than the transaction
174      * timeout used by the transaction manager (in the appropriate unit,
175      * of course). -1 indicates no timeout at all; however, this is only
176      * feasible if not running within a transaction manager.
177      * @see javax.jms.MessageConsumer#receive(long)
178      * @see javax.jms.MessageConsumer#receive()
179      * @see #setTransactionTimeout
180      */

181     public void setReceiveTimeout(long receiveTimeout) {
182         this.receiveTimeout = receiveTimeout;
183     }
184
185
186     public void initialize() {
187         // Set sessionTransacted=true in case of a non-JTA transaction manager.
188
if (!this.sessionTransactedCalled && this.transactionManager instanceof ResourceTransactionManager &&
189                 ((ResourceTransactionManager) this.transactionManager).getResourceFactory() != getConnectionFactory()) {
190             super.setSessionTransacted(true);
191         }
192
193         // Use bean name as default transaction name.
194
if (this.transactionDefinition.getName() == null) {
195             this.transactionDefinition.setName(getBeanName());
196         }
197
198         // Proceed with superclass initialization.
199
super.initialize();
200     }
201
202
203     /**
204      * Create a MessageConsumer for the given JMS Session,
205      * registering a MessageListener for the specified listener.
206      * @param session the JMS Session to work on
207      * @return the MessageConsumer
208      * @throws javax.jms.JMSException if thrown by JMS methods
209      * @see #receiveAndExecute
210      */

211     protected MessageConsumer JavaDoc createListenerConsumer(Session JavaDoc session) throws JMSException JavaDoc {
212         Destination JavaDoc destination = getDestination();
213         if (destination == null) {
214             destination = resolveDestinationName(session, getDestinationName());
215         }
216         return createConsumer(session, destination);
217     }
218
219     /**
220      * Execute the listener for a message received from the given consumer,
221      * wrapping the entire operation in an external transaction if demanded.
222      * @param session the JMS Session to work on
223      * @param consumer the MessageConsumer to work on
224      * @return whether a message has been received
225      * @throws JMSException if thrown by JMS methods
226      * @see #doReceiveAndExecute
227      */

228     protected boolean receiveAndExecute(Session JavaDoc session, MessageConsumer JavaDoc consumer) throws JMSException JavaDoc {
229         if (this.transactionManager != null) {
230             // Execute receive within transaction.
231
TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
232             boolean messageReceived = true;
233             try {
234                 messageReceived = doReceiveAndExecute(session, consumer, status);
235             }
236             catch (JMSException JavaDoc ex) {
237                 rollbackOnException(status, ex);
238                 throw ex;
239             }
240             catch (RuntimeException JavaDoc ex) {
241                 rollbackOnException(status, ex);
242                 throw ex;
243             }
244             catch (Error JavaDoc err) {
245                 rollbackOnException(status, err);
246                 throw err;
247             }
248             this.transactionManager.commit(status);
249             return messageReceived;
250         }
251
252         else {
253             // Execute receive outside of transaction.
254
return doReceiveAndExecute(session, consumer, null);
255         }
256     }
257
258     /**
259      * Actually execute the listener for a message received from the given consumer,
260      * fetching all requires resources and invoking the listener.
261      * @param session the JMS Session to work on
262      * @param consumer the MessageConsumer to work on
263      * @param status the TransactionStatus (may be <code>null</code>)
264      * @return whether a message has been received
265      * @throws JMSException if thrown by JMS methods
266      * @see #doExecuteListener(javax.jms.Session, javax.jms.Message)
267      */

268     protected boolean doReceiveAndExecute(Session JavaDoc session, MessageConsumer JavaDoc consumer, TransactionStatus status)
269             throws JMSException JavaDoc {
270
271         Connection JavaDoc conToClose = null;
272         Session JavaDoc sessionToClose = null;
273         MessageConsumer JavaDoc consumerToClose = null;
274         try {
275             Session JavaDoc sessionToUse = session;
276             boolean transactional = false;
277             if (sessionToUse == null) {
278                 sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
279                         getConnectionFactory(), this.transactionalResourceFactory);
280                 transactional = (sessionToUse != null);
281             }
282             if (sessionToUse == null) {
283                 Connection JavaDoc conToUse = null;
284                 if (sharedConnectionEnabled()) {
285                     conToUse = getSharedConnection();
286                 }
287                 else {
288                     conToUse = createConnection();
289                     conToClose = conToUse;
290                     conToUse.start();
291                 }
292                 sessionToUse = createSession(conToUse);
293                 sessionToClose = sessionToUse;
294             }
295             MessageConsumer JavaDoc consumerToUse = consumer;
296             if (consumerToUse == null) {
297                 consumerToUse = createListenerConsumer(sessionToUse);
298                 consumerToClose = consumerToUse;
299             }
300             Message JavaDoc message = receiveMessage(consumerToUse);
301             if (message != null) {
302                 if (logger.isDebugEnabled()) {
303                     logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
304                             consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
305                             sessionToUse + "]");
306                 }
307                 messageReceived(message, session);
308                 try {
309                     doExecuteListener(sessionToUse, message);
310                 }
311                 catch (Throwable JavaDoc ex) {
312                     if (status != null) {
313                         if (logger.isDebugEnabled()) {
314                             logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
315                         }
316                         status.setRollbackOnly();
317                     }
318                     handleListenerException(ex);
319                 }
320                 return true;
321             }
322             else {
323                 if (logger.isDebugEnabled()) {
324                     logger.debug("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
325                             "session [" + sessionToUse + "] did not receive a message");
326                 }
327                 return false;
328             }
329         }
330         finally {
331             JmsUtils.closeMessageConsumer(consumerToClose);
332             JmsUtils.closeSession(sessionToClose);
333             ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
334         }
335     }
336
337     /**
338      * This implementation checks whether the Session is externally synchronized.
339      * In this case, the Session is not locally transacted, despite the listener
340      * container's "sessionTransacted" flag being set to "true".
341      * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
342      */

343     protected boolean isSessionLocallyTransacted(Session JavaDoc session) {
344         return super.isSessionLocallyTransacted(session) &&
345                 !ConnectionFactoryUtils.isSessionTransactional(session, getConnectionFactory());
346     }
347
348     /**
349      * Perform a rollback, handling rollback exceptions properly.
350      * @param status object representing the transaction
351      * @param ex the thrown application exception or error
352      */

353     private void rollbackOnException(TransactionStatus status, Throwable JavaDoc ex) {
354         logger.debug("Initiating transaction rollback on application exception", ex);
355         try {
356             this.transactionManager.rollback(status);
357         }
358         catch (RuntimeException JavaDoc ex2) {
359             logger.error("Application exception overridden by rollback exception", ex);
360             throw ex2;
361         }
362         catch (Error JavaDoc err) {
363             logger.error("Application exception overridden by rollback error", ex);
364             throw err;
365         }
366     }
367
368     /**
369      * Receive a message from the given consumer.
370      * @param consumer the MessageConsumer to use
371      * @return the Message, or <code>null</code> if none
372      * @throws JMSException if thrown by JMS methods
373      */

374     protected Message JavaDoc receiveMessage(MessageConsumer JavaDoc consumer) throws JMSException JavaDoc {
375         return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));
376     }
377
378     /**
379      * Template method that gets called right when a new message has been received,
380      * before attempting to process it. Allows subclasses to react to the event
381      * of an actual incoming message, for example adapting their consumer count.
382      * @param message the received message
383      * @param session the receiving JMS Session
384      */

385     protected void messageReceived(Message JavaDoc message, Session JavaDoc session) {
386     }
387
388
389     //-------------------------------------------------------------------------
390
// JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
391
//-------------------------------------------------------------------------
392

393     /**
394      * Fetch an appropriate Connection from the given JmsResourceHolder.
395      * <p>This implementation accepts any JMS 1.1 Connection.
396      * @param holder the JmsResourceHolder
397      * @return an appropriate Connection fetched from the holder,
398      * or <code>null</code> if none found
399      */

400     protected Connection JavaDoc getConnection(JmsResourceHolder holder) {
401         return holder.getConnection();
402     }
403
404     /**
405      * Fetch an appropriate Session from the given JmsResourceHolder.
406      * <p>This implementation accepts any JMS 1.1 Session.
407      * @param holder the JmsResourceHolder
408      * @return an appropriate Session fetched from the holder,
409      * or <code>null</code> if none found
410      */

411     protected Session JavaDoc getSession(JmsResourceHolder holder) {
412         return holder.getSession();
413     }
414
415     /**
416      * Create a JMS MessageConsumer for the given Session and Destination.
417      * <p>This implementation uses JMS 1.1 API.
418      * @param session the JMS Session to create a MessageConsumer for
419      * @param destination the JMS Destination to create a MessageConsumer for
420      * @return the new JMS MessageConsumer
421      * @throws javax.jms.JMSException if thrown by JMS API methods
422      */

423     protected MessageConsumer JavaDoc createConsumer(Session JavaDoc session, Destination JavaDoc destination) throws JMSException JavaDoc {
424         // Only pass in the NoLocal flag in case of a Topic:
425
// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
426
// in case of the NoLocal flag being specified for a Queue.
427
if (isPubSubDomain()) {
428             if (isSubscriptionDurable() && destination instanceof Topic JavaDoc) {
429                 return session.createDurableSubscriber(
430                         (Topic JavaDoc) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
431             }
432             else {
433                 return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
434             }
435         }
436         else {
437             return session.createConsumer(destination, getMessageSelector());
438         }
439     }
440
441
442     /**
443      * ResourceFactory implementation that delegates to this listener container's protected callback methods.
444      */

445     private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
446
447         public Connection JavaDoc getConnection(JmsResourceHolder holder) {
448             return AbstractPollingMessageListenerContainer.this.getConnection(holder);
449         }
450
451         public Session JavaDoc getSession(JmsResourceHolder holder) {
452             return AbstractPollingMessageListenerContainer.this.getSession(holder);
453         }
454
455         public Connection JavaDoc createConnection() throws JMSException JavaDoc {
456             if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) {
457                 return AbstractPollingMessageListenerContainer.this.getSharedConnection();
458             }
459             else {
460                 return AbstractPollingMessageListenerContainer.this.createConnection();
461             }
462         }
463
464         public Session JavaDoc createSession(Connection JavaDoc con) throws JMSException JavaDoc {
465             return AbstractPollingMessageListenerContainer.this.createSession(con);
466         }
467
468         public boolean isSynchedLocalTransactionAllowed() {
469             return AbstractPollingMessageListenerContainer.this.isSessionTransacted();
470         }
471     }
472
473 }
474
Popular Tags