KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > springframework > jms > listener > AbstractJmsListeningContainer


1 /*
2  * Copyright 2002-2007 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.springframework.jms.listener;
18
19 import java.util.Iterator JavaDoc;
20 import java.util.LinkedList JavaDoc;
21 import java.util.List JavaDoc;
22
23 import javax.jms.Connection JavaDoc;
24 import javax.jms.JMSException JavaDoc;
25
26 import org.springframework.beans.factory.BeanNameAware;
27 import org.springframework.beans.factory.DisposableBean;
28 import org.springframework.context.Lifecycle;
29 import org.springframework.jms.JmsException;
30 import org.springframework.jms.connection.ConnectionFactoryUtils;
31 import org.springframework.jms.support.JmsUtils;
32 import org.springframework.jms.support.destination.JmsDestinationAccessor;
33 import org.springframework.util.Assert;
34 import org.springframework.util.ClassUtils;
35
36 /**
37  * Common base class for all containers which need to implement listening
38  * based on a JMS Connection (either shared or freshly obtained for each attempt).
39  * Inherits basic Connection and Session configuration handling from the
40  * {@link org.springframework.jms.support.JmsAccessor} base class.
41  *
42  * <p>This class provides basic lifecycle management, in particular management
43  * of a shared JMS Connection. Subclasses are supposed to plug into this
44  * lifecycle, implementing the {@link #sharedConnectionEnabled()} as well
45  * as the {@link #doInitialize()} and {@link #doShutdown()} template methods.
46  *
47  * <p>This base class does not assume any specific listener programming model
48  * or listener invoker mechanism. It just provides the general runtime
49  * lifecycle management needed for any kind of JMS-based listening mechanism
50  * that operates on a JMS Connection/Session.
51  *
52  * <p>For a concrete listener programming model, check out the
53  * {@link AbstractMessageListenerContainer} subclass. For a concrete listener
54  * invoker mechanism, check out the {@link DefaultMessageListenerContainer} class.
55  *
56  * @author Juergen Hoeller
57  * @since 2.0.3
58  * @see #sharedConnectionEnabled()
59  * @see #doInitialize()
60  * @see #doShutdown()
61  */

62 public abstract class AbstractJmsListeningContainer extends JmsDestinationAccessor
63         implements Lifecycle, BeanNameAware, DisposableBean {
64
65     private String JavaDoc clientId;
66
67     private boolean autoStartup = true;
68
69     private String JavaDoc beanName;
70
71     private Connection JavaDoc sharedConnection;
72
73     private final Object JavaDoc sharedConnectionMonitor = new Object JavaDoc();
74
75     private boolean active = false;
76
77     private boolean running = false;
78
79     private final List JavaDoc pausedTasks = new LinkedList JavaDoc();
80
81     private final Object JavaDoc lifecycleMonitor = new Object JavaDoc();
82
83
84     /**
85      * Specify the JMS client id for a shared Connection created and used
86      * by this container.
87      * <p>Note that client ids need to be unique among all active Connections
88      * of the underlying JMS provider. Furthermore, a client id can only be
89      * assigned if the original ConnectionFactory hasn't already assigned one.
90      * @see javax.jms.Connection#setClientID
91      * @see #setConnectionFactory
92      */

93     public void setClientId(String JavaDoc clientId) {
94         this.clientId = clientId;
95     }
96
97     /**
98      * Return the JMS client ID for the shared Connection created and used
99      * by this container, if any.
100      */

101     protected String JavaDoc getClientId() {
102         return this.clientId;
103     }
104
105     /**
106      * Set whether to automatically start the container after initialization.
107      * <p>Default is "true"; set this to "false" to allow for manual startup
108      * through the {@link #start()} method.
109      */

110     public void setAutoStartup(boolean autoStartup) {
111         this.autoStartup = autoStartup;
112     }
113
114     public void setBeanName(String JavaDoc beanName) {
115         this.beanName = beanName;
116     }
117
118     /**
119      * Return the bean name that this listener container has been assigned
120      * in its containing bean factory, if any.
121      */

122     protected final String JavaDoc getBeanName() {
123         return this.beanName;
124     }
125
126
127     /**
128      * Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
129      */

130     public void afterPropertiesSet() {
131         super.afterPropertiesSet();
132         validateConfiguration();
133         initialize();
134     }
135
136     /**
137      * Validate the configuration of this container.
138      * <p>The default implementation is empty. To be overridden in subclasses.
139      */

140     protected void validateConfiguration() {
141     }
142
143
144     /**
145      * Initialize this container.
146      * <p>Creates a JMS Connection, starts the {@link javax.jms.Connection}
147      * (if {@link #setAutoStartup(boolean) "autoStartup"} hasn't been turned off),
148      * and calls {@link #doInitialize()}.
149      * @throws org.springframework.jms.JmsException if startup failed
150      */

151     public void initialize() throws JmsException {
152         try {
153             synchronized (this.lifecycleMonitor) {
154                 this.active = true;
155                 this.lifecycleMonitor.notifyAll();
156             }
157
158             if (sharedConnectionEnabled()) {
159                 establishSharedConnection();
160             }
161
162             if (this.autoStartup) {
163                 doStart();
164             }
165
166             doInitialize();
167         }
168         catch (JMSException JavaDoc ex) {
169             synchronized (this.sharedConnectionMonitor) {
170                 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup);
171             }
172             throw convertJmsAccessException(ex);
173         }
174     }
175
176     /**
177      * Establish a shared Connection for this container.
178      * <p>The default implementation delegates to <code>refreshSharedConnection</code>,
179      * which does one immediate attempt and throws an exception if it fails.
180      * Can be overridden to have a recovery proces in place, retrying
181      * until a Connection can be successfully established.
182      * @throws JMSException if thrown by JMS API methods
183      * @see #refreshSharedConnection()
184      */

