1 17 package org.apache.servicemix.jbi.nmr.flow.jms; 18 19 import java.util.Iterator ; 20 import java.util.Map ; 21 import java.util.Set ; 22 23 import javax.jbi.JBIException; 24 import javax.jbi.messaging.MessageExchange; 25 import javax.jbi.messaging.MessagingException; 26 import javax.jbi.messaging.MessageExchange.Role; 27 import javax.jbi.servicedesc.ServiceEndpoint; 28 import javax.jms.DeliveryMode ; 29 import javax.jms.JMSException ; 30 import javax.jms.Message ; 31 import javax.jms.MessageConsumer ; 32 import javax.jms.MessageListener ; 33 import javax.jms.MessageProducer ; 34 import javax.jms.ObjectMessage ; 35 import javax.jms.Queue ; 36 import javax.jms.Session ; 37 import javax.jms.Topic ; 38 import javax.resource.spi.work.Work ; 39 import javax.resource.spi.work.WorkException ; 40 41 import org.apache.activemq.ActiveMQConnection; 42 import org.apache.activemq.ActiveMQConnectionFactory; 43 import org.apache.activemq.advisory.AdvisorySupport; 44 import org.apache.activemq.command.ActiveMQDestination; 45 import org.apache.activemq.command.ActiveMQMessage; 46 import org.apache.activemq.command.ConsumerId; 47 import org.apache.activemq.command.ConsumerInfo; 48 import org.apache.activemq.command.RemoveInfo; 49 import org.apache.servicemix.JbiConstants; 50 import org.apache.servicemix.jbi.event.ComponentAdapter; 51 import org.apache.servicemix.jbi.event.ComponentEvent; 52 import org.apache.servicemix.jbi.event.ComponentListener; 53 import org.apache.servicemix.jbi.event.EndpointAdapter; 54 import org.apache.servicemix.jbi.event.EndpointEvent; 55 import org.apache.servicemix.jbi.event.EndpointListener; 56 import org.apache.servicemix.jbi.framework.ComponentMBeanImpl; 57 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl; 58 import org.apache.servicemix.jbi.nmr.Broker; 59 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow; 60 import org.apache.servicemix.jbi.servicedesc.EndpointSupport; 61 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint; 62 63 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 64 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; 65 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 66 67 73 public class JMSFlow extends AbstractFlow implements MessageListener { 74 75 private static final String INBOUND_PREFIX = "org.apache.servicemix.jms."; 76 77 private String jmsURL = "peer://org.apache.servicemix?persistent=false"; 78 79 private String userName; 80 81 private String password; 82 83 private ActiveMQConnectionFactory connectionFactory; 84 85 private ActiveMQConnection connection; 86 87 private String broadcastDestinationName = "org.apache.servicemix.JMSFlow"; 88 89 private MessageProducer queueProducer; 90 91 private MessageProducer topicProducer; 92 93 private Topic broadcastTopic; 94 95 private Session broadcastSession; 96 97 private MessageConsumer broadcastConsumer; 98 99 private Session inboundSession; 100 101 private MessageConsumer advisoryConsumer; 102 103 private Set subscriberSet = new CopyOnWriteArraySet(); 104 105 private Map consumerMap = new ConcurrentHashMap(); 106 107 private AtomicBoolean started = new AtomicBoolean(false); 108 109 private EndpointListener endpointListener; 110 111 private ComponentListener componentListener; 112 113 118 public String getDescription() { 119 return "jms"; 120 } 121 122 125 public String getJmsURL() { 126 return jmsURL; 127 } 128 129 132 public void setJmsURL(String jmsURL) { 133 this.jmsURL = jmsURL; 134 } 135 136 139 public String getPassword() { 140 return password; 141 } 142 143 146 public void setPassword(String password) { 147 this.password = password; 148 } 149 150 153 public String getUserName() { 154 return userName; 155 } 156 157 160 public void setUserName(String userName) { 161 this.userName = userName; 162 } 163 164 167 public ActiveMQConnectionFactory getConnectionFactory() { 168 return connectionFactory; 169 } 170 171 174 public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { 175 this.connectionFactory = connectionFactory; 176 } 177 178 181 public String getBroadcastDestinationName() { 182 return broadcastDestinationName; 183 } 184 185 188 public void setBroadcastDestinationName(String broadcastDestinationName) { 189 this.broadcastDestinationName = broadcastDestinationName; 190 } 191 192 197 public boolean canHandle(MessageExchange me) { 198 if (isTransacted(me)) { 199 return false; 200 } 201 return true; 202 } 203 204 210 public void init(Broker broker) throws JBIException { 211 log.debug(broker.getContainer().getName() + ": Initializing jms flow"); 212 super.init(broker); 213 endpointListener = new EndpointAdapter() { 215 public void internalEndpointRegistered(EndpointEvent event) { 216 onInternalEndpointRegistered(event, true); 217 } 218 219 public void internalEndpointUnregistered(EndpointEvent event) { 220 onInternalEndpointUnregistered(event, true); 221 } 222 }; 223 broker.getContainer().addListener(endpointListener); 224 componentListener = new ComponentAdapter() { 226 public void componentStarted(ComponentEvent event) { 227 onComponentStarted(event); 228 } 229 public void componentStopped(ComponentEvent event) { 230 onComponentStopped(event); 231 } 232 }; 233 broker.getContainer().addListener(componentListener); 234 try { 235 if (connectionFactory == null) { 236 if (jmsURL != null) { 237 connectionFactory = new ActiveMQConnectionFactory(jmsURL); 238 } else { 239 connectionFactory = new ActiveMQConnectionFactory(); 240 } 241 } 242 if (userName != null) { 243 connection = (ActiveMQConnection) connectionFactory.createConnection(userName, password); 244 } else { 245 connection = (ActiveMQConnection) connectionFactory.createConnection(); 246 } 247 connection.setClientID(broker.getContainer().getName()); 248 connection.start(); 249 inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 250 Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainer().getName()); 251 MessageConsumer inboundQueue = inboundSession.createConsumer(queue); 252 inboundQueue.setMessageListener(this); 253 queueProducer = inboundSession.createProducer(null); 254 broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 255 broadcastTopic = broadcastSession.createTopic(broadcastDestinationName); 256 topicProducer = broadcastSession.createProducer(broadcastTopic); 257 topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 258 } catch (JMSException e) { 259 log.error("Failed to initialize JMSFlow", e); 260 throw new JBIException(e); 261 } 262 } 263 264 269 public void start() throws JBIException { 270 if (started.compareAndSet(false, true)) { 271 log.debug(broker.getContainer().getName() + ": Starting jms flow"); 272 super.start(); 273 try { 274 broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true); 275 broadcastConsumer.setMessageListener(new MessageListener () { 276 public void onMessage(Message message) { 277 try { 278 Object obj = ((ObjectMessage ) message).getObject(); 279 if (obj instanceof EndpointEvent) { 280 EndpointEvent event = (EndpointEvent) obj; 281 String container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace().getContainerName(); 282 if (!getBroker().getContainer().getName().equals(container)) { 283 if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) { 284 onRemoteEndpointRegistered(event); 285 } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) { 286 onRemoteEndpointUnregistered(event); 287 } 288 } 289 } 290 } catch (Exception e) { 291 log.error("Error processing incoming broadcast message", e); 292 } 293 } 294 }); 295 Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic); 296 advisoryConsumer = broadcastSession.createConsumer(advisoryTopic); 297 advisoryConsumer.setMessageListener(new MessageListener () { 298 public void onMessage(Message message) { 299 if (started.get()) { 300 onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure()); 301 } 302 } 303 }); 304 305 for (Iterator it = broker.getContainer().getRegistry().getComponents().iterator(); it.hasNext();) { 307 ComponentMBeanImpl cmp = (ComponentMBeanImpl) it.next(); 308 if (cmp.isStarted()) { 309 onComponentStarted(new ComponentEvent(cmp, ComponentEvent.COMPONENT_STARTED)); 310 } 311 } 312 ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null); 314 for (int i = 0; i < endpoints.length; i++) { 315 if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) { 316 onInternalEndpointRegistered(new EndpointEvent(endpoints[i], 317 EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), false); 318 } 319 } 320 } catch (JMSException e) { 321 JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage()); 322 throw jbiEx; 323 } 324 } 325 } 326 327 332 public void stop() throws JBIException { 333 if (started.compareAndSet(true, false)) { 334 log.debug(broker.getContainer().getName() + ": Stopping jms flow"); 335 super.stop(); 336 for (Iterator it = subscriberSet.iterator(); it.hasNext();) { 337 String id = (String ) it.next(); 338 removeAllPackets(id); 339 } 340 subscriberSet.clear(); 341 try { 342 advisoryConsumer.close(); 343 broadcastConsumer.close(); 344 } catch (JMSException e) { 345 log.debug("JMSException caught in stop", e); 346 } 347 } 348 } 349 350 public void shutDown() throws JBIException { 351 super.shutDown(); 352 stop(); 353 broker.getContainer().removeListener(endpointListener); 355 broker.getContainer().removeListener(componentListener); 357 if (this.connection != null) { 358 try { 359 this.connection.close(); 360 } catch (JMSException e) { 361 log.warn("Error closing JMS Connection", e); 362 } 363 } 364 } 365 366 371 public int numberInNetwork() { 372 return subscriberSet.size(); 373 } 374 375 public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) { 376 if (!started.get()) { 377 return; 378 } 379 try { 380 String key = EndpointSupport.getKey(event.getEndpoint()); 381 if (!consumerMap.containsKey(key)) { 382 Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key); 383 MessageConsumer consumer = inboundSession.createConsumer(queue); 384 consumer.setMessageListener(this); 385 consumerMap.put(key, consumer); 386 } 387 if (broadcast) { 388 log.debug(broker.getContainer().getName() + ": broadcasting info for " + event); 389 ObjectMessage msg = broadcastSession.createObjectMessage(event); 390 topicProducer.send(msg); 391 } 392 } catch (Exception e) { 393 log.error("Cannot create consumer for " + event.getEndpoint(), e); 394 } 395 } 396 397 public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) { 398 try { 399 String key = EndpointSupport.getKey(event.getEndpoint()); 400 MessageConsumer consumer = (MessageConsumer ) consumerMap.remove(key); 401 if (consumer != null) { 402 consumer.close(); 403 } 404 if (broadcast) { 405 ObjectMessage msg = broadcastSession.createObjectMessage(event); 406 log.debug(broker.getContainer().getName() + ": broadcasting info for " + event); 407 topicProducer.send(msg); 408 } 409 } catch (Exception e) { 410 log.error("Cannot destroy consumer for " + event, e); 411 } 412 } 413 414 public void onComponentStarted(ComponentEvent event) { 415 if (!started.get()) { 416 return; 417 } 418 try { 419 String key = event.getComponent().getName(); 420 if (!consumerMap.containsKey(key)) { 421 Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key); 422 MessageConsumer consumer = inboundSession.createConsumer(queue); 423 consumer.setMessageListener(this); 424 consumerMap.put(key, consumer); 425 } 426 } catch (Exception e) { 427 log.error("Cannot create consumer for component " + event.getComponent().getName(), e); 428 } 429 } 430 431 public void onComponentStopped(ComponentEvent event) { 432 try { 433 String key = event.getComponent().getName(); 434 MessageConsumer consumer = (MessageConsumer ) consumerMap.remove(key); 435 if (consumer != null) { 436 consumer.close(); 437 } 438 } catch (Exception e) { 439 log.error("Cannot destroy consumer for component " + event.getComponent().getName(), e); 440 } 441 } 442 443 public void onRemoteEndpointRegistered(EndpointEvent event) { 444 log.debug(broker.getContainer().getName() + ": adding remote endpoint: " + event.getEndpoint()); 445 broker.getContainer().getRegistry().registerRemoteEndpoint(event.getEndpoint()); 446 } 447 448 public void onRemoteEndpointUnregistered(EndpointEvent event) { 449 log.debug(broker.getContainer().getName() + ": removing remote endpoint: " + event.getEndpoint()); 450 broker.getContainer().getRegistry().unregisterRemoteEndpoint(event.getEndpoint()); 451 } 452 453 459 protected void doSend(MessageExchangeImpl me) throws MessagingException { 460 doRouting(me); 461 } 462 463 469 public void doRouting(MessageExchangeImpl me) throws MessagingException { 470 try { 472 String destination; 473 if (me.getRole() == Role.PROVIDER) { 474 if (me.getDestinationId() == null) { 475 destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint()); 476 } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) { 477 destination = INBOUND_PREFIX + me.getDestinationId().getName(); 478 } else { 479 destination = INBOUND_PREFIX + me.getDestinationId().getContainerName(); 480 } 481 } else { 482 if (me.getSourceId() == null) { 483 throw new IllegalStateException ("No sourceId set on the exchange"); 484 } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) { 485 if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) { 490 destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT); 491 } else { 492 destination = INBOUND_PREFIX + me.getSourceId().getName(); 493 } 494 } else { 495 destination = INBOUND_PREFIX + me.getSourceId().getContainerName(); 496 } 497 } 498 499 Queue queue = inboundSession.createQueue(destination); 500 ObjectMessage msg = inboundSession.createObjectMessage(me); 501 queueProducer.send(queue, msg); 502 } catch (JMSException e) { 503 log.error("Failed to send exchange: " + me + " internal JMS Network", e); 504 throw new MessagingException(e); 505 } 506 } 507 508 513 public void onMessage(final Message message) { 514 try { 515 if (message != null && started.get()) { 516 ObjectMessage objMsg = (ObjectMessage ) message; 517 final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject(); 518 broker.getContainer().getWorkManager().scheduleWork(new Work () { 522 public void release() { 523 } 524 525 public void run() { 526 try { 527 if (me.getDestinationId() == null) { 528 ServiceEndpoint se = me.getEndpoint(); 529 se = broker.getContainer().getRegistry() 530 .getInternalEndpoint(se.getServiceName(), se.getEndpointName()); 531 me.setEndpoint(se); 532 me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace()); 533 } 534 JMSFlow.super.doRouting(me); 535 } catch (Throwable e) { 536 log.error("Caught an exception routing ExchangePacket: ", e); 537 } 538 } 539 }); 540 } 541 } catch (JMSException jmsEx) { 542 log.error("Caught an exception unpacking JMS Message: ", jmsEx); 543 } catch (WorkException e) { 544 log.error("Caught an exception routing ExchangePacket: ", e); 545 } 546 } 547 548 protected void onAdvisoryMessage(Object obj) { 549 if (obj instanceof ConsumerInfo) { 550 ConsumerInfo info = (ConsumerInfo) obj; 551 subscriberSet.add(info.getConsumerId().getConnectionId()); 552 ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null); 553 for (int i = 0; i < endpoints.length; i++) { 554 if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) { 555 onInternalEndpointRegistered(new EndpointEvent(endpoints[i], 556 EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true); 557 } 558 } 559 } else if (obj instanceof RemoveInfo) { 560 ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId(); 561 subscriberSet.remove(id.getConnectionId()); 562 removeAllPackets(id.getConnectionId()); 563 } 564 } 565 566 private void removeAllPackets(String containerName) { 567 } 569 } 570 | Popular Tags |