KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > springframework > jms > listener > SimpleMessageListenerContainer


1 /*
2  * Copyright 2002-2007 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.springframework.jms.listener;
18
19 import java.util.HashSet JavaDoc;
20 import java.util.Iterator JavaDoc;
21 import java.util.Set JavaDoc;
22
23 import javax.jms.Destination JavaDoc;
24 import javax.jms.JMSException JavaDoc;
25 import javax.jms.Message JavaDoc;
26 import javax.jms.MessageConsumer JavaDoc;
27 import javax.jms.MessageListener JavaDoc;
28 import javax.jms.Session JavaDoc;
29 import javax.jms.Topic JavaDoc;
30
31 import org.springframework.core.task.TaskExecutor;
32 import org.springframework.jms.support.JmsUtils;
33 import org.springframework.util.Assert;
34
35 /**
36  * Message listener container that uses the plain JMS client API's
37  * <code>MessageConsumer.setMessageListener()</code> method to
38  * create concurrent MessageConsumers for the specified listeners.
39  *
40  * <p>This is the simplest form of a message listener container.
41  * It creates a fixed number of JMS Sessions to invoke the listener,
42  * not allowing for dynamic adaptation to runtime demands. Its main
43  * advantage is its low level of complexity and the minimum requirements
44  * on the JMS provider: Not even the ServerSessionPool facility is required.
45  *
46  * <p>See the {@link AbstractMessageListenerContainer} javadoc for details
47  * on acknowledge modes and transaction options.
48  *
49  * <p>For a different style of MessageListener handling, through looped
50  * <code>MessageConsumer.receive()</code> calls that also allow for
51  * transactional reception of messages (registering them with XA transactions),
52  * see {@link DefaultMessageListenerContainer}. For dynamic adaptation of the
53  * active number of Sessions, consider using
54  * {@link org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer}.
55  *
56  * <p>This class requires a JMS 1.1+ provider, because it builds on the
57  * domain-independent API. <b>Use the {@link SimpleMessageListenerContainer102}
58  * subclass for JMS 1.0.2 providers.</b>
59  *
60  * @author Juergen Hoeller
61  * @since 2.0
62  * @see javax.jms.MessageConsumer#setMessageListener
63  * @see DefaultMessageListenerContainer
64  * @see org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer
65  * @see SimpleMessageListenerContainer102
66  */

67 public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
68
69     private boolean pubSubNoLocal = false;
70
71     private int concurrentConsumers = 1;
72
73     private TaskExecutor taskExecutor;
74
75     private Set JavaDoc sessions;
76
77     private Set JavaDoc consumers;
78
79
80     /**
81      * Set whether to inhibit the delivery of messages published by its own connection.
82      * Default is "false".
83      * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, String, boolean)
84      */

85     public void setPubSubNoLocal(boolean pubSubNoLocal) {
86         this.pubSubNoLocal = pubSubNoLocal;
87     }
88
89     /**
90      * Return whether to inhibit the delivery of messages published by its own connection.
91      */

92     protected boolean isPubSubNoLocal() {
93         return this.pubSubNoLocal;
94     }
95
96     /**
97      * Specify the number of concurrent consumers to create. Default is 1.
98      * <p>Raising the number of concurrent consumers is recommendable in order
99      * to scale the consumption of messages coming in from a queue. However,
100      * note that any ordering guarantees are lost once multiple consumers are
101      * registered. In general, stick with 1 consumer for low-volume queues.
102      * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
103      * This would lead to concurrent consumption of the same message,
104      * which is hardly ever desirable.
105      */

106     public void setConcurrentConsumers(int concurrentConsumers) {
107         Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
108         this.concurrentConsumers = concurrentConsumers;
109     }
110
111     /**
112      * Set the Spring TaskExecutor to use for executing the listener once
113      * a message has been received by the provider.
114      * <p>Default is none, that is, to run in the JMS provider's own receive thread,
115      * blocking the provider's receive endpoint while executing the listener.
116      * <p>Specify a TaskExecutor for executing the listener in a different thread,
117      * rather than blocking the JMS provider, usually integrating with an existing
118      * thread pool. This allows to keep the number of concurrent consumers low (1)
119      * while still processing messages concurrently (decoupled from receiving!).
120      * <p><b>NOTE: Specifying a TaskExecutor for listener execution affects
121      * acknowledgement semantics.</b> Messages will then always get acknowledged
122      * before listener execution, with the underlying Session immediately reused
123      * for receiving the next message. Using this in combination with a transacted
124      * session or with client acknowledgement will lead to unspecified results!
125      * <p><b>NOTE: Concurrent listener execution via a TaskExecutor will lead
126      * to concurrent processing of messages that have been received by the same
127      * underlying Session.</b> As a consequence, it is not recommended to use
128      * this setting with a {@link SessionAwareMessageListener}, at least not
129      * if the latter performs actual work on the given Session. A standard
130      * {@link javax.jms.MessageListener} will work fine, in general.
131      * @see #setConcurrentConsumers
132      * @see org.springframework.core.task.SimpleAsyncTaskExecutor
133      * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
134      */

