KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > springframework > jms > listener > DefaultMessageListenerContainer


1 /*
2  * Copyright 2002-2007 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.springframework.jms.listener;
18
19 import java.util.HashSet JavaDoc;
20 import java.util.Iterator JavaDoc;
21 import java.util.Set JavaDoc;
22
23 import javax.jms.Connection JavaDoc;
24 import javax.jms.JMSException JavaDoc;
25 import javax.jms.Message JavaDoc;
26 import javax.jms.MessageConsumer JavaDoc;
27 import javax.jms.Session JavaDoc;
28
29 import org.springframework.core.Constants;
30 import org.springframework.core.task.SimpleAsyncTaskExecutor;
31 import org.springframework.core.task.TaskExecutor;
32 import org.springframework.jms.support.JmsUtils;
33 import org.springframework.jms.support.destination.CachingDestinationResolver;
34 import org.springframework.jms.support.destination.DestinationResolver;
35 import org.springframework.scheduling.SchedulingAwareRunnable;
36 import org.springframework.scheduling.SchedulingTaskExecutor;
37 import org.springframework.util.Assert;
38 import org.springframework.util.ClassUtils;
39
40 /**
41  * Message listener container variant that uses plain JMS client API, specifically
42  * a loop of <code>MessageConsumer.receive()</code> calls that also allow for
43  * transactional reception of messages (registering them with XA transactions).
44  * Designed to work in a native JMS environment as well as in a J2EE environment,
45  * with only minimal differences in configuration.
46  *
47  * <p>This is a simple but nevertheless powerful form of message listener container.
48  * On startup, it obtains a fixed number of JMS Sessions to invoke the listener,
49  * and optionally allows for dynamic adaptation at runtime (up until a maximum number).
50  * Like {@link SimpleMessageListenerContainer}, its main advantage is its low level
51  * of runtime complexity, in particular the minimal requirements on the JMS provider:
52  * Not even the JMS ServerSessionPool facility is required. Beyond that, it is
53  * fully self-recovering in case of the broker being temporarily unavailable,
54  * and allows for stops/restarts as well as runtime changes to its configuration.
55  *
56  * <p>Actual MessageListener execution happens in asynchronous work units which are
57  * created through Spring's {@link org.springframework.core.task.TaskExecutor}
58  * abstraction. By default, the specified number of invoker tasks will be created
59  * on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"}
60  * setting. Specify an alternative TaskExecutor to integrate with an existing
61  * thread pool facility (such as a J2EE server's), for example using a
62  * {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}.
63  * With a native JMS setup, each of those listener threads is going to use a
64  * cached JMS Session and MessageConsumer (only refreshed in case of failure),
65  * using the JMS provider's resources as efficiently as possible.
66  *
67  * <p>Message reception and listener execution can automatically be wrapped
68  * in transactions through passing a Spring
69  * {@link org.springframework.transaction.PlatformTransactionManager} into the
70  * {@link #setTransactionManager "transactionManager"} property. This will usually
71  * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
72  * J2EE enviroment, in combination with a JTA-aware JMS ConnectionFactory obtained
73  * from JNDI (check your J2EE server's documentation). Note that this listener
74  * container will automatically reobtain all JMS handles for each transaction
75  * in case of an external transaction manager specified, for compatibility with
76  * all J2EE servers (in particular JBoss). This non-caching behavior can be
77  * overridden through the {@link #setCacheLevel "cacheLevel"} /
78  * {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching
79  * of the Connection (or also Session and MessageConsumer) even in case of
80  * an external transaction manager being involved.
81  *
82  * <p>Dynamic scaling of the number of concurrent invokers can be activated
83  * through specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"}
84  * value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"}
85  * value. Since the latter's default is 1, you can also simply specify a
86  * "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to
87  * 5 concurrent consumers in case of increasing message load, as well as dynamic
88  * shrinking back to the standard number of consumers once the load decreases.
89  * Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"}
90  * setting to control the lifespan of each new task, to avoid frequent scaling up
91  * and down, in particular if the ConnectionFactory does not pool JMS Sessions
92  * and/or the TaskExecutor does not pool threads (check your configuration!).
93  * Note that dynamic scaling only really makes sense for a queue in the first
94  * place; for a topic, you will typically stick with the default number of 1
95  * consumer, else you'd receive the same message multiple times on the same node.
96  *
97  * <p><b>It is strongly recommended to either set {@link #setSessionTransacted
98  * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
99  * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
100  * javadoc for details on acknowledge modes and native transaction options,
101  * as well as the {@link AbstractPollingMessageListenerContainer} javadoc
102  * for details on configuring an external transaction manager.
103  *
104  * <p>This class requires a JMS 1.1+ provider, because it builds on the
105  * domain-independent API. <b>Use the {@link DefaultMessageListenerContainer102}
106  * subclass for JMS 1.0.2 providers.</b>
107  *
108  * @author Juergen Hoeller
109  * @since 2.0
110  * @see #setTransactionManager
111  * @see #setCacheLevel
112  * @see javax.jms.MessageConsumer#receive(long)
113  * @see SimpleMessageListenerContainer
114  * @see org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer
115  * @see DefaultMessageListenerContainer102
116  */

