1 10 11 package org.mule.extras.spring.events; 12 13 import java.beans.ExceptionListener ; 14 import java.util.ArrayList ; 15 import java.util.Iterator ; 16 import java.util.List ; 17 import java.util.Map ; 18 import java.util.Set ; 19 20 import org.apache.commons.logging.Log; 21 import org.apache.commons.logging.LogFactory; 22 import org.mule.MuleManager; 23 import org.mule.MuleRuntimeException; 24 import org.mule.config.MuleConfiguration; 25 import org.mule.config.ThreadingProfile; 26 import org.mule.config.builders.QuickConfigurationBuilder; 27 import org.mule.config.i18n.Message; 28 import org.mule.extras.spring.SpringContainerContext; 29 import org.mule.impl.MuleDescriptor; 30 import org.mule.impl.MuleEvent; 31 import org.mule.impl.MuleMessage; 32 import org.mule.impl.MuleSession; 33 import org.mule.impl.RequestContext; 34 import org.mule.impl.endpoint.MuleEndpoint; 35 import org.mule.impl.endpoint.MuleEndpointURI; 36 import org.mule.providers.AbstractConnector; 37 import org.mule.routing.filters.ObjectFilter; 38 import org.mule.routing.filters.WildcardFilter; 39 import org.mule.umo.UMOComponent; 40 import org.mule.umo.UMODescriptor; 41 import org.mule.umo.UMOEventContext; 42 import org.mule.umo.UMOException; 43 import org.mule.umo.UMOSession; 44 import org.mule.umo.endpoint.MalformedEndpointException; 45 import org.mule.umo.endpoint.UMOEndpoint; 46 import org.mule.umo.endpoint.UMOEndpointURI; 47 import org.mule.umo.lifecycle.InitialisationException; 48 import org.mule.umo.manager.UMOManager; 49 import org.mule.umo.model.UMOModel; 50 import org.mule.umo.provider.UMOConnector; 51 import org.mule.umo.provider.UMOMessageDispatcher; 52 import org.mule.umo.routing.UMOInboundMessageRouter; 53 import org.mule.umo.transformer.TransformerException; 54 import org.mule.umo.transformer.UMOTransformer; 55 import org.mule.util.ClassUtils; 56 import org.mule.util.MapUtils; 57 import org.springframework.beans.BeansException; 58 import org.springframework.context.ApplicationContext; 59 import org.springframework.context.ApplicationContextAware; 60 import org.springframework.context.ApplicationEvent; 61 import org.springframework.context.ApplicationListener; 62 import org.springframework.context.event.ApplicationEventMulticaster; 63 import org.springframework.context.event.ContextClosedEvent; 64 import org.springframework.context.event.ContextRefreshedEvent; 65 import org.springframework.context.support.AbstractApplicationContext; 66 67 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; 68 import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; 69 70 117 118 public class MuleEventMulticaster implements ApplicationEventMulticaster, ApplicationContextAware 119 { 120 public static final String EVENT_MULTICASTER_DESCRIPTOR_NAME = "muleEventMulticasterDescriptor"; 121 122 125 protected static Log logger = LogFactory.getLog(MuleEventMulticaster.class); 126 127 130 protected final Set listeners = new CopyOnWriteArraySet(); 131 132 135 protected boolean asynchronous = false; 136 137 140 protected ExecutorService asyncPool = null; 141 142 147 protected Map endpointMappings = null; 148 149 156 protected String [] subscriptions = null; 157 158 161 protected ApplicationContext applicationContext; 162 163 166 protected UMODescriptor descriptor; 167 168 171 protected UMOComponent component; 172 173 176 protected Class subscriptionFilter = WildcardFilter.class; 177 178 181 protected ExceptionListener exceptionListener = new LoggingExceptionListener(); 182 183 194 public void addApplicationListener(ApplicationListener listener) 195 { 196 Object listenerToAdd = listener; 197 198 if (asynchronous) 199 { 200 listenerToAdd = new AsynchronousEventListener(asyncPool, listener); 201 } 202 203 listeners.add(listenerToAdd); 204 } 205 206 211 public void removeApplicationListener(ApplicationListener listener) 212 { 213 for (Iterator iterator = listeners.iterator(); iterator.hasNext();) 214 { 215 ApplicationListener applicationListener = (ApplicationListener)iterator.next(); 216 if (applicationListener instanceof AsynchronousEventListener) 217 { 218 if (((AsynchronousEventListener)applicationListener).getListener().equals(listener)) 219 { 220 listeners.remove(applicationListener); 221 return; 222 } 223 } 224 else 225 { 226 if (applicationListener.equals(listener)) 227 { 228 listeners.remove(applicationListener); 229 return; 230 } 231 } 232 } 233 listeners.remove(listener); 234 } 235 236 239 public void removeAllListeners() 240 { 241 listeners.clear(); 242 } 243 244 257 public void multicastEvent(ApplicationEvent e) 258 { 259 MuleApplicationEvent muleEvent = null; 260 if (e instanceof ContextRefreshedEvent) 262 { 263 if (MuleManager.isInstanciated() && !MuleManager.getInstance().isInitialised()) 266 { 267 try 268 { 269 registerMulticasterDescriptor(); 270 } 271 catch (UMOException ex) 272 { 273 throw new MuleRuntimeException(new Message("spring", 1), ex); 274 } 275 } 276 else 277 { 278 initMule(); 279 } 280 } 281 else if (e instanceof ContextClosedEvent) 282 { 283 MuleManager.getInstance().dispose(); 284 return; 285 } 286 else if (e instanceof MuleApplicationEvent) 287 { 288 muleEvent = (MuleApplicationEvent)e; 289 if (muleEvent.getMuleEventContext() == null) 292 { 293 try 294 { 295 dispatchEvent(muleEvent); 296 } 297 catch (ApplicationEventException e1) 298 { 299 exceptionListener.exceptionThrown(e1); 300 } 301 return; 302 } 303 } 304 305 for (Iterator iterator = listeners.iterator(); iterator.hasNext();) 306 { 307 ApplicationListener listener = (ApplicationListener)iterator.next(); 308 if (muleEvent != null) 309 { 310 if (listener instanceof AsynchronousEventListener) 314 { 315 AsynchronousEventListener asyncListener = (AsynchronousEventListener)listener; 316 if (asyncListener.getListener() instanceof MuleSubscriptionEventListener) 317 { 318 if (isSubscriptionMatch(muleEvent.getEndpoint(), 319 ((MuleSubscriptionEventListener)asyncListener.getListener()).getSubscriptions())) 320 { 321 asyncListener.onApplicationEvent(muleEvent); 322 } 323 } 324 else if (asyncListener.getListener() instanceof MuleEventListener) 325 { 326 asyncListener.onApplicationEvent(muleEvent); 327 } 328 else if (!(asyncListener.getListener() instanceof MuleEventListener)) 329 { 330 asyncListener.onApplicationEvent(e); 331 } 332 } 334 else if (listener instanceof MuleSubscriptionEventListener) 335 { 336 if (isSubscriptionMatch(muleEvent.getEndpoint(), 337 ((MuleSubscriptionEventListener)listener).getSubscriptions())) 338 { 339 listener.onApplicationEvent(muleEvent); 340 } 341 } 342 else if (listener instanceof MuleEventListener) 343 { 344 listener.onApplicationEvent(muleEvent); 345 } 346 } 347 else if (listener instanceof AsynchronousEventListener 348 && !(((AsynchronousEventListener)listener).getListener() instanceof MuleEventListener)) 349 { 350 listener.onApplicationEvent(e); 351 } 352 else if (!(listener instanceof MuleEventListener)) 353 { 354 listener.onApplicationEvent(e); 355 } 356 else 357 { 358 for (int i = 0; i < listener.getClass().getInterfaces().length; i++) 361 { 362 if (listener.getClass().getInterfaces()[i].equals(ApplicationListener.class)) 363 { 364 listener.onApplicationEvent(e); 365 break; 366 } 367 } 368 369 } 370 } 371 } 372 373 380 private boolean isSubscriptionMatch(String endpoint, String [] subscriptions) 381 { 382 for (int i = 0; i < subscriptions.length; i++) 383 { 384 String subscription = MapUtils.getString(MuleManager.getInstance().getEndpointIdentifiers(), 385 subscriptions[i], subscriptions[i]); 386 387 404 ObjectFilter filter = createFilter(subscription); 405 if (filter.accept(endpoint)) 406 { 407 return true; 408 } 409 } 410 return false; 411 } 412 413 418 public boolean isAsynchronous() 419 { 420 return asynchronous; 421 } 422 423 428 public void setAsynchronous(boolean asynchronous) 429 { 430 this.asynchronous = asynchronous; 431 if (asynchronous) 432 { 433 if (asyncPool == null) 434 { 435 asyncPool = MuleManager.getConfiguration().getDefaultThreadingProfile().createPool( 436 "spring-events"); 437 } 438 } 439 else 440 { 441 if (asyncPool != null) 442 { 443 asyncPool.shutdown(); 444 asyncPool = null; 445 } 446 } 447 } 448 449 455 public void onMuleEvent(UMOEventContext context) throws TransformerException, MalformedEndpointException 456 { 457 multicastEvent(new MuleApplicationEvent(context.getTransformedMessage(), context, applicationContext)); 458 context.setStopFurtherProcessing(true); 459 } 460 461 468 protected void dispatchEvent(MuleApplicationEvent applicationEvent) throws ApplicationEventException 469 { 470 UMOEndpoint endpoint = null; 471 try 472 { 473 endpoint = MuleEndpoint.getOrCreateEndpointForUri(applicationEvent.getEndpoint(), 474 UMOEndpoint.ENDPOINT_TYPE_SENDER); 475 } 476 catch (UMOException e) 477 { 478 throw new ApplicationEventException("Failed to get endpoint for endpointUri: " 479 + applicationEvent.getEndpoint(), e); 480 } 481 if (endpoint != null) 482 { 483 try 484 { 485 489 MuleMessage message = new MuleMessage(applicationEvent.getSource(), 490 applicationEvent.getProperties()); 491 if (applicationEvent.getMuleEventContext() != null) 494 { 495 applicationEvent.getMuleEventContext().setStopFurtherProcessing(true); 497 applicationEvent.getMuleEventContext().dispatchEvent(message, endpoint); 498 } 499 else 500 { 501 UMOSession session = new MuleSession(message, 502 ((AbstractConnector)endpoint.getConnector()).getSessionHandler(), component); 503 RequestContext.setEvent(new MuleEvent(message, endpoint, session, false)); 504 if (endpoint.getTransformer() != null) 506 { 507 message = new MuleMessage(endpoint.getTransformer().transform( 508 applicationEvent.getSource()), applicationEvent.getProperties()); 509 } 510 UMOMessageDispatcher dispatcher = endpoint.getConnector().getDispatcher(endpoint); 511 dispatcher.dispatch(new MuleEvent(message, endpoint, session, false)); 512 } 513 } 514 catch (Exception e1) 515 { 516 throw new ApplicationEventException("Failed to dispatch event: " + e1.getMessage(), e1); 517 } 518 } 519 else 520 { 521 throw new ApplicationEventException("Failed endpoint using name: " 522 + applicationEvent.getEndpoint()); 523 } 524 } 525 526 532 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 533 { 534 this.applicationContext = applicationContext; 535 } 536 537 protected void initMule() 538 { 539 try 540 { 541 if (applicationContext.containsBean(EVENT_MULTICASTER_DESCRIPTOR_NAME)) 544 { 545 descriptor = (UMODescriptor)applicationContext.getBean(EVENT_MULTICASTER_DESCRIPTOR_NAME); 546 } 547 if (applicationContext.containsBean("muleManager")) 550 { 551 registerMulticasterDescriptor(); 553 return; 554 } 555 UMOManager manager = MuleManager.getInstance(); 556 Map map = applicationContext.getBeansOfType(MuleConfiguration.class); 557 if (map != null && map.size() > 0) 558 { 559 MuleManager.setConfiguration((MuleConfiguration)map.values().iterator().next()); 560 } 561 if (!manager.isStarted()) 562 { 563 MuleManager.getConfiguration().setSynchronous(!asynchronous); 564 registerEndpointMappings(); 566 } 567 SpringContainerContext containerContext = new SpringContainerContext(); 569 containerContext.setBeanFactory(applicationContext); 570 manager.setContainerContext(null); 571 manager.setContainerContext(containerContext); 572 573 registerConnectors(); 575 576 registerTransformers(); 578 579 registerGlobalEndpoints(); 580 581 registerMulticasterDescriptor(); 583 584 if (!manager.isStarted()) 585 { 586 manager.start(); 587 } 588 } 589 catch (UMOException e) 590 { 591 throw new MuleRuntimeException(new Message("spring", 1), e); 592 } 593 } 594 595 protected void registerMulticasterDescriptor() throws UMOException 596 { 597 if (descriptor == null) 599 { 600 descriptor = getDefaultDescriptor(); 601 setSubscriptionsOnDescriptor((MuleDescriptor)descriptor); 602 component = MuleManager.getInstance().getModel().registerComponent(descriptor); 603 } 604 } 605 606 protected void setSubscriptionsOnDescriptor(MuleDescriptor descriptor) throws UMOException 607 { 608 String [] subscriptions; 609 List endpoints = new ArrayList (); 610 for (Iterator iterator = listeners.iterator(); iterator.hasNext();) 611 { 612 ApplicationListener listener = (ApplicationListener)iterator.next(); 613 if (listener instanceof AsynchronousEventListener) 614 { 615 listener = ((AsynchronousEventListener)listener).getListener(); 616 } 617 if (listener instanceof MuleSubscriptionEventListener) 618 { 619 subscriptions = ((MuleSubscriptionEventListener)listener).getSubscriptions(); 620 for (int i = 0; i < subscriptions.length; i++) 621 { 622 if (subscriptions[i].indexOf("*") == -1 && MuleEndpointURI.isMuleUri(subscriptions[i])) 623 { 624 boolean isSoap = registerAsSoap(subscriptions[i], listener); 625 626 if (!isSoap) 627 { 628 endpoints.add(subscriptions[i]); 629 } 630 } 631 } 632 } 633 } 634 if (endpoints.size() > 0) 635 { 636 String endpoint; 637 for (Iterator iterator = endpoints.iterator(); iterator.hasNext();) 638 { 639 endpoint = (String )iterator.next(); 640 descriptor.getInboundRouter().addEndpoint(new MuleEndpoint(endpoint, true)); 641 } 642 } 643 } 644 645 private boolean registerAsSoap(String endpoint, Object listener) throws UMOException 646 { 647 if (endpoint.startsWith("glue") || endpoint.startsWith("soap") || endpoint.startsWith("axis")) 648 { 649 UMOEndpointURI ep = new MuleEndpointURI(endpoint); 650 QuickConfigurationBuilder builder = new QuickConfigurationBuilder(); 651 652 String serviceName = null; 654 if (ep.getPath() != null) 655 { 656 String path = ep.getPath(); 657 if (path.endsWith("/")) 658 { 659 path = path.substring(0, path.length() - 1); 660 } 661 int i = path.lastIndexOf("/"); 662 if (i > -1) 663 { 664 serviceName = path.substring(i + 1); 665 } 666 } 667 else 668 { 669 serviceName = descriptor.getName(); 670 } 671 String newEndpoint = endpoint; 673 int i = newEndpoint.indexOf(serviceName); 674 newEndpoint = newEndpoint.substring(0, i - 1); 675 builder.registerComponentInstance(listener, serviceName, new MuleEndpointURI(newEndpoint)); 676 return true; 677 } 678 else 679 { 680 return false; 681 } 682 } 683 684 protected void registerEndpointMappings() throws InitialisationException 685 { 686 if (endpointMappings != null) 688 { 689 Map.Entry entry = null; 690 for (Iterator iterator = endpointMappings.entrySet().iterator(); iterator.hasNext();) 691 { 692 entry = (Map.Entry )iterator.next(); 693 MuleManager.getInstance().registerEndpointIdentifier((String )entry.getKey(), 694 (String )entry.getValue()); 695 } 696 } 697 } 698 699 protected void registerConnectors() throws UMOException 700 { 701 if (!MuleManager.getInstance().isInitialised()) 702 { 703 Map connectors = applicationContext.getBeansOfType(UMOConnector.class, true, true); 705 if (connectors.size() > 0) 706 { 707 Map.Entry entry; 708 UMOConnector c; 709 for (Iterator iterator = connectors.entrySet().iterator(); iterator.hasNext();) 710 { 711 entry = (Map.Entry )iterator.next(); 712 c = (UMOConnector)entry.getValue(); 713 if (c.getName() == null) 714 { 715 c.setName(entry.getKey().toString()); 716 } 717 MuleManager.getInstance().registerConnector(c); 718 } 719 } 720 } 721 } 722 723 protected void registerGlobalEndpoints() throws UMOException 724 { 725 if (!MuleManager.getInstance().isInitialised()) 726 { 727 Map endpoints = applicationContext.getBeansOfType(UMOEndpoint.class, true, true); 729 if (endpoints.size() > 0) 730 { 731 Map.Entry entry; 732 UMOEndpoint endpoint; 733 for (Iterator iterator = endpoints.entrySet().iterator(); iterator.hasNext();) 734 { 735 entry = (Map.Entry )iterator.next(); 736 endpoint = (UMOEndpoint)entry.getValue(); 737 if (endpoint.getName() == null) 738 { 739 endpoint.setName(entry.getKey().toString()); 740 } 741 MuleManager.getInstance().registerEndpoint(endpoint); 742 } 743 } 744 } 745 } 746 747 protected void registerTransformers() throws UMOException 748 { 749 if (!MuleManager.getInstance().isInitialised()) 750 { 751 Map transformers = applicationContext.getBeansOfType(UMOTransformer.class, true, true); 753 if (transformers.size() > 0) 754 { 755 Map.Entry entry; 756 UMOTransformer t; 757 for (Iterator iterator = transformers.entrySet().iterator(); iterator.hasNext();) 758 { 759 entry = (Map.Entry )iterator.next(); 760 t = (UMOTransformer)entry.getValue(); 761 if (t.getName() == null) 762 { 763 t.setName(entry.getKey().toString()); 764 } 765 MuleManager.getInstance().registerTransformer(t); 766 } 767 } 768 } 769 } 770 771 protected UMODescriptor getDefaultDescriptor() throws UMOException 772 { 773 UMOModel model = MuleManager.getInstance().getModel(); 776 UMODescriptor descriptor = model.getDescriptor(EVENT_MULTICASTER_DESCRIPTOR_NAME); 777 if (descriptor != null) 778 { 779 model.unregisterComponent(descriptor); 780 } 781 descriptor = new MuleDescriptor(EVENT_MULTICASTER_DESCRIPTOR_NAME); 782 if (subscriptions == null) 783 { 784 logger.info("No receive endpoints have been set, using default '*'"); 785 descriptor.setInboundEndpoint(new MuleEndpoint("vm://*", true)); 786 } 787 else 788 { 789 UMOInboundMessageRouter messageRouter = descriptor.getInboundRouter(); 791 792 for (int i = 0; i < subscriptions.length; i++) 793 { 794 String subscription = subscriptions[i]; 795 UMOEndpointURI endpointUri = new MuleEndpointURI(subscription); 796 UMOEndpoint endpoint = MuleEndpoint.getOrCreateEndpointForUri(endpointUri, 797 UMOEndpoint.ENDPOINT_TYPE_RECEIVER); 798 if (!asynchronous) 799 { 800 endpoint.setSynchronous(true); 801 } 802 messageRouter.addEndpoint(endpoint); 803 } 804 } 805 descriptor.setImplementation(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME); 807 return descriptor; 808 } 809 810 protected ObjectFilter createFilter(String pattern) 811 { 812 try 813 { 814 if (getSubscriptionFilter() == null) 815 { 816 setSubscriptionFilter(WildcardFilter.class); 817 } 818 ObjectFilter filter = (ObjectFilter)ClassUtils.instanciateClass(getSubscriptionFilter(), 819 new Object []{pattern}); 820 return filter; 821 } 822 catch (Exception e) 823 { 824 exceptionListener.exceptionThrown(e); 825 return new WildcardFilter(pattern); 826 } 827 } 828 829 835 public Class getSubscriptionFilter() 836 { 837 return subscriptionFilter; 838 } 839 840 845 public void setSubscriptionFilter(Class subscriptionFilter) 846 { 847 this.subscriptionFilter = subscriptionFilter; 848 } 849 850 857 public Map getEndpointMappings() 858 { 859 return endpointMappings; 860 } 861 862 869 public void setEndpointMappings(Map endpointMappings) 870 { 871 this.endpointMappings = endpointMappings; 872 } 873 874 883 public String [] getSubscriptions() 884 { 885 return subscriptions; 886 } 887 888 897 public void setSubscriptions(String [] subscriptions) 898 { 899 this.subscriptions = subscriptions; 900 } 901 902 protected void setExceptionListener(ExceptionListener listener) 903 { 904 if (listener != null) 905 { 906 this.exceptionListener = listener; 907 } 908 else 909 { 910 throw new NullPointerException ("exceptionListener"); 911 } 912 } 913 914 private class LoggingExceptionListener implements ExceptionListener 915 { 916 public void exceptionThrown(Exception e) 917 { 918 logger.error(e.getMessage(), e); 919 } 920 } 921 } 922 | Popular Tags |