185     protected void establishSharedConnection() throws JMSException JavaDoc {
186         refreshSharedConnection();
187     }
188
189     /**
190      * Refresh the shared Connection that this container holds.
191      * <p>Called on startup and also after an infrastructure exception
192      * that occured during invoker setup and/or execution.
193      * @throws JMSException if thrown by JMS API methods
194      */

195     protected final void refreshSharedConnection() throws JMSException JavaDoc {
196         boolean running = isRunning();
197         synchronized (this.sharedConnectionMonitor) {
198             ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), running);
199             this.sharedConnection = null;
200             Connection JavaDoc con = createConnection();
201             try {
202                 prepareSharedConnection(con);
203             }
204             catch (JMSException JavaDoc ex) {
205                 JmsUtils.closeConnection(con);
206                 throw ex;
207             }
208             this.sharedConnection = con;
209         }
210     }
211
212     /**
213      * Prepare the given Connection, which is about to be registered
214      * as shared Connection for this container.
215      * <p>The default implementation sets the specified client id, if any.
216      * Subclasses can override this to apply further settings.
217      * @param connection the Connection to prepare
218      * @throws JMSException if the preparation efforts failed
219      * @see #getClientId()
220      */

221     protected void prepareSharedConnection(Connection JavaDoc connection) throws JMSException JavaDoc {
222         String JavaDoc clientId = getClientId();
223         if (clientId != null) {
224             connection.setClientID(clientId);
225         }
226     }
227
228     /**
229      * Return the shared JMS Connection maintained by this container.
230      * Available after initialization.
231      * @return the shared Connection (never <code>null</code>)
232      * @throws IllegalStateException if this container does not maintain a
233      * shared Connection, or if the Connection hasn't been initialized yet
234      * @see #sharedConnectionEnabled()
235      */

236     protected final Connection JavaDoc getSharedConnection() {
237         if (!sharedConnectionEnabled()) {
238             throw new IllegalStateException JavaDoc(
239                     "This listener container does not maintain a shared Connection");
240         }
241         synchronized (this.sharedConnectionMonitor) {
242             if (this.sharedConnection == null) {
243                 throw new SharedConnectionNotInitializedException(
244                         "This listener container's shared Connection has not been initialized yet");
245             }
246             return this.sharedConnection;
247         }
248     }
249
250
251     /**
252      * Calls {@link #shutdown()} when the BeanFactory destroys the container instance.
253      * @see #shutdown()
254      */

