KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > impl > model > seda > SedaComponent


1 /*
2  * $Id: SedaComponent.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.impl.model.seda;
12
13 import org.mule.MuleManager;
14 import org.mule.MuleRuntimeException;
15 import org.mule.config.PoolingProfile;
16 import org.mule.config.ThreadingProfile;
17 import org.mule.config.i18n.Message;
18 import org.mule.config.i18n.Messages;
19 import org.mule.impl.FailedToQueueEventException;
20 import org.mule.impl.MuleDescriptor;
21 import org.mule.impl.MuleEvent;
22 import org.mule.impl.model.AbstractComponent;
23 import org.mule.impl.model.DefaultMuleProxy;
24 import org.mule.impl.model.MuleProxy;
25 import org.mule.umo.ComponentException;
26 import org.mule.umo.UMOEvent;
27 import org.mule.umo.UMOException;
28 import org.mule.umo.UMOMessage;
29 import org.mule.umo.lifecycle.InitialisationException;
30 import org.mule.umo.lifecycle.LifecycleException;
31 import org.mule.umo.manager.UMOWorkManager;
32 import org.mule.util.ObjectPool;
33 import org.mule.util.queue.QueueSession;
34
35 import javax.resource.spi.work.Work JavaDoc;
36 import javax.resource.spi.work.WorkEvent JavaDoc;
37 import javax.resource.spi.work.WorkException JavaDoc;
38 import javax.resource.spi.work.WorkListener JavaDoc;
39 import javax.resource.spi.work.WorkManager JavaDoc;
40 import java.util.NoSuchElementException JavaDoc;
41
42 /**
43  * A Seda component runs inside a Seda Model and is responsible for managing a Seda
44  * Queue and thread pool for a Mule sevice component. In Seda terms this is
45  * equivilent to a stage.
46  *
47  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
48  * @version $Revision: 3798 $
49  */