117 public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
118
119     /**
120      * Default thread name prefix: "DefaultMessageListenerContainer-".
121      */

122     public static final String JavaDoc DEFAULT_THREAD_NAME_PREFIX =
123             ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-";
124
125     /**
126      * The default recovery interval: 5000 ms = 5 seconds.
127      */

128     public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
129
130
131     /**
132      * Constant that indicates to cache no JMS resources at all.
133      * @see #setCacheLevel
134      */

135     public static final int CACHE_NONE = 0;
136
137     /**
138      * Constant that indicates to cache a shared JMS Connection.
139      * @see #setCacheLevel
140      */

141     public static final int CACHE_CONNECTION = 1;
142
143     /**
144      * Constant that indicates to cache a shared JMS Connection
145      * and a JMS Session for each listener thread.
146      * @see #setCacheLevel
147      */

148     public static final int CACHE_SESSION = 2;
149
150     /**
151      * Constant that indicates to cache a shared JMS Connection
152      * and a JMS Session for each listener thread, as well as
153      * a JMS MessageConsumer for each listener thread.
154      * @see #setCacheLevel
155      */

156     public static final int CACHE_CONSUMER = 3;
157
158
159     private static final Constants constants = new Constants(DefaultMessageListenerContainer.class);
160
161
162     private TaskExecutor taskExecutor;
163
164     private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
165
166     private Integer JavaDoc cacheLevel;
167
168     private int concurrentConsumers = 1;
169
170     private int maxConcurrentConsumers = 1;
171
172     private int maxMessagesPerTask = Integer.MIN_VALUE;
173
174     private int idleTaskExecutionLimit = 1;
175
176     private final Set JavaDoc scheduledInvokers = new HashSet JavaDoc();
177
178     private int activeInvokerCount = 0;
179
180     private final Object JavaDoc activeInvokerMonitor = new Object JavaDoc();
181
182     private Object JavaDoc currentRecoveryMarker = new Object JavaDoc();
183
184     private final Object JavaDoc recoveryMonitor = new Object JavaDoc();
185
186
187     /**
188      * Set the Spring TaskExecutor to use for running the listener threads.
189      * <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor},
190      * starting up a number of new threads, according to the specified number
191      * of concurrent consumers.
192      * <p>Specify an alternative TaskExecutor for integration with an existing
193      * thread pool. Note that this really only adds value if the threads are
194      * managed in a specific fashion, for example within a J2EE environment.
195      * A plain thread pool does not add much value, as this listener container
196      * will occupy a number of threads for its entire lifetime.
197      * @see #setConcurrentConsumers
198      * @see org.springframework.core.task.SimpleAsyncTaskExecutor
199      * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
200      */

201     public void setTaskExecutor(TaskExecutor taskExecutor) {
202         this.taskExecutor = taskExecutor;
203     }
204
205     /**
206      * Specify the interval between recovery attempts, in <b>milliseconds</b>.
207      * The default is 5000 ms, that is, 5 seconds.
208      * @see #handleListenerSetupFailure
209      */

210     public void setRecoveryInterval(long recoveryInterval) {
211         this.recoveryInterval = recoveryInterval;
212     }
213
214     /**
215      * Specify the level of caching that this listener container is allowed to apply,
216      * in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".
217      * @see #setCacheLevel
218      */

