1 10 11 package org.mule.providers.jms; 12 13 import javax.jms.DeliveryMode ; 14 import javax.jms.Destination ; 15 import javax.jms.Message ; 16 import javax.jms.MessageProducer ; 17 import javax.jms.Queue ; 18 import javax.jms.Session ; 19 import javax.jms.Topic ; 20 21 import org.apache.commons.lang.ObjectUtils; 22 import org.mule.impl.model.AbstractComponent; 23 import org.mule.providers.DefaultReplyToHandler; 24 import org.mule.umo.UMOEvent; 25 import org.mule.umo.UMOException; 26 import org.mule.umo.UMOMessage; 27 import org.mule.umo.provider.DispatchException; 28 import org.mule.umo.transformer.UMOTransformer; 29 import org.mule.util.StringMessageUtils; 30 31 35 public class JmsReplyToHandler extends DefaultReplyToHandler 36 { 37 private JmsConnector connector; 38 39 public JmsReplyToHandler(JmsConnector connector, UMOTransformer transformer) 40 { 41 super(transformer); 42 this.connector = connector; 43 } 44 45 public void processReplyTo(UMOEvent event, UMOMessage returnMessage, Object replyTo) throws UMOException 46 { 47 Destination replyToDestination = null; 48 MessageProducer replyToProducer = null; 49 Session session = null; 50 try 51 { 52 if (replyTo instanceof Destination ) 54 { 55 replyToDestination = (Destination )replyTo; 56 } 57 if (replyToDestination == null) 58 { 59 super.processReplyTo(event, returnMessage, replyTo); 60 return; 61 } 62 Object payload = returnMessage.getPayload(); 63 if (getTransformer() != null) 64 { 65 getTransformer().setEndpoint(getEndpoint(event, "jms://temporary")); 66 if (getTransformer().isSourceTypeSupported(payload.getClass())) 67 { 68 payload = getTransformer().transform(payload); 69 } 70 else if (logger.isDebugEnabled()) 71 { 72 logger.debug("transformer for replyTo Handler: " + getTransformer().toString() 73 + " does not support source type: " + payload.getClass() 74 + ". Not doing a transform"); 75 } 76 } 77 78 if (replyToDestination instanceof Topic && replyToDestination instanceof Queue 79 && connector.getJmsSupport() instanceof Jms102bSupport) 80 { 81 logger.error(StringMessageUtils.getBoilerPlate("ReplyTo destination implements both Queue and Topic " 82 + "while complying with JMS 1.0.2b specification. " 83 + "Please report your application server or JMS vendor name and version " 84 + "to dev<_at_>mule.codehaus.org or http://mule.mulesource.org/jira")); 85 } 86 boolean topic = replyToDestination instanceof Topic ; 87 session = connector.getSession(false, topic); 88 Message replyToMessage = JmsMessageUtils.toMessage(payload, session); 89 90 replyToMessage.setJMSReplyTo(null); 91 if (logger.isDebugEnabled()) 92 { 93 logger.debug("Sending jms reply to: " + replyToDestination + "(" 94 + replyToDestination.getClass().getName() + ")"); 95 } 96 replyToProducer = connector.getJmsSupport().createProducer(session, replyToDestination, topic); 97 98 UMOMessage eventMsg = event.getMessage(); 100 String ttlString = (String )eventMsg.removeProperty(JmsConstants.TIME_TO_LIVE_PROPERTY); 101 String priorityString = (String )eventMsg.removeProperty(JmsConstants.PRIORITY_PROPERTY); 102 String persistentDeliveryString = (String )eventMsg.removeProperty(JmsConstants.PERSISTENT_DELIVERY_PROPERTY); 103 104 if (ttlString == null && priorityString == null && persistentDeliveryString == null) 105 { 106 connector.getJmsSupport().send(replyToProducer, replyToMessage, topic); 107 } 108 else 109 { 110 long ttl = Message.DEFAULT_TIME_TO_LIVE; 111 int priority = Message.DEFAULT_PRIORITY; 112 boolean persistent = Message.DEFAULT_DELIVERY_MODE == DeliveryMode.PERSISTENT; 113 114 if (ttlString != null) 115 { 116 ttl = Long.parseLong(ttlString); 117 } 118 if (priorityString != null) 119 { 120 priority = Integer.parseInt(priorityString); 121 } 122 if (persistentDeliveryString != null) 123 { 124 persistent = Boolean.valueOf(persistentDeliveryString).booleanValue(); 125 } 126 127 connector.getJmsSupport().send(replyToProducer, replyToMessage, persistent, priority, ttl, 128 topic); 129 } 130 131 logger.info("Reply Message sent to: " + replyToDestination); 134 ((AbstractComponent)event.getComponent()).getStatistics().incSentReplyToEvent(); 135 } 136 catch (Exception e) 137 { 138 throw new DispatchException(new org.mule.config.i18n.Message("jms", 8, ObjectUtils.toString( 139 replyToDestination, "null")), returnMessage, null, e); 140 } 141 finally 142 { 143 connector.closeQuietly(replyToProducer); 144 connector.closeQuietly(session); 145 } 146 } 147 } 148 | Popular Tags |