1 10 11 package org.mule.extras.client; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.Callable; 14 import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; 15 import org.apache.commons.logging.Log; 16 import org.apache.commons.logging.LogFactory; 17 import org.mule.MuleManager; 18 import org.mule.config.ConfigurationBuilder; 19 import org.mule.config.ConfigurationException; 20 import org.mule.config.MuleConfiguration; 21 import org.mule.config.MuleProperties; 22 import org.mule.config.builders.MuleXmlConfigurationBuilder; 23 import org.mule.config.builders.QuickConfigurationBuilder; 24 import org.mule.config.i18n.Message; 25 import org.mule.config.i18n.Messages; 26 import org.mule.impl.MuleEvent; 27 import org.mule.impl.MuleMessage; 28 import org.mule.impl.MuleSession; 29 import org.mule.impl.endpoint.MuleEndpoint; 30 import org.mule.impl.endpoint.MuleEndpointURI; 31 import org.mule.impl.security.MuleCredentials; 32 import org.mule.providers.AbstractConnector; 33 import org.mule.providers.service.ConnectorFactory; 34 import org.mule.umo.FutureMessageResult; 35 import org.mule.umo.MessagingException; 36 import org.mule.umo.UMODescriptor; 37 import org.mule.umo.UMOEvent; 38 import org.mule.umo.UMOException; 39 import org.mule.umo.UMOMessage; 40 import org.mule.umo.UMOSession; 41 import org.mule.umo.endpoint.UMOEndpoint; 42 import org.mule.umo.endpoint.UMOEndpointURI; 43 import org.mule.umo.lifecycle.Disposable; 44 import org.mule.umo.manager.UMOManager; 45 import org.mule.umo.provider.DispatchException; 46 import org.mule.umo.provider.ReceiveException; 47 import org.mule.umo.provider.UMOConnector; 48 import org.mule.umo.provider.UMOStreamMessageAdapter; 49 import org.mule.umo.transformer.UMOTransformer; 50 import org.mule.util.MuleObjectHelper; 51 import org.mule.util.StringUtils; 52 53 import java.util.ArrayList ; 54 import java.util.HashMap ; 55 import java.util.Iterator ; 56 import java.util.List ; 57 import java.util.Map ; 58 59 87 public class MuleClient implements Disposable 88 { 89 92 protected static Log logger = LogFactory.getLog(MuleClient.class); 93 94 97 private UMOManager manager; 98 99 108 private ExecutorService executor = null; 109 110 private List dispatchers = new ArrayList (); 111 112 QuickConfigurationBuilder builder = null; 114 115 private MuleCredentials user; 116 117 123 public MuleClient() throws UMOException 124 { 125 init(true); 126 } 127 128 135 public MuleClient(boolean startManager) throws UMOException 136 { 137 init(startManager); 138 } 139 140 150 public MuleClient(String configResources) throws UMOException 151 { 152 this(configResources, new MuleXmlConfigurationBuilder()); 153 } 154 155 163 public MuleClient(String user, String password) throws UMOException 164 { 165 init(true); 166 this.user = new MuleCredentials(user, password.toCharArray()); 167 } 168 169 179 public MuleClient(String configResources, ConfigurationBuilder builder) throws ConfigurationException 180 { 181 if (MuleManager.isInstanciated()) 182 { 183 throw new ConfigurationException(new Message(Messages.MANAGER_IS_ALREADY_CONFIGURED)); 184 } 185 if (builder == null) 186 { 187 logger.info("Builder passed in was null, using default builder: " 188 + MuleXmlConfigurationBuilder.class.getName()); 189 builder = new MuleXmlConfigurationBuilder(); 190 } 191 manager = builder.configure(configResources, null); 192 } 193 194 206 public MuleClient(String configResources, ConfigurationBuilder builder, String user, String password) 207 throws ConfigurationException 208 { 209 this(configResources, builder); 210 this.user = new MuleCredentials(user, password.toCharArray()); 211 } 212 213 219 private void init(boolean startManager) throws UMOException 220 { 221 if (MuleManager.isInstanciated()) 225 { 226 if (logger.isInfoEnabled()) 227 { 228 logger.info("There is already a manager available to this client locally, no need to create a new one"); 229 } 230 } 231 else 232 { 233 MuleManager.getConfiguration().setClientMode(true); 234 if (logger.isInfoEnabled()) 235 { 236 logger.info("There is no manager instance available locally for this client, Creating a new Manager"); 237 } 238 } 239 240 manager = MuleManager.getInstance(); 241 builder = new QuickConfigurationBuilder(); 242 243 if (!manager.isInitialised() && startManager == true) 244 { 245 if (logger.isInfoEnabled()) logger.info("Starting Mule Manager for this client"); 246 ((MuleManager)manager).start(); 247 } 248 } 249 250 262 public void dispatch(String url, Object payload, Map messageProperties) throws UMOException 263 { 264 dispatch(url, new MuleMessage(payload, messageProperties)); 265 } 266 267 276 public void dispatch(String url, UMOMessage message) throws UMOException 277 { 278 UMOEvent event = getEvent(message, url, false, false); 279 try 280 { 281 event.getSession().dispatchEvent(event); 282 } 283 catch (UMOException e) 284 { 285 throw e; 286 } 287 catch (Exception e) 288 { 289 throw new DispatchException(new Message("client", 1), event.getMessage(), event.getEndpoint(), e); 290 } 291 } 292 293 302 public void dispatchStream(String url, UMOStreamMessageAdapter message) throws UMOException 303 { 304 UMOEvent event = getEvent(new MuleMessage(message), url, false, true); 305 try 306 { 307 event.getSession().dispatchEvent(event); 308 } 309 catch (UMOException e) 310 { 311 throw e; 312 } 313 catch (Exception e) 314 { 315 throw new DispatchException(new Message("client", 1), event.getMessage(), event.getEndpoint(), e); 316 } 317 } 318 319 public UMOStreamMessageAdapter sendStream(String url, UMOStreamMessageAdapter message) 320 throws UMOException 321 { 322 return sendStream(url, message, UMOEvent.TIMEOUT_NOT_SET_VALUE); 323 } 324 325 336 public UMOStreamMessageAdapter sendStream(String url, UMOStreamMessageAdapter message, int timeout) 337 throws UMOException 338 { 339 UMOEvent event = getEvent(new MuleMessage(message), url, true, true); 340 event.setTimeout(timeout); 341 try 342 { 343 UMOMessage result = event.getSession().sendEvent(event); 344 if (result != null) 345 { 346 if (result.getAdapter() instanceof UMOStreamMessageAdapter) 347 { 348 return (UMOStreamMessageAdapter)result.getAdapter(); 349 } 350 else 351 { 352 throw new IllegalStateException ( 354 "Mismatch of stream states. A stream was used for outbound channel, but a stream was not used for the response"); 355 } 356 } 357 } 358 catch (UMOException e) 359 { 360 throw e; 361 } 362 catch (Exception e) 363 { 364 throw new DispatchException(new Message("client", 1), event.getMessage(), event.getEndpoint(), e); 365 } 366 return null; 367 } 368 369 382 public UMOMessage sendDirect(String component, String transformers, Object payload, Map messageProperties) 383 throws UMOException 384 { 385 UMOMessage message = new MuleMessage(payload, messageProperties); 386 return sendDirect(component, transformers, message); 387 } 388 389 400 public UMOMessage sendDirect(String component, String transformers, UMOMessage message) 401 throws UMOException 402 { 403 boolean compregistered = getManager().getModel().isComponentRegistered(component); 404 if (!compregistered) 405 { 406 throw new MessagingException(new Message(Messages.X_NOT_REGISTERED_WITH_MANAGER, "Component '" 407 + component 408 + "'"), message, 409 null); 410 } 411 UMOTransformer trans = null; 412 if (transformers != null) 413 { 414 trans = MuleObjectHelper.getTransformer(transformers, ","); 415 } 416 417 if (!MuleManager.getConfiguration().isSynchronous()) 418 { 419 logger.warn("The mule manager is running synchronously, a null message payload will be returned"); 420 } 421 UMOSession session = getManager().getModel().getComponentSession(component); 422 UMOEndpoint endpoint = getDefaultClientEndpoint(session.getComponent().getDescriptor(), 423 message.getPayload()); 424 UMOEvent event = new MuleEvent(message, endpoint, session, true); 425 426 if (logger.isDebugEnabled()) 427 { 428 logger.debug("MuleClient sending event direct to: " + component + ". Event is: " + event); 429 } 430 431 UMOMessage result = event.getComponent().sendEvent(event); 432 433 if (logger.isDebugEnabled()) 434 { 435 logger.debug("Result of MuleClient sendDirect is: " 436 + (result == null ? "null" : result.getPayload())); 437 } 438 439 if (result != null && trans != null) 440 { 441 return new MuleMessage(trans.transform(result.getPayload())); 442 } 443 else 444 { 445 return result; 446 } 447 } 448 449 459 public void dispatchDirect(String component, Object payload, Map messageProperties) throws UMOException 460 { 461 dispatchDirect(component, new MuleMessage(payload, messageProperties)); 462 } 463 464 472 public void dispatchDirect(String component, UMOMessage message) throws UMOException 473 { 474 boolean compregistered = getManager().getModel().isComponentRegistered(component); 475 if (!compregistered) 476 { 477 throw new MessagingException(new Message(Messages.X_NOT_REGISTERED_WITH_MANAGER, "Component '" 478 + component 479 + "'"), message, 480 null); 481 } 482 UMOSession session = getManager().getModel().getComponentSession(component); 483 UMOEndpoint endpoint = getDefaultClientEndpoint(session.getComponent().getDescriptor(), 484 message.getPayload()); 485 UMOEvent event = new MuleEvent(message, endpoint, session, true); 486 487 if (logger.isDebugEnabled()) 488 { 489 logger.debug("MuleClient dispatching event direct to: " + component + ". Event is: " + event); 490 } 491 492 event.getComponent().dispatchEvent(event); 493 } 494 495 507 public FutureMessageResult sendAsync(final String url, final Object payload, final Map messageProperties) 508 throws UMOException 509 { 510 return sendAsync(url, payload, messageProperties, 0); 511 } 512 513 523 public FutureMessageResult sendAsync(final String url, final UMOMessage message) throws UMOException 524 { 525 return sendAsync(url, message, UMOEvent.TIMEOUT_NOT_SET_VALUE); 526 } 527 528 541 public FutureMessageResult sendAsync(final String url, 542 final Object payload, 543 final Map messageProperties, 544 final int timeout) throws UMOException 545 { 546 return sendAsync(url, new MuleMessage(payload, messageProperties), timeout); 547 } 548 549 560 public FutureMessageResult sendAsync(final String url, final UMOMessage message, final int timeout) 561 throws UMOException 562 { 563 Callable call = new Callable() 564 { 565 public Object call() throws Exception 566 { 567 return send(url, message, timeout); 568 } 569 }; 570 571 FutureMessageResult result = new FutureMessageResult(call); 572 573 if (executor != null) 574 { 575 result.setExecutor(executor); 576 } 577 578 result.execute(); 579 return result; 580 } 581 582 600 public FutureMessageResult sendDirectAsync(final String component, 601 String transformers, 602 final Object payload, 603 final Map messageProperties) throws UMOException 604 { 605 return sendDirectAsync(component, transformers, new MuleMessage(payload, messageProperties)); 606 } 607 608 624 public FutureMessageResult sendDirectAsync(final String component, 625 String transformers, 626 final UMOMessage message) throws UMOException 627 { 628 Callable call = new Callable() 629 { 630 public Object call() throws Exception 631 { 632 return sendDirect(component, null, message); 633 } 634 }; 635 636 FutureMessageResult result = new FutureMessageResult(call); 637 638 if (executor != null) 639 { 640 result.setExecutor(executor); 641 } 642 643 if (StringUtils.isNotBlank(transformers)) 644 { 645 result.setTransformer(MuleObjectHelper.getTransformer(transformers, ",")); 646 } 647 648 result.execute(); 649 return result; 650 } 651 652 666 public UMOMessage send(String url, Object payload, Map messageProperties) throws UMOException 667 { 668 return send(url, payload, messageProperties, UMOEvent.TIMEOUT_NOT_SET_VALUE); 669 } 670 671 682 public UMOMessage send(String url, UMOMessage message) throws UMOException 683 { 684 return send(url, message, UMOEvent.TIMEOUT_NOT_SET_VALUE); 685 } 686 687 703 public UMOMessage send(String url, Object payload, Map messageProperties, int timeout) 704 throws UMOException 705 { 706 if (messageProperties == null) 707 { 708 messageProperties = new HashMap (); 709 } 710 if (messageProperties.get(MuleProperties.MULE_REMOTE_SYNC_PROPERTY) == null) 711 { 712 messageProperties.put(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, "true"); 713 } 714 UMOMessage message = new MuleMessage(payload, messageProperties); 715 return send(url, message, timeout); 716 } 717 718 731 public UMOMessage send(String url, UMOMessage message, int timeout) throws UMOException 732 { 733 UMOEvent event = getEvent(message, url, true, false); 734 event.setTimeout(timeout); 735 736 try 737 { 738 return event.getSession().sendEvent(event); 739 } 740 catch (UMOException e) 741 { 742 throw e; 743 } 744 catch (Exception e) 745 { 746 throw new DispatchException(new Message("client", 1), event.getMessage(), event.getEndpoint(), e); 747 } 748 } 749 750 761 public UMOMessage receive(String url, long timeout) throws UMOException 762 { 763 UMOEndpoint endpoint = getEndpoint(url, UMOEndpoint.ENDPOINT_TYPE_RECEIVER); 764 try 765 { 766 UMOMessage message = endpoint.getConnector().getDispatcher(endpoint).receive(endpoint, timeout); 767 if (message != null && endpoint.getTransformer() != null) 768 { 769 if (endpoint.getTransformer().isSourceTypeSupported(message.getPayload().getClass())) 770 { 771 message = new MuleMessage(endpoint.getTransformer().transform(message.getPayload()), 772 message); 773 } 774 } 775 return message; 776 } 777 catch (Exception e) 778 { 779 throw new ReceiveException(endpoint, timeout, e); 780 } 781 } 782 783 796 public UMOMessage receive(String url, String transformers, long timeout) throws UMOException 797 { 798 return receive(url, MuleObjectHelper.getTransformer(transformers, ","), timeout); 799 } 800 801 813 public UMOMessage receive(String url, UMOTransformer transformer, long timeout) throws UMOException 814 { 815 UMOMessage message = receive(url, timeout); 816 if (message != null && transformer != null) 817 { 818 return new MuleMessage(transformer.transform(message.getPayload())); 819 } 820 else 821 { 822 return message; 823 } 824 } 825 826 836 protected UMOEvent getEvent(UMOMessage message, String uri, boolean synchronous, boolean streaming) 837 throws UMOException 838 { 839 UMOEndpoint endpoint = getEndpoint(uri, UMOEndpoint.ENDPOINT_TYPE_SENDER); 840 if (!endpoint.getConnector().isStarted() && manager.isStarted()) 841 { 842 endpoint.getConnector().startConnector(); 843 } 844 endpoint.setStreaming(streaming); 845 try 846 { 847 MuleSession session = new MuleSession(message, 848 ((AbstractConnector)endpoint.getConnector()).getSessionHandler()); 849 850 if (user != null) 851 { 852 message.setProperty(MuleProperties.MULE_USER_PROPERTY, MuleCredentials.createHeader( 853 user.getUsername(), user.getPassword())); 854 } 855 MuleEvent event = new MuleEvent(message, endpoint, session, synchronous); 856 return event; 857 } 858 catch (Exception e) 859 { 860 throw new DispatchException(new Message(Messages.FAILED_TO_CREATE_X, "Client event"), message, 861 endpoint, e); 862 } 863 } 864 865 protected UMOEndpoint getEndpoint(String uri, String type) throws UMOException 866 { 867 UMOEndpoint endpoint = manager.lookupEndpoint(uri); 868 if (endpoint == null) 869 { 870 endpoint = MuleEndpoint.getOrCreateEndpointForUri(uri, type); 871 } 872 return endpoint; 873 } 874 875 protected UMOEndpoint getDefaultClientEndpoint(UMODescriptor descriptor, Object payload) 876 throws UMOException 877 { 878 UMOEndpoint endpoint = descriptor.getInboundEndpoint(); 880 if (endpoint != null) 881 { 882 if (endpoint.getTransformer() != null) 883 { 884 if (endpoint.getTransformer().isSourceTypeSupported(payload.getClass())) 885 { 886 return endpoint; 887 } 888 else 889 { 890 endpoint = new MuleEndpoint(endpoint); 891 endpoint.setTransformer(null); 892 return endpoint; 893 } 894 } 895 else 896 { 897 return endpoint; 898 } 899 } 900 else 901 { 902 UMOConnector connector = null; 903 UMOEndpointURI defaultEndpointUri = new MuleEndpointURI("vm://mule.client"); 904 connector = ConnectorFactory.createConnector(defaultEndpointUri); 905 manager.registerConnector(connector); 906 connector.startConnector(); 907 endpoint = new MuleEndpoint("muleClientProvider", defaultEndpointUri, connector, null, 908 UMOEndpoint.ENDPOINT_TYPE_RECEIVER, 0, null, null); 909 } 910 911 manager.registerEndpoint(endpoint); 912 return endpoint; 913 } 914 915 927 public void sendNoReceive(String url, Object payload, Map messageProperties) throws UMOException 928 { 929 if (messageProperties == null) 930 { 931 messageProperties = new HashMap (); 932 } 933 messageProperties.put(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, "false"); 934 UMOMessage message = new MuleMessage(payload, messageProperties); 935 UMOEvent event = getEvent(message, url, true, false); 936 try 937 { 938 event.getSession().sendEvent(event); 939 } 940 catch (UMOException e) 941 { 942 throw e; 943 } 944 catch (Exception e) 945 { 946 throw new DispatchException(new Message("client", 1), event.getMessage(), event.getEndpoint(), e); 947 } 948 } 949 950 955 public UMOManager getManager() 956 { 957 return MuleManager.getInstance(); 958 } 959 960 972 public void registerComponent(Object component, String name, UMOEndpointURI listenerEndpoint) 973 throws UMOException 974 { 975 builder.registerComponentInstance(component, name, listenerEndpoint, null); 976 } 977 978 991 public void registerComponent(Object component, 992 String name, 993 MuleEndpointURI listenerEndpoint, 994 MuleEndpointURI sendEndpoint) throws UMOException 995 { 996 builder.registerComponentInstance(component, name, listenerEndpoint, sendEndpoint); 997 } 998 999 1014 public void registerComponent(UMODescriptor descriptor) throws UMOException 1015 { 1016 builder.registerComponent(descriptor); 1017 } 1018 1019 1031 public void unregisterComponent(String name) throws UMOException 1032 { 1033 builder.unregisterComponent(name); 1034 } 1035 1036 public RemoteDispatcher getRemoteDispatcher(String serverEndpoint) throws UMOException 1037 { 1038 RemoteDispatcher rd = new RemoteDispatcher(serverEndpoint); 1039 rd.setExecutorService(executor); 1040 dispatchers.add(rd); 1041 return rd; 1042 } 1043 1044 public RemoteDispatcher getRemoteDispatcher(String serverEndpoint, String user, String password) 1045 throws UMOException 1046 { 1047 RemoteDispatcher rd = new RemoteDispatcher(serverEndpoint, new MuleCredentials(user, 1048 password.toCharArray())); 1049 rd.setExecutorService(executor); 1050 dispatchers.add(rd); 1051 return rd; 1052 } 1053 1054 1058 public void dispose() 1059 { 1060 synchronized (dispatchers) 1061 { 1062 for (Iterator iterator = dispatchers.iterator(); iterator.hasNext();) 1063 { 1064 RemoteDispatcher remoteDispatcher = (RemoteDispatcher)iterator.next(); 1065 remoteDispatcher.dispose(); 1066 remoteDispatcher = null; 1067 } 1068 dispatchers.clear(); 1069 } 1070 if (MuleManager.getConfiguration().isClientMode()) 1072 { 1073 manager.dispose(); 1074 } 1075 } 1076 1077 public void setProperty(Object key, Object value) 1078 { 1079 manager.setProperty(key, value); 1080 } 1081 1082 public Object getProperty(Object key) 1083 { 1084 return manager.getProperty(key); 1085 } 1086 1087 public MuleConfiguration getConfiguration() 1088 { 1089 return MuleManager.getConfiguration(); 1090 } 1091} 1092 | Popular Tags |