219     public void setCacheLevelName(String JavaDoc constantName) throws IllegalArgumentException JavaDoc {
220         if (constantName == null || !constantName.startsWith("CACHE_")) {
221             throw new IllegalArgumentException JavaDoc("Only cache constants allowed");
222         }
223         setCacheLevel(constants.asNumber(constantName).intValue());
224     }
225
226     /**
227      * Specify the level of caching that this listener container is allowed to apply.
228      * <p>Default is CACHE_NONE if an external transaction manager has been specified
229      * (to reobtain all resources freshly within the scope of the external transaction),
230      * and CACHE_CONSUMER else (operating with local JMS resources).
231      * <p>Some J2EE servers only register their JMS resources with an ongoing XA
232      * transaction in case of a freshly obtained JMS Connection and Session,
233      * which is why this listener container does by default not cache any of those.
234      * However, if you want to optimize for a specific server, consider switching
235      * this setting to at least CACHE_CONNECTION or CACHE_SESSION even in
236      * conjunction with an external transaction manager.
237      * <p>Currently known servers that absolutely require CACHE_NONE for XA
238      * transaction processing: JBoss 4. For any others, consider raising the
239      * cache level.
240      * @see #CACHE_NONE
241      * @see #CACHE_CONNECTION
242      * @see #CACHE_SESSION
243      * @see #CACHE_CONSUMER
244      * @see #setCacheLevelName
245      * @see #setTransactionManager
246      */

247     public void setCacheLevel(int cacheLevel) {
248         this.cacheLevel = new Integer JavaDoc(cacheLevel);
249     }
250
251     /**
252      * Return the level of caching that this listener container is allowed to apply.
253      */

254     public int getCacheLevel() {
255         return (this.cacheLevel != null ? this.cacheLevel.intValue() : CACHE_NONE);
256     }
257
258
259     /**
260      * Specify the number of concurrent consumers to create. Default is 1.
261      * <p>Specifying a higher value for this setting will increase the standard
262      * level of scheduled concurrent consumers at runtime: This is effectively
263      * the minimum number of concurrent consumers which will be scheduled
264      * at any given time. This is a static setting; for dynamic scaling,
265      * consider specifying the "maxConcurrentConsumers" setting instead.
266      * <p>Raising the number of concurrent consumers is recommendable in order
267      * to scale the consumption of messages coming in from a queue. However,
268      * note that any ordering guarantees are lost once multiple consumers are
269      * registered. In general, stick with 1 consumer for low-volume queues.
270      * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
271      * This would lead to concurrent consumption of the same message,
272      * which is hardly ever desirable.
273      * <p><b>This setting can be modified at runtime, for example through JMX.</b>
274      * @see #setMaxConcurrentConsumers
275      */

276     public void setConcurrentConsumers(int concurrentConsumers) {
277         Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
278         synchronized (this.activeInvokerMonitor) {
279             this.concurrentConsumers = concurrentConsumers;
280             if (this.maxConcurrentConsumers < concurrentConsumers) {
281                 this.maxConcurrentConsumers = concurrentConsumers;
282             }
283         }
284     }
285
286     /**
287      * Return the "concurrentConsumer" setting.
288      * <p>This returns the currently configured "concurrentConsumers" value;
289      * the number of currently scheduled/active consumers might differ.
290      * @see #getScheduledConsumerCount()
291      * @see #getActiveConsumerCount()
292      */

293     public final int getConcurrentConsumers() {
294         synchronized (this.activeInvokerMonitor) {
295             return this.concurrentConsumers;
296         }
297     }
298
299     /**
300      * Specify the maximum number of concurrent consumers to create. Default is 1.
301      * <p>If this setting is higher than "concurrentConsumers", the listener container
302      * will dynamically schedule new consumers at runtime, provided that enough
303      * incoming messages are encountered. Once the load goes down again, the number of
304      * consumers will be reduced to the standard level ("concurrentConsumers") again.
305      * <p>Raising the number of concurrent consumers is recommendable in order
306      * to scale the consumption of messages coming in from a queue. However,
307      * note that any ordering guarantees are lost once multiple consumers are
308      * registered. In general, stick with 1 consumer for low-volume queues.
309      * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
310      * This would lead to concurrent consumption of the same message,
311      * which is hardly ever desirable.
312      * <p><b>This setting can be modified at runtime, for example through JMX.</b>
313      * @see #setConcurrentConsumers
314      */

