KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > impl > model > AbstractComponent


1 /*
2  * $Id: AbstractComponent.java 4219 2006-12-09 10:15:14Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.impl.model;
12
13 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
14 import org.apache.commons.logging.Log;
15 import org.apache.commons.logging.LogFactory;
16 import org.mule.MuleManager;
17 import org.mule.config.i18n.Message;
18 import org.mule.config.i18n.Messages;
19 import org.mule.impl.DefaultComponentExceptionStrategy;
20 import org.mule.impl.MuleDescriptor;
21 import org.mule.impl.RequestContext;
22 import org.mule.impl.internal.notifications.ComponentNotification;
23 import org.mule.management.stats.ComponentStatistics;
24 import org.mule.providers.AbstractConnector;
25 import org.mule.umo.ComponentException;
26 import org.mule.umo.UMOComponent;
27 import org.mule.umo.UMODescriptor;
28 import org.mule.umo.UMOEvent;
29 import org.mule.umo.UMOException;
30 import org.mule.umo.UMOMessage;
31 import org.mule.umo.endpoint.UMOEndpoint;
32 import org.mule.umo.lifecycle.InitialisationException;
33 import org.mule.umo.model.ModelException;
34 import org.mule.umo.model.UMOModel;
35 import org.mule.umo.provider.DispatchException;
36 import org.mule.umo.provider.UMOMessageDispatcher;
37 import org.mule.umo.provider.UMOMessageReceiver;
38 import org.mule.util.concurrent.WaitableBoolean;
39
40 import java.beans.ExceptionListener JavaDoc;
41 import java.util.ArrayList JavaDoc;
42 import java.util.Iterator JavaDoc;
43 import java.util.List JavaDoc;
44
45 /**
46  * A base implementation for all UMOComponents in Mule
47  */

