KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > AbstractConnector


1 /*
2  * $Id: AbstractConnector.java 3982 2006-11-22 14:28:01Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers;
12
13 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
14 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
15 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
16 import org.apache.commons.beanutils.BeanUtils;
17 import org.apache.commons.lang.StringUtils;
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20 import org.mule.MuleManager;
21 import org.mule.MuleRuntimeException;
22 import org.mule.config.ThreadingProfile;
23 import org.mule.config.i18n.Message;
24 import org.mule.config.i18n.Messages;
25 import org.mule.impl.AlreadyInitialisedException;
26 import org.mule.impl.DefaultExceptionStrategy;
27 import org.mule.impl.MuleSessionHandler;
28 import org.mule.impl.internal.notifications.ConnectionNotification;
29 import org.mule.routing.filters.WildcardFilter;
30 import org.mule.umo.UMOComponent;
31 import org.mule.umo.UMOException;
32 import org.mule.umo.endpoint.UMOEndpoint;
33 import org.mule.umo.endpoint.UMOEndpointURI;
34 import org.mule.umo.endpoint.UMOImmutableEndpoint;
35 import org.mule.umo.lifecycle.DisposeException;
36 import org.mule.umo.lifecycle.Initialisable;
37 import org.mule.umo.lifecycle.InitialisationException;
38 import org.mule.umo.manager.UMOServerNotification;
39 import org.mule.umo.manager.UMOWorkManager;
40 import org.mule.umo.provider.ConnectorException;
41 import org.mule.umo.provider.UMOConnectable;
42 import org.mule.umo.provider.UMOConnector;
43 import org.mule.umo.provider.UMOMessageDispatcher;
44 import org.mule.umo.provider.UMOMessageDispatcherFactory;
45 import org.mule.umo.provider.UMOMessageReceiver;
46 import org.mule.umo.provider.UMOSessionHandler;
47 import org.mule.umo.transformer.UMOTransformer;
48 import org.mule.util.concurrent.WaitableBoolean;
49
50 import javax.resource.spi.work.WorkEvent JavaDoc;
51 import javax.resource.spi.work.WorkListener JavaDoc;
52 import java.beans.ExceptionListener JavaDoc;
53 import java.util.ArrayList JavaDoc;
54 import java.util.Collections JavaDoc;
55 import java.util.Iterator JavaDoc;
56 import java.util.List JavaDoc;
57 import java.util.Map JavaDoc;
58
59 /**
60  * <code>AbstractConnector</code> provides base functionality for all connectors
61  * provided with Mule. Connectors are the mechanism used to connect to external
62  * systems and protocols in order to send and receive data. <p/> The
63  * <code>AbstractConnector</code> provides getter and setter methods for endpoint
64  * name, transport name and protocol. It also provides methods to stop and start
65  * connecotors and sets up a dispatcher threadpool which allows deriving connectors
66  * the possibility to dispatch work to separate threads. This functionality is
67  * controlled with the <i> doThreading</i> property on the threadingProfiles for
68  * dispachers and receivers. The lifecycle for a connector is -
69  * <ol>
70  * <li>Create
71  * <li>Initialise
72  * <li>Connect
73  * <li>Connect receivers
74  * <li>Start
75  * <li>Start Receivers
76  * <li>Stop
77  * <li>Stop Receivers
78  * <li>Disconnect
79  * <li>Disconnect Receivers
80  * <li>Dispose
81  * <li>Dispose Receivers
82  * </ol>
83  *
84  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
85  * @version $Revision: 3982 $
86  */