315     public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
316         Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)");
317         synchronized (this.activeInvokerMonitor) {
318             this.maxConcurrentConsumers =
319                     (maxConcurrentConsumers > this.concurrentConsumers ? maxConcurrentConsumers : this.concurrentConsumers);
320         }
321     }
322
323     /**
324      * Return the "maxConcurrentConsumer" setting.
325      * <p>This returns the currently configured "maxConcurrentConsumers" value;
326      * the number of currently scheduled/active consumers might differ.
327      * @see #getScheduledConsumerCount()
328      * @see #getActiveConsumerCount()
329      */

330     public final int getMaxConcurrentConsumers() {
331         synchronized (this.activeInvokerMonitor) {
332             return this.maxConcurrentConsumers;
333         }
334     }
335
336     /**
337      * Specify the maximum number of messages to process in one task.
338      * More concretely, this limits the number of message reception attempts per
339      * task, which includes receive iterations that did not actually pick up a
340      * message until they hit their timeout (see "receiveTimeout" property).
341      * <p>Default is unlimited (-1) in case of a standard TaskExecutor,
342      * and 1 in case of a SchedulingTaskExecutor that indicates a preference for
343      * short-lived tasks. Specify a number of 10 to 100 messages to balance
344      * between extremely long-lived and extremely short-lived tasks here.
345      * <p>Long-lived tasks avoid frequent thread context switches through
346      * sticking with the same thread all the way through, while short-lived
347      * tasks allow thread pools to control the scheduling. Hence, thread
348      * pools will usually prefer short-lived tasks.
349      * <p><b>This setting can be modified at runtime, for example through JMX.</b>
350      * @see #setTaskExecutor
351      * @see #setReceiveTimeout
352      * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
353      */

354     public void setMaxMessagesPerTask(int maxMessagesPerTask) {
355         Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
356         synchronized (this.activeInvokerMonitor) {
357             this.maxMessagesPerTask = maxMessagesPerTask;
358         }
359     }
360
361     /**
362      * Return the maximum number of messages to process in one task.
363      */

364     public int getMaxMessagesPerTask() {
365         synchronized (this.activeInvokerMonitor) {
366             return this.maxMessagesPerTask;
367         }
368     }
369
370     /**
371      * Specify the limit for idle executions of a receive task, not having
372      * received any message within its execution. If this limit is reached,
373      * the task will shut down and leave receiving to other executing tasks
374      * (in case of dynamic scheduling; see the "maxConcurrentConsumers" setting).
375      * Default is 1.
376      * <p>Within each task execution, a number of message reception attempts
377      * (according to the "maxMessagesPerTask" setting) will each wait for an incoming
378      * message (according to the "receiveTimeout" setting). If all of those receive
379      * attempts in a given task return without a message, the task is considered
380      * idle with respect to received messages. Such a task may still be rescheduled;
381      * however, once it reached the specified "idleTaskExecutionLimit", it will
382      * shut down (in case of dynamic scaling).
383      * <p>Raise this limit if you encounter too frequent scaling up and down.
384      * With this limit being higher, an idle consumer will be kept around longer,
385      * avoiding the restart of a consumer once a new load of messages comes in.
386      * Alternatively, specify a higher "maxMessagePerTask" and/or "receiveTimeout" value,
387      * which will also lead to idle consumers being kept around for a longer time
388      * (while also increasing the average execution time of each scheduled task).
389      * <p><b>This setting can be modified at runtime, for example through JMX.</b>
390      * @see #setMaxMessagesPerTask
391      * @see #setReceiveTimeout
392      */

393     public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
394         Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher");
395         synchronized (this.activeInvokerMonitor) {
396             this.idleTaskExecutionLimit = idleTaskExecutionLimit;
397         }
398     }
399
400     /**
401      * Return the limit for idle executions of a receive task.
402      */

