1 16 17 package org.springframework.jms.listener; 18 19 import javax.jms.Connection ; 20 import javax.jms.Destination ; 21 import javax.jms.ExceptionListener ; 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageListener ; 25 import javax.jms.Queue ; 26 import javax.jms.Session ; 27 import javax.jms.Topic ; 28 29 import org.springframework.jms.support.JmsUtils; 30 import org.springframework.util.Assert; 31 32 128 public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer { 129 130 private Object destination; 131 132 private String messageSelector; 133 134 private Object messageListener; 135 136 private boolean subscriptionDurable = false; 137 138 private String durableSubscriptionName; 139 140 private ExceptionListener exceptionListener; 141 142 private boolean exposeListenerSession = true; 143 144 private boolean acceptMessagesWhileStopping = false; 145 146 147 153 public void setDestination(Destination destination) { 154 Assert.notNull(destination, "'destination' must not be null"); 155 this.destination = destination; 156 if (destination instanceof Topic && !(destination instanceof Queue )) { 157 setPubSubDomain(true); 159 } 160 } 161 162 167 protected Destination getDestination() { 168 return (this.destination instanceof Destination ? (Destination ) this.destination : null); 169 } 170 171 179 public void setDestinationName(String destinationName) { 180 Assert.notNull(destinationName, "'destinationName' must not be null"); 181 this.destination = destinationName; 182 } 183 184 190 protected String getDestinationName() { 191 return (this.destination instanceof String ? (String ) this.destination : null); 192 } 193 194 199 public void setMessageSelector(String messageSelector) { 200 this.messageSelector = messageSelector; 201 } 202 203 206 protected String getMessageSelector() { 207 return this.messageSelector; 208 } 209 210 211 220 public void setMessageListener(Object messageListener) { 221 checkMessageListener(messageListener); 222 this.messageListener = messageListener; 223 if (this.durableSubscriptionName == null) { 224 this.durableSubscriptionName = messageListener.getClass().getName(); 226 } 227 } 228 229 240 protected void checkMessageListener(Object messageListener) { 241 if (!(messageListener instanceof MessageListener || 242 messageListener instanceof SessionAwareMessageListener)) { 243 throw new IllegalArgumentException ( 244 "messageListener needs to be of type [" + MessageListener .class.getName() + 245 "] or [" + SessionAwareMessageListener.class.getName() + "]"); 246 } 247 } 248 249 252 protected Object getMessageListener() { 253 return this.messageListener; 254 } 255 256 265 public void setSubscriptionDurable(boolean subscriptionDurable) { 266 this.subscriptionDurable = subscriptionDurable; 267 } 268 269 272 protected boolean isSubscriptionDurable() { 273 return this.subscriptionDurable; 274 } 275 276 287 public void setDurableSubscriptionName(String durableSubscriptionName) { 288 Assert.notNull(durableSubscriptionName, "'durableSubscriptionName' must not be null"); 289 this.durableSubscriptionName = durableSubscriptionName; 290 } 291 292 295 protected String getDurableSubscriptionName() { 296 return this.durableSubscriptionName; 297 } 298 299 303 public void setExceptionListener(ExceptionListener exceptionListener) { 304 this.exceptionListener = exceptionListener; 305 } 306 307 311 protected ExceptionListener getExceptionListener() { 312 return this.exceptionListener; 313 } 314 315 324 public void setExposeListenerSession(boolean exposeListenerSession) { 325 this.exposeListenerSession = exposeListenerSession; 326 } 327 328 332 protected boolean isExposeListenerSession() { 333 return this.exposeListenerSession; 334 } 335 336 350 public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { 351 this.acceptMessagesWhileStopping = acceptMessagesWhileStopping; 352 } 353 354 358 protected boolean isAcceptMessagesWhileStopping() { 359 return this.acceptMessagesWhileStopping; 360 } 361 362 protected void validateConfiguration() { 363 if (this.destination == null) { 364 throw new IllegalArgumentException ("Property 'destination' or 'destinationName' is required"); 365 } 366 if (this.messageListener == null) { 367 throw new IllegalArgumentException ("Property 'messageListener' is required"); 368 } 369 if (isSubscriptionDurable() && !isPubSubDomain()) { 370 throw new IllegalArgumentException ("A durable subscription requires a topic (pub-sub domain)"); 371 } 372 } 373 374 375 379 389 protected void executeListener(Session session, Message message) { 390 try { 391 doExecuteListener(session, message); 392 } 393 catch (Throwable ex) { 394 handleListenerException(ex); 395 } 396 } 397 398 409 protected void doExecuteListener(Session session, Message message) throws JMSException { 410 if (!isAcceptMessagesWhileStopping() && !isRunning()) { 411 if (logger.isWarnEnabled()) { 412 logger.warn("Rejecting received message because of the listener container " + 413 "having been stopped in the meantime: " + message); 414 } 415 rollbackIfNecessary(session); 416 throw new MessageRejectedWhileStoppingException(); 417 } 418 try { 419 invokeListener(session, message); 420 } 421 catch (JMSException ex) { 422 rollbackOnExceptionIfNecessary(session, ex); 423 throw ex; 424 } 425 catch (RuntimeException ex) { 426 rollbackOnExceptionIfNecessary(session, ex); 427 throw ex; 428 } 429 catch (Error err) { 430 rollbackOnExceptionIfNecessary(session, err); 431 throw err; 432 } 433 commitIfNecessary(session, message); 434 } 435 436 444 protected void invokeListener(Session session, Message message) throws JMSException { 445 if (getMessageListener() instanceof SessionAwareMessageListener) { 446 doInvokeListener((SessionAwareMessageListener) getMessageListener(), session, message); 447 } 448 else if (getMessageListener() instanceof MessageListener ) { 449 doInvokeListener((MessageListener ) getMessageListener(), message); 450 } 451 else { 452 throw new IllegalArgumentException ("Only MessageListener and SessionAwareMessageListener supported"); 453 } 454 } 455 456 467 protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message) 468 throws JMSException { 469 470 Connection conToClose = null; 471 Session sessionToClose = null; 472 try { 473 Session sessionToUse = session; 474 if (!isExposeListenerSession()) { 475 conToClose = createConnection(); 477 sessionToClose = createSession(conToClose); 478 sessionToUse = sessionToClose; 479 } 480 if (logger.isDebugEnabled()) { 482 logger.debug("Invoking listener with message of type [" + message.getClass() + 483 "] and session [" + sessionToUse + "]"); 484 } 485 listener.onMessage(message, sessionToUse); 486 if (sessionToUse != session) { 488 if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) { 489 JmsUtils.commitIfNecessary(sessionToUse); 491 } 492 } 493 } 494 finally { 495 JmsUtils.closeSession(sessionToClose); 496 JmsUtils.closeConnection(conToClose); 497 } 498 } 499 500 509 protected void doInvokeListener(MessageListener listener, Message message) throws JMSException { 510 listener.onMessage(message); 511 } 512 513 519 protected void commitIfNecessary(Session session, Message message) throws JMSException { 520 if (session.getTransacted()) { 522 if (isSessionLocallyTransacted(session)) { 524 JmsUtils.commitIfNecessary(session); 526 } 527 } 528 else if (isClientAcknowledge(session)) { 529 message.acknowledge(); 530 } 531 } 532 533 538 protected void rollbackIfNecessary(Session session) throws JMSException { 539 if (session.getTransacted() && isSessionLocallyTransacted(session)) { 540 JmsUtils.rollbackIfNecessary(session); 542 } 543 } 544 545 551 protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) throws JMSException { 552 try { 553 if (session.getTransacted() && isSessionLocallyTransacted(session)) { 554 if (logger.isDebugEnabled()) { 556 logger.debug("Initiating transaction rollback on application exception", ex); 557 } 558 JmsUtils.rollbackIfNecessary(session); 559 } 560 } 561 catch (IllegalStateException ex2) { 562 logger.debug("Could not roll back because Session already closed", ex2); 563 } 564 catch (JMSException ex2) { 565 logger.error("Application exception overridden by rollback exception", ex); 566 throw ex2; 567 } 568 catch (RuntimeException ex2) { 569 logger.error("Application exception overridden by rollback exception", ex); 570 throw ex2; 571 } 572 catch (Error err) { 573 logger.error("Application exception overridden by rollback error", ex); 574 throw err; 575 } 576 } 577 578 590 protected boolean isSessionLocallyTransacted(Session session) { 591 return isSessionTransacted(); 592 } 593 594 602 protected void handleListenerException(Throwable ex) { 603 if (ex instanceof MessageRejectedWhileStoppingException) { 604 return; 606 } 607 if (ex instanceof JMSException ) { 608 invokeExceptionListener((JMSException ) ex); 609 } 610 if (isActive()) { 611 logger.warn("Execution of JMS message listener failed", ex); 614 } 615 else { 616 logger.debug("Listener exception after container shutdown", ex); 619 } 620 } 621 622 627 protected void invokeExceptionListener(JMSException ex) { 628 ExceptionListener exceptionListener = getExceptionListener(); 629 if (exceptionListener != null) { 630 exceptionListener.onException(ex); 631 } 632 } 633 634 635 639 private static class MessageRejectedWhileStoppingException extends RuntimeException { 640 641 } 642 643 } 644 | Popular Tags |