1 16 17 package org.springframework.jms.listener; 18 19 import javax.jms.Connection ; 20 import javax.jms.Destination ; 21 import javax.jms.JMSException ; 22 import javax.jms.Message ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.Session ; 25 import javax.jms.Topic ; 26 27 import org.springframework.beans.factory.BeanNameAware; 28 import org.springframework.jms.connection.ConnectionFactoryUtils; 29 import org.springframework.jms.connection.JmsResourceHolder; 30 import org.springframework.jms.support.JmsUtils; 31 import org.springframework.transaction.PlatformTransactionManager; 32 import org.springframework.transaction.TransactionStatus; 33 import org.springframework.transaction.support.DefaultTransactionDefinition; 34 import org.springframework.transaction.support.ResourceTransactionManager; 35 36 75 public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer 76 implements BeanNameAware { 77 78 81 public static final long DEFAULT_RECEIVE_TIMEOUT = 1000; 82 83 84 private final MessageListenerContainerResourceFactory transactionalResourceFactory = 85 new MessageListenerContainerResourceFactory(); 86 87 private boolean sessionTransactedCalled = false; 88 89 private boolean pubSubNoLocal = false; 90 91 private PlatformTransactionManager transactionManager; 92 93 private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition(); 94 95 private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT; 96 97 98 public void setSessionTransacted(boolean sessionTransacted) { 99 super.setSessionTransacted(sessionTransacted); 100 this.sessionTransactedCalled = true; 101 } 102 103 108 public void setPubSubNoLocal(boolean pubSubNoLocal) { 109 this.pubSubNoLocal = pubSubNoLocal; 110 } 111 112 115 protected boolean isPubSubNoLocal() { 116 return this.pubSubNoLocal; 117 } 118 119 139 public void setTransactionManager(PlatformTransactionManager transactionManager) { 140 this.transactionManager = transactionManager; 141 } 142 143 147 protected final PlatformTransactionManager getTransactionManager() { 148 return this.transactionManager; 149 } 150 151 156 public void setTransactionName(String transactionName) { 157 this.transactionDefinition.setName(transactionName); 158 } 159 160 166 public void setTransactionTimeout(int transactionTimeout) { 167 this.transactionDefinition.setTimeout(transactionTimeout); 168 } 169 170 181 public void setReceiveTimeout(long receiveTimeout) { 182 this.receiveTimeout = receiveTimeout; 183 } 184 185 186 public void initialize() { 187 if (!this.sessionTransactedCalled && this.transactionManager instanceof ResourceTransactionManager && 189 ((ResourceTransactionManager) this.transactionManager).getResourceFactory() != getConnectionFactory()) { 190 super.setSessionTransacted(true); 191 } 192 193 if (this.transactionDefinition.getName() == null) { 195 this.transactionDefinition.setName(getBeanName()); 196 } 197 198 super.initialize(); 200 } 201 202 203 211 protected MessageConsumer createListenerConsumer(Session session) throws JMSException { 212 Destination destination = getDestination(); 213 if (destination == null) { 214 destination = resolveDestinationName(session, getDestinationName()); 215 } 216 return createConsumer(session, destination); 217 } 218 219 228 protected boolean receiveAndExecute(Session session, MessageConsumer consumer) throws JMSException { 229 if (this.transactionManager != null) { 230 TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition); 232 boolean messageReceived = true; 233 try { 234 messageReceived = doReceiveAndExecute(session, consumer, status); 235 } 236 catch (JMSException ex) { 237 rollbackOnException(status, ex); 238 throw ex; 239 } 240 catch (RuntimeException ex) { 241 rollbackOnException(status, ex); 242 throw ex; 243 } 244 catch (Error err) { 245 rollbackOnException(status, err); 246 throw err; 247 } 248 this.transactionManager.commit(status); 249 return messageReceived; 250 } 251 252 else { 253 return doReceiveAndExecute(session, consumer, null); 255 } 256 } 257 258 268 protected boolean doReceiveAndExecute(Session session, MessageConsumer consumer, TransactionStatus status) 269 throws JMSException { 270 271 Connection conToClose = null; 272 Session sessionToClose = null; 273 MessageConsumer consumerToClose = null; 274 try { 275 Session sessionToUse = session; 276 boolean transactional = false; 277 if (sessionToUse == null) { 278 sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 279 getConnectionFactory(), this.transactionalResourceFactory); 280 transactional = (sessionToUse != null); 281 } 282 if (sessionToUse == null) { 283 Connection conToUse = null; 284 if (sharedConnectionEnabled()) { 285 conToUse = getSharedConnection(); 286 } 287 else { 288 conToUse = createConnection(); 289 conToClose = conToUse; 290 conToUse.start(); 291 } 292 sessionToUse = createSession(conToUse); 293 sessionToClose = sessionToUse; 294 } 295 MessageConsumer consumerToUse = consumer; 296 if (consumerToUse == null) { 297 consumerToUse = createListenerConsumer(sessionToUse); 298 consumerToClose = consumerToUse; 299 } 300 Message message = receiveMessage(consumerToUse); 301 if (message != null) { 302 if (logger.isDebugEnabled()) { 303 logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + 304 consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + 305 sessionToUse + "]"); 306 } 307 messageReceived(message, session); 308 try { 309 doExecuteListener(sessionToUse, message); 310 } 311 catch (Throwable ex) { 312 if (status != null) { 313 if (logger.isDebugEnabled()) { 314 logger.debug("Rolling back transaction because of listener exception thrown: " + ex); 315 } 316 status.setRollbackOnly(); 317 } 318 handleListenerException(ex); 319 } 320 return true; 321 } 322 else { 323 if (logger.isDebugEnabled()) { 324 logger.debug("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + 325 "session [" + sessionToUse + "] did not receive a message"); 326 } 327 return false; 328 } 329 } 330 finally { 331 JmsUtils.closeMessageConsumer(consumerToClose); 332 JmsUtils.closeSession(sessionToClose); 333 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true); 334 } 335 } 336 337 343 protected boolean isSessionLocallyTransacted(Session session) { 344 return super.isSessionLocallyTransacted(session) && 345 !ConnectionFactoryUtils.isSessionTransactional(session, getConnectionFactory()); 346 } 347 348 353 private void rollbackOnException(TransactionStatus status, Throwable ex) { 354 logger.debug("Initiating transaction rollback on application exception", ex); 355 try { 356 this.transactionManager.rollback(status); 357 } 358 catch (RuntimeException ex2) { 359 logger.error("Application exception overridden by rollback exception", ex); 360 throw ex2; 361 } 362 catch (Error err) { 363 logger.error("Application exception overridden by rollback error", ex); 364 throw err; 365 } 366 } 367 368 374 protected Message receiveMessage(MessageConsumer consumer) throws JMSException { 375 return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout)); 376 } 377 378 385 protected void messageReceived(Message message, Session session) { 386 } 387 388 389 393 400 protected Connection getConnection(JmsResourceHolder holder) { 401 return holder.getConnection(); 402 } 403 404 411 protected Session getSession(JmsResourceHolder holder) { 412 return holder.getSession(); 413 } 414 415 423 protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException { 424 if (isPubSubDomain()) { 428 if (isSubscriptionDurable() && destination instanceof Topic ) { 429 return session.createDurableSubscriber( 430 (Topic ) destination, getDurableSubscriptionName(), getMessageSelector(), isPubSubNoLocal()); 431 } 432 else { 433 return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal()); 434 } 435 } 436 else { 437 return session.createConsumer(destination, getMessageSelector()); 438 } 439 } 440 441 442 445 private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory { 446 447 public Connection getConnection(JmsResourceHolder holder) { 448 return AbstractPollingMessageListenerContainer.this.getConnection(holder); 449 } 450 451 public Session getSession(JmsResourceHolder holder) { 452 return AbstractPollingMessageListenerContainer.this.getSession(holder); 453 } 454 455 public Connection createConnection() throws JMSException { 456 if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) { 457 return AbstractPollingMessageListenerContainer.this.getSharedConnection(); 458 } 459 else { 460 return AbstractPollingMessageListenerContainer.this.createConnection(); 461 } 462 } 463 464 public Session createSession(Connection con) throws JMSException { 465 return AbstractPollingMessageListenerContainer.this.createSession(con); 466 } 467 468 public boolean isSynchedLocalTransactionAllowed() { 469 return AbstractPollingMessageListenerContainer.this.isSessionTransacted(); 470 } 471 } 472 473 } 474 | Popular Tags |