1 10 11 package org.mule.providers.jms; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; 14 15 import javax.jms.DeliveryMode ; 16 import javax.jms.Destination ; 17 import javax.jms.Message ; 18 import javax.jms.MessageConsumer ; 19 import javax.jms.MessageListener ; 20 import javax.jms.MessageProducer ; 21 import javax.jms.Session ; 22 import javax.jms.TemporaryQueue ; 23 import javax.jms.TemporaryTopic ; 24 25 import org.apache.commons.collections.MapUtils; 26 import org.mule.MuleException; 27 import org.mule.config.i18n.Messages; 28 import org.mule.impl.MuleMessage; 29 import org.mule.providers.AbstractMessageDispatcher; 30 import org.mule.transaction.IllegalTransactionStateException; 31 import org.mule.umo.UMOEvent; 32 import org.mule.umo.UMOException; 33 import org.mule.umo.UMOMessage; 34 import org.mule.umo.endpoint.UMOEndpointURI; 35 import org.mule.umo.endpoint.UMOImmutableEndpoint; 36 import org.mule.umo.provider.DispatchException; 37 import org.mule.umo.provider.UMOConnector; 38 import org.mule.umo.provider.UMOMessageAdapter; 39 import org.mule.util.concurrent.Latch; 40 import org.mule.util.concurrent.WaitableBoolean; 41 42 48 public class JmsMessageDispatcher extends AbstractMessageDispatcher 49 { 50 51 private JmsConnector connector; 52 private Session delegateSession; 53 private Session cachedSession; 54 55 public JmsMessageDispatcher(UMOImmutableEndpoint endpoint) 56 { 57 super(endpoint); 58 this.connector = (JmsConnector)endpoint.getConnector(); 59 } 60 61 67 protected void doDispatch(UMOEvent event) throws Exception 68 { 69 dispatchMessage(event); 70 } 71 72 protected void doConnect(UMOImmutableEndpoint endpoint) throws Exception 73 { 74 } 76 77 protected void doDisconnect() throws Exception 78 { 79 } 81 82 private UMOMessage dispatchMessage(UMOEvent event) throws Exception 83 { 84 Session session = null; 85 MessageProducer producer = null; 86 MessageConsumer consumer = null; 87 Destination replyTo = null; 88 boolean transacted = false; 89 boolean cached = false; 90 boolean remoteSync = useRemoteSync(event); 91 92 if (logger.isDebugEnabled()) 93 { 94 logger.debug("dispatching on endpoint: " + event.getEndpoint().getEndpointURI() 95 + ". Event id is: " + event.getId()); 96 } 97 98 try 99 { 100 session = connector.getSessionFromTransaction(); 102 if (session != null) 103 { 104 transacted = true; 105 106 if (remoteSync) 109 { 110 throw new IllegalTransactionStateException(new org.mule.config.i18n.Message("jms", 2)); 111 } 112 } 113 else if (event.getMessage().getBooleanProperty(JmsConstants.CACHE_JMS_SESSIONS_PROPERTY, 116 connector.isCacheJmsSessions())) 117 { 118 cached = true; 119 if (cachedSession != null) 120 { 121 session = cachedSession; 122 } 123 else 124 { 125 session = connector.getSession(event.getEndpoint()); 127 cachedSession = session; 128 } 129 } 130 else 131 { 132 session = connector.getSession(event.getEndpoint()); 134 if (event.getEndpoint().getTransactionConfig().isTransacted()) 135 { 136 transacted = true; 137 } 138 } 139 140 147 UMOEndpointURI endpointUri = event.getEndpoint().getEndpointURI(); 148 149 boolean topic = false; 152 String resourceInfo = endpointUri.getResourceInfo(); 153 topic = (resourceInfo != null && JmsConstants.TOPIC_PROPERTY.equalsIgnoreCase(resourceInfo)); 154 if (!topic) 156 { 157 topic = MapUtils.getBooleanValue(event.getEndpoint().getProperties(), 158 JmsConstants.TOPIC_PROPERTY, false); 159 } 160 161 Destination dest = connector.getJmsSupport().createDestination(session, endpointUri.getAddress(), 162 topic); 163 producer = connector.getJmsSupport().createProducer(session, dest, topic); 164 165 Object message = event.getTransformedMessage(); 166 if (!(message instanceof Message )) 167 { 168 throw new DispatchException(new org.mule.config.i18n.Message( 169 Messages.MESSAGE_NOT_X_IT_IS_TYPE_X_CHECK_TRANSFORMER_ON_X, "JMS message", 170 message.getClass().getName(), connector.getName()), event.getMessage(), 171 event.getEndpoint()); 172 } 173 174 Message msg = (Message )message; 175 if (event.getMessage().getCorrelationId() != null) 176 { 177 msg.setJMSCorrelationID(event.getMessage().getCorrelationId()); 178 } 179 180 UMOMessage eventMsg = event.getMessage(); 181 182 if (connector.supportsProperty(JmsConstants.JMS_REPLY_TO)) 184 { 185 Object tempReplyTo = eventMsg.removeProperty(JmsConstants.JMS_REPLY_TO); 186 if (tempReplyTo != null) 187 { 188 if (tempReplyTo instanceof Destination ) 189 { 190 replyTo = (Destination )tempReplyTo; 191 } 192 else 193 { 194 boolean replyToTopic = false; 195 String reply = tempReplyTo.toString(); 196 int i = reply.indexOf(":"); 197 if (i > -1) 198 { 199 String qtype = reply.substring(0, i); 200 replyToTopic = "topic".equalsIgnoreCase(qtype); 201 reply = reply.substring(i + 1); 202 } 203 replyTo = connector.getJmsSupport().createDestination(session, reply, replyToTopic); 204 } 205 } 206 if (remoteSync && replyTo == null) 208 { 209 replyTo = connector.getJmsSupport().createTemporaryDestination(session, topic); 210 } 211 if (replyTo != null) 213 { 214 msg.setJMSReplyTo(replyTo); 215 } 216 217 if (remoteSync) 219 { 220 consumer = connector.getJmsSupport().createConsumer(session, replyTo, topic); 221 } 222 } 223 224 String ttlString = (String )eventMsg.removeProperty(JmsConstants.TIME_TO_LIVE_PROPERTY); 226 String priorityString = (String )eventMsg.removeProperty(JmsConstants.PRIORITY_PROPERTY); 227 String persistentDeliveryString = (String )eventMsg.removeProperty(JmsConstants.PERSISTENT_DELIVERY_PROPERTY); 228 229 long ttl = Message.DEFAULT_TIME_TO_LIVE; 230 int priority = Message.DEFAULT_PRIORITY; 231 boolean persistent = Message.DEFAULT_DELIVERY_MODE == DeliveryMode.PERSISTENT; 232 233 if (ttlString != null) 234 { 235 ttl = Long.parseLong(ttlString); 236 } 237 if (priorityString != null) 238 { 239 priority = Integer.parseInt(priorityString); 240 } 241 if (persistentDeliveryString != null) 242 { 243 persistent = Boolean.valueOf(persistentDeliveryString).booleanValue(); 244 } 245 246 if (logger.isDebugEnabled()) 247 { 248 logger.debug("Sending message of type " + msg.getClass().getName()); 249 } 250 251 if (consumer != null && topic) 252 { 253 Latch l = new Latch(); 255 ReplyToListener listener = new ReplyToListener(l); 256 consumer.setMessageListener(listener); 257 258 connector.getJmsSupport().send(producer, msg, persistent, priority, ttl, topic); 259 260 int timeout = event.getTimeout(); 261 262 if (logger.isDebugEnabled()) 263 { 264 logger.debug("Waiting for return event for: " + timeout + " ms on " + replyTo); 265 } 266 267 l.await(timeout, TimeUnit.MILLISECONDS); 268 consumer.setMessageListener(null); 269 listener.release(); 270 Message result = listener.getMessage(); 271 if (result == null) 272 { 273 logger.debug("No message was returned via replyTo destination"); 274 return null; 275 } 276 else 277 { 278 UMOMessageAdapter adapter = connector.getMessageAdapter(result); 279 return new MuleMessage(JmsMessageUtils.toObject(result, connector.getSpecification()), 280 adapter); 281 } 282 } 283 else 284 { 285 connector.getJmsSupport().send(producer, msg, persistent, priority, ttl, topic); 286 if (consumer != null) 287 { 288 int timeout = event.getTimeout(); 289 290 if (logger.isDebugEnabled()) 291 { 292 logger.debug("Waiting for return event for: " + timeout + " ms on " + replyTo); 293 } 294 295 Message result = consumer.receive(timeout); 296 if (result == null) 297 { 298 logger.debug("No message was returned via replyTo destination"); 299 return null; 300 } 301 else 302 { 303 UMOMessageAdapter adapter = connector.getMessageAdapter(result); 304 return new MuleMessage( 305 JmsMessageUtils.toObject(result, connector.getSpecification()), adapter); 306 } 307 } 308 } 309 return null; 310 } 311 finally 312 { 313 connector.closeQuietly(consumer); 314 connector.closeQuietly(producer); 315 316 if (replyTo != null && (replyTo instanceof TemporaryQueue || replyTo instanceof TemporaryTopic )) 319 { 320 if (replyTo instanceof TemporaryQueue ) 321 { 322 connector.closeQuietly((TemporaryQueue )replyTo); 323 } 324 else 325 { 326 connector.closeQuietly((TemporaryTopic )replyTo); 329 } 330 } 331 332 if (session != null && !cached && !transacted) 335 { 336 connector.closeQuietly(session); 337 } 338 } 339 } 340 341 347 protected UMOMessage doSend(UMOEvent event) throws Exception 348 { 349 UMOMessage message = dispatchMessage(event); 350 return message; 351 } 352 353 365 protected UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception 366 { 367 368 Session session = null; 369 Destination dest = null; 370 MessageConsumer consumer = null; 371 try 372 { 373 boolean topic = false; 374 String resourceInfo = endpoint.getEndpointURI().getResourceInfo(); 375 topic = (resourceInfo != null && JmsConstants.TOPIC_PROPERTY.equalsIgnoreCase(resourceInfo)); 376 377 session = connector.getSession(false, topic); 378 dest = connector.getJmsSupport().createDestination(session, 379 endpoint.getEndpointURI().getAddress(), topic); 380 consumer = connector.getJmsSupport().createConsumer(session, dest, topic); 381 382 try 383 { 384 Message message = null; 385 if (timeout == RECEIVE_NO_WAIT) 386 { 387 message = consumer.receiveNoWait(); 388 } 389 else if (timeout == RECEIVE_WAIT_INDEFINITELY) 390 { 391 message = consumer.receive(); 392 } 393 else 394 { 395 message = consumer.receive(timeout); 396 } 397 if (message == null) 398 { 399 return null; 400 } 401 402 message = connector.preProcessMessage(message, session); 403 404 return new MuleMessage(connector.getMessageAdapter(message)); 405 } 406 catch (Exception e) 407 { 408 connector.handleException(e); 409 return null; 410 } 411 } 412 finally 413 { 414 connector.closeQuietly(consumer); 415 connector.closeQuietly(session); 416 } 417 } 418 419 424 public synchronized Object getDelegateSession() throws UMOException 425 { 426 try 427 { 428 Session session = connector.getSessionFromTransaction(); 431 if (session != null) 432 { 433 return session; 434 } 435 if (delegateSession == null) 438 { 439 delegateSession = connector.getSession(false, false); 440 } 441 return delegateSession; 442 } 443 catch (Exception e) 444 { 445 throw new MuleException(new org.mule.config.i18n.Message("jms", 3), e); 446 } 447 } 448 449 454 public UMOConnector getConnector() 455 { 456 return connector; 457 } 458 459 protected void doDispose() 460 { 461 } 463 464 private class ReplyToListener implements MessageListener 465 { 466 private final Latch latch; 467 private volatile Message message; 468 private final WaitableBoolean released = new WaitableBoolean(false); 469 470 public ReplyToListener(Latch latch) 471 { 472 this.latch = latch; 473 } 474 475 public Message getMessage() 476 { 477 return message; 478 } 479 480 public void release() 481 { 482 released.set(true); 483 } 484 485 public void onMessage(Message message) 486 { 487 this.message = message; 488 latch.countDown(); 489 try 490 { 491 released.whenTrue(null); 492 } 493 catch (InterruptedException e) 494 { 495 } 497 } 498 499 } 500 501 } 502 | Popular Tags |