135     public void setTaskExecutor(TaskExecutor taskExecutor) {
136         this.taskExecutor = taskExecutor;
137     }
138
139     protected void validateConfiguration() {
140         super.validateConfiguration();
141         if (isSubscriptionDurable() && this.concurrentConsumers != 1) {
142             throw new IllegalArgumentException JavaDoc("Only 1 concurrent consumer supported for durable subscription");
143         }
144     }
145
146
147     //-------------------------------------------------------------------------
148
// Implementation of AbstractMessageListenerContainer's template methods
149
//-------------------------------------------------------------------------
150

151     /**
152      * Always use a shared JMS Connection.
153      */

154     protected final boolean sharedConnectionEnabled() {
155         return true;
156     }
157
158     /**
159      * Creates the specified number of concurrent consumers,
160      * in the form of a JMS Session plus associated MessageConsumer.
161      * @see #createListenerConsumer
162      */

163     protected void doInitialize() throws JMSException JavaDoc {
164         this.sessions = new HashSet JavaDoc(this.concurrentConsumers);
165         this.consumers = new HashSet JavaDoc(this.concurrentConsumers);
166         for (int i = 0; i < this.concurrentConsumers; i++) {
167             Session JavaDoc session = createSession(getSharedConnection());
168             MessageConsumer JavaDoc consumer = createListenerConsumer(session);
169             this.sessions.add(session);
170             this.consumers.add(consumer);
171         }
172     }
173
174     /**
175      * Create a MessageConsumer for the given JMS Session,
176      * registering a MessageListener for the specified listener.
177      * @param session the JMS Session to work on
178      * @return the MessageConsumer
179      * @throws JMSException if thrown by JMS methods
180      * @see #executeListener
181      */

182     protected MessageConsumer JavaDoc createListenerConsumer(final Session JavaDoc session) throws JMSException JavaDoc {
183         Destination JavaDoc destination = getDestination();
184         if (destination == null) {
185             destination = resolveDestinationName(session, getDestinationName());
186         }
187         MessageConsumer JavaDoc consumer = createConsumer(session, destination);
188         if (this.taskExecutor != null) {
189             consumer.setMessageListener(new MessageListener JavaDoc() {
190                 public void onMessage(final Message JavaDoc message) {
191                     taskExecutor.execute(new Runnable JavaDoc() {
192                         public void run() {
193                             executeListener(session, message);
194                         }
195                     });
196                 }
197             });
198         }
199         else {
200             consumer.setMessageListener(new MessageListener JavaDoc() {
201                 public void onMessage(Message JavaDoc message) {
202                     executeListener(session, message);
203                 }
204             });
205         }
206         return consumer;
207     }
208
209     /**
210      * Destroy the registered JMS Sessions and associated MessageConsumers.
211      */

212     protected void doShutdown() throws JMSException JavaDoc {
213         logger.debug("Closing JMS MessageConsumers");
214         for (Iterator JavaDoc it = this.consumers.iterator(); it.hasNext();) {
215             MessageConsumer JavaDoc consumer = (MessageConsumer JavaDoc) it.next();
216             JmsUtils.closeMessageConsumer(consumer);
217         }
218         logger.debug("Closing JMS Sessions");
219         for (Iterator JavaDoc it = this.sessions.iterator(); it.hasNext();) {
220             Session JavaDoc session = (Session JavaDoc) it.next();
221             JmsUtils.closeSession(session);
222         }
223     }
224
225
226     //-------------------------------------------------------------------------
227
// JMS 1.1 factory methods, potentially overridden for JMS 1.0.2
228
//-------------------------------------------------------------------------
229

230     /**
231      * Create a JMS MessageConsumer for the given Session and Destination.
232      * <p>This implementation uses JMS 1.1 API.
233      * @param session the JMS Session to create a MessageConsumer for
234      * @param destination the JMS Destination to create a MessageConsumer for
235      * @return the new JMS MessageConsumer
236      * @throws JMSException if thrown by JMS API methods
237      */

238     protected MessageConsumer JavaDoc createConsumer(Session JavaDoc session, Destination JavaDoc destination) throws JMSException JavaDoc {
239         // Only pass in the NoLocal flag in case of a Topic:
240
// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
241
// in case of the NoLocal flag being specified for a Queue.
242
if (isPubSubDomain()) {
243             if (isSubscriptionDurable() && destination instanceof Topic JavaDoc) {
244                 return session.createDurableSubscriber(
245                         (Topic JavaDoc) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
246             }
247             else {
248                 return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
249             }
250         }
251         else {
252             return session.createConsumer(destination, getMessageSelector());
253         }
254     }
255
256 }
257
Popular Tags