255     public void destroy() {
256         shutdown();
257     }
258
259     /**
260      * Stop the shared Connection, call {@link #doShutdown()},
261      * and close this container.
262      * @throws JmsException if shutdown failed
263      */

264     public void shutdown() throws JmsException {
265         logger.debug("Shutting down JMS listener container");
266         boolean wasRunning = false;
267         synchronized (this.lifecycleMonitor) {
268             wasRunning = this.running;
269             this.running = false;
270             this.active = false;
271             this.lifecycleMonitor.notifyAll();
272         }
273
274         // Stop shared Connection early, if necessary.
275
if (wasRunning && sharedConnectionEnabled()) {
276             try {
277                 stopSharedConnection();
278             }
279             catch (Throwable JavaDoc ex) {
280                 logger.debug("Could not stop JMS Connection on shutdown", ex);
281             }
282         }
283
284         // Shut down the invokers.
285
try {
286             doShutdown();
287         }
288         catch (JMSException JavaDoc ex) {
289             throw convertJmsAccessException(ex);
290         }
291         finally {
292             if (sharedConnectionEnabled()) {
293                 synchronized (this.sharedConnectionMonitor) {
294                     ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), false);
295                 }
296             }
297         }
298     }
299
300     /**
301      * Return whether this container is currently active,
302      * that is, whether it has been set up but not shut down yet.
303      */

304     public final boolean isActive() {
305         synchronized (this.lifecycleMonitor) {
306             return this.active;
307         }
308     }
309
310
311     //-------------------------------------------------------------------------
312
// Lifecycle methods for dynamically starting and stopping the container
313
//-------------------------------------------------------------------------
314

315     /**
316      * Start this container.
317      * @throws JmsException if starting failed
318      * @see #doStart
319      */

320     public void start() throws JmsException {
321         try {
322             doStart();
323         }
324         catch (JMSException JavaDoc ex) {
325             throw convertJmsAccessException(ex);
326         }
327     }
328
329     /**
330      * Start the shared Connection, if any, and notify all invoker tasks.
331      * @throws JMSException if thrown by JMS API methods
332      * @see #startSharedConnection
333      */

334     protected void doStart() throws JMSException JavaDoc {
335         synchronized (this.lifecycleMonitor) {
336             this.running = true;
337             this.lifecycleMonitor.notifyAll();
338             for (Iterator JavaDoc it = this.pausedTasks.iterator(); it.hasNext();) {
339                 doRescheduleTask(it.next());
340                 it.remove();
341             }
342         }
343
344         if (sharedConnectionEnabled()) {
345             startSharedConnection();
346         }
347     }
348
349     /**
350      * Start the shared Connection.
351      * @throws JMSException if thrown by JMS API methods
352      * @see javax.jms.Connection#start()
353      */

354     protected void startSharedConnection() throws JMSException JavaDoc {
355         synchronized (this.sharedConnectionMonitor) {
356             if (this.sharedConnection != null) {
357                 try {
358                     this.sharedConnection.start();
359                 }
360                 catch (javax.jms.IllegalStateException JavaDoc ex) {
361                     logger.debug("Ignoring Connection start exception - assuming already started", ex);
362                 }
363             }
364         }
365     }
366
367     /**
368      * Stop this container.
369      * @throws JmsException if stopping failed
370      * @see #doStop
371      */

372     public void stop() throws JmsException {
373         try {
374             doStop();
375         }
376         catch (JMSException JavaDoc ex) {
377             throw convertJmsAccessException(ex);
378         }
379     }
380
381     /**
382      * Notify all invoker tasks and stop the shared Connection, if any.
383      * @throws JMSException if thrown by JMS API methods
384      * @see #stopSharedConnection
385      */

386     protected void doStop() throws JMSException JavaDoc {
387         synchronized (this.lifecycleMonitor) {
388             this.running = false;
389             this.lifecycleMonitor.notifyAll();
390         }
391
392         if (sharedConnectionEnabled()) {
393             stopSharedConnection();
394         }
395     }
396
397     /**
398      * Stop the shared Connection.
399      * @throws JMSException if thrown by JMS API methods
400      * @see javax.jms.Connection#start()
401      */