87 public abstract class AbstractConnector
88     implements UMOConnector, ExceptionListener JavaDoc, UMOConnectable, WorkListener JavaDoc
89 {
90     /**
91      * logger used by this class
92      */

93     protected transient Log logger = LogFactory.getLog(getClass());
94
95     /**
96      * Specifies if the endpoint started
97      */

98     protected AtomicBoolean started = new AtomicBoolean(false);
99
100     /**
101      * True once the endpoint has been initialsed
102      */

103     protected AtomicBoolean initialised = new AtomicBoolean(false);
104
105     /**
106      * The name that identifies the endpoint
107      */

108     protected String JavaDoc name = null;
109
110     /**
111      * The exception strategy used by this connector
112      */

113     protected ExceptionListener JavaDoc exceptionListener = null;
114
115     /**
116      * Determines in the connector is alive and well
117      */

118     protected AtomicBoolean disposed = new AtomicBoolean(false);
119
120     /**
121      * Determines in connector has been told to dispose
122      */

123     protected AtomicBoolean disposing = new AtomicBoolean(false);
124
125     /**
126      * Factory used to create dispatchers for this connector
127      */

128     protected UMOMessageDispatcherFactory dispatcherFactory;
129
130     /**
131      * A pool of dispa tchers for this connector, the pool is keyed on endpointUri
132      */

133     protected ConcurrentMap dispatchers;
134
135     /**
136      * The collection of listeners on this connector. Keyed by entrypoint
137      */

138     protected ConcurrentMap receivers;
139
140     /**
141      * Defines the dispatcher threading model
142      */

143     private ThreadingProfile dispatcherThreadingProfile = null;
144
145     /**
146      * Defines the receiver threading model
147      */

148     private ThreadingProfile receiverThreadingProfile = null;
149
150     /**
151      * Determines whether dispatchers should be disposed straight away of after every
152      * request or cached
153      */

154     protected boolean createDispatcherPerRequest = false;
155
156     /**
157      * For better throughput when using TransactedMessageReceivers. This will create
158      * an number of receiver threads based on the ThreadingProfile configured fro the
159      * receiver. This property is user by transports that support transactions,
160      * specifically MessageReceivers that extend the
161      * TransactedPollingMessageReceiver.
162      */

163     protected boolean createMultipleTransactedReceivers = true;
164
165     /**
166      * The service descriptor can define a default inbound transformer to be used on
167      * an endpoint if no other is set
168      */

169     protected UMOTransformer defaultInboundTransformer = null;
170
171     /**
172      * The service descriptor can define a default outbound transformer to be used on
173      * an endpoint if no other is set
174      */

175     protected UMOTransformer defaultOutboundTransformer = null;
176
177     /**
178      * For some connectors such as http, a response transformer is required or where
179      * a replyTo needs a trnasformer
180      */

181     protected UMOTransformer defaultResponseTransformer = null;
182
183     private ConnectionStrategy connectionStrategy;
184
185     protected WaitableBoolean connected = new WaitableBoolean(false);
186
187     protected WaitableBoolean connecting = new WaitableBoolean(false);
188
189     /**
190      * If the connect method was called via the start method, this will be set so
191      * that when the connector comes on line it will be started
192      */

193     protected WaitableBoolean startOnConnect = new WaitableBoolean(false);
194
195     /**
196      * Whether to fire message notifications for every message that is sent or
197      * received from this connector
198      */

199     private boolean enableMessageEvents = false;
200
201     private List JavaDoc supportedProtocols;
202
203     /**
204      * A shared work manager for all receivers registered with this connector if
205      * <code>useSingleReceiverThreadPool</code> is set to true
206      */

207     private UMOWorkManager receiverWorkManager = null;
208
209     /**
210      * A shared work manager for all dispatchers created for this connector if
211      * <code>useSingleDispatcherThreadPool</code> is set to true
212      */

213     private UMOWorkManager dispatcherWorkManager = null;
214
215     /**
216      * Should a single receiver thread pool be created for all receivers It is
217      * recommended that if you have a lot of receivers being registered per connector
218      * that this should be set to true
219      */

220     private boolean useSingleReceiverThreadPool = false;
221
222     /**
223      * Should a single dispatcher thread pool be created for all distachers It is
224      * recommended that if you have a lot of dispatcher being created per connector
225      * that this should be set to true i.e. many different outbound endpoints
226      */

227     private boolean useSingleDispatcherThreadPool = false;
228
229     /**
230      * The flag determines if the connector is being used on the server side or
231      * client. If true receiver threads will be given a slightly higher priority.
232      */

233     protected boolean serverSide = true;
234
235     /**
236      * The strategy used for reading and writing session information to and fromt he
237      * transport
238      */

239     protected UMOSessionHandler sessionHandler = new MuleSessionHandler();
240
241     public AbstractConnector()
242     {
243         // make sure we always have an exception strategy
244
exceptionListener = new DefaultExceptionStrategy();
245         dispatchers = new ConcurrentHashMap();
246         receivers = new ConcurrentHashMap();
247         connectionStrategy = MuleManager.getConfiguration().getConnectionStrategy();
248         enableMessageEvents = MuleManager.getConfiguration().isEnableMessageEvents();
249         supportedProtocols = new ArrayList JavaDoc();
250
251         // Always add the default protocol
252
supportedProtocols.add(getProtocol().toLowerCase());
253
254     }
255
256     /*
257      * (non-Javadoc)
258      *
259      * @see org.mule.providers.UMOConnector#getName()
260      */

261     public String JavaDoc getName()
262     {
263         return name;
264     }
265
266     /*
267      * (non-Javadoc)
268      *
269      * @see org.mule.providers.UMOConnector#setName(java.lang.String)
270      */

271     public void setName(String JavaDoc newName)
272     {
273         if (newName == null)
274         {
275             throw new IllegalArgumentException JavaDoc(new Message(Messages.X_IS_NULL, "Connector name").toString());
276         }
277         if (logger.isDebugEnabled())
278         {
279             logger.debug("Set UMOConnector name to: " + newName);
280         }
281         name = newName;
282     }
283
284     /*
285      * (non-Javadoc)
286      *
287      * @see org.mule.providers.UMOConnector#create(java.util.HashMap)
288      */

289     public final synchronized void initialise() throws InitialisationException
290     {
291         if (initialised.get())
292         {
293             throw new AlreadyInitialisedException("Connector '" + getName() + "'", this);
294         }
295
296         if (logger.isInfoEnabled())
297         {
298             logger.info("Initialising " + getClass().getName());
299         }
300
301         doInitialise();
302         if (exceptionListener instanceof Initialisable)
303         {
304             ((Initialisable)exceptionListener).initialise();
305         }
306         initialised.set(true);
307     }
308
309     public abstract String JavaDoc getProtocol();
310
311     /*
312      * (non-Javadoc)
313      *
314      * @see org.mule.umo.provider.UMOConnector#start()
315      */

316     public final void startConnector() throws UMOException
317     {
318         checkDisposed();
319         if (!isStarted())
320         {
321             if (!isConnected())
322             {
323                 startOnConnect.set(true);
324                 getConnectionStrategy().connect(this);
325                 // Only start once we are connected
326
return;
327             }
328             if (logger.isInfoEnabled())
329             {
330                 logger.info("Starting Connector: " + getClass().getName());
331             }
332             doStart();
333             started.set(true);
334             if (receivers != null)
335             {
336                 for (Iterator JavaDoc iterator = receivers.values().iterator(); iterator.hasNext();)
337                 {
338                     AbstractMessageReceiver amr = (AbstractMessageReceiver)iterator.next();
339                     if (logger.isDebugEnabled())
340                     {
341                         logger.debug("Starting receiver on endpoint: " + amr.getEndpoint().getEndpointURI());
342                     }
343                     amr.start();
344                 }
345             }
346
347             if (logger.isInfoEnabled())
348             {
349                 logger.info("Connector: " + getClass().getName() + " has been started");
350             }
351         }
352     }
353
354     /*
355      * (non-Javadoc)
356      *
357      * @see org.mule.umo.provider.UMOConnector#isStarted()
358      */

359     public boolean isStarted()
360     {
361         return started.get();
362     }
363
364     /*
365      * (non-Javadoc)
366      *
367      * @see org.mule.umo.provider.UMOConnector#stop()
368      */

369     public final void stopConnector() throws UMOException
370     {
371         if (isDisposed())
372         {
373             return;
374         }
375
376         if (isStarted())
377         {
378             if (logger.isInfoEnabled())
379             {
380                 logger.info("Stopping Connector: " + getClass().getName());
381             }
382             doStop();
383             started.set(false);
384
385             // Stop all the receivers on this connector (this will cause them to
386
// disconnect too)
387
if (receivers != null)
388             {
389                 for (Iterator JavaDoc iterator = receivers.values().iterator(); iterator.hasNext();)
390                 {
391                     UMOMessageReceiver mr = (UMOMessageReceiver)iterator.next();
392                     if (logger.isDebugEnabled())
393                     {
394                         logger.debug("Stopping receiver on endpoint: " + mr.getEndpoint().getEndpointURI());
395                     }
396                     mr.stop();
397                 }
398             }
399         }
400
401         if (isConnected())
402         {
403             try
404             {
405                 disconnect();
406             }
407             catch (Exception JavaDoc e)
408             {
409                 logger.error("Failed to disconnect: " + e.getMessage(), e);
410             }
411         }
412         if (logger.isInfoEnabled())
413         {
414             logger.info("Connector " + getClass().getName() + " has been stopped");
415         }
416     }
417
418     /*
419      * (non-Javadoc)
420      *
421      * @see org.mule.umo.provider.UMOConnector#shutdown()
422      */

423     public final synchronized void dispose()
424     {
425         disposing.set(true);
426         if (logger.isInfoEnabled())
427         {
428             logger.info("Disposing Connector: " + getClass().getName());
429             logger.debug("Disposing Receivers");
430         }
431         disposeReceivers();
432         disposeDispatchers();
433
434         doDispose();
435         disposed.set(true);
436
437         if (logger.isInfoEnabled())
438         {
439             logger.info("Connector " + getClass().getName() + " has been disposed.");
440         }
441     }
442
443     protected void disposeReceivers()
444     {
445         if (receivers != null)
446         {
447             UMOMessageReceiver receiver;
448             for (Iterator JavaDoc iterator = receivers.values().iterator(); iterator.hasNext();)
449             {
450                 receiver = (UMOMessageReceiver)iterator.next();
451                 try
452                 {
453                     destroyReceiver(receiver, receiver.getEndpoint());
454                 }
455                 catch (Throwable JavaDoc e)
456                 {
457                     logger.error("Failed to destroy receiver: " + e.getMessage(), e);
458                 }
459             }
460             receivers.clear();
461             receivers = null;
462             logger.debug("Receivers Disposed");
463         }
464     }
465
466     protected void disposeDispatchers()
467     {
468         if (dispatchers != null)
469         {
470             logger.debug("Disposing Dispatchers");
471             for (Iterator JavaDoc iterator = dispatchers.values().iterator(); iterator.hasNext();)
472             {
473                 UMOMessageDispatcher umoMessageDispatcher = (UMOMessageDispatcher)iterator.next();
474                 umoMessageDispatcher.dispose();
475             }
476             dispatchers.clear();
477             logger.debug("Dispatchers Disposed");
478         }
479     }
480
481     /*
482      * (non-Javadoc)
483      *
484      * @see org.mule.umo.provider.UMOConnector#isAlive()
485      */

486     public boolean isDisposed()
487     {
488         return disposed.get();
489     }
490
491     /*
492      * (non-Javadoc)
493      *
494      * @see org.mule.umo.provider.UMOConnector#handleException(java.lang.Object,
495      * java.lang.Throwable)
496      */

497     public void handleException(Exception JavaDoc exception)
498     {
499         if (exceptionListener == null)
500         {
501             throw new MuleRuntimeException(new Message(
502                 Messages.EXCEPTION_ON_CONNECTOR_X_NO_EXCEPTION_LISTENER, getName()), exception);
503         }
504         else
505         {
506             exceptionListener.exceptionThrown(exception);
507         }
508     }
509
510     /*
511      * (non-Javadoc)
512      *
513      * @see org.mule.util.ExceptionListener#onException(java.lang.Throwable)
514      */

515     public void exceptionThrown(Exception JavaDoc e)
516     {
517         handleException(e);
518     }
519
520     /**
521      * @return the ExceptionStrategy for this endpoint
522      * @see ExceptionListener
523      */

524     public ExceptionListener JavaDoc getExceptionListener()
525     {
526         return exceptionListener;
527     }
528
529     /**
530      * @param listener the ExceptionStrategy to use with this endpoint
531      * @see ExceptionListener
532      */

533     public void setExceptionListener(ExceptionListener JavaDoc listener)
534     {
535         exceptionListener = listener;
536     }
537
538     /**
539      * @return Returns the dispatcherFactory.
540      */

541     public UMOMessageDispatcherFactory getDispatcherFactory()
542     {
543         return dispatcherFactory;
544     }
545
546     /**
547      * @param dispatcherFactory The dispatcherFactory to set.
548      */

549     public void setDispatcherFactory(UMOMessageDispatcherFactory dispatcherFactory)
550     {
551         this.dispatcherFactory = dispatcherFactory;
552     }
553
554     public UMOMessageDispatcher getDispatcher(UMOImmutableEndpoint endpoint) throws UMOException
555     {
556         return getDispatcher(endpoint, /* createDispatcherIfNotExists */true);
557     }
558
559     public UMOMessageDispatcher getDispatcher(UMOImmutableEndpoint endpoint,
560                                               boolean createDispatcherIfNotExists) throws UMOException
561     {
562         checkDisposed();
563
564         if (endpoint == null)
565         {
566             throw new IllegalArgumentException JavaDoc("Endpoint must not be null");
567         }
568
569         if (!supportsProtocol(endpoint.getConnector().getProtocol()))
570         {
571             throw new IllegalArgumentException JavaDoc(new Message(
572                 Messages.CONNECTOR_SCHEME_X_INCOMPATIBLE_WITH_ENDPOINT_SCHEME_X, getProtocol(),
573                 endpoint.getEndpointURI().toString()).getMessage());
574         }
575
576         if (dispatchers == null)
577         {
578             throw new IllegalStateException JavaDoc("Dispatchers are null for connector: " + name);
579         }
580
581         synchronized (endpoint)
582         {
583             String JavaDoc endpointUriKey = endpoint.getEndpointURI().toString();
584             UMOMessageDispatcher dispatcher = (UMOMessageDispatcher)dispatchers.get(endpointUriKey);
585
586             if ((dispatcher == null || dispatcher.isDisposed()) && createDispatcherIfNotExists)
587             {
588                 dispatcher = createDispatcher(endpoint);
589                 dispatchers.put(endpointUriKey, dispatcher);
590             }
591
592             return dispatcher;
593         }
594     }
595
596     public UMOMessageDispatcher lookupDispatcher(String JavaDoc key)
597     {
598         if (key != null)
599         {
600             return (UMOMessageDispatcher)dispatchers.get(key);
601         }
602         else
603         {
604             throw new IllegalArgumentException JavaDoc("Dispatcher key must not be null");
605         }
606     }
607
608     protected void checkDisposed() throws DisposeException
609     {
610         if (isDisposed())
611         {
612             throw new DisposeException(new Message(Messages.CANT_USE_DISPOSED_CONNECTOR), this);
613         }
614     }
615
616     protected UMOMessageDispatcher createDispatcher(UMOImmutableEndpoint endpoint) throws UMOException
617     {
618         if (dispatcherFactory == null)
619         {
620             throw new ConnectorException(new Message(Messages.CONNECTOR_NOT_STARTED, name), this);
621         }
622
623         return dispatcherFactory.create(endpoint);
624     }
625
626     public UMOMessageReceiver registerListener(UMOComponent component, UMOEndpoint endpoint) throws Exception JavaDoc
627     {
628         if (endpoint == null)
629         {
630             throw new IllegalArgumentException JavaDoc("The endpoint cannot be null when registering a listener");
631         }
632
633         if (component == null)
634         {
635             throw new IllegalArgumentException JavaDoc("The component cannot be null when registering a listener");
636         }
637
638         UMOEndpointURI endpointUri = endpoint.getEndpointURI();
639         if (endpointUri == null)
640         {
641             throw new ConnectorException(new Message(Messages.ENDPOINT_NULL_FOR_LISTENER), this);
642         }
643         logger.info("registering listener: " + component.getDescriptor().getName() + " on endpointUri: "
644                     + endpointUri.toString());
645
646         UMOMessageReceiver receiver = getReceiver(component, endpoint);
647         if (receiver != null)
648         {
649             throw new ConnectorException(new Message(Messages.LISTENER_ALREADY_REGISTERED, endpointUri), this);
650         }
651         else
652         {
653             receiver = createReceiver(component, endpoint);
654             receivers.put(getReceiverKey(component, endpoint), receiver);
655         }
656         return receiver;
657     }
658
659     /**
660      * The method determines the key used to store the receiver against.
661      *
662      * @param component the component for which the endpoint is being registered
663      * @param endpoint the endpoint being registered for the component
664      * @return the key to store the newly created receiver against
665      */

666     protected Object JavaDoc getReceiverKey(UMOComponent component, UMOEndpoint endpoint)
667     {
668         return StringUtils.defaultIfEmpty(endpoint.getEndpointURI().getFilterAddress(),
669             endpoint.getEndpointURI().getAddress());
670     }
671
672     public final void unregisterListener(UMOComponent component, UMOEndpoint endpoint) throws Exception JavaDoc
673     {
674         if (component == null)
675         {
676             throw new IllegalArgumentException JavaDoc(
677                 "The component must not be null when you unregister a listener");
678         }
679
680         if (endpoint == null)
681         {
682             throw new IllegalArgumentException JavaDoc("The endpoint must not be null when you unregister a listener");
683         }
684
685         UMOEndpointURI endpointUri = endpoint.getEndpointURI();
686         if (endpointUri == null)
687         {
688             throw new IllegalArgumentException JavaDoc(
689                 "The endpointUri must not be null when you unregister a listener");
690         }
691
692         if (logger.isInfoEnabled())
693         {
694             logger.info("removing listener on endpointUri: " + endpointUri);
695         }
696
697         if (receivers != null && !receivers.isEmpty())
698         {
699             UMOMessageReceiver receiver = (UMOMessageReceiver)receivers.remove(getReceiverKey(component,
700                 endpoint));
701             if (receiver != null)
702             {
703                 destroyReceiver(receiver, endpoint);
704                 receiver.dispose();
705             }
706         }
707     }
708
709     public ThreadingProfile getDispatcherThreadingProfile()
710     {
711         if (dispatcherThreadingProfile == null)
712         {
713             dispatcherThreadingProfile = MuleManager.getConfiguration()
714                 .getMessageDispatcherThreadingProfile();
715
716         }
717         return dispatcherThreadingProfile;
718     }
719
720     public void setDispatcherThreadingProfile(ThreadingProfile dispatcherThreadingProfile)
721     {
722         this.dispatcherThreadingProfile = dispatcherThreadingProfile;
723     }
724
725     public ThreadingProfile getReceiverThreadingProfile()
726     {
727         if (receiverThreadingProfile == null)
728         {
729             receiverThreadingProfile = MuleManager.getConfiguration().getMessageReceiverThreadingProfile();
730         }
731         return receiverThreadingProfile;
732     }
733
734     public void setReceiverThreadingProfile(ThreadingProfile receiverThreadingProfile)
735     {
736         this.receiverThreadingProfile = receiverThreadingProfile;
737     }
738
739     public abstract UMOMessageReceiver createReceiver(UMOComponent component, UMOEndpoint endpoint)
740         throws Exception JavaDoc;
741
742     public void destroyReceiver(UMOMessageReceiver receiver, UMOEndpoint endpoint) throws Exception JavaDoc
743     {
744         receiver.dispose();
745     }
746
747     /**
748      * Template method to perform any work when starting the connectoe
749      *
750      * @throws UMOException if the method fails
751      */

752     protected void doStart() throws UMOException
753     {
754         // template method
755
}
756
757     /**
758      * Template method to perform any work when stopping the connectoe
759      *
760      * @throws UMOException if the method fails
761      */

762     protected void doStop() throws UMOException
763     {
764         // template method
765
}
766
767     /**
768      * Template method to perform any work when destroying the connectoe
769      */

770     protected void doDispose()
771     {
772         try
773         {
774             stopConnector();
775         }
776         catch (UMOException e)
777         {
778             logger.warn("Failed to stop during shutdown: " + e.getMessage(), e);
779         }
780     }
781
782     public void doInitialise() throws InitialisationException
783     {
784         // template method
785
}
786
787     public UMOTransformer getDefaultInboundTransformer()
788     {
789         if (defaultInboundTransformer != null)
790         {
791             try
792             {
793                 return (UMOTransformer)defaultInboundTransformer.clone();
794             }
795             catch (CloneNotSupportedException JavaDoc e)
796             {
797                 logger.error("Failed to clone default Inbound transformer");
798                 return null;
799             }
800         }
801         else
802         {
803             return null;
804         }
805     }
806
807     public void setDefaultInboundTransformer(UMOTransformer defaultInboundTransformer)
808     {
809         this.defaultInboundTransformer = defaultInboundTransformer;
810     }
811
812     public UMOTransformer getDefaultResponseTransformer()
813     {
814         if (defaultResponseTransformer != null)
815         {
816             try
817             {
818                 return (UMOTransformer)defaultResponseTransformer.clone();
819             }
820             catch (CloneNotSupportedException JavaDoc e)
821             {
822                 logger.error("Failed to clone default Outbound transformer");
823                 return null;
824             }
825         }
826         else
827         {
828             return null;
829         }
830     }
831
832     public UMOTransformer getDefaultOutboundTransformer()
833     {
834         if (defaultOutboundTransformer != null)
835         {
836             try
837             {
838                 return (UMOTransformer)defaultOutboundTransformer.clone();
839             }
840             catch (CloneNotSupportedException JavaDoc e)
841             {
842                 logger.error("Failed to clone default Outbound transformer");
843                 return null;
844             }
845         }
846         else
847         {
848             return null;
849         }
850     }
851
852     public void setDefaultOutboundTransformer(UMOTransformer defaultOutboundTransformer)
853     {
854         this.defaultOutboundTransformer = defaultOutboundTransformer;
855     }
856
857     public void setDefaultResponseTransformer(UMOTransformer defaultResponseTransformer)
858     {
859         this.defaultResponseTransformer = defaultResponseTransformer;
860     }
861
862     public ReplyToHandler getReplyToHandler()
863     {
864         return new DefaultReplyToHandler(defaultResponseTransformer);
865     }
866
867     /**
868      * @deprecated this method will go away soon.
869      */

870     public Map getDispatchers()
871     {
872         return dispatchers;
873     }
874
875     /**
876      * Fires a server notification to all registered
877      * {@link org.mule.impl.internal.notifications.CustomNotificationListener}
878      * eventManager.
879      *
880      * @param notification the notification to fire. This must be of type
881      * {@link org.mule.impl.internal.notifications.CustomNotification}
882      * otherwise an exception will be thrown.
883      * @throws UnsupportedOperationException if the notification fired is not a
884      * {@link org.mule.impl.internal.notifications.CustomNotification}
885      */

886     public void fireNotification(UMOServerNotification notification)
887     {
888         MuleManager.getInstance().fireNotification(notification);
889     }
890
891     public ConnectionStrategy getConnectionStrategy()
892     {
893         // not happy with this but each receiver needs its own instance
894
// of the connection strategy and using a factory just introduces extra
895
// implementation
896
try
897         {
898             return (ConnectionStrategy)BeanUtils.cloneBean(connectionStrategy);
899         }
900         catch (Exception JavaDoc e)
901         {
902             throw new MuleRuntimeException(new Message(Messages.FAILED_TO_CLONE_X, "connectionStrategy"), e);
903         }
904     }
905
906     public void setConnectionStrategy(ConnectionStrategy connectionStrategy)
907     {
908         this.connectionStrategy = connectionStrategy;
909     }
910
911     public boolean isDisposing()
912     {
913         return disposing.get();
914     }
915
916     public boolean isRemoteSyncEnabled()
917     {
918         return false;
919     }
920
921     public AbstractMessageReceiver getReceiver(UMOComponent component, UMOEndpoint endpoint)
922     {
923         return (AbstractMessageReceiver)receivers.get(getReceiverKey(component, endpoint));
924     }
925
926     public Map getReceivers()
927     {
928         return Collections.unmodifiableMap(receivers);
929     }
930
931     public UMOMessageReceiver lookupReceiver(String JavaDoc key)
932     {
933         if (key != null)
934         {
935             return (UMOMessageReceiver)receivers.get(key);
936         }
937         else
938         {
939             throw new IllegalArgumentException JavaDoc("Receiver key must not be null");
940         }
941     }
942
943     /** @deprecated Use lookupReceiver instead */
944     public AbstractMessageReceiver getReceiver(String JavaDoc key)
945     {
946         if (key != null)
947         {
948             return (AbstractMessageReceiver)receivers.get(key);
949         }
950         else
951         {
952             throw new IllegalArgumentException JavaDoc("Receiver key must not be null");
953         }
954     }
955
956     public AbstractMessageReceiver[] getReceivers(String JavaDoc wildcardExpression)
957     {
958
959         List JavaDoc temp = new ArrayList JavaDoc();
960         WildcardFilter filter = new WildcardFilter(wildcardExpression);
961         filter.setCaseSensitive(false);
962         for (Iterator JavaDoc iterator = receivers.keySet().iterator(); iterator.hasNext();)
963         {
964             Object JavaDoc o = iterator.next();
965             if (filter.accept(o))
966             {
967                 temp.add(receivers.get(o));
968             }
969         }
970         AbstractMessageReceiver[] result = new AbstractMessageReceiver[temp.size()];
971         return (AbstractMessageReceiver[])temp.toArray(result);
972     }
973
974     public void connect() throws Exception JavaDoc
975     {
976         if (connected.get())
977         {
978             return;
979         }
980         checkDisposed();
981         if (connecting.commit(false, true))
982         {
983             connectionStrategy.connect(this);
984             logger.info("Connected: " + getConnectionDescription());
985             // This method calls itself so the the connecting vflag is set first, the
986
// the
987
// connection is made on the second call
988
return;
989         }
990
991         try
992         {
993             doConnect();
994             fireNotification(new ConnectionNotification(this, getConnectEventId(),
995                 ConnectionNotification.CONNECTION_CONNECTED));
996         }
997         catch (Exception JavaDoc e)
998         {
999             fireNotification(new ConnectionNotification(this, getConnectEventId(),
1000                ConnectionNotification.CONNECTION_FAILED));
1001            if (e instanceof ConnectException)
1002            {
1003                throw (ConnectException)e;
1004            }
1005            else
1006            {
1007                throw new ConnectException(e, this);
1008            }
1009        }
1010        connected.set(true);
1011        connecting.set(false);
1012        if (startOnConnect.get())
1013        {
1014            startConnector();
1015        }
1016        else
1017        {
1018            for (Iterator JavaDoc iterator = receivers.values().iterator(); iterator.hasNext();)
1019            {
1020                AbstractMessageReceiver amr = (AbstractMessageReceiver)iterator.next();
1021                if (logger.isDebugEnabled())
1022                {
1023                    logger.debug("Connecting receiver on endpoint: " + amr.getEndpoint().getEndpointURI());
1024                }
1025                amr.connect();
1026            }
1027        }
1028    }
1029
1030    public void disconnect() throws Exception JavaDoc
1031    {
1032        startOnConnect.set(isStarted());
1033        fireNotification(new ConnectionNotification(this, getConnectEventId(),
1034            ConnectionNotification.CONNECTION_DISCONNECTED));
1035        connected.set(false);
1036        try
1037        {
1038            doDisconnect();
1039        }
1040        finally
1041        {
1042            stopConnector();
1043
1044        }
1045
1046        logger.info("Disconnected: " + getConnectionDescription());
1047    }
1048
1049    public String JavaDoc getConnectionDescription()
1050    {
1051        return toString();
1052    }
1053
1054    public final boolean isConnected()
1055    {
1056        return connected.get();
1057    }
1058
1059    /**
1060     * Template method where any connections should be made for the connector
1061     *
1062     * @throws Exception
1063     */

1064    public void doConnect() throws Exception JavaDoc
1065    {
1066        // template method
1067
}
1068
1069    /**
1070     * Template method where any connected resources used by the connector should be
1071     * disconnected
1072     *
1073     * @throws Exception
1074     */

1075    public void doDisconnect() throws Exception JavaDoc
1076    {
1077        // template method
1078
}
1079
1080    /**
1081     * The resource id used when firing ConnectEvents from this connector
1082     *
1083     * @return the resource id used when firing ConnectEvents from this connector
1084     */

1085    protected String JavaDoc getConnectEventId()
1086    {
1087        return getName();
1088    }
1089
1090    /**
1091     * controls whether dispatchers or cached or created per request Note that if an
1092     * exception occurs in the Dispatcher it is automatically disposed of and a new
1093     * one is created for the next request. This allows dispatchers to recover from
1094     * loss of connection and other faults.
1095     *
1096     * @param createDispatcherPerRequest whether a new dispatcher is created for
1097     * every request or not
1098     */

1099    public void setCreateDispatcherPerRequest(boolean createDispatcherPerRequest)
1100    {
1101        this.createDispatcherPerRequest = createDispatcherPerRequest;
1102    }
1103
1104    /**
1105     * controls whether dispatchers or cached or created per request Note that if an
1106     * exception occurs in the Dispatcher it is automatically disposed of and a new
1107     * one is created for the next request. This allows dispatchers to recover from
1108     * loss of connection and other faults.
1109     *
1110     * @return true if a anew dispatcher is created for every request
1111     */

1112    public boolean isCreateDispatcherPerRequest()
1113    {
1114        return createDispatcherPerRequest;
1115    }
1116
1117    /**
1118     * For better throughput when using TransactedMessageReceivers. This will create
1119     * an number of receiver threads based on the ThreadingProfile configured fro the
1120     * receiver. This property is user by transports that support transactions,
1121     * specifically MessageReceivers that extend the
1122     * TransactedPollingMessageReceiver.
1123     *
1124     * @return true if multiple receiver threads will be created for receivers on
1125     * this connection
1126     */

1127    public boolean isCreateMultipleTransactedReceivers()
1128    {
1129        return createMultipleTransactedReceivers;
1130    }
1131
1132    /**
1133     * For better throughput when using TransactedMessageReceivers. This will create
1134     * an number of receiver threads based on the ThreadingProfile configured fro the
1135     * receiver. This property is user by transports that support transactions,
1136     * specifically MessageReceivers that extend the
1137     * TransactedPollingMessageReceiver.
1138     *
1139     * @param createMultipleTransactedReceivers true if multiple receiver threads
1140     * will be created for receivers on this connection
1141     */

1142    public void setCreateMultipleTransactedReceivers(boolean createMultipleTransactedReceivers)
1143    {
1144        this.createMultipleTransactedReceivers = createMultipleTransactedReceivers;
1145    }
1146
1147    /**
1148     * Whether to fire message notifications for every message that is sent or
1149     * received from this connector
1150     */

1151    public boolean isEnableMessageEvents()
1152    {
1153        return enableMessageEvents;
1154    }
1155
1156    /**
1157     * Whether to fire message notifications for every message that is sent or
1158     * received from this connector
1159     *
1160     * @param enableMessageEvents
1161     */

1162    public void setEnableMessageEvents(boolean enableMessageEvents)
1163    {
1164        this.enableMessageEvents = enableMessageEvents;
1165    }
1166
1167    /**
1168     * Registers other protocols 'understood' by this connector. These must contain
1169     * scheme meta info. Any protocol registered must begin with the protocol of this
1170     * connector, i.e. If the connector is axis the protocol for jms over axis will
1171     * be axis:jms. Here, 'axis' is the scheme meta info and 'jms' is the protocol.
1172     * If the protocol argument does not start with the connector's protocol, it will
1173     * be appended.
1174     *
1175     * @param protocol the supported protocol to register
1176     */

1177    public void registerSupportedProtocol(String JavaDoc protocol)
1178    {
1179        protocol = protocol.toLowerCase();
1180        if (protocol.startsWith(getProtocol().toLowerCase()))
1181        {
1182            registerSupportedProtocolWithotPrefix(protocol);
1183        }
1184        else
1185        {
1186            supportedProtocols.add(getProtocol().toLowerCase() + ":" + protocol);
1187        }
1188    }
1189
1190    /**
1191     * Registers other protocols 'understood' by this connector. These must contain
1192     * scheme meta info. Unlike the <code>registerSupportedProtolcol</code> method,
1193     * this allows you to register protocols that are not prefixed with the connector
1194     * protocol. This is useful where you use a Service Finder to discover which
1195     * Transport implementation to use. For example the 'wsdl' transport is a generic
1196     * 'finder' transport that will use Axis, Xfire or Glue to create the WSDL
1197     * client. These transport protocols would be wsdl-axis, wsdl-xfire and
1198     * wsdl-glue, but they can all support 'wsdl' protocol too.
1199     *
1200     * @param protocol the supported protocol to register
1201     */

1202    protected void registerSupportedProtocolWithotPrefix(String JavaDoc protocol)
1203    {
1204        supportedProtocols.add(protocol.toLowerCase());
1205    }
1206
1207    public void unregisterSupportedProtocol(String JavaDoc protocol)
1208    {
1209        protocol = protocol.toLowerCase();
1210        if (protocol.startsWith(getProtocol().toLowerCase()))
1211        {
1212            supportedProtocols.remove(protocol);
1213        }
1214        else
1215        {
1216            supportedProtocols.remove(getProtocol().toLowerCase() + ":" + protocol);
1217        }
1218    }
1219
1220    /**
1221     * @return true if the protocol is supported by this connector.
1222     */

1223    public boolean supportsProtocol(String JavaDoc protocol)
1224    {
1225        return supportedProtocols.contains(protocol.toLowerCase());
1226    }
1227
1228    /**
1229     * Returns an unmodifiable list of the protocols supported by this connector
1230     *
1231     * @return an unmodifiable list of the protocols supported by this connector
1232     */

1233    public List JavaDoc getSupportedProtocols()
1234    {
1235        return Collections.unmodifiableList(supportedProtocols);
1236    }
1237
1238    /**
1239     * Sets A list of protocols that the connector can accept
1240     *
1241     * @param supportedProtocols
1242     */

1243    public void setSupportedProtocols(List JavaDoc supportedProtocols)
1244    {
1245        for (Iterator JavaDoc iterator = supportedProtocols.iterator(); iterator.hasNext();)
1246        {
1247            String JavaDoc s = (String JavaDoc)iterator.next();
1248            registerSupportedProtocol(s);
1249        }
1250    }
1251
1252    /**
1253     * Creates a work manager for a Message receiver. If
1254     * <code>useSingleReceiverThreadPool</code> has been set the same workManager
1255     * of all receivers will be used
1256     *
1257     * @param name The name to associate with the thread pool. No that the connector
1258     * name will be prepended and ".receiver" will be appended
1259     * @return A new work manager of an existing one if the work manager is being
1260     * shared
1261     */

1262    UMOWorkManager createReceiverWorkManager(String JavaDoc name)
1263    {
1264        UMOWorkManager wm;
1265        if (useSingleReceiverThreadPool && receiverWorkManager != null)
1266        {
1267            wm = receiverWorkManager;
1268        }
1269        else
1270        {
1271            ThreadingProfile tp = getReceiverThreadingProfile();
1272            if (serverSide)
1273            {
1274                tp.setThreadPriority(Thread.NORM_PRIORITY + 2);
1275            }
1276            wm = tp.createWorkManager(getName() + "." + name + ".receiver");
1277            if (useSingleReceiverThreadPool)
1278            {
1279                receiverWorkManager = wm;
1280            }
1281        }
1282        return wm;
1283    }
1284
1285    /**
1286     * Creates a work manager for a Message dispatcher. If
1287     * <code>useSingleDispatcherThreadPool</code> has been set the same workManager
1288     * of all dispatchers will be used
1289     *
1290     * @param name The name to associate with the thread pool. No that the connector
1291     * name will be prepended and ".dispatcher" will be appended
1292     * @return A new work manager of an existing one if the work manager is being
1293     * shared
1294     */

1295    UMOWorkManager createDispatcherWorkManager(String JavaDoc name)
1296    {
1297        UMOWorkManager wm;
1298        if (useSingleDispatcherThreadPool && dispatcherWorkManager != null)
1299        {
1300            wm = dispatcherWorkManager;
1301        }
1302        else
1303        {
1304            ThreadingProfile tp = getReceiverThreadingProfile();
1305            wm = tp.createWorkManager(getName() + "." + name + ".dispatcher");
1306            if (useSingleDispatcherThreadPool)
1307            {
1308                dispatcherWorkManager = wm;
1309            }
1310        }
1311        return wm;
1312    }
1313
1314    /**
1315     * Should a single receiver thread pool be created for all receivers It is
1316     * recommended that if you have a lot of receivers being registered per connector
1317     * that this should be set to true
1318     *
1319     * @return true is a single thread pool is being used for all receivers on this
1320     * connector
1321     */

1322    public boolean isUseSingleReceiverThreadPool()
1323    {
1324        return useSingleReceiverThreadPool;
1325    }
1326
1327    /**
1328     * Should a single dispatcher thread pool be created for all recivers It is
1329     * recommended that if you have a lot of receivers being registered per connector
1330     * that this should be set to true
1331     *
1332     * @param useSingleReceiverThreadPool true is a single thread pool is being used
1333     * for all receivers on this connector
1334     */

1335    public void setUseSingleReceiverThreadPool(boolean useSingleReceiverThreadPool)
1336    {
1337        this.useSingleReceiverThreadPool = useSingleReceiverThreadPool;
1338    }
1339
1340    /**
1341     * Should a single dispatcher thread pool be created for all distachers It is
1342     * recommended that if you have a lot of dispatcher being created per connector
1343     * that this should be set to true i.e. many different outbound endpoints
1344     *
1345     * @return true is a single thread pool is being used for all dispatchers on this
1346     * connector
1347     */

1348    public boolean isUseSingleDispatcherThreadPool()
1349    {
1350        return useSingleDispatcherThreadPool;
1351    }
1352
1353    /**
1354     * Should a single dispatcher thread pool be created for all distachers It is
1355     * recommended that if you have a lot of dispatcher being created per connector
1356     * that this should be set to true i.e. many different outbound endpoints
1357     *
1358     * @param useSingleDispatcherThreadPool true is a single thread pool is being
1359     * used for all dispatchers on this connector
1360     */

1361    public void setUseSingleDispatcherThreadPool(boolean useSingleDispatcherThreadPool)
1362    {
1363        this.useSingleDispatcherThreadPool = useSingleDispatcherThreadPool;
1364    }
1365
1366    /**
1367     * The flag determines if the connector is being used on the server side or
1368     * client. If true receiver threads will be given a slightly higher priority.
1369     *
1370     * @return true if running on the server side (default)
1371     */

1372    public boolean isServerSide()
1373    {
1374        return serverSide;
1375    }
1376
1377    /**
1378     * The flag determines if the connector is being used on the server side or
1379     * client. If true receiver threads will be given a slightly higher priority.
1380     *
1381     * @param serverSide true if running on the server side
1382     */

1383    public void setServerSide(boolean serverSide)
1384    {
1385        this.serverSide = serverSide;
1386    }
1387
1388    public UMOSessionHandler getSessionHandler()
1389    {
1390        return sessionHandler;
1391    }
1392
1393    public void setSessionHandler(UMOSessionHandler sessionHandler)
1394    {
1395        this.sessionHandler = sessionHandler;
1396    }
1397
1398    public void workAccepted(WorkEvent JavaDoc event)
1399    {
1400        handleWorkException(event, "workAccepted");
1401    }
1402
1403    public void workRejected(WorkEvent JavaDoc event)
1404    {
1405        handleWorkException(event, "workRejected");
1406    }
1407
1408    public void workStarted(WorkEvent JavaDoc event)
1409    {
1410        handleWorkException(event, "workStarted");
1411    }
1412
1413    public void workCompleted(WorkEvent JavaDoc event)
1414    {
1415        handleWorkException(event, "workCompleted");
1416    }
1417
1418    protected void handleWorkException(WorkEvent JavaDoc event, String JavaDoc type)
1419    {
1420        Throwable JavaDoc e;
1421        if (event != null && event.getException() != null)
1422        {
1423            e = event.getException();
1424        }
1425        else
1426        {
1427            return;
1428        }
1429        if (event.getException().getCause() != null)
1430        {
1431            e = event.getException().getCause();
1432        }
1433        logger.error("Work caused exception on '" + type + "'. Work being executed was: "
1434                     + event.getWork().toString());
1435        if (e instanceof Exception JavaDoc)
1436        {
1437            handleException((Exception JavaDoc)e);
1438        }
1439        else
1440        {
1441            throw new MuleRuntimeException(new Message(Messages.CONNECTOR_CAUSED_ERROR, getName()), e);
1442        }
1443    }
1444}
1445
Popular Tags