403     public int getIdleTaskExecutionLimit() {
404         synchronized (this.activeInvokerMonitor) {
405             return this.idleTaskExecutionLimit;
406         }
407     }
408
409     protected void validateConfiguration() {
410         super.validateConfiguration();
411         synchronized (this.activeInvokerMonitor) {
412             if (isSubscriptionDurable() && this.concurrentConsumers != 1) {
413                 throw new IllegalArgumentException JavaDoc("Only 1 concurrent consumer supported for durable subscription");
414             }
415         }
416     }
417
418
419     //-------------------------------------------------------------------------
420
// Implementation of AbstractMessageListenerContainer's template methods
421
//-------------------------------------------------------------------------
422

423     public void initialize() {
424         // Adapt default cache level.
425
if (getTransactionManager() != null) {
426             if (this.cacheLevel == null) {
427                 this.cacheLevel = new Integer JavaDoc(CACHE_NONE);
428             }
429         }
430         else {
431             if (this.cacheLevel == null) {
432                 this.cacheLevel = new Integer JavaDoc(CACHE_CONSUMER);
433             }
434         }
435
436         // Prepare taskExecutor and maxMessagesPerTask.
437
synchronized (this.activeInvokerMonitor) {
438             if (this.taskExecutor == null) {
439                 this.taskExecutor = createDefaultTaskExecutor();
440             }
441             else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
442                     ((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
443                     this.maxMessagesPerTask == Integer.MIN_VALUE) {
444                 // TaskExecutor indicated a preference for short-lived tasks. According to
445
// setMaxMessagesPerTask javadoc, we'll use 1 message per task in this case
446
// unless the user specified a custom value.
447
this.maxMessagesPerTask = 1;
448             }
449         }
450
451         // Proceed with actual listener initialization.
452
super.initialize();
453     }
454
455     /**
456      * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
457      * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
458      * with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
459      * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
460      */

461     protected TaskExecutor createDefaultTaskExecutor() {
462         String JavaDoc beanName = getBeanName();
463         String JavaDoc threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
464         return new SimpleAsyncTaskExecutor(threadNamePrefix);
465     }
466
467     /**
468      * Use a shared JMS Connection depending on the "cacheLevel" setting.
469      * @see #setCacheLevel
470      * @see #CACHE_CONNECTION
471      */

472     protected final boolean sharedConnectionEnabled() {
473         return (getCacheLevel() >= CACHE_CONNECTION);
474     }
475
476     /**
477      * Creates the specified number of concurrent consumers,
478      * in the form of a JMS Session plus associated MessageConsumer
479      * running in a separate thread.
480      * @see #scheduleNewInvoker
481      * @see #setTaskExecutor
482      */

483     protected void doInitialize() throws JMSException JavaDoc {
484         synchronized (this.activeInvokerMonitor) {
485             for (int i = 0; i < this.concurrentConsumers; i++) {
486                 scheduleNewInvoker();
487             }
488         }
489     }
490
491     /**
492      * Re-executes the given task via this listener container's TaskExecutor.
493      * @see #setTaskExecutor
494      */

495     protected void doRescheduleTask(Object JavaDoc task) {
496         this.taskExecutor.execute((Runnable JavaDoc) task);
497     }
498
499     protected void messageReceived(Message JavaDoc message, Session JavaDoc session) {
500         scheduleNewInvokerIfAppropriate();
501     }
502
503     /**
504      * Schedule a new invoker, increasing the total number of scheduled
505      * invokers for this listener container, but only if the specified
506      * "maxConcurrentConsumers" limit has not been reached yet, and only
507      * if this listener container does not currently have idle invokers
508      * that are waiting for new messages already.
509      * <p>Called once a message has been received, to scale up while
510      * processing the message in the invoker that originally received it.
511      * @see #setTaskExecutor
512      * @see #getMaxConcurrentConsumers()
513      */

514     protected void scheduleNewInvokerIfAppropriate() {
515         if (isRunning()) {
516             synchronized (this.activeInvokerMonitor) {
517                 if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && !hasIdleInvokers()) {
518                     scheduleNewInvoker();
519                     if (logger.isDebugEnabled()) {
520                         logger.debug("Raised scheduled invoker count: " + scheduledInvokers.size());
521                     }
522                 }
523             }
524         }
525     }
526
527     /**
528      * Schedule a new invoker, increasing the total number of scheduled
529      * invokers for this listener container.
530      */

