1 10 11 package org.mule.providers; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 14 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap; 15 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 16 import org.apache.commons.beanutils.BeanUtils; 17 import org.apache.commons.lang.StringUtils; 18 import org.apache.commons.logging.Log; 19 import org.apache.commons.logging.LogFactory; 20 import org.mule.MuleManager; 21 import org.mule.MuleRuntimeException; 22 import org.mule.config.ThreadingProfile; 23 import org.mule.config.i18n.Message; 24 import org.mule.config.i18n.Messages; 25 import org.mule.impl.AlreadyInitialisedException; 26 import org.mule.impl.DefaultExceptionStrategy; 27 import org.mule.impl.MuleSessionHandler; 28 import org.mule.impl.internal.notifications.ConnectionNotification; 29 import org.mule.routing.filters.WildcardFilter; 30 import org.mule.umo.UMOComponent; 31 import org.mule.umo.UMOException; 32 import org.mule.umo.endpoint.UMOEndpoint; 33 import org.mule.umo.endpoint.UMOEndpointURI; 34 import org.mule.umo.endpoint.UMOImmutableEndpoint; 35 import org.mule.umo.lifecycle.DisposeException; 36 import org.mule.umo.lifecycle.Initialisable; 37 import org.mule.umo.lifecycle.InitialisationException; 38 import org.mule.umo.manager.UMOServerNotification; 39 import org.mule.umo.manager.UMOWorkManager; 40 import org.mule.umo.provider.ConnectorException; 41 import org.mule.umo.provider.UMOConnectable; 42 import org.mule.umo.provider.UMOConnector; 43 import org.mule.umo.provider.UMOMessageDispatcher; 44 import org.mule.umo.provider.UMOMessageDispatcherFactory; 45 import org.mule.umo.provider.UMOMessageReceiver; 46 import org.mule.umo.provider.UMOSessionHandler; 47 import org.mule.umo.transformer.UMOTransformer; 48 import org.mule.util.concurrent.WaitableBoolean; 49 50 import javax.resource.spi.work.WorkEvent ; 51 import javax.resource.spi.work.WorkListener ; 52 import java.beans.ExceptionListener ; 53 import java.util.ArrayList ; 54 import java.util.Collections ; 55 import java.util.Iterator ; 56 import java.util.List ; 57 import java.util.Map ; 58 59 87 public abstract class AbstractConnector 88 implements UMOConnector, ExceptionListener , UMOConnectable, WorkListener 89 { 90 93 protected transient Log logger = LogFactory.getLog(getClass()); 94 95 98 protected AtomicBoolean started = new AtomicBoolean(false); 99 100 103 protected AtomicBoolean initialised = new AtomicBoolean(false); 104 105 108 protected String name = null; 109 110 113 protected ExceptionListener exceptionListener = null; 114 115 118 protected AtomicBoolean disposed = new AtomicBoolean(false); 119 120 123 protected AtomicBoolean disposing = new AtomicBoolean(false); 124 125 128 protected UMOMessageDispatcherFactory dispatcherFactory; 129 130 133 protected ConcurrentMap dispatchers; 134 135 138 protected ConcurrentMap receivers; 139 140 143 private ThreadingProfile dispatcherThreadingProfile = null; 144 145 148 private ThreadingProfile receiverThreadingProfile = null; 149 150 154 protected boolean createDispatcherPerRequest = false; 155 156 163 protected boolean createMultipleTransactedReceivers = true; 164 165 169 protected UMOTransformer defaultInboundTransformer = null; 170 171 175 protected UMOTransformer defaultOutboundTransformer = null; 176 177 181 protected UMOTransformer defaultResponseTransformer = null; 182 183 private ConnectionStrategy connectionStrategy; 184 185 protected WaitableBoolean connected = new WaitableBoolean(false); 186 187 protected WaitableBoolean connecting = new WaitableBoolean(false); 188 189 193 protected WaitableBoolean startOnConnect = new WaitableBoolean(false); 194 195 199 private boolean enableMessageEvents = false; 200 201 private List supportedProtocols; 202 203 207 private UMOWorkManager receiverWorkManager = null; 208 209 213 private UMOWorkManager dispatcherWorkManager = null; 214 215 220 private boolean useSingleReceiverThreadPool = false; 221 222 227 private boolean useSingleDispatcherThreadPool = false; 228 229 233 protected boolean serverSide = true; 234 235 239 protected UMOSessionHandler sessionHandler = new MuleSessionHandler(); 240 241 public AbstractConnector() 242 { 243 exceptionListener = new DefaultExceptionStrategy(); 245 dispatchers = new ConcurrentHashMap(); 246 receivers = new ConcurrentHashMap(); 247 connectionStrategy = MuleManager.getConfiguration().getConnectionStrategy(); 248 enableMessageEvents = MuleManager.getConfiguration().isEnableMessageEvents(); 249 supportedProtocols = new ArrayList (); 250 251 supportedProtocols.add(getProtocol().toLowerCase()); 253 254 } 255 256 261 public String getName() 262 { 263 return name; 264 } 265 266 271 public void setName(String newName) 272 { 273 if (newName == null) 274 { 275 throw new IllegalArgumentException (new Message(Messages.X_IS_NULL, "Connector name").toString()); 276 } 277 if (logger.isDebugEnabled()) 278 { 279 logger.debug("Set UMOConnector name to: " + newName); 280 } 281 name = newName; 282 } 283 284 289 public final synchronized void initialise() throws InitialisationException 290 { 291 if (initialised.get()) 292 { 293 throw new AlreadyInitialisedException("Connector '" + getName() + "'", this); 294 } 295 296 if (logger.isInfoEnabled()) 297 { 298 logger.info("Initialising " + getClass().getName()); 299 } 300 301 doInitialise(); 302 if (exceptionListener instanceof Initialisable) 303 { 304 ((Initialisable)exceptionListener).initialise(); 305 } 306 initialised.set(true); 307 } 308 309 public abstract String getProtocol(); 310 311 316 public final void startConnector() throws UMOException 317 { 318 checkDisposed(); 319 if (!isStarted()) 320 { 321 if (!isConnected()) 322 { 323 startOnConnect.set(true); 324 getConnectionStrategy().connect(this); 325 return; 327 } 328 if (logger.isInfoEnabled()) 329 { 330 logger.info("Starting Connector: " + getClass().getName()); 331 } 332 doStart(); 333 started.set(true); 334 if (receivers != null) 335 { 336 for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();) 337 { 338 AbstractMessageReceiver amr = (AbstractMessageReceiver)iterator.next(); 339 if (logger.isDebugEnabled()) 340 { 341 logger.debug("Starting receiver on endpoint: " + amr.getEndpoint().getEndpointURI()); 342 } 343 amr.start(); 344 } 345 } 346 347 if (logger.isInfoEnabled()) 348 { 349 logger.info("Connector: " + getClass().getName() + " has been started"); 350 } 351 } 352 } 353 354 359 public boolean isStarted() 360 { 361 return started.get(); 362 } 363 364 369 public final void stopConnector() throws UMOException 370 { 371 if (isDisposed()) 372 { 373 return; 374 } 375 376 if (isStarted()) 377 { 378 if (logger.isInfoEnabled()) 379 { 380 logger.info("Stopping Connector: " + getClass().getName()); 381 } 382 doStop(); 383 started.set(false); 384 385 if (receivers != null) 388 { 389 for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();) 390 { 391 UMOMessageReceiver mr = (UMOMessageReceiver)iterator.next(); 392 if (logger.isDebugEnabled()) 393 { 394 logger.debug("Stopping receiver on endpoint: " + mr.getEndpoint().getEndpointURI()); 395 } 396 mr.stop(); 397 } 398 } 399 } 400 401 if (isConnected()) 402 { 403 try 404 { 405 disconnect(); 406 } 407 catch (Exception e) 408 { 409 logger.error("Failed to disconnect: " + e.getMessage(), e); 410 } 411 } 412 if (logger.isInfoEnabled()) 413 { 414 logger.info("Connector " + getClass().getName() + " has been stopped"); 415 } 416 } 417 418 423 public final synchronized void dispose() 424 { 425 disposing.set(true); 426 if (logger.isInfoEnabled()) 427 { 428 logger.info("Disposing Connector: " + getClass().getName()); 429 logger.debug("Disposing Receivers"); 430 } 431 disposeReceivers(); 432 disposeDispatchers(); 433 434 doDispose(); 435 disposed.set(true); 436 437 if (logger.isInfoEnabled()) 438 { 439 logger.info("Connector " + getClass().getName() + " has been disposed."); 440 } 441 } 442 443 protected void disposeReceivers() 444 { 445 if (receivers != null) 446 { 447 UMOMessageReceiver receiver; 448 for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();) 449 { 450 receiver = (UMOMessageReceiver)iterator.next(); 451 try 452 { 453 destroyReceiver(receiver, receiver.getEndpoint()); 454 } 455 catch (Throwable e) 456 { 457 logger.error("Failed to destroy receiver: " + e.getMessage(), e); 458 } 459 } 460 receivers.clear(); 461 receivers = null; 462 logger.debug("Receivers Disposed"); 463 } 464 } 465 466 protected void disposeDispatchers() 467 { 468 if (dispatchers != null) 469 { 470 logger.debug("Disposing Dispatchers"); 471 for (Iterator iterator = dispatchers.values().iterator(); iterator.hasNext();) 472 { 473 UMOMessageDispatcher umoMessageDispatcher = (UMOMessageDispatcher)iterator.next(); 474 umoMessageDispatcher.dispose(); 475 } 476 dispatchers.clear(); 477 logger.debug("Dispatchers Disposed"); 478 } 479 } 480 481 486 public boolean isDisposed() 487 { 488 return disposed.get(); 489 } 490 491 497 public void handleException(Exception exception) 498 { 499 if (exceptionListener == null) 500 { 501 throw new MuleRuntimeException(new Message( 502 Messages.EXCEPTION_ON_CONNECTOR_X_NO_EXCEPTION_LISTENER, getName()), exception); 503 } 504 else 505 { 506 exceptionListener.exceptionThrown(exception); 507 } 508 } 509 510 515 public void exceptionThrown(Exception e) 516 { 517 handleException(e); 518 } 519 520 524 public ExceptionListener getExceptionListener() 525 { 526 return exceptionListener; 527 } 528 529 533 public void setExceptionListener(ExceptionListener listener) 534 { 535 exceptionListener = listener; 536 } 537 538 541 public UMOMessageDispatcherFactory getDispatcherFactory() 542 { 543 return dispatcherFactory; 544 } 545 546 549 public void setDispatcherFactory(UMOMessageDispatcherFactory dispatcherFactory) 550 { 551 this.dispatcherFactory = dispatcherFactory; 552 } 553 554 public UMOMessageDispatcher getDispatcher(UMOImmutableEndpoint endpoint) throws UMOException 555 { 556 return getDispatcher(endpoint, true); 557 } 558 559 public UMOMessageDispatcher getDispatcher(UMOImmutableEndpoint endpoint, 560 boolean createDispatcherIfNotExists) throws UMOException 561 { 562 checkDisposed(); 563 564 if (endpoint == null) 565 { 566 throw new IllegalArgumentException ("Endpoint must not be null"); 567 } 568 569 if (!supportsProtocol(endpoint.getConnector().getProtocol())) 570 { 571 throw new IllegalArgumentException (new Message( 572 Messages.CONNECTOR_SCHEME_X_INCOMPATIBLE_WITH_ENDPOINT_SCHEME_X, getProtocol(), 573 endpoint.getEndpointURI().toString()).getMessage()); 574 } 575 576 if (dispatchers == null) 577 { 578 throw new IllegalStateException ("Dispatchers are null for connector: " + name); 579 } 580 581 synchronized (endpoint) 582 { 583 String endpointUriKey = endpoint.getEndpointURI().toString(); 584 UMOMessageDispatcher dispatcher = (UMOMessageDispatcher)dispatchers.get(endpointUriKey); 585 586 if ((dispatcher == null || dispatcher.isDisposed()) && createDispatcherIfNotExists) 587 { 588 dispatcher = createDispatcher(endpoint); 589 dispatchers.put(endpointUriKey, dispatcher); 590 } 591 592 return dispatcher; 593 } 594 } 595 596 public UMOMessageDispatcher lookupDispatcher(String key) 597 { 598 if (key != null) 599 { 600 return (UMOMessageDispatcher)dispatchers.get(key); 601 } 602 else 603 { 604 throw new IllegalArgumentException ("Dispatcher key must not be null"); 605 } 606 } 607 608 protected void checkDisposed() throws DisposeException 609 { 610 if (isDisposed()) 611 { 612 throw new DisposeException(new Message(Messages.CANT_USE_DISPOSED_CONNECTOR), this); 613 } 614 } 615 616 protected UMOMessageDispatcher createDispatcher(UMOImmutableEndpoint endpoint) throws UMOException 617 { 618 if (dispatcherFactory == null) 619 { 620 throw new ConnectorException(new Message(Messages.CONNECTOR_NOT_STARTED, name), this); 621 } 622 623 return dispatcherFactory.create(endpoint); 624 } 625 626 public UMOMessageReceiver registerListener(UMOComponent component, UMOEndpoint endpoint) throws Exception 627 { 628 if (endpoint == null) 629 { 630 throw new IllegalArgumentException ("The endpoint cannot be null when registering a listener"); 631 } 632 633 if (component == null) 634 { 635 throw new IllegalArgumentException ("The component cannot be null when registering a listener"); 636 } 637 638 UMOEndpointURI endpointUri = endpoint.getEndpointURI(); 639 if (endpointUri == null) 640 { 641 throw new ConnectorException(new Message(Messages.ENDPOINT_NULL_FOR_LISTENER), this); 642 } 643 logger.info("registering listener: " + component.getDescriptor().getName() + " on endpointUri: " 644 + endpointUri.toString()); 645 646 UMOMessageReceiver receiver = getReceiver(component, endpoint); 647 if (receiver != null) 648 { 649 throw new ConnectorException(new Message(Messages.LISTENER_ALREADY_REGISTERED, endpointUri), this); 650 } 651 else 652 { 653 receiver = createReceiver(component, endpoint); 654 receivers.put(getReceiverKey(component, endpoint), receiver); 655 } 656 return receiver; 657 } 658 659 666 protected Object getReceiverKey(UMOComponent component, UMOEndpoint endpoint) 667 { 668 return StringUtils.defaultIfEmpty(endpoint.getEndpointURI().getFilterAddress(), 669 endpoint.getEndpointURI().getAddress()); 670 } 671 672 public final void unregisterListener(UMOComponent component, UMOEndpoint endpoint) throws Exception 673 { 674 if (component == null) 675 { 676 throw new IllegalArgumentException ( 677 "The component must not be null when you unregister a listener"); 678 } 679 680 if (endpoint == null) 681 { 682 throw new IllegalArgumentException ("The endpoint must not be null when you unregister a listener"); 683 } 684 685 UMOEndpointURI endpointUri = endpoint.getEndpointURI(); 686 if (endpointUri == null) 687 { 688 throw new IllegalArgumentException ( 689 "The endpointUri must not be null when you unregister a listener"); 690 } 691 692 if (logger.isInfoEnabled()) 693 { 694 logger.info("removing listener on endpointUri: " + endpointUri); 695 } 696 697 if (receivers != null && !receivers.isEmpty()) 698 { 699 UMOMessageReceiver receiver = (UMOMessageReceiver)receivers.remove(getReceiverKey(component, 700 endpoint)); 701 if (receiver != null) 702 { 703 destroyReceiver(receiver, endpoint); 704 receiver.dispose(); 705 } 706 } 707 } 708 709 public ThreadingProfile getDispatcherThreadingProfile() 710 { 711 if (dispatcherThreadingProfile == null) 712 { 713 dispatcherThreadingProfile = MuleManager.getConfiguration() 714 .getMessageDispatcherThreadingProfile(); 715 716 } 717 return dispatcherThreadingProfile; 718 } 719 720 public void setDispatcherThreadingProfile(ThreadingProfile dispatcherThreadingProfile) 721 { 722 this.dispatcherThreadingProfile = dispatcherThreadingProfile; 723 } 724 725 public ThreadingProfile getReceiverThreadingProfile() 726 { 727 if (receiverThreadingProfile == null) 728 { 729 receiverThreadingProfile = MuleManager.getConfiguration().getMessageReceiverThreadingProfile(); 730 } 731 return receiverThreadingProfile; 732 } 733 734 public void setReceiverThreadingProfile(ThreadingProfile receiverThreadingProfile) 735 { 736 this.receiverThreadingProfile = receiverThreadingProfile; 737 } 738 739 public abstract UMOMessageReceiver createReceiver(UMOComponent component, UMOEndpoint endpoint) 740 throws Exception ; 741 742 public void destroyReceiver(UMOMessageReceiver receiver, UMOEndpoint endpoint) throws Exception 743 { 744 receiver.dispose(); 745 } 746 747 752 protected void doStart() throws UMOException 753 { 754 } 756 757 762 protected void doStop() throws UMOException 763 { 764 } 766 767 770 protected void doDispose() 771 { 772 try 773 { 774 stopConnector(); 775 } 776 catch (UMOException e) 777 { 778 logger.warn("Failed to stop during shutdown: " + e.getMessage(), e); 779 } 780 } 781 782 public void doInitialise() throws InitialisationException 783 { 784 } 786 787 public UMOTransformer getDefaultInboundTransformer() 788 { 789 if (defaultInboundTransformer != null) 790 { 791 try 792 { 793 return (UMOTransformer)defaultInboundTransformer.clone(); 794 } 795 catch (CloneNotSupportedException e) 796 { 797 logger.error("Failed to clone default Inbound transformer"); 798 return null; 799 } 800 } 801 else 802 { 803 return null; 804 } 805 } 806 807 public void setDefaultInboundTransformer(UMOTransformer defaultInboundTransformer) 808 { 809 this.defaultInboundTransformer = defaultInboundTransformer; 810 } 811 812 public UMOTransformer getDefaultResponseTransformer() 813 { 814 if (defaultResponseTransformer != null) 815 { 816 try 817 { 818 return (UMOTransformer)defaultResponseTransformer.clone(); 819 } 820 catch (CloneNotSupportedException e) 821 { 822 logger.error("Failed to clone default Outbound transformer"); 823 return null; 824 } 825 } 826 else 827 { 828 return null; 829 } 830 } 831 832 public UMOTransformer getDefaultOutboundTransformer() 833 { 834 if (defaultOutboundTransformer != null) 835 { 836 try 837 { 838 return (UMOTransformer)defaultOutboundTransformer.clone(); 839 } 840 catch (CloneNotSupportedException e) 841 { 842 logger.error("Failed to clone default Outbound transformer"); 843 return null; 844 } 845 } 846 else 847 { 848 return null; 849 } 850 } 851 852 public void setDefaultOutboundTransformer(UMOTransformer defaultOutboundTransformer) 853 { 854 this.defaultOutboundTransformer = defaultOutboundTransformer; 855 } 856 857 public void setDefaultResponseTransformer(UMOTransformer defaultResponseTransformer) 858 { 859 this.defaultResponseTransformer = defaultResponseTransformer; 860 } 861 862 public ReplyToHandler getReplyToHandler() 863 { 864 return new DefaultReplyToHandler(defaultResponseTransformer); 865 } 866 867 870 public Map getDispatchers() 871 { 872 return dispatchers; 873 } 874 875 886 public void fireNotification(UMOServerNotification notification) 887 { 888 MuleManager.getInstance().fireNotification(notification); 889 } 890 891 public ConnectionStrategy getConnectionStrategy() 892 { 893 try 897 { 898 return (ConnectionStrategy)BeanUtils.cloneBean(connectionStrategy); 899 } 900 catch (Exception e) 901 { 902 throw new MuleRuntimeException(new Message(Messages.FAILED_TO_CLONE_X, "connectionStrategy"), e); 903 } 904 } 905 906 public void setConnectionStrategy(ConnectionStrategy connectionStrategy) 907 { 908 this.connectionStrategy = connectionStrategy; 909 } 910 911 public boolean isDisposing() 912 { 913 return disposing.get(); 914 } 915 916 public boolean isRemoteSyncEnabled() 917 { 918 return false; 919 } 920 921 public AbstractMessageReceiver getReceiver(UMOComponent component, UMOEndpoint endpoint) 922 { 923 return (AbstractMessageReceiver)receivers.get(getReceiverKey(component, endpoint)); 924 } 925 926 public Map getReceivers() 927 { 928 return Collections.unmodifiableMap(receivers); 929 } 930 931 public UMOMessageReceiver lookupReceiver(String key) 932 { 933 if (key != null) 934 { 935 return (UMOMessageReceiver)receivers.get(key); 936 } 937 else 938 { 939 throw new IllegalArgumentException ("Receiver key must not be null"); 940 } 941 } 942 943 944 public AbstractMessageReceiver getReceiver(String key) 945 { 946 if (key != null) 947 { 948 return (AbstractMessageReceiver)receivers.get(key); 949 } 950 else 951 { 952 throw new IllegalArgumentException ("Receiver key must not be null"); 953 } 954 } 955 956 public AbstractMessageReceiver[] getReceivers(String wildcardExpression) 957 { 958 959 List temp = new ArrayList (); 960 WildcardFilter filter = new WildcardFilter(wildcardExpression); 961 filter.setCaseSensitive(false); 962 for (Iterator iterator = receivers.keySet().iterator(); iterator.hasNext();) 963 { 964 Object o = iterator.next(); 965 if (filter.accept(o)) 966 { 967 temp.add(receivers.get(o)); 968 } 969 } 970 AbstractMessageReceiver[] result = new AbstractMessageReceiver[temp.size()]; 971 return (AbstractMessageReceiver[])temp.toArray(result); 972 } 973 974 public void connect() throws Exception 975 { 976 if (connected.get()) 977 { 978 return; 979 } 980 checkDisposed(); 981 if (connecting.commit(false, true)) 982 { 983 connectionStrategy.connect(this); 984 logger.info("Connected: " + getConnectionDescription()); 985 return; 989 } 990 991 try 992 { 993 doConnect(); 994 fireNotification(new ConnectionNotification(this, getConnectEventId(), 995 ConnectionNotification.CONNECTION_CONNECTED)); 996 } 997 catch (Exception e) 998 { 999 fireNotification(new ConnectionNotification(this, getConnectEventId(), 1000 ConnectionNotification.CONNECTION_FAILED)); 1001 if (e instanceof ConnectException) 1002 { 1003 throw (ConnectException)e; 1004 } 1005 else 1006 { 1007 throw new ConnectException(e, this); 1008 } 1009 } 1010 connected.set(true); 1011 connecting.set(false); 1012 if (startOnConnect.get()) 1013 { 1014 startConnector(); 1015 } 1016 else 1017 { 1018 for (Iterator iterator = receivers.values().iterator(); iterator.hasNext();) 1019 { 1020 AbstractMessageReceiver amr = (AbstractMessageReceiver)iterator.next(); 1021 if (logger.isDebugEnabled()) 1022 { 1023 logger.debug("Connecting receiver on endpoint: " + amr.getEndpoint().getEndpointURI()); 1024 } 1025 amr.connect(); 1026 } 1027 } 1028 } 1029 1030 public void disconnect() throws Exception 1031 { 1032 startOnConnect.set(isStarted()); 1033 fireNotification(new ConnectionNotification(this, getConnectEventId(), 1034 ConnectionNotification.CONNECTION_DISCONNECTED)); 1035 connected.set(false); 1036 try 1037 { 1038 doDisconnect(); 1039 } 1040 finally 1041 { 1042 stopConnector(); 1043 1044 } 1045 1046 logger.info("Disconnected: " + getConnectionDescription()); 1047 } 1048 1049 public String getConnectionDescription() 1050 { 1051 return toString(); 1052 } 1053 1054 public final boolean isConnected() 1055 { 1056 return connected.get(); 1057 } 1058 1059 1064 public void doConnect() throws Exception 1065 { 1066 } 1068 1069 1075 public void doDisconnect() throws Exception 1076 { 1077 } 1079 1080 1085 protected String getConnectEventId() 1086 { 1087 return getName(); 1088 } 1089 1090 1099 public void setCreateDispatcherPerRequest(boolean createDispatcherPerRequest) 1100 { 1101 this.createDispatcherPerRequest = createDispatcherPerRequest; 1102 } 1103 1104 1112 public boolean isCreateDispatcherPerRequest() 1113 { 1114 return createDispatcherPerRequest; 1115 } 1116 1117 1127 public boolean isCreateMultipleTransactedReceivers() 1128 { 1129 return createMultipleTransactedReceivers; 1130 } 1131 1132 1142 public void setCreateMultipleTransactedReceivers(boolean createMultipleTransactedReceivers) 1143 { 1144 this.createMultipleTransactedReceivers = createMultipleTransactedReceivers; 1145 } 1146 1147 1151 public boolean isEnableMessageEvents() 1152 { 1153 return enableMessageEvents; 1154 } 1155 1156 1162 public void setEnableMessageEvents(boolean enableMessageEvents) 1163 { 1164 this.enableMessageEvents = enableMessageEvents; 1165 } 1166 1167 1177 public void registerSupportedProtocol(String protocol) 1178 { 1179 protocol = protocol.toLowerCase(); 1180 if (protocol.startsWith(getProtocol().toLowerCase())) 1181 { 1182 registerSupportedProtocolWithotPrefix(protocol); 1183 } 1184 else 1185 { 1186 supportedProtocols.add(getProtocol().toLowerCase() + ":" + protocol); 1187 } 1188 } 1189 1190 1202 protected void registerSupportedProtocolWithotPrefix(String protocol) 1203 { 1204 supportedProtocols.add(protocol.toLowerCase()); 1205 } 1206 1207 public void unregisterSupportedProtocol(String protocol) 1208 { 1209 protocol = protocol.toLowerCase(); 1210 if (protocol.startsWith(getProtocol().toLowerCase())) 1211 { 1212 supportedProtocols.remove(protocol); 1213 } 1214 else 1215 { 1216 supportedProtocols.remove(getProtocol().toLowerCase() + ":" + protocol); 1217 } 1218 } 1219 1220 1223 public boolean supportsProtocol(String protocol) 1224 { 1225 return supportedProtocols.contains(protocol.toLowerCase()); 1226 } 1227 1228 1233 public List getSupportedProtocols() 1234 { 1235 return Collections.unmodifiableList(supportedProtocols); 1236 } 1237 1238 1243 public void setSupportedProtocols(List supportedProtocols) 1244 { 1245 for (Iterator iterator = supportedProtocols.iterator(); iterator.hasNext();) 1246 { 1247 String s = (String )iterator.next(); 1248 registerSupportedProtocol(s); 1249 } 1250 } 1251 1252 1262 UMOWorkManager createReceiverWorkManager(String name) 1263 { 1264 UMOWorkManager wm; 1265 if (useSingleReceiverThreadPool && receiverWorkManager != null) 1266 { 1267 wm = receiverWorkManager; 1268 } 1269 else 1270 { 1271 ThreadingProfile tp = getReceiverThreadingProfile(); 1272 if (serverSide) 1273 { 1274 tp.setThreadPriority(Thread.NORM_PRIORITY + 2); 1275 } 1276 wm = tp.createWorkManager(getName() + "." + name + ".receiver"); 1277 if (useSingleReceiverThreadPool) 1278 { 1279 receiverWorkManager = wm; 1280 } 1281 } 1282 return wm; 1283 } 1284 1285 1295 UMOWorkManager createDispatcherWorkManager(String name) 1296 { 1297 UMOWorkManager wm; 1298 if (useSingleDispatcherThreadPool && dispatcherWorkManager != null) 1299 { 1300 wm = dispatcherWorkManager; 1301 } 1302 else 1303 { 1304 ThreadingProfile tp = getReceiverThreadingProfile(); 1305 wm = tp.createWorkManager(getName() + "." + name + ".dispatcher"); 1306 if (useSingleDispatcherThreadPool) 1307 { 1308 dispatcherWorkManager = wm; 1309 } 1310 } 1311 return wm; 1312 } 1313 1314 1322 public boolean isUseSingleReceiverThreadPool() 1323 { 1324 return useSingleReceiverThreadPool; 1325 } 1326 1327 1335 public void setUseSingleReceiverThreadPool(boolean useSingleReceiverThreadPool) 1336 { 1337 this.useSingleReceiverThreadPool = useSingleReceiverThreadPool; 1338 } 1339 1340 1348 public boolean isUseSingleDispatcherThreadPool() 1349 { 1350 return useSingleDispatcherThreadPool; 1351 } 1352 1353 1361 public void setUseSingleDispatcherThreadPool(boolean useSingleDispatcherThreadPool) 1362 { 1363 this.useSingleDispatcherThreadPool = useSingleDispatcherThreadPool; 1364 } 1365 1366 1372 public boolean isServerSide() 1373 { 1374 return serverSide; 1375 } 1376 1377 1383 public void setServerSide(boolean serverSide) 1384 { 1385 this.serverSide = serverSide; 1386 } 1387 1388 public UMOSessionHandler getSessionHandler() 1389 { 1390 return sessionHandler; 1391 } 1392 1393 public void setSessionHandler(UMOSessionHandler sessionHandler) 1394 { 1395 this.sessionHandler = sessionHandler; 1396 } 1397 1398 public void workAccepted(WorkEvent event) 1399 { 1400 handleWorkException(event, "workAccepted"); 1401 } 1402 1403 public void workRejected(WorkEvent event) 1404 { 1405 handleWorkException(event, "workRejected"); 1406 } 1407 1408 public void workStarted(WorkEvent event) 1409 { 1410 handleWorkException(event, "workStarted"); 1411 } 1412 1413 public void workCompleted(WorkEvent event) 1414 { 1415 handleWorkException(event, "workCompleted"); 1416 } 1417 1418 protected void handleWorkException(WorkEvent event, String type) 1419 { 1420 Throwable e; 1421 if (event != null && event.getException() != null) 1422 { 1423 e = event.getException(); 1424 } 1425 else 1426 { 1427 return; 1428 } 1429 if (event.getException().getCause() != null) 1430 { 1431 e = event.getException().getCause(); 1432 } 1433 logger.error("Work caused exception on '" + type + "'. Work being executed was: " 1434 + event.getWork().toString()); 1435 if (e instanceof Exception ) 1436 { 1437 handleException((Exception )e); 1438 } 1439 else 1440 { 1441 throw new MuleRuntimeException(new Message(Messages.CONNECTOR_CAUSED_ERROR, getName()), e); 1442 } 1443 } 1444} 1445 | Popular Tags |