1 10 11 package org.mule.providers.jms; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 14 15 import java.lang.reflect.InvocationTargetException ; 16 import java.util.Hashtable ; 17 import java.util.Map ; 18 19 import javax.jms.Connection ; 20 import javax.jms.ConnectionFactory ; 21 import javax.jms.ExceptionListener ; 22 import javax.jms.JMSException ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.MessageProducer ; 25 import javax.jms.Session ; 26 import javax.jms.TemporaryQueue ; 27 import javax.jms.TemporaryTopic ; 28 import javax.jms.XAConnectionFactory ; 29 import javax.naming.Context ; 30 import javax.naming.InitialContext ; 31 import javax.naming.NamingException ; 32 33 import org.apache.commons.lang.UnhandledException; 34 import org.mule.MuleManager; 35 import org.mule.MuleRuntimeException; 36 import org.mule.config.ExceptionHelper; 37 import org.mule.config.i18n.Message; 38 import org.mule.config.i18n.Messages; 39 import org.mule.impl.internal.notifications.ConnectionNotification; 40 import org.mule.impl.internal.notifications.ConnectionNotificationListener; 41 import org.mule.impl.internal.notifications.NotificationException; 42 import org.mule.providers.AbstractServiceEnabledConnector; 43 import org.mule.providers.ConnectException; 44 import org.mule.providers.ConnectionStrategy; 45 import org.mule.providers.FatalConnectException; 46 import org.mule.providers.ReplyToHandler; 47 import org.mule.providers.jms.xa.ConnectionFactoryWrapper; 48 import org.mule.transaction.TransactionCoordination; 49 import org.mule.umo.MessagingException; 50 import org.mule.umo.TransactionException; 51 import org.mule.umo.UMOComponent; 52 import org.mule.umo.UMOException; 53 import org.mule.umo.UMOTransaction; 54 import org.mule.umo.endpoint.UMOEndpoint; 55 import org.mule.umo.endpoint.UMOImmutableEndpoint; 56 import org.mule.umo.lifecycle.InitialisationException; 57 import org.mule.umo.lifecycle.LifecycleException; 58 import org.mule.umo.manager.UMOServerNotification; 59 import org.mule.umo.provider.UMOMessageAdapter; 60 import org.mule.util.BeanUtils; 61 import org.mule.util.ClassUtils; 62 63 68 69 public class JmsConnector extends AbstractServiceEnabledConnector implements ConnectionNotificationListener 70 { 71 72 static 73 { 74 ExceptionHelper.registerExceptionReader(new JmsExceptionReader()); 75 } 76 77 private String connectionFactoryJndiName; 78 79 private ConnectionFactory connectionFactory; 80 81 private String connectionFactoryClass; 82 83 private String jndiInitialFactory; 84 85 private String jndiProviderUrl; 86 87 private int acknowledgementMode = Session.AUTO_ACKNOWLEDGE; 88 89 private String clientId; 90 91 private boolean durable; 92 93 private boolean noLocal; 94 95 private boolean persistentDelivery; 96 97 private Map jndiProviderProperties; 98 99 private Map connectionFactoryProperties; 100 101 private Connection connection; 102 103 private String specification = JmsConstants.JMS_SPECIFICATION_102B; 104 105 private JmsSupport jmsSupport; 106 107 private Context jndiContext; 108 109 private boolean jndiDestinations = false; 110 111 private boolean forceJndiDestinations = false; 112 113 public String username = null; 114 115 public String password = null; 116 117 private int maxRedelivery = 0; 118 119 private String redeliveryHandler = DefaultRedeliveryHandler.class.getName(); 120 121 private boolean cacheJmsSessions = false; 122 123 private boolean recoverJmsConnections = true; 124 125 public JmsConnector() 126 { 127 receivers = new ConcurrentHashMap(); 128 } 129 130 135 public void doInitialise() throws InitialisationException 136 { 137 if (dispatchers == null) 140 { 141 dispatchers = new ConcurrentHashMap(); 142 } 143 if (receivers == null) 144 { 145 receivers = new ConcurrentHashMap(); 146 } 147 148 super.doInitialise(); 149 try 150 { 151 MuleManager.getInstance().registerListener(this, getName()); 152 } 153 catch (NotificationException nex) 154 { 155 throw new InitialisationException(nex, this); 156 } 157 } 158 159 protected void initJndiContext() throws NamingException , InitialisationException 160 { 161 if (jndiContext == null) 162 { 163 Hashtable props = new Hashtable (); 164 165 if (jndiInitialFactory != null) 166 { 167 props.put(Context.INITIAL_CONTEXT_FACTORY, jndiInitialFactory); 168 } 169 else if (jndiProviderProperties == null 170 || !jndiProviderProperties.containsKey(Context.INITIAL_CONTEXT_FACTORY)) 171 { 172 throw new InitialisationException(new Message(Messages.X_IS_NULL, "jndiInitialFactory"), this); 173 } 174 175 if (jndiProviderUrl != null) 176 { 177 props.put(Context.PROVIDER_URL, jndiProviderUrl); 178 } 179 180 if (jndiProviderProperties != null) 181 { 182 props.putAll(jndiProviderProperties); 183 } 184 jndiContext = new InitialContext (props); 185 } 186 } 187 188 protected void setConnection(Connection connection) 189 { 190 this.connection = connection; 191 } 192 193 protected ConnectionFactory createConnectionFactory() throws InitialisationException, NamingException 194 { 195 196 Object temp = jndiContext.lookup(connectionFactoryJndiName); 197 198 if (temp instanceof ConnectionFactory ) 199 { 200 return (ConnectionFactory )temp; 201 } 202 else 203 { 204 throw new InitialisationException(new Message(Messages.JNDI_RESOURCE_X_NOT_FOUND, 205 connectionFactoryJndiName), this); 206 } 207 } 208 209 protected Connection createConnection() throws NamingException , JMSException , InitialisationException 210 { 211 Connection connection; 212 if (connectionFactory == null) 213 { 214 connectionFactory = createConnectionFactory(); 215 } 216 if (connectionFactory != null && connectionFactory instanceof XAConnectionFactory ) 217 { 218 if (MuleManager.getInstance().getTransactionManager() != null) 219 { 220 connectionFactory = new ConnectionFactoryWrapper(connectionFactory, MuleManager.getInstance() 221 .getTransactionManager()); 222 } 223 } 224 225 if (username != null) 226 { 227 connection = jmsSupport.createConnection(connectionFactory, username, password); 228 } 229 else 230 { 231 connection = jmsSupport.createConnection(connectionFactory); 232 } 233 234 if (clientId != null) 235 { 236 connection.setClientID(getClientId()); 237 } 238 239 242 final ConnectionStrategy connectionStrategy = getConnectionStrategy(); 243 if (recoverJmsConnections && connectionStrategy != null && connection != null) 244 { 245 connection.setExceptionListener(new ExceptionListener () 246 { 247 public void onException(JMSException jmsException) 248 { 249 logger.debug("About to recycle myself due to remote JMS connection shutdown."); 250 final JmsConnector jmsConnector = JmsConnector.this; 251 try 252 { 253 jmsConnector.stopConnector(); 254 jmsConnector.initialised.set(false); 255 } 256 catch (UMOException e) 257 { 258 logger.warn(e.getMessage(), e); 259 } 260 261 try 262 { 263 connectionStrategy.connect(jmsConnector); 264 jmsConnector.initialise(); 265 jmsConnector.startConnector(); 266 } 267 catch (FatalConnectException fcex) 268 { 269 logger.fatal("Failed to reconnect to JMS server. I'm giving up."); 270 } 271 catch (UMOException umoex) 272 { 273 throw new UnhandledException("Failed to recover a connector.", umoex); 274 } 275 } 276 }); 277 } 278 279 return connection; 280 } 281 282 public void doConnect() throws ConnectException 283 { 284 try 285 { 286 if (connectionFactoryClass != null) 292 { 293 connectionFactory = (ConnectionFactory )ClassUtils.instanciateClass(connectionFactoryClass, 294 ClassUtils.NO_ARGS); 295 } 296 297 if (connectionFactory == null || jndiInitialFactory != null) 300 { 301 initJndiContext(); 302 } 303 else 304 { 305 jndiDestinations = false; 308 forceJndiDestinations = false; 309 } 310 311 if (jmsSupport == null) 312 { 313 if (JmsConstants.JMS_SPECIFICATION_102B.equals(specification)) 314 { 315 jmsSupport = new Jms102bSupport(this, jndiContext, jndiDestinations, 316 forceJndiDestinations); 317 } 318 else 319 { 320 jmsSupport = new Jms11Support(this, jndiContext, jndiDestinations, forceJndiDestinations); 321 } 322 } 323 if (connectionFactory == null) 324 { 325 connectionFactory = createConnectionFactory(); 326 } 327 if (connectionFactoryProperties != null && !connectionFactoryProperties.isEmpty()) 328 { 329 BeanUtils.populateWithoutFail(connectionFactory, connectionFactoryProperties, true); 331 } 332 } 333 catch (Exception e) 334 { 335 throw new ConnectException(new Message(Messages.FAILED_TO_CREATE_X, "Jms Connector"), e, this); 336 } 337 338 try 339 { 340 connection = createConnection(); 341 if (started.get()) 342 { 343 connection.start(); 344 } 345 } 346 catch (Exception e) 347 { 348 throw new ConnectException(e, this); 349 } 350 } 351 352 public void doDisconnect() throws ConnectException 353 { 354 try 355 { 356 if (connection != null) 357 { 358 connection.close(); 359 } 360 } 361 catch (Exception e) 362 { 363 throw new ConnectException(e, this); 364 } 365 finally 366 { 367 connection = null; 369 } 370 } 371 372 public UMOMessageAdapter getMessageAdapter(Object message) throws MessagingException 373 { 374 JmsMessageAdapter adapter = (JmsMessageAdapter)super.getMessageAdapter(message); 375 adapter.setSpecification(this.getSpecification()); 376 return adapter; 377 } 378 379 protected Object getReceiverKey(UMOComponent component, UMOEndpoint endpoint) 380 { 381 return component.getDescriptor().getName() + "~" + endpoint.getEndpointURI().getAddress(); 382 } 383 384 389 public Object getSessionFactory(UMOEndpoint endpoint) 390 { 391 if (endpoint.getTransactionConfig() != null 392 && endpoint.getTransactionConfig().getFactory() instanceof JmsClientAcknowledgeTransactionFactory) 393 { 394 throw new MuleRuntimeException(new org.mule.config.i18n.Message("jms", 9)); 395 } 396 else 397 { 398 return connection; 399 } 400 } 401 402 public Session getSessionFromTransaction() 403 { 404 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 405 if (tx != null) 406 { 407 if (tx.hasResource(connection)) 408 { 409 return (Session )tx.getResource(connection); 410 } 411 } 412 return null; 413 } 414 415 public Session getSession(boolean transacted, boolean topic) throws JMSException 416 { 417 if (!isConnected()) 418 { 419 throw new JMSException ("Not connected"); 420 } 421 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 422 Session session = getSessionFromTransaction(); 423 if (session != null) 424 { 425 logger.debug("Retrieving jms session from current transaction"); 426 return session; 427 } 428 if (logger.isDebugEnabled()) 429 { 430 logger.debug("Retrieving new jms session from connection: topic=" + topic + ", transacted=" 431 + (transacted || tx != null) + ", ack mode=" + acknowledgementMode + ", nolocal=" 432 + noLocal); 433 } 434 435 session = jmsSupport.createSession(connection, topic, transacted || tx != null, acknowledgementMode, 436 noLocal); 437 if (tx != null) 438 { 439 logger.debug("Binding session to current transaction"); 440 try 441 { 442 tx.bindResource(connection, session); 443 } 444 catch (TransactionException e) 445 { 446 throw new RuntimeException ("Could not bind session to current transaction", e); 447 } 448 } 449 return session; 450 } 451 452 public void doStart() throws UMOException 453 { 454 if (connection != null) 455 { 456 try 457 { 458 connection.start(); 459 } 460 catch (JMSException e) 461 { 462 throw new LifecycleException(new Message(Messages.FAILED_TO_START_X, "Jms Connection"), e); 463 } 464 } 465 } 466 467 472 public String getProtocol() 473 { 474 return "jms"; 475 } 476 477 482 protected void doDispose() 483 { 484 super.doDispose(); 485 if (connection != null) 486 { 487 try 488 { 489 connection.close(); 490 } 491 catch (JMSException e) 492 { 493 logger.error("Jms connector failed to dispose properly: ", e); 494 } 495 connection = null; 496 } 497 if (jndiContext != null) 498 { 499 try 500 { 501 jndiContext.close(); 502 } 503 catch (NamingException e) 504 { 505 logger.error("Jms connector failed to dispose properly: ", e); 506 } 507 jndiContext = null; 509 } 510 } 511 512 515 public int getAcknowledgementMode() 516 { 517 return acknowledgementMode; 518 } 519 520 523 public void setAcknowledgementMode(int acknowledgementMode) 524 { 525 this.acknowledgementMode = acknowledgementMode; 526 } 527 528 531 public String getConnectionFactoryJndiName() 532 { 533 return connectionFactoryJndiName; 534 } 535 536 539 public void setConnectionFactoryJndiName(String connectionFactoryJndiName) 540 { 541 this.connectionFactoryJndiName = connectionFactoryJndiName; 542 } 543 544 547 public boolean isDurable() 548 { 549 return durable; 550 } 551 552 555 public void setDurable(boolean durable) 556 { 557 this.durable = durable; 558 } 559 560 563 public boolean isNoLocal() 564 { 565 return noLocal; 566 } 567 568 571 public void setNoLocal(boolean noLocal) 572 { 573 this.noLocal = noLocal; 574 } 575 576 579 public boolean isPersistentDelivery() 580 { 581 return persistentDelivery; 582 } 583 584 587 public void setPersistentDelivery(boolean persistentDelivery) 588 { 589 this.persistentDelivery = persistentDelivery; 590 } 591 592 596 public Map getJndiProviderProperties() 597 { 598 return jndiProviderProperties; 599 } 600 601 605 public void setJndiProviderProperties(final Map jndiProviderProperties) 606 { 607 this.jndiProviderProperties = jndiProviderProperties; 608 } 609 610 613 public Map getConnectionFactoryProperties() 614 { 615 return connectionFactoryProperties; 616 } 617 618 622 public void setConnectionFactoryProperties(final Map connectionFactoryProperties) 623 { 624 this.connectionFactoryProperties = connectionFactoryProperties; 625 } 626 627 public String getJndiInitialFactory() 628 { 629 return jndiInitialFactory; 630 } 631 632 public void setJndiInitialFactory(String jndiInitialFactory) 633 { 634 this.jndiInitialFactory = jndiInitialFactory; 635 } 636 637 public String getJndiProviderUrl() 638 { 639 return jndiProviderUrl; 640 } 641 642 public void setJndiProviderUrl(String jndiProviderUrl) 643 { 644 this.jndiProviderUrl = jndiProviderUrl; 645 } 646 647 public Session getSession(UMOImmutableEndpoint endpoint) throws Exception 648 { 649 String resourceInfo = endpoint.getEndpointURI().getResourceInfo(); 650 boolean topic = (resourceInfo != null && JmsConstants.TOPIC_PROPERTY.equalsIgnoreCase(resourceInfo)); 651 return getSession(endpoint.getTransactionConfig().isTransacted(), topic); 652 } 653 654 public ConnectionFactory getConnectionFactory() 655 { 656 return connectionFactory; 657 } 658 659 public void setConnectionFactory(ConnectionFactory connectionFactory) 660 { 661 this.connectionFactory = connectionFactory; 662 } 663 664 public String getConnectionFactoryClass() 665 { 666 return connectionFactoryClass; 667 } 668 669 public void setConnectionFactoryClass(String connectionFactoryClass) 670 { 671 this.connectionFactoryClass = connectionFactoryClass; 672 } 673 674 public JmsSupport getJmsSupport() 675 { 676 return jmsSupport; 677 } 678 679 public void setJmsSupport(JmsSupport jmsSupport) 680 { 681 this.jmsSupport = jmsSupport; 682 } 683 684 public String getSpecification() 685 { 686 return specification; 687 } 688 689 public void setSpecification(String specification) 690 { 691 this.specification = specification; 692 } 693 694 public boolean isJndiDestinations() 695 { 696 return jndiDestinations; 697 } 698 699 public void setJndiDestinations(boolean jndiDestinations) 700 { 701 this.jndiDestinations = jndiDestinations; 702 } 703 704 public boolean isForceJndiDestinations() 705 { 706 return forceJndiDestinations; 707 } 708 709 public void setForceJndiDestinations(boolean forceJndiDestinations) 710 { 711 this.forceJndiDestinations = forceJndiDestinations; 712 } 713 714 public Context getJndiContext() 715 { 716 return jndiContext; 717 } 718 719 public void setJndiContext(Context jndiContext) 720 { 721 this.jndiContext = jndiContext; 722 } 723 724 public void setRecoverJmsConnections(boolean recover) 725 { 726 this.recoverJmsConnections = recover; 727 } 728 729 public boolean isRecoverJmsConnections() 730 { 731 return this.recoverJmsConnections; 732 } 733 734 protected RedeliveryHandler createRedeliveryHandler() 735 throws IllegalAccessException , NoSuchMethodException , InvocationTargetException , 736 InstantiationException , ClassNotFoundException 737 { 738 if (redeliveryHandler != null) 739 { 740 return (RedeliveryHandler)ClassUtils.instanciateClass(redeliveryHandler, ClassUtils.NO_ARGS); 741 } 742 else 743 { 744 return new DefaultRedeliveryHandler(); 745 } 746 } 747 748 public ReplyToHandler getReplyToHandler() 749 { 750 return new JmsReplyToHandler(this, defaultResponseTransformer); 751 } 752 753 public String getUsername() 754 { 755 return username; 756 } 757 758 public void setUsername(String username) 759 { 760 this.username = username; 761 } 762 763 public String getPassword() 764 { 765 return password; 766 } 767 768 public void setPassword(String password) 769 { 770 this.password = password; 771 } 772 773 776 public Connection getConnection() 777 { 778 return connection; 779 } 780 781 public String getClientId() 782 { 783 return clientId; 784 } 785 786 public void setClientId(String clientId) 787 { 788 this.clientId = clientId; 789 } 790 791 public int getMaxRedelivery() 792 { 793 return maxRedelivery; 794 } 795 796 public void setMaxRedelivery(int maxRedelivery) 797 { 798 this.maxRedelivery = maxRedelivery; 799 } 800 801 public String getRedeliveryHandler() 802 { 803 return redeliveryHandler; 804 } 805 806 public void setRedeliveryHandler(String redeliveryHandler) 807 { 808 this.redeliveryHandler = redeliveryHandler; 809 } 810 811 public boolean isRemoteSyncEnabled() 812 { 813 return true; 814 } 815 816 public void onNotification(UMOServerNotification notification) 817 { 818 if (notification.getAction() == ConnectionNotification.CONNECTION_DISCONNECTED 819 || notification.getAction() == ConnectionNotification.CONNECTION_FAILED) 820 { 821 disposeDispatchers(); 823 } 829 } 830 831 public boolean isCacheJmsSessions() 832 { 833 return cacheJmsSessions; 834 } 835 836 public void setCacheJmsSessions(boolean cacheJmsSessions) 837 { 838 this.cacheJmsSessions = cacheJmsSessions; 839 } 840 841 845 public boolean supportsProperty(String property) 846 { 847 return true; 848 } 849 850 858 public javax.jms.Message preProcessMessage(javax.jms.Message message, Session session) throws Exception 859 { 860 return message; 861 } 862 863 869 public void close(MessageProducer producer) throws JMSException 870 { 871 if (producer != null) 872 { 873 producer.close(); 874 } 875 } 876 877 883 public void closeQuietly(MessageProducer producer) 884 { 885 try 886 { 887 close(producer); 888 } 889 catch (JMSException e) 890 { 891 logger.error("Failed to close jms message producer", e); 892 } 893 } 894 895 901 public void close(MessageConsumer consumer) throws JMSException 902 { 903 if (consumer != null) 904 { 905 consumer.close(); 906 } 907 } 908 909 915 public void closeQuietly(MessageConsumer consumer) 916 { 917 try 918 { 919 close(consumer); 920 } 921 catch (JMSException e) 922 { 923 logger.error("Failed to close jms message consumer", e); 924 } 925 } 926 927 933 public void close(Session session) throws JMSException 934 { 935 if (session != null) 936 { 937 session.close(); 938 } 939 } 940 941 947 public void closeQuietly(Session session) 948 { 949 try 950 { 951 close(session); 952 } 953 catch (JMSException e) 954 { 955 logger.error("Failed to close jms session consumer", e); 956 } 957 } 958 959 965 public void close(TemporaryQueue tempQueue) throws JMSException 966 { 967 if (tempQueue != null) 968 { 969 tempQueue.delete(); 970 } 971 } 972 973 979 public void closeQuietly(TemporaryQueue tempQueue) 980 { 981 try 982 { 983 close(tempQueue); 984 } 985 catch (JMSException e) 986 { 987 if (logger.isErrorEnabled()) 988 { 989 String queueName = ""; 990 try 991 { 992 queueName = tempQueue.getQueueName(); 993 } 994 catch (JMSException innerEx) 995 { 996 } 998 logger.error("Faled to delete a temporary queue " + queueName, e); 999 } 1000 } 1001 } 1002 1003 1009 public void close(TemporaryTopic tempTopic) throws JMSException 1010 { 1011 if (tempTopic != null) 1012 { 1013 tempTopic.delete(); 1014 } 1015 } 1016 1017 1023 public void closeQuietly(TemporaryTopic tempTopic) 1024 { 1025 try 1026 { 1027 close(tempTopic); 1028 } 1029 catch (JMSException e) 1030 { 1031 if (logger.isErrorEnabled()) 1032 { 1033 String topicName = ""; 1034 try 1035 { 1036 topicName = tempTopic.getTopicName(); 1037 } 1038 catch (JMSException innerEx) 1039 { 1040 } 1042 logger.error("Faled to delete a temporary topic " + topicName, e); 1043 } 1044 } 1045 } 1046} 1047 | Popular Tags |