1 10 11 package org.mule.providers.jms; 12 13 import javax.jms.Destination ; 14 import javax.jms.JMSException ; 15 import javax.jms.Message ; 16 import javax.jms.MessageConsumer ; 17 import javax.jms.MessageListener ; 18 import javax.jms.Session ; 19 import javax.jms.Topic ; 20 21 import org.apache.commons.collections.MapUtils; 22 import org.mule.impl.MuleMessage; 23 import org.mule.providers.AbstractMessageReceiver; 24 import org.mule.providers.ConnectException; 25 import org.mule.providers.jms.filters.JmsSelectorFilter; 26 import org.mule.umo.UMOComponent; 27 import org.mule.umo.UMOException; 28 import org.mule.umo.endpoint.UMOEndpoint; 29 import org.mule.umo.lifecycle.InitialisationException; 30 import org.mule.umo.lifecycle.LifecycleException; 31 import org.mule.umo.provider.UMOConnector; 32 import org.mule.umo.provider.UMOMessageAdapter; 33 34 37 public class SingleJmsMessageReceiver extends AbstractMessageReceiver implements MessageListener 38 { 39 40 protected JmsConnector connector; 41 protected RedeliveryHandler redeliveryHandler; 42 protected MessageConsumer consumer; 43 protected Session session; 44 protected boolean startOnConnect = false; 45 46 public SingleJmsMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint) 47 throws InitialisationException 48 { 49 super(connector, component, endpoint); 50 this.connector = (JmsConnector)connector; 51 52 try 53 { 54 redeliveryHandler = this.connector.createRedeliveryHandler(); 55 redeliveryHandler.setConnector(this.connector); 56 } 57 catch (Exception e) 58 { 59 throw new InitialisationException(e, this); 60 } 61 } 62 63 public void doConnect() throws Exception 64 { 65 createConsumer(); 66 if (startOnConnect) 67 { 68 doStart(); 69 } 70 } 71 72 public void doDisconnect() throws Exception 73 { 74 closeConsumer(); 75 } 76 77 public void onMessage(Message message) 78 { 79 try 80 { 81 if (logger.isDebugEnabled()) 82 { 83 logger.debug("Message received it is of type: " + message.getClass().getName()); 84 if (message.getJMSDestination() != null) 85 { 86 logger.debug("Message received on " + message.getJMSDestination() + " (" 87 + message.getJMSDestination().getClass().getName() + ")"); 88 } 89 else 90 { 91 logger.debug("Message received on unknown destination"); 92 } 93 logger.debug("Message CorrelationId is: " + message.getJMSCorrelationID()); 94 logger.debug("Jms Message Id is: " + message.getJMSMessageID()); 95 } 96 97 if (message.getJMSRedelivered()) 98 { 99 if (logger.isDebugEnabled()) 100 { 101 logger.debug("Message with correlationId: " + message.getJMSCorrelationID() 102 + " is redelivered. handing off to Exception Handler"); 103 } 104 redeliveryHandler.handleRedelivery(message); 105 } 106 107 UMOMessageAdapter adapter = connector.getMessageAdapter(message); 108 routeMessage(new MuleMessage(adapter)); 109 } 110 catch (Exception e) 111 { 112 handleException(e); 113 } 114 } 115 116 public void doStart() throws UMOException 117 { 118 try 119 { 120 if (consumer == null) 127 { 128 startOnConnect = true; 129 } 130 else 131 { 132 startOnConnect = false; 133 consumer.setMessageListener(this); 134 } 135 } 136 catch (JMSException e) 137 { 138 throw new LifecycleException(e, this); 139 } 140 } 141 142 public void doStop() throws UMOException 143 { 144 try 145 { 146 if (consumer != null) 147 { 148 consumer.setMessageListener(null); 149 } 150 } 151 catch (JMSException e) 152 { 153 throw new LifecycleException(e, this); 154 } 155 } 156 157 protected void closeConsumer() 158 { 159 connector.closeQuietly(consumer); 160 consumer = null; 161 connector.closeQuietly(session); 162 session = null; 163 } 164 165 170 protected void createConsumer() throws Exception 171 { 172 try 173 { 174 JmsSupport jmsSupport = this.connector.getJmsSupport(); 175 if (session == null) 177 { 178 session = this.connector.getSession(endpoint); 179 } 180 181 String resourceInfo = endpoint.getEndpointURI().getResourceInfo(); 183 boolean topic = (resourceInfo != null && JmsConstants.TOPIC_PROPERTY.equalsIgnoreCase(resourceInfo)); 184 185 if (!topic) 187 { 188 topic = MapUtils.getBooleanValue(endpoint.getProperties(), JmsConstants.TOPIC_PROPERTY, false); 189 } 190 191 Destination dest = jmsSupport.createDestination(session, endpoint.getEndpointURI().getAddress(), 192 topic); 193 194 String selector = null; 196 if (endpoint.getFilter() != null && endpoint.getFilter() instanceof JmsSelectorFilter) 197 { 198 selector = ((JmsSelectorFilter)endpoint.getFilter()).getExpression(); 199 } 200 else if (endpoint.getProperties() != null) 201 { 202 selector = (String )endpoint.getProperties().get(JmsConstants.JMS_SELECTOR_PROPERTY); 205 } 206 String tempDurable = (String )endpoint.getProperties().get(JmsConstants.DURABLE_PROPERTY); 207 boolean durable = connector.isDurable(); 208 if (tempDurable != null) 209 { 210 durable = Boolean.valueOf(tempDurable).booleanValue(); 211 } 212 213 String durableName = (String )endpoint.getProperties().get(JmsConstants.DURABLE_NAME_PROPERTY); 215 if (durableName == null && durable && dest instanceof Topic ) 216 { 217 durableName = "mule." + connector.getName() + "." + endpoint.getEndpointURI().getAddress(); 218 logger.debug("Jms Connector for this receiver is durable but no durable name has been specified. Defaulting to: " 219 + durableName); 220 } 221 222 consumer = jmsSupport.createConsumer(session, dest, selector, connector.isNoLocal(), durableName, 224 topic); 225 } 226 catch (JMSException e) 227 { 228 throw new ConnectException(e, this); 229 } 230 } 231 } 232 | Popular Tags |