531     private void scheduleNewInvoker() {
532         AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
533         this.taskExecutor.execute(invoker);
534         this.scheduledInvokers.add(invoker);
535         this.activeInvokerMonitor.notifyAll();
536     }
537
538     /**
539      * Determine whether this listener container currently has any
540      * idle instances among its scheduled invokers.
541      */

542     private boolean hasIdleInvokers() {
543         for (Iterator JavaDoc it = this.scheduledInvokers.iterator(); it.hasNext();) {
544             AsyncMessageListenerInvoker invoker = (AsyncMessageListenerInvoker) it.next();
545             if (invoker.isIdle()) {
546                 return true;
547             }
548         }
549         return false;
550     }
551
552     /**
553      * Determine whether the current invoker should be rescheduled,
554      * given that it might not have received a message in a while.
555      * @param idleTaskExecutionCount the number of idle executions
556      * that this invoker task has already accumulated (in a row)
557      */

558     private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) {
559         synchronized (this.activeInvokerMonitor) {
560             boolean idle = (idleTaskExecutionCount >= this.idleTaskExecutionLimit);
561             return (this.scheduledInvokers.size() <= (idle ? this.concurrentConsumers : this.maxConcurrentConsumers));
562         }
563     }
564
565     /**
566      * Return the number of currently scheduled consumers.
567      * <p>This number will always be inbetween "concurrentConsumers" and
568      * "maxConcurrentConsumers", but might be higher than "activeConsumerCount"
569      * (in case of some consumers being scheduled but not executed at the moment).
570      * @see #getConcurrentConsumers()
571      * @see #getMaxConcurrentConsumers()
572      * @see #getActiveConsumerCount()
573      */

574     public final int getScheduledConsumerCount() {
575         synchronized (this.activeInvokerMonitor) {
576             return this.scheduledInvokers.size();
577         }
578     }
579
580     /**
581      * Return the number of currently active consumers.
582      * <p>This number will always be inbetween "concurrentConsumers" and
583      * "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount".
584      * (in case of some consumers being scheduled but not executed at the moment).
585      * @see #getConcurrentConsumers()
586      * @see #getMaxConcurrentConsumers()
587      * @see #getActiveConsumerCount()
588      */

589     public final int getActiveConsumerCount() {
590         synchronized (this.activeInvokerMonitor) {
591             return this.activeInvokerCount;
592         }
593     }
594
595
596     /**
597      * Overridden to accept a failure in the initial setup - leaving it up to the
598      * asynchronous invokers to establish the shared Connection on first access.
599      * @see #refreshConnectionUntilSuccessful()
600      */

601     protected void establishSharedConnection() {
602         try {
603             refreshSharedConnection();
604         }
605         catch (JMSException JavaDoc ex) {
606             logger.debug("Could not establish shared JMS Connection - " +
607                     "leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
608         }
609     }
610
611     /**
612      * This implementations proceeds even after an exception thrown from
613      * <code>Connection.start()</code>, relying on listeners to perform
614      * appropriate recovery.
615      */

616     protected void startSharedConnection() {
617         try {
618             super.startSharedConnection();
619         }
620         catch (JMSException JavaDoc ex) {
621             logger.debug("Connection start failed - relying on listeners to perform recovery", ex);
622         }
623     }
624
625     /**
626      * This implementations proceeds even after an exception thrown from
627      * <code>Connection.stop()</code>, relying on listeners to perform
628      * appropriate recovery after a restart.
629      */

630     protected void stopSharedConnection() {
631         try {
632             super.stopSharedConnection();
633         }
634         catch (JMSException JavaDoc ex) {
635             logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex);
636         }
637     }
638
639     /**
640      * Handle the given exception that arose during setup of a listener.
641      * Called for every such exception in every concurrent listener.
642      * <p>The default implementation logs the exception at error level
643      * if not recovered yet, and at debug level if already recovered.
644      * Can be overridden in subclasses.
645      * @param ex the exception to handle
646      * @param alreadyRecovered whether a previously executing listener
647      * already recovered from the present listener setup failure
648      * (this usually indicates a follow-up failure than be ignored
649      * other than for debug log purposes)
650      * @see #recoverAfterListenerSetupFailure()
651      */

