1 22 package org.jboss.ejb.plugins.jms; 23 24 import java.lang.reflect.Method ; 25 import java.security.AccessController ; 26 import java.security.Principal ; 27 import java.security.PrivilegedAction ; 28 import java.util.Collection ; 29 30 import javax.ejb.EJBMetaData ; 31 import javax.jms.Connection ; 32 import javax.jms.ConnectionConsumer ; 33 import javax.jms.Destination ; 34 import javax.jms.ExceptionListener ; 35 import javax.jms.JMSException ; 36 import javax.jms.Message ; 37 import javax.jms.MessageListener ; 38 import javax.jms.Queue ; 39 import javax.jms.QueueConnection ; 40 import javax.jms.ServerSessionPool ; 41 import javax.jms.Topic ; 42 import javax.jms.TopicConnection ; 43 import javax.management.MBeanServer ; 44 import javax.management.Notification ; 45 import javax.management.ObjectName ; 46 import javax.naming.Context ; 47 import javax.naming.InitialContext ; 48 import javax.naming.NamingException ; 49 import javax.transaction.Transaction ; 50 import javax.transaction.TransactionManager ; 51 52 import org.jboss.deployment.DeploymentException; 53 import org.jboss.ejb.Container; 54 import org.jboss.ejb.EJBProxyFactory; 55 import org.jboss.invocation.Invocation; 56 import org.jboss.invocation.InvocationType; 57 import org.jboss.jms.ConnectionFactoryHelper; 58 import org.jboss.jms.asf.ServerSessionPoolFactory; 59 import org.jboss.jms.asf.StdServerSessionPool; 60 import org.jboss.jms.jndi.JMSProviderAdapter; 61 import org.jboss.logging.Logger; 62 import org.jboss.metadata.ActivationConfigPropertyMetaData; 63 import org.jboss.metadata.InvokerProxyBindingMetaData; 64 import org.jboss.metadata.MessageDestinationMetaData; 65 import org.jboss.metadata.MessageDrivenMetaData; 66 import org.jboss.metadata.MetaData; 67 import org.jboss.system.ServiceMBeanSupport; 68 import org.w3c.dom.Element ; 69 70 81 public class JMSContainerInvoker extends ServiceMBeanSupport 82 implements EJBProxyFactory, JMSContainerInvokerMBean 83 { 84 85 private static final Logger log = Logger.getLogger(JMSContainerInvoker.class); 86 87 88 private static final String CONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTING"; 89 90 91 private static final String CONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTED"; 92 93 94 private static final String DISCONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTING"; 95 96 97 private static final String DISCONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTED"; 98 99 100 private static final String FAILURE_NOTIFICATION = "org.jboss.ejb.plugins.jms.FAILURE"; 101 102 103 protected static Method ON_MESSAGE; 104 105 110 protected final static String DEFAULT_DESTINATION_TYPE = "javax.jms.Topic"; 111 112 115 static 116 { 117 try 118 { 119 final Class type = MessageListener .class; 120 final Class arg = Message .class; 121 ON_MESSAGE = type.getMethod("onMessage", new Class []{arg}); 122 } 123 catch (Exception e) 124 { 125 throw new ExceptionInInitializerError (e); 126 } 127 } 128 129 protected boolean optimize; 130 131 132 protected int maxMessagesNr = 1; 133 134 135 protected int minPoolSize = 1; 136 137 138 protected long keepAlive = 30 * 1000; 139 140 141 protected int maxPoolSize = 15; 142 143 144 protected long reconnectInterval = 10000; 145 146 147 protected boolean useDLQ = false; 148 149 154 protected String providerAdapterJNDI; 155 156 161 protected String serverSessionPoolFactoryJNDI; 162 163 164 protected int acknowledgeMode; 165 166 protected boolean isContainerManagedTx; 167 protected boolean isNotSupportedTx; 168 169 170 protected Container container; 171 172 173 protected Connection connection; 174 175 176 protected ConnectionConsumer connectionConsumer; 177 178 protected TransactionManager tm; 179 protected ServerSessionPool pool; 180 protected ExceptionListenerImpl exListener; 181 182 183 protected DLQHandler dlqHandler; 184 185 186 protected Element dlqConfig; 187 188 protected InvokerProxyBindingMetaData invokerMetaData; 189 protected String invokerBinding; 190 191 protected boolean deliveryActive = true; 192 193 protected boolean createJBossMQDestination = true; 194 195 199 public void setInvokerMetaData(InvokerProxyBindingMetaData imd) 200 { 201 invokerMetaData = imd; 202 } 203 204 207 public void setInvokerBinding(String binding) 208 { 209 invokerBinding = binding; 210 } 211 212 217 public void setContainer(final Container container) 218 { 219 this.container = container; 220 } 221 222 public int getMinPoolSize() 223 { 224 return minPoolSize; 225 } 226 227 public void setMinPoolSize(int minPoolSize) 228 { 229 this.minPoolSize = minPoolSize; 230 } 231 232 public int getMaxPoolSize() 233 { 234 return maxPoolSize; 235 } 236 237 public void setMaxPoolSize(int maxPoolSize) 238 { 239 this.maxPoolSize = maxPoolSize; 240 } 241 242 public long getKeepAliveMillis() 243 { 244 return keepAlive; 245 } 246 247 public void setKeepAliveMillis(long keepAlive) 248 { 249 this.keepAlive = keepAlive; 250 } 251 252 public int getMaxMessages() 253 { 254 return maxMessagesNr; 255 } 256 257 public void setMaxMessages(int maxMessages) 258 { 259 this.maxMessagesNr = maxMessages; 260 } 261 262 public MessageDrivenMetaData getMetaData() 263 { 264 MessageDrivenMetaData config = 265 (MessageDrivenMetaData) container.getBeanMetaData(); 266 return config; 267 } 268 269 public boolean getDeliveryActive() 270 { 271 return deliveryActive; 272 } 273 274 public boolean getCreateJBossMQDestination() 275 { 276 return createJBossMQDestination; 277 } 278 279 public void startDelivery() 280 throws Exception 281 { 282 if (getState() != STARTED) 283 throw new IllegalStateException ("The MDB is not started"); 284 if (deliveryActive) 285 return; 286 deliveryActive = true; 287 startService(); 288 } 289 290 public void stopDelivery() 291 throws Exception 292 { 293 if (getState() != STARTED) 294 throw new IllegalStateException ("The MDB is not started"); 295 if (deliveryActive == false) 296 return; 297 deliveryActive = false; 298 stopService(); 299 } 300 301 306 public void setOptimized(final boolean optimize) 307 { 308 this.optimize = optimize; 309 } 310 311 public boolean isIdentical(Container container, Invocation mi) 312 { 313 throw new Error ("Not valid for MessageDriven beans"); 314 } 315 316 public Object getEJBHome() 317 { 318 throw new Error ("Not valid for MessageDriven beans"); 319 } 320 321 public EJBMetaData getEJBMetaData() 322 { 323 throw new Error ("Not valid for MessageDriven beans"); 324 } 325 326 public Collection getEntityCollection(Collection ids) 327 { 328 throw new Error ("Not valid for MessageDriven beans"); 329 } 330 331 public Object getEntityEJBObject(Object id) 332 { 333 throw new Error ("Not valid for MessageDriven beans"); 334 } 335 336 public Object getStatefulSessionEJBObject(Object id) 337 { 338 throw new Error ("Not valid for MessageDriven beans"); 339 } 340 341 public Object getStatelessSessionEJBObject() 342 { 343 throw new Error ("Not valid for MessageDriven beans"); 344 } 345 346 public boolean isOptimized() 347 { 348 return optimize; 349 } 350 351 360 public void importXml(final Element element) throws Exception 361 { 362 try 363 { 364 if ("false".equalsIgnoreCase(MetaData.getElementContent(MetaData.getUniqueChild(element, "CreateJBossMQDestination")))) 365 { 366 createJBossMQDestination = false; 367 } 368 } 369 catch (Exception ignore) 370 { 371 } 372 373 try 374 { 375 String maxMessages = MetaData.getElementContent 376 (MetaData.getUniqueChild(element, "MaxMessages")); 377 maxMessagesNr = Integer.parseInt(maxMessages); 378 } 379 catch (Exception ignore) 380 { 381 } 382 383 try 384 { 385 String minSize = MetaData.getElementContent 386 (MetaData.getUniqueChild(element, "MinimumSize")); 387 minPoolSize = Integer.parseInt(minSize); 388 } 389 catch (Exception ignore) 390 { 391 } 392 393 try 394 { 395 String maxSize = MetaData.getElementContent 396 (MetaData.getUniqueChild(element, "MaximumSize")); 397 maxPoolSize = Integer.parseInt(maxSize); 398 } 399 catch (Exception ignore) 400 { 401 } 402 403 try 404 { 405 String keepAliveMillis = MetaData.getElementContent 406 (MetaData.getUniqueChild(element, "KeepAliveMillis")); 407 keepAlive = Integer.parseInt(keepAliveMillis); 408 } 409 catch (Exception ignore) 410 { 411 } 412 413 Element mdbConfig = MetaData.getUniqueChild(element, "MDBConfig"); 414 415 try 416 { 417 String reconnect = MetaData.getElementContent 418 (MetaData.getUniqueChild(mdbConfig, "ReconnectIntervalSec")); 419 reconnectInterval = Long.parseLong(reconnect) * 1000; 420 } 421 catch (Exception ignore) 422 { 423 } 424 425 try 426 { 427 if ("false".equalsIgnoreCase(MetaData.getElementContent(MetaData.getUniqueChild(mdbConfig, "DeliveryActive")))) 428 { 429 deliveryActive = false; 430 } 431 } 432 catch (Exception ignore) 433 { 434 } 435 436 Element dlqEl = MetaData.getOptionalChild(mdbConfig, "DLQConfig"); 438 if (dlqEl != null) 439 { 440 dlqConfig = (Element ) dlqEl.cloneNode(true); 441 useDLQ = true; 442 } 443 else 444 { 445 useDLQ = false; 446 } 447 448 providerAdapterJNDI = MetaData.getElementContent 450 (MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI")); 451 452 serverSessionPoolFactoryJNDI = MetaData.getElementContent 453 (MetaData.getUniqueChild(element, "ServerSessionPoolFactoryJNDI")); 454 455 if (!providerAdapterJNDI.startsWith("java:/")) 457 { 458 providerAdapterJNDI = "java:/" + providerAdapterJNDI; 459 } 460 461 if (!serverSessionPoolFactoryJNDI.startsWith("java:/")) 462 { 463 serverSessionPoolFactoryJNDI = "java:/" + serverSessionPoolFactoryJNDI; 464 } 465 } 466 467 476 protected void createService() throws Exception 477 { 478 importXml(invokerMetaData.getProxyFactoryConfig()); 479 480 exListener = new ExceptionListenerImpl(this); 481 } 482 483 489 protected void innerStartDelivery() throws Exception 490 { 491 if (deliveryActive == false) 492 { 493 log.debug("Delivery is disabled"); 494 return; 495 } 496 497 sendNotification(CONNECTING_NOTIFICATION, null); 498 499 log.debug("Initializing"); 500 501 JMSProviderAdapter adapter = getJMSProviderAdapter(); 503 log.debug("Provider adapter: " + adapter); 504 505 if (useDLQ) 507 { 508 dlqHandler = new DLQHandler(adapter, this); 509 dlqHandler.importXml(dlqConfig); 510 dlqHandler.create(); 511 } 512 513 tm = container.getTransactionManager(); 515 516 MessageDrivenMetaData config = getMetaData(); 518 519 String messageSelector = config.getMessageSelector(); 521 String activationConfig = getActivationConfigProperty("messageSelector"); 522 if (activationConfig != null) 523 messageSelector = activationConfig; 524 525 String destinationType = config.getDestinationType(); 527 activationConfig = getActivationConfigProperty("destinationType"); 528 if (activationConfig != null) 529 destinationType = activationConfig; 530 531 isContainerManagedTx = config.isContainerManagedTx(); 533 acknowledgeMode = config.getAcknowledgeMode(); 534 activationConfig = getActivationConfigProperty("acknowledgeMode"); 535 if (activationConfig != null) 536 { 537 if (activationConfig.equals("DUPS_OK_ACKNOWLEDGE")) 538 acknowledgeMode = MessageDrivenMetaData.DUPS_OK_ACKNOWLEDGE_MODE; 539 else 540 acknowledgeMode = MessageDrivenMetaData.AUTO_ACKNOWLEDGE_MODE; 541 } 542 543 byte txType = config.getMethodTransactionType("onMessage", 544 new Class []{Message .class}, 545 InvocationType.LOCAL); 546 isNotSupportedTx = txType == MetaData.TX_NOT_SUPPORTED; 547 548 String destinationJNDI = config.getDestinationJndiName(); 550 activationConfig = getActivationConfigProperty("destination"); 551 if (activationConfig != null) 552 destinationJNDI = activationConfig; 553 if (destinationJNDI == null) 555 { 556 String link = config.getDestinationLink(); 557 if (link != null) 558 { 559 link = link.trim(); 560 if (link.length() > 0) 561 { 562 MessageDestinationMetaData destinationMetaData = container.getMessageDestination(link); 563 if (destinationMetaData == null) 564 log.warn("Unresolved message-destination-link '" + link + "' no message-destination in ejb-jar.xml"); 565 else 566 { 567 String jndiName = destinationMetaData.getJNDIName(); 568 if (jndiName == null) 569 log.warn("The message-destination '" + link + "' has no jndi-name in jboss.xml"); 570 else 571 destinationJNDI = jndiName; 572 } 573 } 574 } 575 } 576 577 String user = config.getUser(); 578 String password = config.getPasswd(); 579 580 Context context = adapter.getInitialContext(); 582 log.debug("context: " + context); 583 584 if (context == null) 586 { 587 throw new RuntimeException ("Failed to get the root context"); 588 } 589 590 String jndiSuffix = parseJndiSuffix(destinationJNDI, config.getEjbName()); 592 log.debug("jndiSuffix: " + jndiSuffix); 593 594 if (destinationType == null) 598 { 599 log.warn("No message-driven-destination given; using; guessing type"); 600 destinationType = getDestinationType(context, destinationJNDI); 601 } 602 603 if ("javax.jms.Topic".equals(destinationType)) 604 { 605 log.debug("Got destination type Topic for " + config.getEjbName()); 606 607 Object factory = context.lookup(adapter.getTopicFactoryRef()); 609 TopicConnection tConnection = null; 610 try 611 { 612 tConnection = ConnectionFactoryHelper.createTopicConnection(factory, user, password); 613 connection = tConnection; 614 } 615 catch (ClassCastException e) 616 { 617 throw new DeploymentException("Expected a TopicConnection check your provider adaptor: " 618 + adapter.getTopicFactoryRef()); 619 } 620 621 try 622 { 623 626 String clientId = config.getClientId(); 627 activationConfig = getActivationConfigProperty("clientID"); 628 if (activationConfig != null) 629 clientId = activationConfig; 630 631 log.debug("Using client id: " + clientId); 632 if (clientId != null && clientId.length() > 0) 633 connection.setClientID(clientId); 634 635 Topic topic = null; 637 try 638 { 639 if (destinationJNDI != null) 641 topic = (Topic ) context.lookup(destinationJNDI); 642 else if (createJBossMQDestination == false) 643 throw new DeploymentException("Unable to determine destination for '" + container.getBeanMetaData().getEjbName() 644 + "' use destination-jndi-name in jboss.xml, an activation config property or a message-destination-link"); 645 } 646 catch (NamingException e) 647 { 648 if (createJBossMQDestination == false) 649 throw new DeploymentException("Could not find the topic destination-jndi-name=" + destinationJNDI, e); 650 log.warn("Could not find the topic destination-jndi-name=" + destinationJNDI, e); 651 } 652 catch (ClassCastException e) 653 { 654 throw new DeploymentException("Expected a Topic destination-jndi-name=" + destinationJNDI, e); 655 } 656 657 if (topic == null) 659 topic = (Topic ) createDestination(Topic .class, 660 context, 661 "topic/" + jndiSuffix, 662 jndiSuffix); 663 664 pool = createSessionPool( 666 topic, 667 tConnection, 668 minPoolSize, 669 maxPoolSize, 670 keepAlive, 671 true, acknowledgeMode, 673 new MessageListenerImpl(this)); 674 675 int subscriptionDurablity = config.getSubscriptionDurability(); 676 activationConfig = getActivationConfigProperty("subscriptionDurability"); 677 if (activationConfig != null) 678 { 679 if (activationConfig.equals("Durable")) 680 subscriptionDurablity = MessageDrivenMetaData.DURABLE_SUBSCRIPTION; 681 else 682 subscriptionDurablity = MessageDrivenMetaData.NON_DURABLE_SUBSCRIPTION; 683 } 684 if (subscriptionDurablity != MessageDrivenMetaData.DURABLE_SUBSCRIPTION) 686 { 687 connectionConsumer = 689 tConnection.createConnectionConsumer(topic, 690 messageSelector, 691 pool, 692 maxMessagesNr); 693 } 694 else 695 { 696 String durableName = config.getSubscriptionId(); 698 activationConfig = getActivationConfigProperty("subscriptionName"); 699 if (activationConfig != null) 700 durableName = activationConfig; 701 702 connectionConsumer = 703 tConnection.createDurableConnectionConsumer(topic, 704 durableName, 705 messageSelector, 706 pool, 707 maxMessagesNr); 708 } 709 log.debug("Topic connectionConsumer set up"); 710 } 711 catch (Throwable t) 712 { 713 try 714 { 715 tConnection.close(); 716 } 717 catch (Throwable ignored) 718 { 719 } 720 DeploymentException.rethrowAsDeploymentException("Error during topic setup", t); 721 } 722 } 723 else if ("javax.jms.Queue".equals(destinationType)) 724 { 725 log.debug("Got destination type Queue for " + config.getEjbName()); 726 727 Object qFactory = context.lookup(adapter.getQueueFactoryRef()); 729 QueueConnection qConnection = null; 730 try 731 { 732 qConnection = ConnectionFactoryHelper.createQueueConnection(qFactory, user, password); 733 connection = qConnection; 734 } 735 catch (ClassCastException e) 736 { 737 throw new DeploymentException("Expected a QueueConnection check your provider adaptor: " 738 + adapter.getQueueFactoryRef()); 739 } 740 741 try 742 { 743 String clientId = config.getClientId(); 745 activationConfig = getActivationConfigProperty("clientID"); 746 if (activationConfig != null) 747 clientId = activationConfig; 748 749 log.debug("Using client id: " + clientId); 750 if (clientId != null && clientId.length() > 0) 751 connection.setClientID(clientId); 752 753 Queue queue = null; 755 try 756 { 757 if (destinationJNDI != null) 759 queue = (Queue ) context.lookup(destinationJNDI); 760 else if (createJBossMQDestination == false) 761 throw new DeploymentException("Unable to determine destination for '" + container.getBeanMetaData().getEjbName() 762 + "' use destination-jndi-name in jboss.xml, an activation config property or a message-destination-link"); 763 } 764 catch (NamingException e) 765 { 766 if (createJBossMQDestination == false) 767 throw new DeploymentException("Could not find the queue destination-jndi-name=" + destinationJNDI, e); 768 log.warn("Could not find the queue destination-jndi-name=" + destinationJNDI); 769 } 770 catch (ClassCastException e) 771 { 772 throw new DeploymentException("Expected a Queue destination-jndi-name=" + destinationJNDI); 773 } 774 775 if (queue == null) 777 queue = (Queue ) createDestination(Queue .class, 778 context, 779 "queue/" + jndiSuffix, 780 jndiSuffix); 781 782 pool = createSessionPool( 784 queue, 785 qConnection, 786 minPoolSize, 787 maxPoolSize, 788 keepAlive, 789 true, acknowledgeMode, 791 new MessageListenerImpl(this)); 792 log.debug("Server session pool: " + pool); 793 794 connectionConsumer = 796 qConnection.createConnectionConsumer(queue, 797 messageSelector, 798 pool, 799 maxMessagesNr); 800 log.debug("Connection consumer: " + connectionConsumer); 801 } 802 catch (Throwable t) 803 { 804 try 805 { 806 qConnection.close(); 807 } 808 catch (Throwable ignored) 809 { 810 } 811 DeploymentException.rethrowAsDeploymentException("Error during queue setup", t); 812 } 813 } 814 else 815 throw new DeploymentException("Unknown destination-type " + destinationType); 816 817 log.debug("Initialized with config " + toString()); 818 819 context.close(); 820 821 if (dlqHandler != null) 822 { 823 dlqHandler.start(); 824 } 825 826 if (connection != null) 827 { 828 connection.setExceptionListener(exListener); 829 connection.start(); 830 } 831 832 sendNotification(CONNECTED_NOTIFICATION, null); 833 } 834 835 protected void startService() throws Exception 836 { 837 try 838 { 839 innerStartDelivery(); 840 } 841 catch (final Throwable t) 842 { 843 exListener.handleFailure(t); 846 return; 847 } 848 finally 849 { 850 SecurityActions.clear(); 852 } 853 } 854 855 protected void stopService() throws Exception 856 { 857 if (exListener != null) 859 { 860 exListener.stop(); 861 } 862 863 innerStopDelivery(); 864 } 865 866 870 protected void innerStopDelivery() 871 { 872 log.debug("innerStop"); 873 874 sendNotification(DISCONNECTING_NOTIFICATION, null); 875 876 try 877 { 878 if (connection != null) 879 { 880 connection.setExceptionListener(null); 881 log.debug("unset exception listener"); 882 } 883 } 884 catch (Throwable t) 885 { 886 log.trace("Could not set ExceptionListener to null", t); 887 } 888 889 try 891 { 892 if (connection != null) 893 { 894 connection.stop(); 895 log.debug("connection stopped"); 896 } 897 } 898 catch (Throwable t) 899 { 900 log.trace("Could not stop JMS connection", t); 901 } 902 903 try 904 { 905 if (dlqHandler != null) 906 dlqHandler.stop(); 907 } 908 catch (Throwable t) 909 { 910 log.trace("Failed to stop the dlq handler", t); 911 } 912 913 try 915 { 916 if (connectionConsumer != null) 917 connectionConsumer.close(); 918 } 919 catch (Throwable t) 920 { 921 log.trace("Failed to close connection consumer", t); 922 } 923 connectionConsumer = null; 924 925 try 927 { 928 if (pool instanceof StdServerSessionPool) 929 { 930 StdServerSessionPool p = (StdServerSessionPool) pool; 931 p.clear(); 932 } 933 } 934 catch (Throwable t) 935 { 936 log.trace("Failed to clear session pool", t); 937 } 938 939 if (connection != null) 941 { 942 try 943 { 944 connection.close(); 945 } 946 catch (Throwable t) 947 { 948 log.trace("Failed to close connection", t); 949 } 950 } 951 connection = null; 952 953 try 955 { 956 if (dlqHandler != null) 957 { 958 dlqHandler.destroy(); 959 } 960 } 961 catch (Throwable t) 962 { 963 log.trace("Failed to close the dlq handler", t); 964 } 965 dlqHandler = null; 966 967 sendNotification(DISCONNECTED_NOTIFICATION, null); 968 } 969 970 public Object invoke(Object id, 971 Method m, 972 Object [] args, 973 Transaction tx, 974 Principal identity, 975 Object credential) 976 throws Exception 977 { 978 979 Invocation invocation = new Invocation(id, m, args, tx, identity, credential); 980 invocation.setType(InvocationType.LOCAL); 981 982 ClassLoader oldCL = TCLAction.UTIL.getContextClassLoader(); 984 TCLAction.UTIL.setContextClassLoader(container.getClassLoader()); 985 try 986 { 987 return container.invoke(invocation); 988 } 989 finally 990 { 991 TCLAction.UTIL.setContextClassLoader(oldCL); 992 } 993 } 994 995 1005 protected String getDestinationType(Context ctx, String destinationJNDI) 1006 { 1007 String destType = null; 1008 1009 if (destinationJNDI != null) 1010 { 1011 try 1012 { 1013 Destination dest = (Destination ) ctx.lookup(destinationJNDI); 1014 if (dest instanceof javax.jms.Topic ) 1015 { 1016 destType = "javax.jms.Topic"; 1017 } 1018 else if (dest instanceof javax.jms.Queue ) 1019 { 1020 destType = "javax.jms.Queue"; 1021 } 1022 } 1023 catch (NamingException ex) 1024 { 1025 log.debug("Could not do heristic lookup of destination ", ex); 1026 } 1027 1028 } 1029 if (destType == null) 1030 { 1031 log.warn("Could not determine destination type, defaults to: " + 1032 DEFAULT_DESTINATION_TYPE); 1033 1034 destType = DEFAULT_DESTINATION_TYPE; 1035 } 1036 1037 return destType; 1038 } 1039 1040 1045 protected JMSProviderAdapter getJMSProviderAdapter() throws NamingException 1046 { 1047 Context context = new InitialContext (); 1048 try 1049 { 1050 log.debug("Looking up provider adapter: " + providerAdapterJNDI); 1051 return (JMSProviderAdapter) context.lookup(providerAdapterJNDI); 1052 } 1053 finally 1054 { 1055 context.close(); 1056 } 1057 } 1058 1059 1070 protected Destination createDestination(final Class type, 1071 final Context ctx, 1072 final String jndiName, 1073 final String jndiSuffix) 1074 throws Exception 1075 { 1076 try 1077 { 1078 return (Destination ) ctx.lookup(jndiName); 1080 } 1081 catch (NamingException e) 1082 { 1083 log.warn("destination not found: " + jndiName + " reason: " + e); 1085 log.warn("creating a new temporary destination: " + jndiName); 1086 1087 1094 MBeanServer server = org.jboss.mx.util.MBeanServerLocator.locateJBoss(); 1095 1096 String methodName; 1097 if (type == Topic .class) 1098 { 1099 methodName = "createTopic"; 1100 } 1101 else if (type == Queue .class) 1102 { 1103 methodName = "createQueue"; 1104 } 1105 else 1106 { 1107 throw new IllegalArgumentException 1109 ("Expected javax.jms.Queue or javax.jms.Topic: " + type); 1110 } 1111 1112 server.invoke(new ObjectName ("jboss.mq:service=DestinationManager"), 1114 methodName, 1115 new Object []{jndiSuffix}, 1116 new String []{"java.lang.String"}); 1117 1118 return (Destination ) ctx.lookup(jndiName); 1120 } 1121 } 1122 1123 protected String getActivationConfigProperty(String property) 1124 { 1125 MessageDrivenMetaData mdmd = getMetaData(); 1126 ActivationConfigPropertyMetaData acpmd = mdmd.getActivationConfigProperty(property); 1127 if (acpmd != null) 1128 return acpmd.getValue(); 1129 else 1130 return null; 1131 } 1132 1133 1148 protected ServerSessionPool createSessionPool( 1149 final Destination destination, 1150 final Connection connection, 1151 final int minSession, 1152 final int maxSession, 1153 final long keepAlive, 1154 final boolean isTransacted, 1155 final int ack, 1156 final MessageListener listener) 1157 throws NamingException , JMSException 1158 { 1159 ServerSessionPool pool; 1160 Context context = new InitialContext (); 1161 1162 try 1163 { 1164 log.debug("looking up session pool factory: " + 1166 serverSessionPoolFactoryJNDI); 1167 ServerSessionPoolFactory factory = (ServerSessionPoolFactory) 1168 context.lookup(serverSessionPoolFactoryJNDI); 1169 1170 pool = factory.getServerSessionPool(destination, connection, minSession, maxSession, keepAlive, isTransacted, ack, !isContainerManagedTx || isNotSupportedTx, listener); 1172 } 1173 finally 1174 { 1175 context.close(); 1176 } 1177 1178 return pool; 1179 } 1180 1181 1187 protected void sendNotification(String event, Object userData) 1188 { 1189 Notification notif = new Notification (event, getServiceName(), getNextNotificationSequenceNumber()); 1190 notif.setUserData(userData); 1191 sendNotification(notif); 1192 } 1193 1194 1201 protected String parseJndiSuffix(final String jndiname, 1202 final String defautSuffix) 1203 { 1204 String jndiSuffix = ""; 1208 1209 if (jndiname != null) 1210 { 1211 int indexOfSlash = jndiname.indexOf("/"); 1212 if (indexOfSlash != -1) 1213 { 1214 jndiSuffix = jndiname.substring(indexOfSlash + 1); 1215 } 1216 else 1217 { 1218 jndiSuffix = jndiname; 1219 } 1220 } 1221 else 1222 { 1223 jndiSuffix = defautSuffix; 1225 } 1226 1227 return jndiSuffix; 1228 } 1229 1230 1234 class MessageListenerImpl implements MessageListener 1235 { 1236 1237 JMSContainerInvoker invoker; 1238 1239 1244 MessageListenerImpl(final JMSContainerInvoker invoker) 1245 { 1246 this.invoker = invoker; 1247 } 1248 1249 1254 public void onMessage(final Message message) 1255 { 1256 if (log.isTraceEnabled()) 1257 { 1258 log.trace("processing message: " + message); 1259 } 1260 1261 Object id; 1262 try 1263 { 1264 id = message.getJMSMessageID(); 1265 } 1266 catch (JMSException e) 1267 { 1268 id = "JMSContainerInvoker"; 1270 } 1271 1272 try 1274 { 1275 Transaction tx = tm.getTransaction(); 1276 1277 if (useDLQ && message.getJMSRedelivered() && dlqHandler.handleRedeliveredMessage(message, tx)) { 1282 return; 1285 } 1286 1287 invoker.invoke(id, ON_MESSAGE, new Object []{message}, tx, null, null); 1294 } 1295 catch (Exception e) 1296 { 1297 log.error("Exception in JMSCI message listener", e); 1298 } 1299 } 1300 } 1301 1302 1303 class ExceptionListenerImpl implements ExceptionListener 1304 { 1305 Object lock = new Object (); 1306 JMSContainerInvoker invoker; 1307 Thread currentThread; 1308 boolean notStopped = true; 1309 1310 1315 ExceptionListenerImpl(final JMSContainerInvoker invoker) 1316 { 1317 this.invoker = invoker; 1318 } 1319 1320 1325 public void onException(JMSException ex) 1326 { 1327 handleFailure(ex); 1328 } 1329 1330 1335 public void handleFailure(Throwable t) 1336 { 1337 MessageDrivenMetaData metaData = invoker.getMetaData(); 1338 log.warn("JMS provider failure detected for " + metaData.getEjbName(), t); 1339 1340 if (t instanceof JMSException ) 1342 { 1343 Exception le = ((JMSException )t).getLinkedException(); 1344 if (le != null) 1345 log.debug("Linked exception: " + le + ", cause: " + le.getCause()); 1346 } 1347 1348 String name = "JMSContainerInvoker("+metaData.getEjbName()+") Reconnect"; 1350 synchronized (lock) 1351 { 1352 if (currentThread != null) 1353 { 1354 log.debug("Already a reconnect thread: " + currentThread + " for " + metaData.getEjbName()); 1355 return; 1356 } 1357 Runnable runnable = new ExceptionListenerRunnable(t); 1358 currentThread = new Thread (runnable, name); 1359 try 1360 { 1361 currentThread.setDaemon(true); 1362 currentThread.start(); 1363 } 1364 catch (RuntimeException rethrow) 1365 { 1366 currentThread = null; 1367 throw rethrow; 1368 } 1369 catch (Error rethrow) 1370 { 1371 currentThread = null; 1372 throw rethrow; 1373 } 1374 } 1375 } 1376 1377 class ExceptionListenerRunnable implements Runnable 1378 { 1379 Throwable failure; 1380 1381 1386 public ExceptionListenerRunnable(Throwable failure) 1387 { 1388 this.failure = failure; 1389 } 1390 1391 1394 public void run() 1395 { 1396 MessageDrivenMetaData metaData = invoker.getMetaData(); 1397 try 1398 { 1399 boolean tryIt = true; 1400 while (tryIt && notStopped) 1401 { 1402 try 1403 { 1404 invoker.innerStopDelivery(); 1405 } 1406 catch (Throwable t) 1407 { 1408 log.error("Unhandled error stopping connection for " + metaData.getEjbName(), t); 1409 } 1410 1411 sendNotification(FAILURE_NOTIFICATION, failure); 1412 1413 try 1414 { 1415 log.info("Waiting for reconnect internal " + reconnectInterval + "ms for " + metaData.getEjbName()); 1416 try 1417 { 1418 Thread.sleep(reconnectInterval); 1419 } 1420 catch (InterruptedException ie) 1421 { 1422 tryIt = false; 1423 return; 1424 } 1425 1426 log.info("Trying to reconnect to JMS provider for " + metaData.getEjbName()); 1428 invoker.innerStartDelivery(); 1429 tryIt = false; 1430 1431 log.info("Reconnected to JMS provider for " + metaData.getEjbName()); 1432 } 1433 catch (Throwable t) 1434 { 1435 log.error("Reconnect failed: JMS provider failure detected for " + metaData.getEjbName(), t); 1436 } 1437 } 1438 } 1439 finally 1440 { 1441 synchronized (lock) 1442 { 1443 currentThread = null; 1444 } 1445 } 1446 } 1447 } 1448 1449 void stop() 1450 { 1451 synchronized (lock) 1452 { 1453 log.debug("Stop requested for recovery thread: " + currentThread); 1454 notStopped = false; 1455 if (currentThread != null) 1456 { 1457 currentThread.interrupt(); 1458 log.debug("Recovery thread interrupted: " + currentThread); 1459 } 1460 } 1461 } 1462 } 1463 1464 1467 public String toString() 1468 { 1469 MessageDrivenMetaData metaData = getMetaData(); 1470 String destinationJNDI = metaData.getDestinationJndiName(); 1471 return super.toString() + 1472 "{ maxMessagesNr=" + maxMessagesNr + 1473 ", maxPoolSize=" + maxPoolSize + 1474 ", reconnectInterval=" + reconnectInterval + 1475 ", providerAdapterJNDI=" + providerAdapterJNDI + 1476 ", serverSessionPoolFactoryJNDI=" + serverSessionPoolFactoryJNDI + 1477 ", acknowledgeMode=" + acknowledgeMode + 1478 ", isContainerManagedTx=" + isContainerManagedTx + 1479 ", isNotSupportedTx=" + isNotSupportedTx + 1480 ", useDLQ=" + useDLQ + 1481 ", dlqHandler=" + dlqHandler + 1482 ", destinationJNDI=" + destinationJNDI + 1483 " }"; 1484 } 1485 1486 interface TCLAction 1487 { 1488 class UTIL 1489 { 1490 static TCLAction getTCLAction() 1491 { 1492 return System.getSecurityManager() == null ? NON_PRIVILEGED : PRIVILEGED; 1493 } 1494 1495 static ClassLoader getContextClassLoader() 1496 { 1497 return getTCLAction().getContextClassLoader(); 1498 } 1499 1500 static ClassLoader getContextClassLoader(Thread thread) 1501 { 1502 return getTCLAction().getContextClassLoader(thread); 1503 } 1504 1505 static void setContextClassLoader(ClassLoader cl) 1506 { 1507 getTCLAction().setContextClassLoader(cl); 1508 } 1509 1510 static void setContextClassLoader(Thread thread, ClassLoader cl) 1511 { 1512 getTCLAction().setContextClassLoader(thread, cl); 1513 } 1514 } 1515 1516 TCLAction NON_PRIVILEGED = new TCLAction() 1517 { 1518 public ClassLoader getContextClassLoader() 1519 { 1520 return Thread.currentThread().getContextClassLoader(); 1521 } 1522 1523 public ClassLoader getContextClassLoader(Thread thread) 1524 { 1525 return thread.getContextClassLoader(); 1526 } 1527 1528 public void setContextClassLoader(ClassLoader cl) 1529 { 1530 Thread.currentThread().setContextClassLoader(cl); 1531 } 1532 1533 public void setContextClassLoader(Thread thread, ClassLoader cl) 1534 { 1535 thread.setContextClassLoader(cl); 1536 } 1537 }; 1538 1539 TCLAction PRIVILEGED = new TCLAction() 1540 { 1541 private final PrivilegedAction getTCLPrivilegedAction = new PrivilegedAction () 1542 { 1543 public Object run() 1544 { 1545 return Thread.currentThread().getContextClassLoader(); 1546 } 1547 }; 1548 1549 public ClassLoader getContextClassLoader() 1550 { 1551 return (ClassLoader ) AccessController.doPrivileged(getTCLPrivilegedAction); 1552 } 1553 1554 public ClassLoader getContextClassLoader(final Thread thread) 1555 { 1556 return (ClassLoader ) AccessController.doPrivileged(new PrivilegedAction () 1557 { 1558 public Object run() 1559 { 1560 return thread.getContextClassLoader(); 1561 } 1562 }); 1563 } 1564 1565 public void setContextClassLoader(final ClassLoader cl) 1566 { 1567 AccessController.doPrivileged(new PrivilegedAction () 1568 { 1569 public Object run() 1570 { 1571 Thread.currentThread().setContextClassLoader(cl); 1572 return null; 1573 } 1574 }); 1575 } 1576 1577 public void setContextClassLoader(final Thread thread, final ClassLoader cl) 1578 { 1579 AccessController.doPrivileged(new PrivilegedAction () 1580 { 1581 public Object run() 1582 { 1583 thread.setContextClassLoader(cl); 1584 return null; 1585 } 1586 }); 1587 } 1588 }; 1589 1590 ClassLoader getContextClassLoader(); 1591 1592 ClassLoader getContextClassLoader(Thread thread); 1593 1594 void setContextClassLoader(ClassLoader cl); 1595 1596 void setContextClassLoader(Thread thread, ClassLoader cl); 1597 } 1598} 1599 | Popular Tags |