1 16 17 package org.springframework.jms.listener; 18 19 import java.util.HashSet ; 20 import java.util.Iterator ; 21 import java.util.Set ; 22 23 import javax.jms.Destination ; 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageConsumer ; 27 import javax.jms.MessageListener ; 28 import javax.jms.Session ; 29 import javax.jms.Topic ; 30 31 import org.springframework.core.task.TaskExecutor; 32 import org.springframework.jms.support.JmsUtils; 33 import org.springframework.util.Assert; 34 35 67 public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer { 68 69 private boolean pubSubNoLocal = false; 70 71 private int concurrentConsumers = 1; 72 73 private TaskExecutor taskExecutor; 74 75 private Set sessions; 76 77 private Set consumers; 78 79 80 85 public void setPubSubNoLocal(boolean pubSubNoLocal) { 86 this.pubSubNoLocal = pubSubNoLocal; 87 } 88 89 92 protected boolean isPubSubNoLocal() { 93 return this.pubSubNoLocal; 94 } 95 96 106 public void setConcurrentConsumers(int concurrentConsumers) { 107 Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)"); 108 this.concurrentConsumers = concurrentConsumers; 109 } 110 111 135 public void setTaskExecutor(TaskExecutor taskExecutor) { 136 this.taskExecutor = taskExecutor; 137 } 138 139 protected void validateConfiguration() { 140 super.validateConfiguration(); 141 if (isSubscriptionDurable() && this.concurrentConsumers != 1) { 142 throw new IllegalArgumentException ("Only 1 concurrent consumer supported for durable subscription"); 143 } 144 } 145 146 147 151 154 protected final boolean sharedConnectionEnabled() { 155 return true; 156 } 157 158 163 protected void doInitialize() throws JMSException { 164 this.sessions = new HashSet (this.concurrentConsumers); 165 this.consumers = new HashSet (this.concurrentConsumers); 166 for (int i = 0; i < this.concurrentConsumers; i++) { 167 Session session = createSession(getSharedConnection()); 168 MessageConsumer consumer = createListenerConsumer(session); 169 this.sessions.add(session); 170 this.consumers.add(consumer); 171 } 172 } 173 174 182 protected MessageConsumer createListenerConsumer(final Session session) throws JMSException { 183 Destination destination = getDestination(); 184 if (destination == null) { 185 destination = resolveDestinationName(session, getDestinationName()); 186 } 187 MessageConsumer consumer = createConsumer(session, destination); 188 if (this.taskExecutor != null) { 189 consumer.setMessageListener(new MessageListener () { 190 public void onMessage(final Message message) { 191 taskExecutor.execute(new Runnable () { 192 public void run() { 193 executeListener(session, message); 194 } 195 }); 196 } 197 }); 198 } 199 else { 200 consumer.setMessageListener(new MessageListener () { 201 public void onMessage(Message message) { 202 executeListener(session, message); 203 } 204 }); 205 } 206 return consumer; 207 } 208 209 212 protected void doShutdown() throws JMSException { 213 logger.debug("Closing JMS MessageConsumers"); 214 for (Iterator it = this.consumers.iterator(); it.hasNext();) { 215 MessageConsumer consumer = (MessageConsumer ) it.next(); 216 JmsUtils.closeMessageConsumer(consumer); 217 } 218 logger.debug("Closing JMS Sessions"); 219 for (Iterator it = this.sessions.iterator(); it.hasNext();) { 220 Session session = (Session ) it.next(); 221 JmsUtils.closeSession(session); 222 } 223 } 224 225 226 230 238 protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException { 239 if (isPubSubDomain()) { 243 if (isSubscriptionDurable() && destination instanceof Topic ) { 244 return session.createDurableSubscriber( 245 (Topic ) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal()); 246 } 247 else { 248 return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal()); 249 } 250 } 251 else { 252 return session.createConsumer(destination, getMessageSelector()); 253 } 254 } 255 256 } 257 | Popular Tags |