652     protected void handleListenerSetupFailure(Throwable JavaDoc ex, boolean alreadyRecovered) {
653         if (ex instanceof JMSException JavaDoc) {
654             invokeExceptionListener((JMSException JavaDoc) ex);
655         }
656         if (ex instanceof SharedConnectionNotInitializedException) {
657             if (!alreadyRecovered) {
658                 logger.debug("JMS message listener invoker needs to establish shared Connection");
659             }
660         }
661         else {
662             if (alreadyRecovered) {
663                 logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex);
664             }
665             else {
666                 logger.error("Setup of JMS message listener invoker failed - trying to recover", ex);
667             }
668         }
669     }
670
671     /**
672      * Recover this listener container after a listener failed to set itself up,
673      * for example reestablishing the underlying Connection.
674      * <p>The default implementation delegates to DefaultMessageListenerContainer's
675      * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will
676      * try to re-establish a Connection to the JMS provider both for the shared
677      * and the non-shared Connection case.
678      * @see #refreshConnectionUntilSuccessful()
679      * @see #refreshDestination()
680      */

681     protected void recoverAfterListenerSetupFailure() {
682         refreshConnectionUntilSuccessful();
683         refreshDestination();
684     }
685
686     /**
687      * Refresh the underlying Connection, not returning before an attempt has been
688      * successful. Called in case of a shared Connection as well as without shared
689      * Connection, so either needs to operate on the shared Connection or on a
690      * temporary Connection that just gets established for validation purposes.
691      * <p>The default implementation retries until it successfully established a
692      * Connection, for as long as this message listener container is active.
693      * Applies the specified recovery interval between retries.
694      * @see #setRecoveryInterval
695      */

696     protected void refreshConnectionUntilSuccessful() {
697         while (isActive()) {
698             try {
699                 if (sharedConnectionEnabled()) {
700                     refreshSharedConnection();
701                     if (isRunning()) {
702                         startSharedConnection();
703                     }
704                 }
705                 else {
706                     Connection JavaDoc con = createConnection();
707                     JmsUtils.closeConnection(con);
708                 }
709                 logger.info("Successfully refreshed JMS Connection");
710                 break;
711             }
712             catch (Exception JavaDoc ex) {
713                 if (logger.isInfoEnabled()) {
714                     logger.info("Could not refresh JMS Connection - retrying in " + this.recoveryInterval + " ms", ex);
715                 }
716             }
717             sleepInbetweenRecoveryAttempts();
718         }
719     }
720
721     /**
722      * Refresh the JMS destination that this listener container operates on.
723      * <p>Called after listener setup failure, assuming that a cached Destination
724      * object might have become invalid (a typical case on WebLogic JMS).
725      * <p>The default implementation removes the destination from a
726      * DestinationResolver's cache, in case of a CachingDestinationResolver.
727      * @see #setDestinationName
728      * @see org.springframework.jms.support.destination.CachingDestinationResolver
729      */

730     protected void refreshDestination() {
731         String JavaDoc destName = getDestinationName();
732         if (destName != null) {
733             DestinationResolver destResolver = getDestinationResolver();
734             if (destResolver instanceof CachingDestinationResolver) {
735                 ((CachingDestinationResolver) destResolver).removeFromCache(destName);
736             }
737         }
738     }
739
740     /**
741      * Sleep according to the specified recovery interval.
742      * Called inbetween recovery attempts.
743      */

744     protected void sleepInbetweenRecoveryAttempts() {
745         if (this.recoveryInterval > 0) {
746             try {
747                 Thread.sleep(this.recoveryInterval);
748             }
749             catch (InterruptedException JavaDoc interEx) {
750                 // Re-interrupt current thread, to allow other threads to react.
751
Thread.currentThread().interrupt();
752             }
753         }
754     }
755
756
757     /**
758      * Destroy the registered JMS Sessions and associated MessageConsumers.
759      */

760     protected void doShutdown() throws JMSException JavaDoc {
761         logger.debug("Waiting for shutdown of message listener invokers");
762         synchronized (this.activeInvokerMonitor) {
763             while (this.activeInvokerCount > 0) {
764                 if (logger.isDebugEnabled()) {
765                     logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
766                             " message listener invokers");
767                 }
768                 try {
769                     this.activeInvokerMonitor.wait();
770                 }
771                 catch (InterruptedException JavaDoc interEx) {
772                     // Re-interrupt current thread, to allow other threads to react.
773
Thread.currentThread().interrupt();
774                 }
775             }
776         }
777     }
778
779
780     //-------------------------------------------------------------------------
781
// Inner classes used as internal adapters
782
//-------------------------------------------------------------------------
783

