1 10 11 package org.mule.providers.jms; 12 13 import javax.jms.Destination ; 14 import javax.jms.JMSException ; 15 import javax.jms.Message ; 16 import javax.jms.MessageConsumer ; 17 import javax.jms.MessageListener ; 18 import javax.jms.Session ; 19 import javax.jms.Topic ; 20 import javax.resource.spi.work.Work ; 21 22 import org.apache.commons.collections.MapUtils; 23 import org.mule.impl.MuleMessage; 24 import org.mule.providers.AbstractMessageReceiver; 25 import org.mule.providers.ConnectException; 26 import org.mule.providers.jms.filters.JmsSelectorFilter; 27 import org.mule.transaction.TransactionCallback; 28 import org.mule.transaction.TransactionCoordination; 29 import org.mule.transaction.TransactionTemplate; 30 import org.mule.umo.UMOComponent; 31 import org.mule.umo.UMOException; 32 import org.mule.umo.UMOTransaction; 33 import org.mule.umo.endpoint.UMOEndpoint; 34 import org.mule.umo.lifecycle.InitialisationException; 35 import org.mule.umo.lifecycle.LifecycleException; 36 import org.mule.umo.provider.UMOConnector; 37 import org.mule.umo.provider.UMOMessageAdapter; 38 39 public class TransactedSingleResourceJmsMessageReceiver extends AbstractMessageReceiver 40 implements MessageListener 41 { 42 protected JmsConnector connector; 43 protected RedeliveryHandler redeliveryHandler; 44 protected MessageConsumer consumer; 45 protected Session session; 46 protected boolean startOnConnect = false; 47 48 49 protected boolean receiveMessagesInTransaction = true; 50 51 52 protected boolean useMultipleReceivers = true; 53 54 60 public TransactedSingleResourceJmsMessageReceiver(UMOConnector connector, 61 UMOComponent component, 62 UMOEndpoint endpoint) throws InitialisationException 63 { 64 super(connector, component, endpoint); 65 this.connector = (JmsConnector)connector; 66 67 70 try 71 { 72 redeliveryHandler = this.connector.createRedeliveryHandler(); 73 redeliveryHandler.setConnector(this.connector); 74 } 75 catch (Exception e) 76 { 77 throw new InitialisationException(e, this); 78 } 79 } 80 81 public void doConnect() throws Exception 82 { 83 try 84 { 85 JmsSupport jmsSupport = this.connector.getJmsSupport(); 86 if (session == null) 88 { 89 session = this.connector.getSession(endpoint); 90 } 91 92 String resourceInfo = endpoint.getEndpointURI().getResourceInfo(); 94 boolean topic = (resourceInfo != null && JmsConstants.TOPIC_PROPERTY.equalsIgnoreCase(resourceInfo)); 95 96 if (!topic) 98 { 99 topic = MapUtils.getBooleanValue(endpoint.getProperties(), JmsConstants.TOPIC_PROPERTY, false); 100 } 101 102 Destination dest = jmsSupport.createDestination(session, endpoint.getEndpointURI().getAddress(), 103 topic); 104 105 String selector = null; 107 if (endpoint.getFilter() != null && endpoint.getFilter() instanceof JmsSelectorFilter) 108 { 109 selector = ((JmsSelectorFilter)endpoint.getFilter()).getExpression(); 110 } 111 else if (endpoint.getProperties() != null) 112 { 113 selector = (String )endpoint.getProperties().get(JmsConstants.JMS_SELECTOR_PROPERTY); 116 } 117 String tempDurable = (String )endpoint.getProperties().get(JmsConstants.DURABLE_PROPERTY); 118 boolean durable = connector.isDurable(); 119 if (tempDurable != null) 120 { 121 durable = Boolean.valueOf(tempDurable).booleanValue(); 122 } 123 124 String durableName = (String )endpoint.getProperties().get(JmsConstants.DURABLE_NAME_PROPERTY); 126 if (durableName == null && durable && dest instanceof Topic ) 127 { 128 durableName = "mule." + connector.getName() + "." + endpoint.getEndpointURI().getAddress(); 129 logger.debug("Jms Connector for this receiver is durable but no durable name has been specified. Defaulting to: " 130 + durableName); 131 } 132 133 consumer = jmsSupport.createConsumer(session, dest, selector, connector.isNoLocal(), durableName, 135 topic); 136 } 137 catch (JMSException e) 138 { 139 throw new ConnectException(e, this); 140 } 141 } 142 143 public void onMessage(Message message) 144 { 145 try 146 { 147 getWorkManager().scheduleWork(new MessageReceiverWorker(message)); 148 } 149 catch (Exception e) 150 { 151 handleException(e); 152 } 153 } 154 155 public void doStart() throws UMOException 156 { 157 try 158 { 159 if (consumer == null) 166 { 167 startOnConnect = true; 168 } 169 else 170 { 171 startOnConnect = false; 172 173 this.consumer.setMessageListener(this); 174 } 175 } 176 catch (JMSException e) 177 { 178 throw new LifecycleException(e, this); 179 } 180 } 181 182 public void doStop() throws UMOException 183 { 184 try 185 { 186 if (consumer != null) 187 { 188 consumer.setMessageListener(null); 189 } 190 } 191 catch (JMSException e) 192 { 193 throw new LifecycleException(e, this); 194 } 195 } 196 197 public void doDisconnect() throws Exception 198 { 199 closeConsumer(); 200 } 201 202 protected void closeConsumer() 203 { 204 connector.closeQuietly(consumer); 205 consumer = null; 206 connector.closeQuietly(session); 207 session = null; 208 } 209 210 protected class MessageReceiverWorker implements Work 211 { 212 Message message; 213 214 public MessageReceiverWorker(Message message) 215 { 216 this.message = message; 217 } 218 219 public void run() 220 { 221 try 222 { 223 TransactionTemplate tt = new TransactionTemplate(endpoint.getTransactionConfig(), 224 connector.getExceptionListener()); 225 226 if (receiveMessagesInTransaction) 227 { 228 TransactionCallback cb = new MessageTransactionCallback(message) 229 { 230 231 public Object doInTransaction() throws Exception 232 { 233 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 235 if (tx != null) 236 { 237 tx.bindResource(connector.getConnection(), session); 238 } 239 if (tx instanceof JmsClientAcknowledgeTransaction) 240 { 241 tx.bindResource(message, message); 242 } 243 244 if (logger.isDebugEnabled()) 245 { 246 logger.debug("Message received it is of type: " 247 + message.getClass().getName()); 248 if (message.getJMSDestination() != null) 249 { 250 logger.debug("Message received on " + message.getJMSDestination() + " (" 251 + message.getJMSDestination().getClass().getName() + ")"); 252 } 253 else 254 { 255 logger.debug("Message received on unknown destination"); 256 } 257 logger.debug("Message CorrelationId is: " + message.getJMSCorrelationID()); 258 logger.debug("Jms Message Id is: " + message.getJMSMessageID()); 259 } 260 261 if (message.getJMSRedelivered()) 262 { 263 if (logger.isDebugEnabled()) 264 { 265 logger.debug("Message with correlationId: " 266 + message.getJMSCorrelationID() 267 + " is redelivered. handing off to Exception Handler"); 268 } 269 redeliveryHandler.handleRedelivery(message); 270 } 271 272 UMOMessageAdapter adapter = connector.getMessageAdapter(message); 273 routeMessage(new MuleMessage(adapter)); 274 return null; 275 } 276 }; 277 tt.execute(cb); 278 } 279 else 280 { 281 UMOMessageAdapter adapter = connector.getMessageAdapter(message); 282 routeMessage(new MuleMessage(adapter)); 283 } 284 285 } 286 catch (Exception e) 287 { 288 getConnector().handleException(e); 289 } 290 291 } 292 293 public void release() 294 { 295 } 297 298 } 299 300 } 301 | Popular Tags |