50 public class SedaComponent extends AbstractComponent implements Work JavaDoc, WorkListener JavaDoc
51 {
52     /**
53      * Serial version
54      */

55     private static final long serialVersionUID = 7711976708670893015L;
56
57     /**
58      * A pool of available Mule Proxies. Component pooling has been disabled on the
59      * SEDAModel, this pool will be null anf the 'componentProxy' will be used.
60      */

61     protected ObjectPool proxyPool = null;
62
63     /**
64      * Is created only if component pooling is turned off on the SEDAModel. in this
65      * scenario all requests are serviced by this component, unless
66      * 'componentPerRequest' flag is set on the model
67      */

68     protected MuleProxy componentProxy = null;
69
70     protected UMOWorkManager workManager;
71
72     protected String JavaDoc descriptorQueueName;
73
74     /**
75      * The time out used for taking from the Seda Queue
76      */

77     protected int queueTimeout = 0;
78
79     /**
80      * Whether component objects should be pooled or a single instance should be used
81      */

82     protected boolean enablePooling = true;
83
84     /**
85      * If this is set to true a new component will be created for every request
86      */

87     protected boolean componentPerRequest = false;
88
89     /**
90      * Creates a new SEDA component
91      *
92      * @param descriptor The descriptor of the component to creat
93      * @param model the model in which the component is registered
94      */

95     public SedaComponent(MuleDescriptor descriptor, SedaModel model)
96     {
97         super(descriptor, model);
98         descriptorQueueName = descriptor.getName() + ".component";
99         queueTimeout = model.getQueueTimeout();
100         enablePooling = model.isEnablePooling();
101         componentPerRequest = model.isComponentPerRequest();
102     }
103
104     /**
105      * Initialise the component. The component will first create a Mule UMO from the
106      * UMODescriptor and then initialise a pool based on the attributes in the
107      * UMODescriptor.
108      *
109      * @throws org.mule.umo.lifecycle.InitialisationException if the component fails
110      * to initialise
111      * @see org.mule.umo.UMODescriptor
112      */

113     public synchronized void doInitialise() throws InitialisationException
114     {
115         // Create thread pool
116
ThreadingProfile tp = descriptor.getThreadingProfile();
117         workManager = tp.createWorkManager(descriptor.getName());
118         try
119         {
120             // Setup event Queue (used for VM execution)
121
descriptor.getQueueProfile().configureQueue(descriptor.getName());
122         }
123         catch (InitialisationException e)
124         {
125             throw e;
126         }
127         catch (Throwable JavaDoc e)
128         {
129             throw new InitialisationException(
130                 new Message(Messages.X_FAILED_TO_INITIALISE, "Component Queue"), e, this);
131         }
132     }
133
134     protected void initialisePool() throws InitialisationException
135     {
136         try
137         {
138             // Initialise the proxy pool
139
proxyPool = descriptor.getPoolingProfile().getPoolFactory().createPool(descriptor);
140
141             if (descriptor.getPoolingProfile().getInitialisationPolicy() == PoolingProfile.POOL_INITIALISE_ALL_COMPONENTS)
142             {
143                 int threads = descriptor.getPoolingProfile().getMaxActive();
144                 for (int i = 0; i < threads; i++)
145                 {
146                     proxyPool.returnObject(proxyPool.borrowObject());
147                 }
148             }
149             else if (descriptor.getPoolingProfile().getInitialisationPolicy() == PoolingProfile.POOL_INITIALISE_ONE_COMPONENT)
150             {
151                 proxyPool.returnObject(proxyPool.borrowObject());
152             }
153
154             poolInitialised.set(true);
155         }
156         catch (Exception JavaDoc e)
157         {
158             throw new InitialisationException(new Message(Messages.X_FAILED_TO_INITIALISE, "Proxy Pool"), e,
159                 this);
160         }
161     }
162
163     protected MuleProxy createComponentProxy() throws InitialisationException
164     {
165         try
166         {
167             Object JavaDoc component = lookupComponent();
168             MuleProxy componentProxy = new DefaultMuleProxy(component, descriptor, null);
169             getStatistics().setComponentPoolSize(-1);
170             componentProxy.setStatistics(getStatistics());
171             componentProxy.start();
172             return componentProxy;
173         }
174         catch (UMOException e)
175         {
176             throw new InitialisationException(e, this);
177         }
178     }
179
180     public void doForceStop() throws UMOException
181     {
182         doStop();
183     }
184
185     public void doStop() throws UMOException
186     {
187         workManager.stop();
188         if (proxyPool != null)
189         {
190             try
191             {
192                 proxyPool.stop();
193                 proxyPool.clearPool();
194             }
195             catch (Exception JavaDoc e)
196             {
197                 logger.error("Failed to stop component pool: " + e.getMessage(), e);
198             }
199             poolInitialised.set(false);
200         }
201         else if (componentProxy != null)
202         {
203             componentProxy.stop();
204         }
205     }
206
207     public void doStart() throws UMOException
208     {
209
210         try
211         {
212             // Need to initialise the pool only after all listerner have
213
// been registered and initialised so we need to delay until now
214
if (!poolInitialised.get() && enablePooling)
215             {
216                 initialisePool();
217                 proxyPool.start();
218             }
219             else if (!componentPerRequest)
220             {
221                 componentProxy = createComponentProxy();
222             }
223             workManager.start();
224             workManager.scheduleWork(this, WorkManager.INDEFINITE, null, this);
225         }
226         catch (Exception JavaDoc e)
227         {
228             throw new LifecycleException(new Message(Messages.FAILED_TO_START_X, "Component: "
229                                                                                  + descriptor.getName()), e,
230                 this);
231         }
232     }
233
234     protected void doDispose()
235     {
236
237         try
238         {
239             // threadPool.awaitTerminationAfterShutdown();
240
if (workManager != null)
241             {
242                 workManager.dispose();
243             }
244         }
245         catch (Exception JavaDoc e)
246         {
247             logger.error("Component Thread Pool did not close properly: " + e);
248         }
249         try
250         {
251             if (proxyPool != null)
252             {
253                 proxyPool.clearPool();
254             }
255             else if (componentProxy != null)
256             {
257                 componentProxy.dispose();
258             }
259         }
260         catch (Exception JavaDoc e)
261         {
262             logger.error("Proxy Pool did not close properly: " + e);
263         }
264     }
265
266     protected void doDispatch(UMOEvent event) throws UMOException
267     {
268         // Dispatching event to the component
269
if (stats.isEnabled())
270         {
271             stats.incReceivedEventASync();
272         }
273         if (logger.isDebugEnabled())
274         {
275             logger.debug("Component: " + descriptor.getName() + " has received asynchronous event on: "
276                          + event.getEndpoint().getEndpointURI());
277         }
278
279         // Block until we can queue the next event
280
try
281         {
282             enqueue(event);
283             if (stats.isEnabled())
284             {
285                 stats.incQueuedEvent();
286             }
287         }
288         catch (Exception JavaDoc e)
289         {
290             FailedToQueueEventException e1 = new FailedToQueueEventException(new Message(
291                 Messages.INTERRUPTED_QUEUING_EVENT_FOR_X, getName()), event.getMessage(), this, e);
292             handleException(e1);
293         }
294
295         if (logger.isTraceEnabled())
296         {
297             logger.trace("Event added to queue for: " + descriptor.getName());
298         }
299     }
300
301     public UMOMessage doSend(UMOEvent event) throws UMOException
302     {
303
304         UMOMessage result = null;
305         MuleProxy proxy = null;
306         try
307         {
308             if (proxyPool != null)
309             {
310                 proxy = (MuleProxy)proxyPool.borrowObject();
311                 getStatistics().setComponentPoolSize(proxyPool.getSize());
312             }
313             else if (componentPerRequest)
314             {
315                 proxy = createComponentProxy();
316             }
317             else
318             {
319                 proxy = componentProxy;
320             }
321
322             proxy.setStatistics(getStatistics());
323
324             if (logger.isDebugEnabled())
325             {
326                 logger.debug(this + " : got proxy for " + event.getId() + " = " + proxy);
327             }
328             result = (UMOMessage)proxy.onCall(event);
329         }
330         catch (UMOException e)
331         {
332             throw e;
333         }
334         catch (Exception JavaDoc e)
335         {
336             throw new ComponentException(event.getMessage(), this, e);
337         }
338         finally
339         {
340             try
341             {
342                 if (proxy != null)
343                 {
344                     if (proxyPool != null)
345                     {
346                         proxyPool.returnObject(proxy);
347                     }
348                     else if (componentPerRequest)
349                     {
350                         proxy.dispose();
351                     }
352                 }
353             }
354             catch (Exception JavaDoc e)
355             {
356                 // noinspection ThrowFromFinallyBlock
357
throw new ComponentException(event.getMessage(), this, e);
358             }
359
360             if (proxyPool != null)
361             {
362                 getStatistics().setComponentPoolSize(proxyPool.getSize());
363             }
364         }
365         return result;
366     }
367
368     /**
369      * @return the pool of Mule UMOs initialised in this component
370      */

371     ObjectPool getProxyPool()
372     {
373         return proxyPool;
374     }
375
376     public int getQueueSize()
377     {
378         QueueSession queueSession = MuleManager.getInstance().getQueueManager().getQueueSession();
379         return queueSession.getQueue(descriptor.getName()).size();
380     }
381
382     /**
383      * While the component isn't stopped this runs a continuous loop checking for new
384      * events in the queue
385      */

386     public void run()
387     {
388         MuleEvent event = null;
389         MuleProxy proxy = null;
390         QueueSession queueSession = MuleManager.getInstance().getQueueManager().getQueueSession();
391
392         while (!stopped.get())
393         {
394             try
395             {
396                 // Wait if the component is paused
397
paused.whenFalse(null);
398
399                 // If we're doing a draining stop, read all events from the queue
400
// before stopping
401
if (stopping.get())
402                 {
403                     if (queueSession == null || queueSession.getQueue(descriptorQueueName).size() == 0)
404                     {
405                         stopping.set(false);
406                         break;
407                     }
408                 }
409
410                 event = (MuleEvent)dequeue();
411                 if (event != null)
412                 {
413                     if (stats.isEnabled())
414                     {
415                         stats.decQueuedEvent();
416                     }
417
418                     if (logger.isDebugEnabled())
419                     {
420                         logger.debug("Component: " + descriptor.getName() + " dequeued event on: "
421                                      + event.getEndpoint().getEndpointURI());
422                     }
423
424                     if (proxyPool != null)
425                     {
426                         proxy = (MuleProxy)proxyPool.borrowObject();
427                         getStatistics().setComponentPoolSize(proxyPool.getSize());
428                     }
429                     else if (componentPerRequest)
430                     {
431                         proxy = createComponentProxy();
432                     }
433                     else
434                     {
435                         proxy = componentProxy;
436                     }
437
438                     proxy.setStatistics(getStatistics());
439                     proxy.start();
440                     proxy.onEvent(queueSession, event);
441                     workManager.scheduleWork(proxy, WorkManager.INDEFINITE, null, this);
442                 }
443             }
444             catch (Exception JavaDoc e)
445             {
446                 if (proxy != null && proxyPool != null)
447                 {
448                     try
449                     {
450                         proxyPool.returnObject(proxy);
451                     }
452                     catch (Exception JavaDoc e2)
453                     {
454                         logger.info("Failed to return proxy to pool", e2);
455                     }
456                 }
457
458                 if (e instanceof InterruptedException JavaDoc)
459                 {
460                     stopping.set(false);
461                     break;
462                 }
463                 else if (e instanceof NoSuchElementException JavaDoc)
464                 {
465                     handleException(new ComponentException(new Message(Messages.PROXY_POOL_TIMED_OUT),
466                         (event == null ? null : event.getMessage()), this, e));
467                 }
468                 else if (e instanceof UMOException)
469                 {
470                     handleException(e);
471                 }
472                 else if (e instanceof WorkException JavaDoc)
473                 {
474                     handleException(new ComponentException(new Message(
475                         Messages.EVENT_PROCESSING_FAILED_FOR_X, descriptor.getName()), (event == null
476                                     ? null : event.getMessage()), this, e));
477                 }
478                 else
479                 {
480                     handleException(new ComponentException(new Message(Messages.FAILED_TO_GET_POOLED_OBJECT),
481                         (event == null ? null : event.getMessage()), this, e));
482                 }
483             }
484             finally
485             {
486                 stopping.set(false);
487                 if (proxy != null && componentPerRequest)
488                 {
489                     proxy.dispose();
490                 }
491             }
492         }
493     }
494
495     public void release()
496     {
497         stopping.set(false);
498     }
499
500     protected void enqueue(UMOEvent event) throws Exception JavaDoc
501     {
502         QueueSession session = MuleManager.getInstance().getQueueManager().getQueueSession();
503         session.getQueue(descriptorQueueName).put(event);
504     }
505
506     protected UMOEvent dequeue() throws Exception JavaDoc
507     {
508         // Wait until an event is available
509
QueueSession queueSession = MuleManager.getInstance().getQueueManager().getQueueSession();
510         return (UMOEvent)queueSession.getQueue(descriptorQueueName).poll(queueTimeout);
511     }
512
513     public void workAccepted(WorkEvent JavaDoc event)
514     {
515         handleWorkException(event, "workAccepted");
516     }
517
518     public void workRejected(WorkEvent JavaDoc event)
519     {
520         handleWorkException(event, "workRejected");
521     }
522
523     public void workStarted(WorkEvent JavaDoc event)
524     {
525         handleWorkException(event, "workStarted");
526     }
527
528     public void workCompleted(WorkEvent JavaDoc event)
529     {
530         handleWorkException(event, "workCompleted");
531     }
532
533     protected void handleWorkException(WorkEvent JavaDoc event, String JavaDoc type)
534     {
535         Throwable JavaDoc e;
536
537         if (event != null && event.getException() != null)
538         {
539             e = event.getException();
540         }
541         else
542         {
543             return;
544         }
545
546         if (event.getException().getCause() != null)
547         {
548             e = event.getException().getCause();
549         }
550
551         logger.error("Work caused exception on '" + type + "'. Work being executed was: "
552                      + event.getWork().toString());
553
554         if (e instanceof Exception JavaDoc)
555         {
556             handleException((Exception JavaDoc)e);
557         }
558         else
559         {
560             throw new MuleRuntimeException(new Message(Messages.COMPONENT_CAUSED_ERROR_IS_X, getName()), e);
561         }
562     }
563
564 }
565
Popular Tags