784     /**
785      * Runnable that performs looped <code>MessageConsumer.receive()</code> calls.
786      */

787     private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {
788
789         private Session JavaDoc session;
790
791         private MessageConsumer JavaDoc consumer;
792
793         private Object JavaDoc lastRecoveryMarker;
794
795         private boolean lastMessageSucceeded;
796
797         private int idleTaskExecutionCount = 0;
798
799         private volatile boolean idle = true;
800
801         public void run() {
802             synchronized (activeInvokerMonitor) {
803                 activeInvokerCount++;
804                 activeInvokerMonitor.notifyAll();
805             }
806             boolean messageReceived = false;
807             try {
808                 if (maxMessagesPerTask < 0) {
809                     while (isActive()) {
810                         waitWhileNotRunning();
811                         if (isActive()) {
812                             messageReceived = invokeListener();
813                         }
814                     }
815                 }
816                 else {
817                     int messageCount = 0;
818                     while (isRunning() && messageCount < maxMessagesPerTask) {
819                         messageReceived = (invokeListener() || messageReceived);
820                         messageCount++;
821                     }
822                 }
823             }
824             catch (Throwable JavaDoc ex) {
825                 clearResources();
826                 if (!this.lastMessageSucceeded) {
827                     // We failed more than once in a row - sleep for recovery interval
828
// even before first recovery attempt.
829
sleepInbetweenRecoveryAttempts();
830                 }
831                 this.lastMessageSucceeded = false;
832                 boolean alreadyRecovered = false;
833                 synchronized (recoveryMonitor) {
834                     if (this.lastRecoveryMarker == currentRecoveryMarker) {
835                         handleListenerSetupFailure(ex, false);
836                         recoverAfterListenerSetupFailure();
837                         currentRecoveryMarker = new Object JavaDoc();
838                     }
839                     else {
840                         alreadyRecovered = true;
841                     }
842                 }
843                 if (alreadyRecovered) {
844                     handleListenerSetupFailure(ex, true);
845                 }
846             }
847             synchronized (activeInvokerMonitor) {
848                 activeInvokerCount--;
849                 activeInvokerMonitor.notifyAll();
850             }
851             if (!messageReceived) {
852                 this.idleTaskExecutionCount++;
853             }
854             else {
855                 this.idleTaskExecutionCount = 0;
856             }
857             if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
858                 // We're shutting down completely.
859
synchronized (activeInvokerMonitor) {
860                     scheduledInvokers.remove(this);
861                     if (logger.isDebugEnabled()) {
862                         logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
863                     }
864                     activeInvokerMonitor.notifyAll();
865                 }
866                 clearResources();
867             }
868         }
869
870         private boolean invokeListener() throws JMSException JavaDoc {
871             initResourcesIfNecessary();
872             boolean messageReceived = receiveAndExecute(this.session, this.consumer);
873             this.lastMessageSucceeded = true;
874             this.idle = !messageReceived;
875             return messageReceived;
876         }
877
878         private void initResourcesIfNecessary() throws JMSException JavaDoc {
879             if (getCacheLevel() <= CACHE_CONNECTION) {
880                 updateRecoveryMarker();
881             }
882             else {
883                 if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
884                     updateRecoveryMarker();
885                     this.session = createSession(getSharedConnection());
886                 }
887                 if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
888                     this.consumer = createListenerConsumer(this.session);
889                 }
890             }
891         }
892
893         private void updateRecoveryMarker() {
894             synchronized (recoveryMonitor) {
895                 this.lastRecoveryMarker = currentRecoveryMarker;
896             }
897         }
898
899         private void clearResources() {
900             JmsUtils.closeMessageConsumer(this.consumer);
901             JmsUtils.closeSession(this.session);
902             this.consumer = null;
903             this.session = null;
904         }
905
906         public boolean isLongLived() {
907             return (maxMessagesPerTask < 0);
908         }
909
910         public boolean isIdle() {
911             return this.idle;
912         }
913     }
914
915 }
916
Popular Tags