1 16 17 package org.springframework.jms.listener; 18 19 import java.util.HashSet ; 20 import java.util.Iterator ; 21 import java.util.Set ; 22 23 import javax.jms.Connection ; 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageConsumer ; 27 import javax.jms.Session ; 28 29 import org.springframework.core.Constants; 30 import org.springframework.core.task.SimpleAsyncTaskExecutor; 31 import org.springframework.core.task.TaskExecutor; 32 import org.springframework.jms.support.JmsUtils; 33 import org.springframework.jms.support.destination.CachingDestinationResolver; 34 import org.springframework.jms.support.destination.DestinationResolver; 35 import org.springframework.scheduling.SchedulingAwareRunnable; 36 import org.springframework.scheduling.SchedulingTaskExecutor; 37 import org.springframework.util.Assert; 38 import org.springframework.util.ClassUtils; 39 40 117 public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer { 118 119 122 public static final String DEFAULT_THREAD_NAME_PREFIX = 123 ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-"; 124 125 128 public static final long DEFAULT_RECOVERY_INTERVAL = 5000; 129 130 131 135 public static final int CACHE_NONE = 0; 136 137 141 public static final int CACHE_CONNECTION = 1; 142 143 148 public static final int CACHE_SESSION = 2; 149 150 156 public static final int CACHE_CONSUMER = 3; 157 158 159 private static final Constants constants = new Constants(DefaultMessageListenerContainer.class); 160 161 162 private TaskExecutor taskExecutor; 163 164 private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL; 165 166 private Integer cacheLevel; 167 168 private int concurrentConsumers = 1; 169 170 private int maxConcurrentConsumers = 1; 171 172 private int maxMessagesPerTask = Integer.MIN_VALUE; 173 174 private int idleTaskExecutionLimit = 1; 175 176 private final Set scheduledInvokers = new HashSet (); 177 178 private int activeInvokerCount = 0; 179 180 private final Object activeInvokerMonitor = new Object (); 181 182 private Object currentRecoveryMarker = new Object (); 183 184 private final Object recoveryMonitor = new Object (); 185 186 187 201 public void setTaskExecutor(TaskExecutor taskExecutor) { 202 this.taskExecutor = taskExecutor; 203 } 204 205 210 public void setRecoveryInterval(long recoveryInterval) { 211 this.recoveryInterval = recoveryInterval; 212 } 213 214 219 public void setCacheLevelName(String constantName) throws IllegalArgumentException { 220 if (constantName == null || !constantName.startsWith("CACHE_")) { 221 throw new IllegalArgumentException ("Only cache constants allowed"); 222 } 223 setCacheLevel(constants.asNumber(constantName).intValue()); 224 } 225 226 247 public void setCacheLevel(int cacheLevel) { 248 this.cacheLevel = new Integer (cacheLevel); 249 } 250 251 254 public int getCacheLevel() { 255 return (this.cacheLevel != null ? this.cacheLevel.intValue() : CACHE_NONE); 256 } 257 258 259 276 public void setConcurrentConsumers(int concurrentConsumers) { 277 Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)"); 278 synchronized (this.activeInvokerMonitor) { 279 this.concurrentConsumers = concurrentConsumers; 280 if (this.maxConcurrentConsumers < concurrentConsumers) { 281 this.maxConcurrentConsumers = concurrentConsumers; 282 } 283 } 284 } 285 286 293 public final int getConcurrentConsumers() { 294 synchronized (this.activeInvokerMonitor) { 295 return this.concurrentConsumers; 296 } 297 } 298 299 315 public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { 316 Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)"); 317 synchronized (this.activeInvokerMonitor) { 318 this.maxConcurrentConsumers = 319 (maxConcurrentConsumers > this.concurrentConsumers ? maxConcurrentConsumers : this.concurrentConsumers); 320 } 321 } 322 323 330 public final int getMaxConcurrentConsumers() { 331 synchronized (this.activeInvokerMonitor) { 332 return this.maxConcurrentConsumers; 333 } 334 } 335 336 354 public void setMaxMessagesPerTask(int maxMessagesPerTask) { 355 Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0"); 356 synchronized (this.activeInvokerMonitor) { 357 this.maxMessagesPerTask = maxMessagesPerTask; 358 } 359 } 360 361 364 public int getMaxMessagesPerTask() { 365 synchronized (this.activeInvokerMonitor) { 366 return this.maxMessagesPerTask; 367 } 368 } 369 370 393 public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { 394 Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher"); 395 synchronized (this.activeInvokerMonitor) { 396 this.idleTaskExecutionLimit = idleTaskExecutionLimit; 397 } 398 } 399 400 403 public int getIdleTaskExecutionLimit() { 404 synchronized (this.activeInvokerMonitor) { 405 return this.idleTaskExecutionLimit; 406 } 407 } 408 409 protected void validateConfiguration() { 410 super.validateConfiguration(); 411 synchronized (this.activeInvokerMonitor) { 412 if (isSubscriptionDurable() && this.concurrentConsumers != 1) { 413 throw new IllegalArgumentException ("Only 1 concurrent consumer supported for durable subscription"); 414 } 415 } 416 } 417 418 419 423 public void initialize() { 424 if (getTransactionManager() != null) { 426 if (this.cacheLevel == null) { 427 this.cacheLevel = new Integer (CACHE_NONE); 428 } 429 } 430 else { 431 if (this.cacheLevel == null) { 432 this.cacheLevel = new Integer (CACHE_CONSUMER); 433 } 434 } 435 436 synchronized (this.activeInvokerMonitor) { 438 if (this.taskExecutor == null) { 439 this.taskExecutor = createDefaultTaskExecutor(); 440 } 441 else if (this.taskExecutor instanceof SchedulingTaskExecutor && 442 ((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() && 443 this.maxMessagesPerTask == Integer.MIN_VALUE) { 444 this.maxMessagesPerTask = 1; 448 } 449 } 450 451 super.initialize(); 453 } 454 455 461 protected TaskExecutor createDefaultTaskExecutor() { 462 String beanName = getBeanName(); 463 String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX); 464 return new SimpleAsyncTaskExecutor(threadNamePrefix); 465 } 466 467 472 protected final boolean sharedConnectionEnabled() { 473 return (getCacheLevel() >= CACHE_CONNECTION); 474 } 475 476 483 protected void doInitialize() throws JMSException { 484 synchronized (this.activeInvokerMonitor) { 485 for (int i = 0; i < this.concurrentConsumers; i++) { 486 scheduleNewInvoker(); 487 } 488 } 489 } 490 491 495 protected void doRescheduleTask(Object task) { 496 this.taskExecutor.execute((Runnable ) task); 497 } 498 499 protected void messageReceived(Message message, Session session) { 500 scheduleNewInvokerIfAppropriate(); 501 } 502 503 514 protected void scheduleNewInvokerIfAppropriate() { 515 if (isRunning()) { 516 synchronized (this.activeInvokerMonitor) { 517 if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && !hasIdleInvokers()) { 518 scheduleNewInvoker(); 519 if (logger.isDebugEnabled()) { 520 logger.debug("Raised scheduled invoker count: " + scheduledInvokers.size()); 521 } 522 } 523 } 524 } 525 } 526 527 531 private void scheduleNewInvoker() { 532 AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker(); 533 this.taskExecutor.execute(invoker); 534 this.scheduledInvokers.add(invoker); 535 this.activeInvokerMonitor.notifyAll(); 536 } 537 538 542 private boolean hasIdleInvokers() { 543 for (Iterator it = this.scheduledInvokers.iterator(); it.hasNext();) { 544 AsyncMessageListenerInvoker invoker = (AsyncMessageListenerInvoker) it.next(); 545 if (invoker.isIdle()) { 546 return true; 547 } 548 } 549 return false; 550 } 551 552 558 private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) { 559 synchronized (this.activeInvokerMonitor) { 560 boolean idle = (idleTaskExecutionCount >= this.idleTaskExecutionLimit); 561 return (this.scheduledInvokers.size() <= (idle ? this.concurrentConsumers : this.maxConcurrentConsumers)); 562 } 563 } 564 565 574 public final int getScheduledConsumerCount() { 575 synchronized (this.activeInvokerMonitor) { 576 return this.scheduledInvokers.size(); 577 } 578 } 579 580 589 public final int getActiveConsumerCount() { 590 synchronized (this.activeInvokerMonitor) { 591 return this.activeInvokerCount; 592 } 593 } 594 595 596 601 protected void establishSharedConnection() { 602 try { 603 refreshSharedConnection(); 604 } 605 catch (JMSException ex) { 606 logger.debug("Could not establish shared JMS Connection - " + 607 "leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex); 608 } 609 } 610 611 616 protected void startSharedConnection() { 617 try { 618 super.startSharedConnection(); 619 } 620 catch (JMSException ex) { 621 logger.debug("Connection start failed - relying on listeners to perform recovery", ex); 622 } 623 } 624 625 630 protected void stopSharedConnection() { 631 try { 632 super.stopSharedConnection(); 633 } 634 catch (JMSException ex) { 635 logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex); 636 } 637 } 638 639 652 protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) { 653 if (ex instanceof JMSException ) { 654 invokeExceptionListener((JMSException ) ex); 655 } 656 if (ex instanceof SharedConnectionNotInitializedException) { 657 if (!alreadyRecovered) { 658 logger.debug("JMS message listener invoker needs to establish shared Connection"); 659 } 660 } 661 else { 662 if (alreadyRecovered) { 663 logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex); 664 } 665 else { 666 logger.error("Setup of JMS message listener invoker failed - trying to recover", ex); 667 } 668 } 669 } 670 671 681 protected void recoverAfterListenerSetupFailure() { 682 refreshConnectionUntilSuccessful(); 683 refreshDestination(); 684 } 685 686 696 protected void refreshConnectionUntilSuccessful() { 697 while (isActive()) { 698 try { 699 if (sharedConnectionEnabled()) { 700 refreshSharedConnection(); 701 if (isRunning()) { 702 startSharedConnection(); 703 } 704 } 705 else { 706 Connection con = createConnection(); 707 JmsUtils.closeConnection(con); 708 } 709 logger.info("Successfully refreshed JMS Connection"); 710 break; 711 } 712 catch (Exception ex) { 713 if (logger.isInfoEnabled()) { 714 logger.info("Could not refresh JMS Connection - retrying in " + this.recoveryInterval + " ms", ex); 715 } 716 } 717 sleepInbetweenRecoveryAttempts(); 718 } 719 } 720 721 730 protected void refreshDestination() { 731 String destName = getDestinationName(); 732 if (destName != null) { 733 DestinationResolver destResolver = getDestinationResolver(); 734 if (destResolver instanceof CachingDestinationResolver) { 735 ((CachingDestinationResolver) destResolver).removeFromCache(destName); 736 } 737 } 738 } 739 740 744 protected void sleepInbetweenRecoveryAttempts() { 745 if (this.recoveryInterval > 0) { 746 try { 747 Thread.sleep(this.recoveryInterval); 748 } 749 catch (InterruptedException interEx) { 750 Thread.currentThread().interrupt(); 752 } 753 } 754 } 755 756 757 760 protected void doShutdown() throws JMSException { 761 logger.debug("Waiting for shutdown of message listener invokers"); 762 synchronized (this.activeInvokerMonitor) { 763 while (this.activeInvokerCount > 0) { 764 if (logger.isDebugEnabled()) { 765 logger.debug("Still waiting for shutdown of " + this.activeInvokerCount + 766 " message listener invokers"); 767 } 768 try { 769 this.activeInvokerMonitor.wait(); 770 } 771 catch (InterruptedException interEx) { 772 Thread.currentThread().interrupt(); 774 } 775 } 776 } 777 } 778 779 780 784 787 private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable { 788 789 private Session session; 790 791 private MessageConsumer consumer; 792 793 private Object lastRecoveryMarker; 794 795 private boolean lastMessageSucceeded; 796 797 private int idleTaskExecutionCount = 0; 798 799 private volatile boolean idle = true; 800 801 public void run() { 802 synchronized (activeInvokerMonitor) { 803 activeInvokerCount++; 804 activeInvokerMonitor.notifyAll(); 805 } 806 boolean messageReceived = false; 807 try { 808 if (maxMessagesPerTask < 0) { 809 while (isActive()) { 810 waitWhileNotRunning(); 811 if (isActive()) { 812 messageReceived = invokeListener(); 813 } 814 } 815 } 816 else { 817 int messageCount = 0; 818 while (isRunning() && messageCount < maxMessagesPerTask) { 819 messageReceived = (invokeListener() || messageReceived); 820 messageCount++; 821 } 822 } 823 } 824 catch (Throwable ex) { 825 clearResources(); 826 if (!this.lastMessageSucceeded) { 827 sleepInbetweenRecoveryAttempts(); 830 } 831 this.lastMessageSucceeded = false; 832 boolean alreadyRecovered = false; 833 synchronized (recoveryMonitor) { 834 if (this.lastRecoveryMarker == currentRecoveryMarker) { 835 handleListenerSetupFailure(ex, false); 836 recoverAfterListenerSetupFailure(); 837 currentRecoveryMarker = new Object (); 838 } 839 else { 840 alreadyRecovered = true; 841 } 842 } 843 if (alreadyRecovered) { 844 handleListenerSetupFailure(ex, true); 845 } 846 } 847 synchronized (activeInvokerMonitor) { 848 activeInvokerCount--; 849 activeInvokerMonitor.notifyAll(); 850 } 851 if (!messageReceived) { 852 this.idleTaskExecutionCount++; 853 } 854 else { 855 this.idleTaskExecutionCount = 0; 856 } 857 if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { 858 synchronized (activeInvokerMonitor) { 860 scheduledInvokers.remove(this); 861 if (logger.isDebugEnabled()) { 862 logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); 863 } 864 activeInvokerMonitor.notifyAll(); 865 } 866 clearResources(); 867 } 868 } 869 870 private boolean invokeListener() throws JMSException { 871 initResourcesIfNecessary(); 872 boolean messageReceived = receiveAndExecute(this.session, this.consumer); 873 this.lastMessageSucceeded = true; 874 this.idle = !messageReceived; 875 return messageReceived; 876 } 877 878 private void initResourcesIfNecessary() throws JMSException { 879 if (getCacheLevel() <= CACHE_CONNECTION) { 880 updateRecoveryMarker(); 881 } 882 else { 883 if (this.session == null && getCacheLevel() >= CACHE_SESSION) { 884 updateRecoveryMarker(); 885 this.session = createSession(getSharedConnection()); 886 } 887 if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) { 888 this.consumer = createListenerConsumer(this.session); 889 } 890 } 891 } 892 893 private void updateRecoveryMarker() { 894 synchronized (recoveryMonitor) { 895 this.lastRecoveryMarker = currentRecoveryMarker; 896 } 897 } 898 899 private void clearResources() { 900 JmsUtils.closeMessageConsumer(this.consumer); 901 JmsUtils.closeSession(this.session); 902 this.consumer = null; 903 this.session = null; 904 } 905 906 public boolean isLongLived() { 907 return (maxMessagesPerTask < 0); 908 } 909 910 public boolean isIdle() { 911 return this.idle; 912 } 913 } 914 915 } 916 | Popular Tags |