48 public abstract class AbstractComponent implements UMOComponent
49 {
50     /**
51      * logger used by this class
52      */

53     protected transient Log logger = LogFactory.getLog(getClass());
54
55     /**
56      * The Mule descriptor associated with the component
57      */

58     protected MuleDescriptor descriptor = null;
59
60     protected ComponentStatistics stats = null;
61
62     /**
63      * Determines if the component has been stopped
64      */

65     protected AtomicBoolean stopped = new AtomicBoolean(true);
66
67     /**
68      * Determines whether stop has been called and is still in progress
69      */

70     protected WaitableBoolean stopping = new WaitableBoolean(false);
71
72     /**
73      * determines if the proxy pool has been initialised
74      */

75     protected AtomicBoolean poolInitialised = new AtomicBoolean(false);
76
77     /**
78      * The exception strategy used by the component, this is provided by the
79      * UMODescriptor
80      */

81     protected ExceptionListener JavaDoc exceptionListener = null;
82
83     /**
84      * Determines if the component has been initilised
85      */

86     protected AtomicBoolean initialised = new AtomicBoolean(false);
87
88     /**
89      * The model in which this component is registered
90      */

91     protected UMOModel model;
92
93     /**
94      * Determines if the component has been paused
95      */

96     protected WaitableBoolean paused = new WaitableBoolean(false);
97
98     /**
99      * Default constructor
100      */

101     public AbstractComponent(MuleDescriptor descriptor, UMOModel model)
102     {
103         if (descriptor == null)
104         {
105             throw new IllegalArgumentException JavaDoc("Descriptor cannot be null");
106         }
107         this.descriptor = descriptor;
108         this.model = MuleManager.getInstance().getModel();
109     }
110
111     /**
112      * Initialise the component. The component will first create a Mule UMO from the
113      * UMODescriptor and then initialise a pool based on the attributes in the
114      * UMODescriptor.
115      *
116      * @throws org.mule.umo.lifecycle.InitialisationException if the component fails
117      * to initialise
118      * @see org.mule.umo.UMODescriptor
119      */

120     public final synchronized void initialise() throws InitialisationException
121     {
122         if (initialised.get())
123         {
124             throw new InitialisationException(new Message(Messages.OBJECT_X_ALREADY_INITIALISED,
125                 "Component '" + descriptor.getName() + "'"), this);
126         }
127         descriptor.initialise();
128
129         this.exceptionListener = descriptor.getExceptionListener();
130
131         // initialise statistics
132
stats = new ComponentStatistics(getName(), descriptor.getPoolingProfile().getMaxActive(),
133             descriptor.getThreadingProfile().getMaxThreadsActive());
134
135         stats.setEnabled(((MuleManager)MuleManager.getInstance()).getStatistics().isEnabled());
136         ((MuleManager)MuleManager.getInstance()).getStatistics().add(stats);
137         stats.setOutboundRouterStat(getDescriptor().getOutboundRouter().getStatistics());
138         stats.setInboundRouterStat(getDescriptor().getInboundRouter().getStatistics());
139
140         doInitialise();
141         initialised.set(true);
142         fireComponentNotification(ComponentNotification.COMPONENT_INITIALISED);
143
144     }
145
146     protected void fireComponentNotification(int action)
147     {
148         MuleManager.getInstance().fireNotification(new ComponentNotification(descriptor, action));
149     }
150
151     void finaliseEvent(UMOEvent event)
152     {
153         logger.debug("Finalising event for: " + descriptor.getName() + " event endpointUri is: "
154                      + event.getEndpoint().getEndpointURI());
155         // queue.remove(event);
156
}
157
158     public void forceStop() throws UMOException
159     {
160         if (!stopped.get())
161         {
162             logger.debug("Stopping UMOComponent");
163             stopping.set(true);
164             fireComponentNotification(ComponentNotification.COMPONENT_STOPPING);
165             doForceStop();
166             stopped.set(true);
167             stopping.set(false);
168             fireComponentNotification(ComponentNotification.COMPONENT_STOPPED);
169         }
170     }
171
172     public void stop() throws UMOException
173     {
174         if (!stopped.get())
175         {
176             logger.debug("Stopping UMOComponent");
177             stopping.set(true);
178             fireComponentNotification(ComponentNotification.COMPONENT_STOPPING);
179
180             // Unregister Listeners for the component
181
unregisterListeners();
182             if (MuleManager.getInstance().getQueueManager().getQueueSession().getQueue(
183                 descriptor.getName() + ".component").size() > 0)
184             {
185                 try
186                 {
187                     stopping.whenFalse(null);
188                 }
189                 catch (InterruptedException JavaDoc e)
190                 {
191                     // we can ignore this
192
}
193             }
194
195             doStop();
196             stopped.set(true);
197             fireComponentNotification(ComponentNotification.COMPONENT_STOPPED);
198         }
199     }
200
201     public void start() throws UMOException
202     {
203         start(false);
204     }
205
206     /**
207      * Starts a Mule Component.
208      *
209      * @param startPaused - Start component in a "paused" state (messages are
210      * received but not processed).
211      */

212     protected void start(boolean startPaused) throws UMOException
213     {
214
215         // Create the receivers for the component but do not start them yet.
216
registerListeners();
217
218         // We connect the receivers _before_ starting the component because there may
219
// be
220
// some initialization required for the component which needs to have them
221
// connected.
222
// For example, the org.mule.providers.soap.glue.GlueMessageReceiver adds
223
// InitialisationCallbacks within its doConnect() method (see MULE-804).
224
connectListeners();
225
226         // Start (and pause) the component.
227
if (stopped.get())
228         {
229             stopped.set(false);
230             paused.set(false);
231             doStart();
232         }
233         fireComponentNotification(ComponentNotification.COMPONENT_STARTED);
234         if (startPaused)
235         {
236             pause();
237         }
238
239         // We start the receivers _after_ starting the component because if a message
240
// gets routed to the component before it is started,
241
// org.mule.impl.model.AbstractComponent.dispatchEvent() will throw a
242
// ComponentException with message COMPONENT_X_IS_STOPPED (see MULE-526).
243
startListeners();
244     }
245
246     /**
247      * Pauses event processing for a single Mule Component. Unlike stop(), a paused
248      * component will still consume messages from the underlying transport, but those
249      * messages will be queued until the component is resumed.
250      */

251     public final void pause() throws UMOException
252     {
253
254         doPause();
255         paused.set(true);
256         fireComponentNotification(ComponentNotification.COMPONENT_PAUSED);
257     }
258
259     /**
260      * Resumes a single Mule Component that has been paused. If the component is not
261      * paused nothing is executed.
262      */

263     public final void resume() throws UMOException
264     {
265         doResume();
266         paused.set(false);
267         fireComponentNotification(ComponentNotification.COMPONENT_RESUMED);
268     }
269
270     /**
271      * Determines if the component is in a paused state
272      *
273      * @return True if the component is in a paused state, false otherwise
274      */

275     public boolean isPaused()
276     {
277         return paused.get();
278     }
279
280     /**
281      * Custom components can execute code necessary to put the component in a paused
282      * state here. If a developer overloads this method the doResume() method MUST
283      * also be overloaded to avoid inconsistent state in the component
284      *
285      * @throws UMOException
286      */

287     protected void doPause() throws UMOException
288     {
289         // template method
290
}
291
292     /**
293      * Custom components can execute code necessary to resume a component once it has
294      * been paused If a developer overloads this method the doPause() method MUST
295      * also be overloaded to avoid inconsistent state in the component
296      *
297      * @throws UMOException
298      */

299     protected void doResume() throws UMOException
300     {
301         // template method
302
}
303
304     public final void dispose()
305     {
306         try
307         {
308             if (!stopped.get())
309             {
310                 stop();
311             }
312         }
313         catch (UMOException e)
314         {
315             logger.error("Failed to stop component: " + descriptor.getName(), e);
316         }
317         doDispose();
318         fireComponentNotification(ComponentNotification.COMPONENT_DISPOSED);
319         ((MuleManager)MuleManager.getInstance()).getStatistics().remove(stats);
320     }
321
322     public ComponentStatistics getStatistics()
323     {
324         return stats;
325     }
326
327     /*
328      * (non-Javadoc)
329      *
330      * @see org.mule.umo.UMOSession#getDescriptor()
331      */

332     public UMODescriptor getDescriptor()
333     {
334         return descriptor;
335     }
336
337     public void dispatchEvent(UMOEvent event) throws UMOException
338     {
339         if (stopping.get() || stopped.get())
340         {
341             throw new ComponentException(new Message(Messages.COMPONENT_X_IS_STOPPED,
342                 getDescriptor().getName()), event.getMessage(), this);
343         }
344
345         try
346         {
347             waitIfPaused(event);
348         }
349         catch (InterruptedException JavaDoc e)
350         {
351             throw new ComponentException(event.getMessage(), this, e);
352         }
353
354         // Dispatching event to an inbound endpoint
355
// in the MuleSession#dispatchEvent
356
if (!event.getEndpoint().canReceive())
357         {
358             UMOMessageDispatcher dispatcher = event.getEndpoint().getConnector().getDispatcher(
359                 event.getEndpoint());
360             try
361             {
362                 dispatcher.dispatch(event);
363             }
364             catch (Exception JavaDoc e)
365             {
366                 throw new DispatchException(event.getMessage(), event.getEndpoint(), e);
367             }
368             return;
369         }
370
371         // Dispatching event to the component
372
if (stats.isEnabled())
373         {
374             stats.incReceivedEventASync();
375         }
376         if (logger.isDebugEnabled())
377         {
378             logger.debug("Component: " + descriptor.getName() + " has received asynchronous event on: "
379                          + event.getEndpoint().getEndpointURI());
380         }
381
382         doDispatch(event);
383     }
384
385     public UMOMessage sendEvent(UMOEvent event) throws UMOException
386     {
387         if (stopping.get() || stopped.get())
388         {
389             throw new ComponentException(new Message(Messages.COMPONENT_X_IS_STOPPED,
390                 getDescriptor().getName()), event.getMessage(), this);
391         }
392
393         try
394         {
395             waitIfPaused(event);
396         }
397         catch (InterruptedException JavaDoc e)
398         {
399             throw new ComponentException(event.getMessage(), this, e);
400         }
401
402         if (stats.isEnabled())
403         {
404             stats.incReceivedEventSync();
405         }
406         if (logger.isDebugEnabled())
407         {
408             logger.debug("Component: " + descriptor.getName() + " has received synchronous event on: "
409                          + event.getEndpoint().getEndpointURI());
410         }
411         RequestContext.setEvent(event);
412         return doSend(event);
413     }
414
415     /**
416      * Called before an event is sent or dispatched to a component, it will block
417      * until resume() is called. Users can override this method if they want to
418      * handle pausing differently e.g. implement a store and forward policy
419      *
420      * @param event the current event being passed to the component
421      * @throws InterruptedException if the thread is interrupted
422      */

423     protected void waitIfPaused(UMOEvent event) throws InterruptedException JavaDoc
424     {
425         if (logger.isDebugEnabled() && paused.get())
426         {
427             logger.debug("Component: " + descriptor.getName()
428                          + " is paused. Blocking call until resume is called");
429         }
430         paused.whenFalse(null);
431     }
432
433     /**
434      * @return the Mule descriptor name which is associated with the component
435      */

436     public String JavaDoc getName()
437     {
438         return descriptor.getName();
439     }
440
441     /*
442      * (non-Javadoc)
443      *
444      * @see java.lang.Object#toString()
445      */

446     public String JavaDoc toString()
447     {
448         return descriptor.getName();
449     }
450
451     public boolean isStopped()
452     {
453         return stopped.get();
454     }
455
456     public boolean isStopping()
457     {
458         return stopping.get();
459     }
460
461     protected void handleException(Exception JavaDoc e)
462     {
463         if (exceptionListener instanceof DefaultComponentExceptionStrategy)
464         {
465             if (((DefaultComponentExceptionStrategy)exceptionListener).getComponent() == null)
466             {
467                 ((DefaultComponentExceptionStrategy)exceptionListener).setComponent(this);
468             }
469         }
470         exceptionListener.exceptionThrown(e);
471     }
472
473     /**
474      * Provides a consistent mechanism for custom models to create components.
475      *
476      * @return
477      * @throws UMOException
478      */

479     protected Object JavaDoc lookupComponent() throws UMOException
480     {
481         return ComponentFactory.createComponent(getDescriptor());
482     }
483
484     protected void doForceStop() throws UMOException
485     {
486         // template method
487
}
488
489     protected void doStop() throws UMOException
490     {
491         // template method
492
}
493
494     protected void doStart() throws UMOException
495     {
496         // template method
497
}
498
499     protected void doDispose()
500     {
501         // template method
502
}
503
504     protected void doInitialise() throws InitialisationException
505     {
506         // template method
507
}
508
509     public boolean isStarted()
510     {
511         return !stopped.get();
512     }
513
514     protected abstract UMOMessage doSend(UMOEvent event) throws UMOException;
515
516     protected abstract void doDispatch(UMOEvent event) throws UMOException;
517
518     /**
519      * Gets the underlying instance form this component Where the Component
520      * implmentation provides pooling this is no 1-2-1 mapping between UMOComponent
521      * and instance, so this method will return the object in initial state. <p/> If
522      * the underlying component is Container managed in Spring or another IoC
523      * container then the object instance in the IoC container will be returned
524      *
525      * @return the underlying instance form this component
526      */

527     public Object JavaDoc getInstance() throws UMOException
528     {
529         return lookupComponent();
530     }
531
532     protected void registerListeners() throws UMOException
533     {
534         UMOEndpoint endpoint;
535         List JavaDoc endpoints = getIncomingEndpoints();
536
537         for (Iterator JavaDoc it = endpoints.iterator(); it.hasNext();)
538         {
539             endpoint = (UMOEndpoint)it.next();
540             try
541             {
542                 endpoint.getConnector().registerListener(this, endpoint);
543             }
544             catch (UMOException e)
545             {
546                 throw e;
547             }
548             catch (Exception JavaDoc e)
549             {
550                 throw new ModelException(new Message(Messages.FAILED_TO_REGISTER_X_ON_ENDPOINT_X,
551                     getDescriptor().getName(), endpoint.getEndpointURI()), e);
552             }
553         }
554     }
555
556     protected void unregisterListeners() throws UMOException
557     {
558         UMOEndpoint endpoint;
559         List JavaDoc endpoints = getIncomingEndpoints();
560
561         for (Iterator JavaDoc it = endpoints.iterator(); it.hasNext();)
562         {
563             endpoint = (UMOEndpoint)it.next();
564             try
565             {
566                 endpoint.getConnector().unregisterListener(this, endpoint);
567             }
568             catch (UMOException e)
569             {
570                 throw e;
571             }
572             catch (Exception JavaDoc e)
573             {
574                 throw new ModelException(new Message(Messages.FAILED_TO_UNREGISTER_X_ON_ENDPOINT_X,
575                     getDescriptor().getName(), endpoint.getEndpointURI()), e);
576             }
577         }
578     }
579
580     protected void startListeners() throws UMOException
581     {
582         UMOEndpoint endpoint;
583         List JavaDoc endpoints = getIncomingEndpoints();
584
585         for (Iterator JavaDoc it = endpoints.iterator(); it.hasNext();)
586         {
587             endpoint = (UMOEndpoint)it.next();
588             UMOMessageReceiver receiver = ((AbstractConnector)endpoint.getConnector()).getReceiver(this,
589                 endpoint);
590             if (receiver != null && endpoint.getConnector().isStarted()
591                 && endpoint.getInitialState().equals(UMOEndpoint.INITIAL_STATE_STARTED))
592             {
593                 receiver.start();
594             }
595         }
596     }
597
598     protected void stopListeners() throws UMOException
599     {
600         UMOEndpoint endpoint;
601         List JavaDoc endpoints = getIncomingEndpoints();
602
603         for (Iterator JavaDoc it = endpoints.iterator(); it.hasNext();)
604         {
605             endpoint = (UMOEndpoint)it.next();
606             UMOMessageReceiver receiver = ((AbstractConnector)endpoint.getConnector()).getReceiver(this,
607                 endpoint);
608             if (receiver != null)
609             {
610                 receiver.stop();
611             }
612         }
613     }
614
615     protected void connectListeners() throws UMOException
616     {
617         UMOEndpoint endpoint;
618         List JavaDoc endpoints = getIncomingEndpoints();
619
620         for (Iterator JavaDoc it = endpoints.iterator(); it.hasNext();)
621         {
622             endpoint = (UMOEndpoint)it.next();
623             UMOMessageReceiver receiver = ((AbstractConnector)endpoint.getConnector()).getReceiver(this,
624                 endpoint);
625             if (receiver != null)
626             {
627                 try
628                 {
629                     receiver.connect();
630                 }
631                 catch (Exception JavaDoc e)
632                 {
633                     throw new ModelException(
634                         Message.createStaticMessage("Failed to connect listener for endpoint "
635                                                     + endpoint.getName()), e);
636                 }
637             }
638         }
639     }
640
641     protected void disconnectListeners() throws UMOException
642     {
643         UMOEndpoint endpoint;
644         List JavaDoc endpoints = getIncomingEndpoints();
645
646         for (Iterator JavaDoc it = endpoints.iterator(); it.hasNext();)
647         {
648             endpoint = (UMOEndpoint)it.next();
649             UMOMessageReceiver receiver = ((AbstractConnector)endpoint.getConnector()).getReceiver(this,
650                 endpoint);
651             if (receiver != null)
652             {
653                 try
654                 {
655                     receiver.disconnect();
656                 }
657                 catch (Exception JavaDoc e)
658                 {
659                     throw new ModelException(
660                         Message.createStaticMessage("Failed to connect listener for endpoint "
661                                                     + endpoint.getName()), e);
662                 }
663             }
664         }
665     }
666
667     /**
668      * Returns a list of all incoming endpoints on a component.
669      */

670     protected List JavaDoc getIncomingEndpoints()
671     {
672         List JavaDoc endpoints = new ArrayList JavaDoc();
673
674         // Add inbound endpoints
675
endpoints.addAll(getDescriptor().getInboundRouter().getEndpoints());
676         // Add the (deprecated) single inbound endpoint.
677
if (getDescriptor().getInboundEndpoint() != null)
678         {
679             endpoints.add(getDescriptor().getInboundEndpoint());
680         }
681
682         // Add response endpoints
683
if (getDescriptor().getResponseRouter() != null
684             && getDescriptor().getResponseRouter().getEndpoints() != null)
685         {
686             endpoints.addAll(getDescriptor().getResponseRouter().getEndpoints());
687         }
688         return endpoints;
689     }
690
691 }
692
Popular Tags