402     protected void stopSharedConnection() throws JMSException JavaDoc {
403         synchronized (this.sharedConnectionMonitor) {
404             if (this.sharedConnection != null) {
405                 try {
406                     this.sharedConnection.stop();
407                 }
408                 catch (javax.jms.IllegalStateException JavaDoc ex) {
409                     logger.debug("Ignoring Connection stop exception - assuming already stopped", ex);
410                 }
411             }
412         }
413     }
414
415     /**
416      * Return whether this container is currently running,
417      * that is, whether it has been started and not stopped yet.
418      */

419     public final boolean isRunning() {
420         synchronized (this.lifecycleMonitor) {
421             return this.running;
422         }
423     }
424
425     /**
426      * Wait while this container is not running.
427      * <p>To be called by asynchronous tasks that want to block
428      * while the container is in stopped state.
429      */

430     protected final void waitWhileNotRunning() {
431         synchronized (this.lifecycleMonitor) {
432             while (this.active && !this.running) {
433                 try {
434                     this.lifecycleMonitor.wait();
435                 }
436                 catch (InterruptedException JavaDoc ex) {
437                     // Re-interrupt current thread, to allow other threads to react.
438
Thread.currentThread().interrupt();
439                 }
440             }
441         }
442     }
443
444     /**
445      * Take the given task object and reschedule it, either immediately if
446      * this container is currently running, or later once this container
447      * has been restarted.
448      * <p>If this container has already been shut down, the task will not
449      * get rescheduled at all.
450      * @param task the task object to reschedule
451      * @return whether the task has been rescheduled
452      * (either immediately or for a restart of this container)
453      * @see #doRescheduleTask
454      */

455     protected final boolean rescheduleTaskIfNecessary(Object JavaDoc task) {
456         Assert.notNull(task, "Task object must not be null");
457         synchronized (this.lifecycleMonitor) {
458             if (this.running) {
459                 doRescheduleTask(task);
460                 return true;
461             }
462             else if (this.active) {
463                 this.pausedTasks.add(task);
464                 return true;
465             }
466             else {
467                 return false;
468             }
469         }
470     }
471
472     /**
473      * Reschedule the given task object immediately.
474      * <p>To be implemented by subclasses if they ever call
475      * <code>rescheduleTaskIfNecessary</code>.
476      * This implementation throws an UnsupportedOperationException.
477      * @param task the task object to reschedule
478      * @see #rescheduleTaskIfNecessary
479      */

480     protected void doRescheduleTask(Object JavaDoc task) {
481         throw new UnsupportedOperationException JavaDoc(
482                 ClassUtils.getShortName(getClass()) + " does not support rescheduling of tasks");
483     }
484
485
486     //-------------------------------------------------------------------------
487
// Template methods to be implemented by subclasses
488
//-------------------------------------------------------------------------
489

490     /**
491      * Return whether a shared JMS Connection should be maintained
492      * by this container base class.
493      * @see #getSharedConnection()
494      */

495     protected abstract boolean sharedConnectionEnabled();
496
497     /**
498      * Register any invokers within this container.
499      * <p>Subclasses need to implement this method for their specific
500      * invoker management process.
501      * <p>A shared JMS Connection, if any, will already have been
502      * started at this point.
503      * @throws JMSException if registration failed
504      * @see #getSharedConnection()
505      */

506     protected abstract void doInitialize() throws JMSException JavaDoc;
507
508     /**
509      * Close the registered invokers.
510      * <p>Subclasses need to implement this method for their specific
511      * invoker management process.
512      * <p>A shared JMS Connection, if any, will automatically be closed
513      * <i>afterwards</i>.
514      * @throws JMSException if shutdown failed
515      * @see #shutdown()
516      */

517     protected abstract void doShutdown() throws JMSException JavaDoc;
518
519
520     /**
521      * Exception that indicates that the initial setup of this container's
522      * shared JMS Connection failed. This is indicating to invokers that they need
523      * to establish the shared Connection themselves on first access.
524      */

525     public static class SharedConnectionNotInitializedException extends RuntimeException JavaDoc {
526
527         /**
528          * Create a new SharedConnectionNotInitializedException.
529          * @param msg the detail message
530          */

531         protected SharedConnectionNotInitializedException(String JavaDoc msg) {
532             super(msg);
533         }
534     }
535
536 }
537
Popular Tags