1 10 11 package org.mule.providers.jms; 12 13 import java.util.List ; 14 import javax.jms.Destination ; 15 import javax.jms.JMSException ; 16 import javax.jms.Message ; 17 import javax.jms.MessageConsumer ; 18 import javax.jms.Session ; 19 import javax.jms.Topic ; 20 21 import org.apache.commons.collections.MapUtils; 22 23 import org.mule.impl.MuleMessage; 24 import org.mule.providers.ConnectException; 25 import org.mule.providers.SingleAttemptConnectionStrategy; 26 import org.mule.providers.TransactedPollingMessageReceiver; 27 import org.mule.providers.jms.filters.JmsSelectorFilter; 28 import org.mule.transaction.TransactionCoordination; 29 import org.mule.umo.UMOComponent; 30 import org.mule.umo.UMOTransaction; 31 import org.mule.umo.endpoint.UMOEndpoint; 32 import org.mule.umo.lifecycle.InitialisationException; 33 import org.mule.umo.provider.UMOConnector; 34 import org.mule.umo.provider.UMOMessageAdapter; 35 36 public class TransactedJmsMessageReceiver extends TransactedPollingMessageReceiver 37 { 38 protected final JmsConnector connector; 39 protected boolean reuseConsumer; 40 protected boolean reuseSession; 41 protected final ThreadContextLocal context = new ThreadContextLocal(); 42 protected final long timeout; 43 protected final RedeliveryHandler redeliveryHandler; 44 45 48 protected static class JmsThreadContext 49 { 50 public Session session; 51 public MessageConsumer consumer; 52 } 53 54 57 protected static class ThreadContextLocal extends ThreadLocal 58 { 59 public JmsThreadContext getContext() 60 { 61 return (JmsThreadContext)get(); 62 } 63 64 protected Object initialValue() 65 { 66 return new JmsThreadContext(); 67 } 68 } 69 70 public TransactedJmsMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint) 71 throws InitialisationException 72 { 73 super(connector, component, endpoint, new Long (0)); 74 this.connector = (JmsConnector)connector; 75 this.timeout = endpoint.getTransactionConfig().getTimeout(); 76 77 if (this.connectionStrategy instanceof SingleAttemptConnectionStrategy) 81 { 82 this.reuseConsumer = true; 83 this.reuseSession = true; 84 } 85 this.reuseConsumer = MapUtils.getBooleanValue(endpoint.getProperties(), "reuseConsumer", 87 this.reuseConsumer); 88 this.reuseSession = MapUtils.getBooleanValue(endpoint.getProperties(), "reuseSession", 89 this.reuseSession); 90 91 String resourceInfo = endpoint.getEndpointURI().getResourceInfo(); 96 boolean topic = (resourceInfo != null && "topic".equalsIgnoreCase(resourceInfo)); 97 98 useMultipleReceivers = !topic; 102 103 try 104 { 105 redeliveryHandler = this.connector.createRedeliveryHandler(); 106 redeliveryHandler.setConnector(this.connector); 107 } 108 catch (Exception e) 109 { 110 throw new InitialisationException(e, this); 111 } 112 113 } 114 115 public void doConnect() throws Exception 116 { 117 if (connector.isConnected()) 118 { 119 121 createConsumer(); 126 } 130 } 131 132 public void doDisconnect() throws Exception 133 { 134 if (connector.isConnected()) 135 { 136 closeConsumer(true); 137 } 138 } 139 140 143 public void poll() throws Exception 144 { 145 try 146 { 147 JmsThreadContext ctx = context.getContext(); 148 if (ctx.consumer == null) 150 { 151 createConsumer(); 152 } 153 super.poll(); 155 } 156 catch (Exception e) 157 { 158 closeConsumer(true); 160 throw e; 161 } 162 finally 163 { 164 closeConsumer(false); 166 } 167 } 168 169 174 protected List getMessages() throws Exception 175 { 176 JmsThreadContext ctx = context.getContext(); 179 180 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 181 if (tx != null) 182 { 183 tx.bindResource(connector.getConnection(), ctx.session); 184 } 185 186 Message message = null; 188 try 189 { 190 message = ctx.consumer.receive(timeout); 191 } 192 catch (JMSException e) 193 { 194 if (!this.isConnected()) 196 { 197 } 199 else 200 { 201 throw e; 202 } 203 } 204 if (message == null) 205 { 206 if (tx != null) 207 { 208 tx.setRollbackOnly(); 209 } 210 return null; 211 } 212 message = connector.preProcessMessage(message, ctx.session); 213 214 if (logger.isDebugEnabled()) 216 { 217 logger.debug("Message received it is of type: " + message.getClass().getName()); 218 if (message.getJMSDestination() != null) 219 { 220 logger.debug("Message received on " + message.getJMSDestination() + " (" 221 + message.getJMSDestination().getClass().getName() + ")"); 222 } 223 else 224 { 225 logger.debug("Message received on unknown destination"); 226 } 227 logger.debug("Message CorrelationId is: " + message.getJMSCorrelationID()); 228 logger.debug("Jms Message Id is: " + message.getJMSMessageID()); 229 } 230 231 if (message.getJMSRedelivered()) 232 { 233 if (logger.isDebugEnabled()) 234 { 235 logger.debug("Message with correlationId: " + message.getJMSCorrelationID() 236 + " is redelivered. handing off to Exception Handler"); 237 } 238 redeliveryHandler.handleRedelivery(message); 239 } 240 241 if (tx instanceof JmsClientAcknowledgeTransaction) 242 { 243 tx.bindResource(message, null); 244 } 245 246 UMOMessageAdapter adapter = connector.getMessageAdapter(message); 247 routeMessage(new MuleMessage(adapter)); 248 return null; 249 } 250 251 256 protected void processMessage(Object msg) throws Exception 257 { 258 } 261 262 protected void closeConsumer(boolean force) 263 { 264 JmsThreadContext ctx = context.getContext(); 265 if (ctx == null) 266 { 267 return; 268 } 269 if (force || !reuseSession || !reuseConsumer) 271 { 272 connector.closeQuietly(ctx.consumer); 273 ctx.consumer = null; 274 } 275 if (force || !reuseSession) 278 { 279 connector.closeQuietly(ctx.session); 280 ctx.session = null; 281 } 282 } 283 284 289 protected void createConsumer() throws Exception 290 { 291 try 292 { 293 JmsSupport jmsSupport = this.connector.getJmsSupport(); 294 JmsThreadContext ctx = context.getContext(); 295 if (ctx.session == null) 297 { 298 ctx.session = this.connector.getSession(endpoint); 299 } 300 301 String resourceInfo = endpoint.getEndpointURI().getResourceInfo(); 303 boolean topic = (resourceInfo != null && "topic".equalsIgnoreCase(resourceInfo)); 304 Destination dest = jmsSupport.createDestination(ctx.session, endpoint.getEndpointURI() 305 .getAddress(), topic); 306 307 String selector = null; 309 if (endpoint.getFilter() != null && endpoint.getFilter() instanceof JmsSelectorFilter) 310 { 311 selector = ((JmsSelectorFilter)endpoint.getFilter()).getExpression(); 312 } 313 else if (endpoint.getProperties() != null) 314 { 315 selector = (String )endpoint.getProperties().get(JmsConstants.JMS_SELECTOR_PROPERTY); 318 } 319 String tempDurable = (String )endpoint.getProperties().get("durable"); 320 boolean durable = connector.isDurable(); 321 if (tempDurable != null) 322 { 323 durable = Boolean.valueOf(tempDurable).booleanValue(); 324 } 325 326 String durableName = (String )endpoint.getProperties().get("durableName"); 328 if (durableName == null && durable && dest instanceof Topic ) 329 { 330 durableName = "mule." + connector.getName() + "." + endpoint.getEndpointURI().getAddress(); 331 logger.debug("Jms Connector for this receiver is durable but no durable name has been specified. Defaulting to: " 332 + durableName); 333 } 334 335 ctx.consumer = jmsSupport.createConsumer(ctx.session, dest, selector, connector.isNoLocal(), 337 durableName, topic); 338 } 339 catch (JMSException e) 340 { 341 throw new ConnectException(e, this); 342 } 343 } 344 } 345 | Popular Tags |