1 17 package org.apache.servicemix.jbi.nmr.flow.jca; 18 19 import java.io.Serializable ; 20 import java.util.Map ; 21 import java.util.Set ; 22 23 import javax.jbi.JBIException; 24 import javax.jbi.messaging.MessageExchange; 25 import javax.jbi.messaging.MessagingException; 26 import javax.jbi.messaging.MessageExchange.Role; 27 import javax.jbi.servicedesc.ServiceEndpoint; 28 import javax.jms.Connection ; 29 import javax.jms.ConnectionFactory ; 30 import javax.jms.DeliveryMode ; 31 import javax.jms.Destination ; 32 import javax.jms.JMSException ; 33 import javax.jms.Message ; 34 import javax.jms.MessageConsumer ; 35 import javax.jms.MessageListener ; 36 import javax.jms.MessageProducer ; 37 import javax.jms.ObjectMessage ; 38 import javax.jms.Session ; 39 import javax.jms.Topic ; 40 import javax.resource.spi.BootstrapContext ; 41 import javax.resource.spi.ConnectionManager ; 42 import javax.resource.spi.ResourceAdapter ; 43 import javax.resource.spi.ResourceAdapterInternalException ; 44 import javax.transaction.Status ; 45 import javax.transaction.SystemException ; 46 import javax.transaction.TransactionManager ; 47 48 import org.apache.activemq.advisory.AdvisorySupport; 49 import org.apache.activemq.command.ActiveMQDestination; 50 import org.apache.activemq.command.ActiveMQMessage; 51 import org.apache.activemq.command.ActiveMQQueue; 52 import org.apache.activemq.command.ActiveMQTopic; 53 import org.apache.activemq.command.ConsumerId; 54 import org.apache.activemq.command.ConsumerInfo; 55 import org.apache.activemq.command.RemoveInfo; 56 import org.apache.activemq.ra.ActiveMQActivationSpec; 57 import org.apache.activemq.ra.ActiveMQManagedConnectionFactory; 58 import org.apache.activemq.ra.ActiveMQResourceAdapter; 59 import org.apache.geronimo.connector.BootstrapContextImpl; 60 import org.apache.geronimo.connector.outbound.connectionmanagerconfig.SinglePool; 61 import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions; 62 import org.apache.geronimo.connector.work.GeronimoWorkManager; 63 import org.apache.geronimo.transaction.context.TransactionContextManager; 64 import org.apache.servicemix.JbiConstants; 65 import org.apache.servicemix.jbi.container.SpringJBIContainer; 66 import org.apache.servicemix.jbi.event.ComponentAdapter; 67 import org.apache.servicemix.jbi.event.ComponentEvent; 68 import org.apache.servicemix.jbi.event.ComponentListener; 69 import org.apache.servicemix.jbi.event.EndpointAdapter; 70 import org.apache.servicemix.jbi.event.EndpointEvent; 71 import org.apache.servicemix.jbi.event.EndpointListener; 72 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl; 73 import org.apache.servicemix.jbi.nmr.Broker; 74 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow; 75 import org.apache.servicemix.jbi.servicedesc.EndpointSupport; 76 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint; 77 import org.jencks.JCAConnector; 78 import org.jencks.SingletonEndpointFactory; 79 import org.jencks.factory.ConnectionManagerFactoryBean; 80 import org.springframework.context.ApplicationContext; 81 82 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 83 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; 84 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 85 86 92 public class JCAFlow extends AbstractFlow implements MessageListener { 93 94 private static final String INBOUND_PREFIX = "org.apache.servicemix.jca."; 95 private String jmsURL = "tcp://localhost:61616"; 96 private String userName; 97 private String password; 98 private ConnectionFactory connectionFactory; 99 private Connection connection; 100 private String broadcastDestinationName = "org.apache.servicemix.JCAFlow"; 101 private Topic broadcastTopic; 102 private Map connectorMap = new ConcurrentHashMap(); 103 private AtomicBoolean started = new AtomicBoolean(false); 104 private Set subscriberSet=new CopyOnWriteArraySet(); 105 private TransactionContextManager transactionContextManager; 106 private ConnectionManager connectionManager; 107 private BootstrapContext bootstrapContext; 108 private ResourceAdapter resourceAdapter; 109 private JCAConnector containerConnector; 110 private JCAConnector broadcastConnector; 111 private Session broadcastSession; 112 private Topic advisoryTopic; 113 private MessageConsumer advisoryConsumer; 114 115 private EndpointListener endpointListener; 116 117 private ComponentListener componentListener; 118 119 124 public String getDescription() { 125 return "jca"; 126 } 127 128 133 public String getJmsURL() { 134 return jmsURL; 135 } 136 137 142 public void setJmsURL(String jmsURL) { 143 this.jmsURL = jmsURL; 144 } 145 146 151 public String getPassword() { 152 return password; 153 } 154 155 160 public void setPassword(String password) { 161 this.password = password; 162 } 163 164 169 public String getUserName() { 170 return userName; 171 } 172 173 178 public void setUserName(String userName) { 179 this.userName = userName; 180 } 181 182 187 public ConnectionFactory getConnectionFactory() { 188 return connectionFactory; 189 } 190 191 196 public void setConnectionFactory(ConnectionFactory connectoFactory) { 197 this.connectionFactory = connectoFactory; 198 } 199 200 205 public String getBroadcastDestinationName() { 206 return broadcastDestinationName; 207 } 208 209 214 public void setBroadcastDestinationName(String broadcastDestinationName) { 215 this.broadcastDestinationName = broadcastDestinationName; 216 } 217 218 protected ResourceAdapter createResourceAdapter() throws ResourceAdapterInternalException { 219 ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); 220 ra.setServerUrl(jmsURL); 221 ra.start(getBootstrapContext()); 222 return ra; 223 } 224 225 public TransactionManager getTransactionManager() { 226 return (TransactionManager ) broker.getContainer().getTransactionManager(); 227 } 228 229 235 public void init(Broker broker) throws JBIException { 236 log.debug(broker.getContainer().getName() + ": Initializing jca flow"); 237 super.init(broker); 238 endpointListener = new EndpointAdapter() { 240 public void internalEndpointRegistered(EndpointEvent event) { 241 onInternalEndpointRegistered(event, true); 242 } 243 244 public void internalEndpointUnregistered(EndpointEvent event) { 245 onInternalEndpointUnregistered(event, true); 246 } 247 }; 248 broker.getContainer().addListener(endpointListener); 249 componentListener = new ComponentAdapter() { 251 public void componentStarted(ComponentEvent event) { 252 onComponentStarted(event); 253 } 254 public void componentStopped(ComponentEvent event) { 255 onComponentStopped(event); 256 } 257 }; 258 broker.getContainer().addListener(componentListener); 259 try { 260 resourceAdapter = createResourceAdapter(); 261 262 ActiveMQActivationSpec ac = new ActiveMQActivationSpec(); 264 ac.setDestinationType("javax.jms.Queue"); 265 ac.setDestination(INBOUND_PREFIX + broker.getContainer().getName()); 266 containerConnector = new JCAConnector(); 267 containerConnector.setBootstrapContext(getBootstrapContext()); 268 containerConnector.setActivationSpec(ac); 269 containerConnector.setResourceAdapter(resourceAdapter); 270 containerConnector.setEndpointFactory(new SingletonEndpointFactory(this, getTransactionManager())); 271 containerConnector.start(); 272 273 ActiveMQManagedConnectionFactory mcf = new ActiveMQManagedConnectionFactory(); 275 mcf.setResourceAdapter(resourceAdapter); 276 connectionFactory = (ConnectionFactory ) mcf.createConnectionFactory(getConnectionManager()); 277 278 connection = ((ActiveMQResourceAdapter) resourceAdapter).makeConnection(); 280 connection.start(); 281 broadcastTopic = new ActiveMQTopic(broadcastDestinationName); 282 283 broadcastSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 284 broadcastTopic = new ActiveMQTopic(broadcastDestinationName); 285 advisoryTopic=AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic); 286 } 287 catch (Exception e) { 288 log.error("Failed to initialize JCAFlow", e); 289 throw new JBIException(e); 290 } 291 } 292 293 298 public void start() throws JBIException { 299 if (started.compareAndSet(false, true)) { 300 super.start(); 301 try { 302 ActiveMQActivationSpec ac = new ActiveMQActivationSpec(); 304 ac.setDestinationType("javax.jms.Topic"); 305 ac.setDestination(broadcastDestinationName); 306 broadcastConnector = new JCAConnector(); 307 broadcastConnector.setBootstrapContext(getBootstrapContext()); 308 broadcastConnector.setActivationSpec(ac); 309 broadcastConnector.setResourceAdapter(resourceAdapter); 310 broadcastConnector.setEndpointFactory(new SingletonEndpointFactory(new MessageListener () { 311 public void onMessage(Message message) { 312 try { 313 Object obj = ((ObjectMessage ) message).getObject(); 314 if (obj instanceof EndpointEvent) { 315 EndpointEvent event = (EndpointEvent) obj; 316 String container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace().getContainerName(); 317 if (!getBroker().getContainer().getName().equals(container)) { 318 if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) { 319 onRemoteEndpointRegistered(event); 320 } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) { 321 onRemoteEndpointUnregistered(event); 322 } 323 } 324 } 325 } catch (Exception e) { 326 log.error("Error processing incoming broadcast message", e); 327 } 328 } 329 })); 330 broadcastConnector.start(); 331 332 advisoryConsumer = broadcastSession.createConsumer(advisoryTopic); 333 advisoryConsumer.setMessageListener(new MessageListener () { 334 public void onMessage(Message message) { 335 if (started.get()) { 336 onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure()); 337 } 338 } 339 }); 340 } 341 catch (Exception e) { 342 throw new JBIException("JMSException caught in start: " + e.getMessage(), e); 343 } 344 } 345 } 346 347 352 public void stop() throws JBIException { 353 if (started.compareAndSet(true, false)) { 354 super.stop(); 355 try { 356 advisoryConsumer.close(); 357 } 358 catch (JMSException e) { 359 log.debug("JMSException caught in stop" ,e); 360 } 361 } 362 } 363 364 public void shutDown() throws JBIException { 365 super.shutDown(); 366 stop(); 367 broker.getContainer().removeListener(endpointListener); 369 broker.getContainer().removeListener(componentListener); 371 while (!connectorMap.isEmpty()) { 373 JCAConnector connector = (JCAConnector) connectorMap.remove(connectorMap.keySet().iterator().next()); 374 try { 375 connector.destroy(); 376 } catch (Exception e) { 377 log.debug("Error closing jca connector", e); 378 } 379 } 380 try { 381 containerConnector.destroy(); 382 } catch (Exception e) { 383 log.debug("Error closing jca connector", e); 384 } 385 try { 386 broadcastConnector.destroy(); 387 } catch (Exception e) { 388 log.debug("Error closing jca connector", e); 389 } 390 resourceAdapter.stop(); 392 if (this.connection != null) { 393 try { 394 this.connection.close(); 395 } 396 catch (JMSException e) { 397 log.debug("Error closing JMS Connection", e); 398 } 399 } 400 } 401 402 407 public int numberInNetwork() { 408 return subscriberSet.size(); 409 } 410 411 416 public boolean canHandle(MessageExchange me) { 417 if (isTransacted(me) && isSynchronous(me)) { 418 return false; 419 } 420 return true; 421 } 422 423 public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) { 424 if (!started.get()) { 425 return; 426 } 427 try { 428 String key = EndpointSupport.getKey(event.getEndpoint()); 429 if(!connectorMap.containsKey(key)){ 430 ActiveMQActivationSpec ac = new ActiveMQActivationSpec(); 431 ac.setDestinationType("javax.jms.Queue"); 432 ac.setDestination(INBOUND_PREFIX + key); 433 JCAConnector connector = new JCAConnector(); 434 connector.setBootstrapContext(getBootstrapContext()); 435 connector.setActivationSpec(ac); 436 connector.setResourceAdapter(resourceAdapter); 437 connector.setEndpointFactory(new SingletonEndpointFactory(this, getTransactionManager())); 438 connector.start(); 439 connectorMap.put(key, connector); 440 } 441 if (broadcast) { 443 log.debug(broker.getContainer().getName() + ": broadcasting info for " + event); 444 sendJmsMessage(broadcastTopic, event, false, false); 445 } 446 } catch (Exception e) { 447 log.error("Cannot create consumer for " + event.getEndpoint(), e); 448 } 449 } 450 451 public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) { 452 try{ 453 String key = EndpointSupport.getKey(event.getEndpoint()); 454 JCAConnector connector=(JCAConnector) connectorMap.remove(key); 455 if(connector!=null){ 456 connector.destroy(); 457 } 458 if (broadcast) { 460 log.debug(broker.getContainer().getName() + ": broadcasting info for " + event); 461 sendJmsMessage(broadcastTopic, event, false, false); 462 } 463 } catch (Exception e) { 464 log.error("Cannot destroy consumer for " + event, e); 465 } 466 } 467 468 public void onComponentStarted(ComponentEvent event) { 469 if (!started.get()) { 470 return; 471 } 472 try { 473 String key = event.getComponent().getName(); 474 if(!connectorMap.containsKey(key)){ 475 ActiveMQActivationSpec ac = new ActiveMQActivationSpec(); 476 ac.setDestinationType("javax.jms.Queue"); 477 ac.setDestination(INBOUND_PREFIX + key); 478 JCAConnector connector = new JCAConnector(); 479 connector.setBootstrapContext(getBootstrapContext()); 480 connector.setActivationSpec(ac); 481 connector.setResourceAdapter(resourceAdapter); 482 connector.setEndpointFactory(new SingletonEndpointFactory(this, getTransactionManager())); 483 connector.start(); 484 connectorMap.put(key, connector); 485 } 486 } catch (Exception e) { 487 log.error("Cannot create consumer for component " + event.getComponent().getName(), e); 488 } 489 } 490 491 public void onComponentStopped(ComponentEvent event) { 492 try { 493 String key = event.getComponent().getName(); 494 JCAConnector connector = (JCAConnector) connectorMap.remove(key); 495 if (connector != null){ 496 connector.destroy(); 497 } 498 } catch (Exception e) { 499 log.error("Cannot destroy consumer for component " + event.getComponent().getName(), e); 500 } 501 } 502 503 public void onRemoteEndpointRegistered(EndpointEvent event) { 504 log.debug(broker.getContainer().getName() + ": adding remote endpoint: " + event.getEndpoint()); 505 broker.getContainer().getRegistry().registerRemoteEndpoint(event.getEndpoint()); 506 } 507 508 public void onRemoteEndpointUnregistered(EndpointEvent event) { 509 log.debug(broker.getContainer().getName() + ": removing remote endpoint: " + event.getEndpoint()); 510 broker.getContainer().getRegistry().unregisterRemoteEndpoint(event.getEndpoint()); 511 } 512 513 519 protected void doSend(MessageExchangeImpl me) throws MessagingException { 520 doRouting(me); 521 } 522 523 529 public void doRouting(final MessageExchangeImpl me) throws MessagingException { 530 try { 532 String destination; 533 if (me.getRole() == Role.PROVIDER) { 534 if (me.getDestinationId() == null) { 535 destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint()); 536 } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) { 537 destination = INBOUND_PREFIX + me.getDestinationId().getName(); 538 } else { 539 destination = INBOUND_PREFIX + me.getDestinationId().getContainerName(); 540 } 541 } else { 542 if (me.getSourceId() == null) { 543 throw new IllegalStateException ("No sourceId set on the exchange"); 544 } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) { 545 if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) { 550 destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT); 551 } else { 552 destination = INBOUND_PREFIX + me.getSourceId().getName(); 553 } 554 } else { 555 destination = INBOUND_PREFIX + me.getSourceId().getContainerName(); 556 } 557 } 558 if (me.isTransacted()) { 559 me.setTxState(MessageExchangeImpl.TX_STATE_ENLISTED); 560 } 561 sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted()); 562 } catch (JMSException e) { 563 log.error("Failed to send exchange: " + me + " internal JMS Network", e); 564 throw new MessagingException(e); 565 } catch (SystemException e) { 566 log.error("Failed to send exchange: " + me + " transaction problem", e); 567 throw new MessagingException(e); 568 } 569 } 570 571 576 public void onMessage(Message message) { 577 try { 578 if (message != null && started.get()) { 579 ObjectMessage objMsg = (ObjectMessage ) message; 580 final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject(); 581 TransactionManager tm = (TransactionManager ) getTransactionManager(); 586 if (tm != null) { 587 me.setTransactionContext(tm.getTransaction()); 588 } 589 if (me.getDestinationId() == null) { 590 ServiceEndpoint se = me.getEndpoint(); 591 se = broker.getContainer().getRegistry() 592 .getInternalEndpoint(se.getServiceName(), se.getEndpointName()); 593 me.setEndpoint(se); 594 me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace()); 595 } 596 super.doRouting(me); 597 } 598 } 599 catch (JMSException jmsEx) { 600 log.error("Caught an exception unpacking JMS Message: ", jmsEx); 601 } 602 catch (MessagingException e) { 603 log.error("Caught an exception routing ExchangePacket: ", e); 604 } 605 catch (SystemException e) { 606 log.error("Caught an exception acessing transaction context: ", e); 607 } 608 } 609 610 protected void onAdvisoryMessage(Object obj) { 611 if (obj instanceof ConsumerInfo) { 612 ConsumerInfo info = (ConsumerInfo) obj; 613 subscriberSet.add(info.getConsumerId().getConnectionId()); 614 ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null); 615 for (int i = 0; i < endpoints.length; i++) { 616 if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) { 617 onInternalEndpointRegistered(new EndpointEvent(endpoints[i], 618 EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true); 619 } 620 } 621 } else if (obj instanceof RemoveInfo) { 622 ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId(); 623 subscriberSet.remove(id.getConnectionId()); 624 removeAllPackets(id.getConnectionId()); 625 } 626 } 627 628 private void removeAllPackets(String containerName) { 629 } 631 632 public ConnectionManager getConnectionManager() throws Exception { 633 if (connectionManager == null) { 634 ConnectionManagerFactoryBean cmfb = new ConnectionManagerFactoryBean(); 635 cmfb.setTransactionContextManager(getTransactionContextManager()); 636 cmfb.setPoolingSupport(new SinglePool( 637 16, 0, 100, 1, true, true, true)); cmfb.setTransactionSupport(new XATransactions( 645 true, false)); cmfb.afterPropertiesSet(); 648 connectionManager = (ConnectionManager ) cmfb.getObject(); 649 } 650 return connectionManager; 651 } 652 653 public void setConnectionManager(ConnectionManager connectionManager) { 654 this.connectionManager = connectionManager; 655 } 656 657 public TransactionContextManager getTransactionContextManager() { 658 if (transactionContextManager == null) { 659 if (broker != null && broker.getContainer() instanceof SpringJBIContainer) { 660 ApplicationContext applicationContext = ((SpringJBIContainer) broker.getContainer()).getApplicationContext(); 661 if (applicationContext != null) { 662 Map map = applicationContext.getBeansOfType(TransactionContextManager.class); 663 if( map.size() == 1) { 664 transactionContextManager = (TransactionContextManager) map.values().iterator().next(); 665 } 666 } 667 } 668 } 669 return transactionContextManager; 670 } 671 672 public void setTransactionContextManager( 673 TransactionContextManager transactionContextManager) { 674 this.transactionContextManager = transactionContextManager; 675 } 676 677 public BootstrapContext getBootstrapContext() { 678 if (bootstrapContext == null) { 679 GeronimoWorkManager wm = (GeronimoWorkManager) broker.getContainer().getWorkManager(); 680 bootstrapContext = new BootstrapContextImpl(wm); 681 } 682 return bootstrapContext; 683 } 684 685 public void setBootstrapContext(BootstrapContext bootstrapContext) { 686 this.bootstrapContext = bootstrapContext; 687 } 688 689 public String toString(){ 690 return broker.getContainer().getName() + " JCAFlow"; 691 } 692 693 private void sendJmsMessage(Destination dest, Serializable object, boolean persistent, boolean transacted) throws JMSException , SystemException { 694 if (transacted) { 695 TransactionManager tm = (TransactionManager ) getBroker().getContainer().getTransactionManager(); 696 if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) { 697 return; 698 } 699 } 700 Connection connection = connectionFactory.createConnection(); 701 try { 702 Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); 703 ObjectMessage msg = session.createObjectMessage(object); 704 MessageProducer producer = session.createProducer(dest); 705 producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 706 producer.send(msg); 707 } finally { 708 connection.close(); 709 } 710 } 711 712 } 713 | Popular Tags |