1 16 17 package org.springframework.jms.core; 18 19 import javax.jms.Connection ; 20 import javax.jms.ConnectionFactory ; 21 import javax.jms.DeliveryMode ; 22 import javax.jms.Destination ; 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.MessageConsumer ; 26 import javax.jms.MessageProducer ; 27 import javax.jms.Session ; 28 29 import org.springframework.jms.JmsException; 30 import org.springframework.jms.connection.ConnectionFactoryUtils; 31 import org.springframework.jms.connection.JmsResourceHolder; 32 import org.springframework.jms.support.JmsUtils; 33 import org.springframework.jms.support.converter.MessageConverter; 34 import org.springframework.jms.support.converter.SimpleMessageConverter; 35 import org.springframework.jms.support.destination.JmsDestinationAccessor; 36 import org.springframework.transaction.support.TransactionSynchronizationManager; 37 import org.springframework.util.Assert; 38 39 76 public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations { 77 78 82 public static final long DEFAULT_RECEIVE_TIMEOUT = -1; 83 84 85 private final JmsTemplateResourceFactory transactionalResourceFactory = 86 new JmsTemplateResourceFactory(); 87 88 89 private Object defaultDestination; 90 91 private MessageConverter messageConverter; 92 93 94 private boolean messageIdEnabled = true; 95 96 private boolean messageTimestampEnabled = true; 97 98 private boolean pubSubNoLocal = false; 99 100 private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT; 101 102 103 private boolean explicitQosEnabled = false; 104 105 private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; 106 107 private int priority = Message.DEFAULT_PRIORITY; 108 109 private long timeToLive = Message.DEFAULT_TIME_TO_LIVE; 110 111 112 119 public JmsTemplate() { 120 initDefaultStrategies(); 121 } 122 123 127 public JmsTemplate(ConnectionFactory connectionFactory) { 128 this(); 129 setConnectionFactory(connectionFactory); 130 afterPropertiesSet(); 131 } 132 133 141 protected void initDefaultStrategies() { 142 setMessageConverter(new SimpleMessageConverter()); 143 } 144 145 146 156 public void setDefaultDestination(Destination destination) { 157 this.defaultDestination = destination; 158 } 159 160 164 public Destination getDefaultDestination() { 165 return (this.defaultDestination instanceof Destination ? (Destination ) this.defaultDestination : null); 166 } 167 168 179 public void setDefaultDestinationName(String destinationName) { 180 this.defaultDestination = destinationName; 181 } 182 183 187 public String getDefaultDestinationName() { 188 return (this.defaultDestination instanceof String ? (String ) this.defaultDestination : null); 189 } 190 191 201 public void setMessageConverter(MessageConverter messageConverter) { 202 this.messageConverter = messageConverter; 203 } 204 205 208 public MessageConverter getMessageConverter() { 209 return this.messageConverter; 210 } 211 212 213 219 public void setMessageIdEnabled(boolean messageIdEnabled) { 220 this.messageIdEnabled = messageIdEnabled; 221 } 222 223 226 public boolean isMessageIdEnabled() { 227 return this.messageIdEnabled; 228 } 229 230 236 public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { 237 this.messageTimestampEnabled = messageTimestampEnabled; 238 } 239 240 243 public boolean isMessageTimestampEnabled() { 244 return this.messageTimestampEnabled; 245 } 246 247 252 public void setPubSubNoLocal(boolean pubSubNoLocal) { 253 this.pubSubNoLocal = pubSubNoLocal; 254 } 255 256 259 public boolean isPubSubNoLocal() { 260 return this.pubSubNoLocal; 261 } 262 263 269 public void setReceiveTimeout(long receiveTimeout) { 270 this.receiveTimeout = receiveTimeout; 271 } 272 273 276 public long getReceiveTimeout() { 277 return this.receiveTimeout; 278 } 279 280 281 288 public void setExplicitQosEnabled(boolean explicitQosEnabled) { 289 this.explicitQosEnabled = explicitQosEnabled; 290 } 291 292 302 public boolean isExplicitQosEnabled() { 303 return this.explicitQosEnabled; 304 } 305 306 315 public void setDeliveryPersistent(boolean deliveryPersistent) { 316 this.deliveryMode = (deliveryPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 317 } 318 319 331 public void setDeliveryMode(int deliveryMode) { 332 this.deliveryMode = deliveryMode; 333 } 334 335 338 public int getDeliveryMode() { 339 return this.deliveryMode; 340 } 341 342 350 public void setPriority(int priority) { 351 this.priority = priority; 352 } 353 354 357 public int getPriority() { 358 return this.priority; 359 } 360 361 370 public void setTimeToLive(long timeToLive) { 371 this.timeToLive = timeToLive; 372 } 373 374 377 public long getTimeToLive() { 378 return this.timeToLive; 379 } 380 381 382 private void checkDefaultDestination() throws IllegalStateException { 383 if (this.defaultDestination == null) { 384 throw new IllegalStateException ( 385 "No defaultDestination or defaultDestinationName specified. Check configuration of JmsTemplate."); 386 } 387 } 388 389 private void checkMessageConverter() throws IllegalStateException { 390 if (getMessageConverter() == null) { 391 throw new IllegalStateException ("No messageConverter registered. Check configuration of JmsTemplate."); 392 } 393 } 394 395 396 409 public Object execute(SessionCallback action, boolean startConnection) throws JmsException { 410 Assert.notNull(action, "Callback object must not be null"); 411 412 Connection conToClose = null; 413 Session sessionToClose = null; 414 try { 415 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 416 getConnectionFactory(), this.transactionalResourceFactory); 417 if (sessionToUse == null) { 418 conToClose = createConnection(); 419 sessionToClose = createSession(conToClose); 420 if (startConnection) { 421 conToClose.start(); 422 } 423 sessionToUse = sessionToClose; 424 } 425 if (logger.isDebugEnabled()) { 426 logger.debug("Executing callback on JMS Session [" + sessionToUse + "]"); 427 } 428 return action.doInJms(sessionToUse); 429 } 430 catch (JMSException ex) { 431 throw convertJmsAccessException(ex); 432 } 433 finally { 434 JmsUtils.closeSession(sessionToClose); 435 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); 436 } 437 } 438 439 public Object execute(SessionCallback action) throws JmsException { 440 return execute(action, false); 441 } 442 443 public Object execute(final ProducerCallback action) throws JmsException { 444 Assert.notNull(action, "Callback object must not be null"); 445 446 return execute(new SessionCallback() { 447 public Object doInJms(Session session) throws JMSException { 448 MessageProducer producer = createProducer(session, null); 449 try { 450 return action.doInJms(session, producer); 451 } 452 finally { 453 JmsUtils.closeMessageProducer(producer); 454 } 455 } 456 }, false); 457 } 458 459 460 464 public void send(MessageCreator messageCreator) throws JmsException { 465 checkDefaultDestination(); 466 if (getDefaultDestination() != null) { 467 send(getDefaultDestination(), messageCreator); 468 } 469 else { 470 send(getDefaultDestinationName(), messageCreator); 471 } 472 } 473 474 public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException { 475 execute(new SessionCallback() { 476 public Object doInJms(Session session) throws JMSException { 477 doSend(session, destination, messageCreator); 478 return null; 479 } 480 }, false); 481 } 482 483 public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException { 484 execute(new SessionCallback() { 485 public Object doInJms(Session session) throws JMSException { 486 Destination destination = resolveDestinationName(session, destinationName); 487 doSend(session, destination, messageCreator); 488 return null; 489 } 490 }, false); 491 } 492 493 500 protected void doSend(Session session, Destination destination, MessageCreator messageCreator) 501 throws JMSException { 502 503 Assert.notNull(messageCreator, "MessageCreator must not be null"); 504 505 MessageProducer producer = createProducer(session, destination); 506 try { 507 Message message = messageCreator.createMessage(session); 508 if (logger.isDebugEnabled()) { 509 logger.debug("Sending created message [" + message + "]"); 510 } 511 doSend(producer, message); 512 if (session.getTransacted() && isSessionLocallyTransacted(session)) { 514 JmsUtils.commitIfNecessary(session); 516 } 517 } 518 finally { 519 JmsUtils.closeMessageProducer(producer); 520 } 521 } 522 523 529 protected void doSend(MessageProducer producer, Message message) throws JMSException { 530 if (isExplicitQosEnabled()) { 531 producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); 532 } 533 else { 534 producer.send(message); 535 } 536 } 537 538 539 543 public void convertAndSend(Object message) throws JmsException { 544 checkDefaultDestination(); 545 if (getDefaultDestination() != null) { 546 convertAndSend(getDefaultDestination(), message); 547 } 548 else { 549 convertAndSend(getDefaultDestinationName(), message); 550 } 551 } 552 553 public void convertAndSend(Destination destination, final Object message) throws JmsException { 554 checkMessageConverter(); 555 send(destination, new MessageCreator() { 556 public Message createMessage(Session session) throws JMSException { 557 return getMessageConverter().toMessage(message, session); 558 } 559 }); 560 } 561 562 public void convertAndSend(String destinationName, final Object message) throws JmsException { 563 checkMessageConverter(); 564 send(destinationName, new MessageCreator() { 565 public Message createMessage(Session session) throws JMSException { 566 return getMessageConverter().toMessage(message, session); 567 } 568 }); 569 } 570 571 public void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException { 572 checkDefaultDestination(); 573 if (getDefaultDestination() != null) { 574 convertAndSend(getDefaultDestination(), message, postProcessor); 575 } 576 else { 577 convertAndSend(getDefaultDestinationName(), message, postProcessor); 578 } 579 } 580 581 public void convertAndSend( 582 Destination destination, final Object message, final MessagePostProcessor postProcessor) 583 throws JmsException { 584 585 checkMessageConverter(); 586 send(destination, new MessageCreator() { 587 public Message createMessage(Session session) throws JMSException { 588 Message msg = getMessageConverter().toMessage(message, session); 589 return postProcessor.postProcessMessage(msg); 590 } 591 }); 592 } 593 594 public void convertAndSend( 595 String destinationName, final Object message, final MessagePostProcessor postProcessor) 596 throws JmsException { 597 598 checkMessageConverter(); 599 send(destinationName, new MessageCreator() { 600 public Message createMessage(Session session) throws JMSException { 601 Message msg = getMessageConverter().toMessage(message, session); 602 return postProcessor.postProcessMessage(msg); 603 } 604 }); 605 } 606 607 608 612 public Message receive() throws JmsException { 613 checkDefaultDestination(); 614 if (getDefaultDestination() != null) { 615 return receive(getDefaultDestination()); 616 } 617 else { 618 return receive(getDefaultDestinationName()); 619 } 620 } 621 622 public Message receive(final Destination destination) throws JmsException { 623 return (Message ) execute(new SessionCallback() { 624 public Object doInJms(Session session) throws JMSException { 625 return doReceive(session, destination, null); 626 } 627 }, true); 628 } 629 630 public Message receive(final String destinationName) throws JmsException { 631 return (Message ) execute(new SessionCallback() { 632 public Object doInJms(Session session) throws JMSException { 633 Destination destination = resolveDestinationName(session, destinationName); 634 return doReceive(session, destination, null); 635 } 636 }, true); 637 } 638 639 public Message receiveSelected(String messageSelector) throws JmsException { 640 checkDefaultDestination(); 641 if (getDefaultDestination() != null) { 642 return receiveSelected(getDefaultDestination(), messageSelector); 643 } 644 else { 645 return receiveSelected(getDefaultDestinationName(), messageSelector); 646 } 647 } 648 649 public Message receiveSelected(final Destination destination, final String messageSelector) throws JmsException { 650 return (Message ) execute(new SessionCallback() { 651 public Object doInJms(Session session) throws JMSException { 652 return doReceive(session, destination, messageSelector); 653 } 654 }, true); 655 } 656 657 public Message receiveSelected(final String destinationName, final String messageSelector) throws JmsException { 658 return (Message ) execute(new SessionCallback() { 659 public Object doInJms(Session session) throws JMSException { 660 Destination destination = resolveDestinationName(session, destinationName); 661 return doReceive(session, destination, messageSelector); 662 } 663 }, true); 664 } 665 666 673 protected Message doReceive(Session session, Destination destination, String messageSelector) 674 throws JMSException { 675 676 return doReceive(session, createConsumer(session, destination, messageSelector)); 677 } 678 679 686 protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { 687 try { 688 long timeout = getReceiveTimeout(); 690 JmsResourceHolder resourceHolder = 691 (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory()); 692 if (resourceHolder != null && resourceHolder.hasTimeout()) { 693 timeout = resourceHolder.getTimeToLiveInMillis(); 694 } 695 Message message = (timeout >= 0) ? 696 consumer.receive(timeout) : consumer.receive(); 697 if (session.getTransacted()) { 698 if (isSessionLocallyTransacted(session)) { 700 JmsUtils.commitIfNecessary(session); 702 } 703 } 704 else if (isClientAcknowledge(session)) { 705 if (message != null) { 707 message.acknowledge(); 708 } 709 } 710 return message; 711 } 712 finally { 713 JmsUtils.closeMessageConsumer(consumer); 714 } 715 } 716 717 718 722 public Object receiveAndConvert() throws JmsException { 723 checkMessageConverter(); 724 return doConvertFromMessage(receive()); 725 } 726 727 public Object receiveAndConvert(Destination destination) throws JmsException { 728 checkMessageConverter(); 729 return doConvertFromMessage(receive(destination)); 730 } 731 732 public Object receiveAndConvert(String destinationName) throws JmsException { 733 checkMessageConverter(); 734 return doConvertFromMessage(receive(destinationName)); 735 } 736 737 public Object receiveSelectedAndConvert(String messageSelector) throws JmsException { 738 checkMessageConverter(); 739 return doConvertFromMessage(receiveSelected(messageSelector)); 740 } 741 742 public Object receiveSelectedAndConvert(Destination destination, String messageSelector) throws JmsException { 743 checkMessageConverter(); 744 return doConvertFromMessage(receiveSelected(destination, messageSelector)); 745 } 746 747 public Object receiveSelectedAndConvert(String destinationName, String messageSelector) throws JmsException { 748 checkMessageConverter(); 749 return doConvertFromMessage(receiveSelected(destinationName, messageSelector)); 750 } 751 752 757 protected Object doConvertFromMessage(Message message) { 758 if (message != null) { 759 try { 760 return getMessageConverter().fromMessage(message); 761 } 762 catch (JMSException ex) { 763 throw convertJmsAccessException(ex); 764 } 765 } 766 return null; 767 } 768 769 770 774 781 protected Connection getConnection(JmsResourceHolder holder) { 782 return holder.getConnection(); 783 } 784 785 792 protected Session getSession(JmsResourceHolder holder) { 793 return holder.getSession(); 794 } 795 796 808 protected boolean isSessionLocallyTransacted(Session session) { 809 return isSessionTransacted() && 810 !ConnectionFactoryUtils.isSessionTransactional(session, getConnectionFactory()); 811 } 812 813 826 protected MessageProducer createProducer(Session session, Destination destination) throws JMSException { 827 MessageProducer producer = doCreateProducer(session, destination); 828 if (!isMessageIdEnabled()) { 829 producer.setDisableMessageID(true); 830 } 831 if (!isMessageTimestampEnabled()) { 832 producer.setDisableMessageTimestamp(true); 833 } 834 return producer; 835 } 836 837 845 protected MessageProducer doCreateProducer(Session session, Destination destination) throws JMSException { 846 return session.createProducer(destination); 847 } 848 849 858 protected MessageConsumer createConsumer(Session session, Destination destination, String messageSelector) 859 throws JMSException { 860 861 if (isPubSubDomain()) { 865 return session.createConsumer(destination, messageSelector, isPubSubNoLocal()); 866 } 867 else { 868 return session.createConsumer(destination, messageSelector); 869 } 870 } 871 872 873 876 private class JmsTemplateResourceFactory implements ConnectionFactoryUtils.ResourceFactory { 877 878 public Connection getConnection(JmsResourceHolder holder) { 879 return JmsTemplate.this.getConnection(holder); 880 } 881 882 public Session getSession(JmsResourceHolder holder) { 883 return JmsTemplate.this.getSession(holder); 884 } 885 886 public Connection createConnection() throws JMSException { 887 return JmsTemplate.this.createConnection(); 888 } 889 890 public Session createSession(Connection con) throws JMSException { 891 return JmsTemplate.this.createSession(con); 892 } 893 894 public boolean isSynchedLocalTransactionAllowed() { 895 return JmsTemplate.this.isSessionTransacted(); 896 } 897 } 898 899 } 900 | Popular Tags |