KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > impl > model > DefaultMuleProxy


1 /*
2  * $Id: DefaultMuleProxy.java 4219 2006-12-09 10:15:14Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.impl.model;
12
13 import org.apache.commons.logging.Log;
14 import org.apache.commons.logging.LogFactory;
15 import org.mule.MuleManager;
16 import org.mule.config.MuleProperties;
17 import org.mule.config.i18n.Message;
18 import org.mule.config.i18n.Messages;
19 import org.mule.impl.ImmutableMuleDescriptor;
20 import org.mule.impl.InterceptorsInvoker;
21 import org.mule.impl.MuleDescriptor;
22 import org.mule.impl.MuleEvent;
23 import org.mule.impl.MuleMessage;
24 import org.mule.impl.RequestContext;
25 import org.mule.impl.endpoint.MuleEndpoint;
26 import org.mule.impl.endpoint.MuleEndpointURI;
27 import org.mule.impl.message.ExceptionPayload;
28 import org.mule.management.stats.ComponentStatistics;
29 import org.mule.providers.AbstractConnector;
30 import org.mule.providers.NullPayload;
31 import org.mule.providers.ReplyToHandler;
32 import org.mule.umo.MessagingException;
33 import org.mule.umo.UMOEvent;
34 import org.mule.umo.UMOException;
35 import org.mule.umo.UMOExceptionPayload;
36 import org.mule.umo.UMOImmutableDescriptor;
37 import org.mule.umo.UMOInterceptor;
38 import org.mule.umo.UMOMessage;
39 import org.mule.umo.endpoint.UMOEndpoint;
40 import org.mule.umo.endpoint.UMOEndpointURI;
41 import org.mule.umo.endpoint.UMOImmutableEndpoint;
42 import org.mule.umo.lifecycle.Disposable;
43 import org.mule.umo.lifecycle.Initialisable;
44 import org.mule.umo.lifecycle.UMOLifecycleAdapter;
45 import org.mule.umo.model.ModelException;
46 import org.mule.umo.model.UMOEntryPointResolver;
47 import org.mule.umo.model.UMOModel;
48 import org.mule.umo.provider.UMOMessageDispatcher;
49 import org.mule.util.ObjectPool;
50 import org.mule.util.queue.QueueSession;
51
52 import java.util.ArrayList JavaDoc;
53 import java.util.Iterator JavaDoc;
54 import java.util.List JavaDoc;
55 import java.util.Map JavaDoc;
56
57 /**
58  * <code>MuleProxy</code> is a proxy to a UMO. It is a poolable object that that
59  * can be executed in it's own thread.
60  */

61
62 public class DefaultMuleProxy implements MuleProxy
63 {
64     /**
65      * logger used by this class
66      */

67     private static Log logger = LogFactory.getLog(DefaultMuleProxy.class);
68
69     /**
70      * Holds the current event being processed
71      */

72     private UMOEvent event;
73
74     /**
75      * Holds the actual UMO
76      */

77     private UMOLifecycleAdapter umo;
78
79     /**
80      * holds the UMO descriptor
81      */

82     private ImmutableMuleDescriptor descriptor;
83
84     /**
85      * Determines if the proxy is suspended
86      */

87     private boolean suspended = true;
88
89     private List JavaDoc interceptorList;
90
91     private ObjectPool proxyPool;
92
93     private ComponentStatistics stat = null;
94
95     private QueueSession queueSession = null;
96
97     /**
98      * Constructs a Proxy using the UMO's AbstractMessageDispatcher and the UMO
99      * itself
100      *
101      * @param component the underlying object that with receive events
102      * @param descriptor the UMOComponent descriptor associated with the component
103      */

104     public DefaultMuleProxy(Object JavaDoc component, MuleDescriptor descriptor, ObjectPool proxyPool)
105         throws UMOException
106     {
107         this.descriptor = new ImmutableMuleDescriptor(descriptor);
108         this.proxyPool = proxyPool;
109
110         UMOModel model = MuleManager.getInstance().getModel();
111
112         UMOEntryPointResolver resolver = model.getEntryPointResolver();
113         umo = model.getLifecycleAdapterFactory().create(component, descriptor, resolver);
114
115         interceptorList = new ArrayList JavaDoc(descriptor.getInterceptors().size() + 1);
116         interceptorList.addAll(descriptor.getInterceptors());
117         interceptorList.add(umo);
118
119         for (Iterator JavaDoc iter = interceptorList.iterator(); iter.hasNext();)
120         {
121             UMOInterceptor interceptor = (UMOInterceptor)iter.next();
122             if (interceptor instanceof Initialisable)
123             {
124                 try
125                 {
126                     ((Initialisable)interceptor).initialise();
127                 }
128                 catch (Exception JavaDoc e)
129                 {
130                     throw new ModelException(new Message(Messages.X_FAILED_TO_INITIALISE,
131                         "Component '" + descriptor.getName() + "'"), e);
132                 }
133             }
134         }
135     }
136
137     public void start() throws UMOException
138     {
139         checkDisposed();
140         if (!umo.isStarted())
141         {
142             try
143             {
144                 umo.start();
145             }
146             catch (Exception JavaDoc e)
147             {
148                 throw new ModelException(
149                     new Message(Messages.FAILED_TO_START_X, "Component '" + descriptor.getName() + "'"), e);
150             }
151         }
152
153     }
154
155     public boolean isStarted()
156     {
157         return umo.isStarted();
158     }
159
160     public void stop() throws UMOException
161     {
162         checkDisposed();
163         if (umo.isStarted())
164         {
165             try
166             {
167                 umo.stop();
168             }
169             catch (Exception JavaDoc e)
170             {
171                 throw new ModelException(
172                     new Message(Messages.FAILED_TO_STOP_X, "Component '" + descriptor.getName() + "'"), e);
173             }
174         }
175     }
176
177     public void dispose()
178     {
179         checkDisposed();
180         for (Iterator JavaDoc iter = interceptorList.iterator(); iter.hasNext();)
181         {
182             UMOInterceptor interceptor = (UMOInterceptor)iter.next();
183             if (interceptor instanceof Disposable)
184             {
185                 try
186                 {
187                     ((Disposable)interceptor).dispose();
188                 }
189                 catch (Exception JavaDoc e)
190                 {
191                     logger.error(new Message(Messages.FAILED_TO_DISPOSE_X, "Component '"
192                                                                            + descriptor.getName() + "'"), e);
193                 }
194             }
195         }
196     }
197
198     private void checkDisposed()
199     {
200         if (umo.isDisposed())
201         {
202             throw new IllegalStateException JavaDoc("Component has already been disposed of");
203         }
204     }
205
206     /**
207      * Sets the current event being processed
208      *
209      * @param event the event being processed
210      */

211     public void onEvent(QueueSession session, UMOEvent event)
212     {
213         this.queueSession = session;
214         this.event = event;
215     }
216
217     public ComponentStatistics getStatistics()
218     {
219         return stat;
220     }
221
222     public void setStatistics(ComponentStatistics stat)
223     {
224         this.stat = stat;
225     }
226
227     /**
228      * Makes a synchronous call on the UMO
229      *
230      * @param event the event to pass to the UMO
231      * @return the return event from the UMO
232      * @throws UMOException if the call fails
233      */

234     public Object JavaDoc onCall(UMOEvent event) throws UMOException
235     {
236         if (logger.isTraceEnabled())
237         {
238             logger.trace("MuleProxy: sync call for Mule UMO " + descriptor.getName());
239         }
240
241         UMOMessage returnMessage = null;
242         try
243         {
244             if (event.getEndpoint().canReceive())
245             {
246                 RequestContext.setEvent(event);
247                 Object JavaDoc replyTo = event.getMessage().getReplyTo();
248                 ReplyToHandler replyToHandler = getReplyToHandler(event.getMessage(), event.getEndpoint());
249                 InterceptorsInvoker invoker = new InterceptorsInvoker(interceptorList, descriptor,
250                     event.getMessage());
251
252                 // stats
253
long startTime = 0;
254                 if (stat.isEnabled())
255                 {
256                     startTime = System.currentTimeMillis();
257                 }
258                 returnMessage = invoker.execute();
259
260                 // stats
261
if (stat.isEnabled())
262                 {
263                     stat.addExecutionTime(System.currentTimeMillis() - startTime);
264                 }
265                 // this is the request event
266
event = RequestContext.getEvent();
267                 if (event.isStopFurtherProcessing())
268                 {
269                     logger.debug("Event stop further processing has been set, no outbound routing will be performed.");
270                 }
271                 if (returnMessage != null && !event.isStopFurtherProcessing())
272                 {
273                     if (descriptor.getOutboundRouter().hasEndpoints())
274                     {
275                         UMOMessage outboundReturnMessage = descriptor.getOutboundRouter().route(
276                             returnMessage, event.getSession(), event.isSynchronous());
277                         if (outboundReturnMessage != null)
278                         {
279                             returnMessage = outboundReturnMessage;
280                         }
281                     }
282                     else
283                     {
284                         logger.debug("Outbound router on component '" + descriptor.getName()
285                                      + "' doesn't have any endpoints configured.");
286                     }
287                 }
288
289                 // Process Response Router
290
if (returnMessage != null && descriptor.getResponseRouter() != null)
291                 {
292                     logger.debug("Waiting for response router message");
293                     returnMessage = descriptor.getResponseRouter().getResponse(returnMessage);
294                 }
295
296                 // process repltyTo if there is one
297
if (returnMessage != null && replyToHandler != null)
298                 {
299                     String JavaDoc requestor = (String JavaDoc)returnMessage.getProperty(MuleProperties.MULE_REPLY_TO_REQUESTOR_PROPERTY);
300                     if ((requestor != null && !requestor.equals(descriptor.getName())) || requestor == null)
301                     {
302                         replyToHandler.processReplyTo(event, returnMessage, replyTo);
303                     }
304                 }
305
306             }
307             else
308             {
309                 returnMessage = event.getSession().sendEvent(event);
310                 processReplyTo(returnMessage);
311             }
312
313             // stats
314
if (stat.isEnabled())
315             {
316                 stat.incSentEventSync();
317             }
318         }
319         catch (Exception JavaDoc e)
320         {
321             event.getSession().setValid(false);
322             if (e instanceof MessagingException)
323             {
324                 handleException(e);
325             }
326             else
327             {
328                 handleException(new MessagingException(new Message(Messages.EVENT_PROCESSING_FAILED_FOR_X,
329                     descriptor.getName()), event.getMessage(), e));
330             }
331
332             if (returnMessage == null)
333             {
334                 returnMessage = new MuleMessage(new NullPayload(), (Map JavaDoc)null);
335             }
336             UMOExceptionPayload exceptionPayload = RequestContext.getExceptionPayload();
337             if (exceptionPayload == null)
338             {
339                 exceptionPayload = new ExceptionPayload(e);
340             }
341             returnMessage.setExceptionPayload(exceptionPayload);
342         }
343         return returnMessage;
344     }
345
346     /**
347      * When an exception occurs this method can be called to invoke the configured
348      * UMOExceptionStrategy on the UMO
349      *
350      * @param exception If the UMOExceptionStrategy implementation fails
351      */

352     public void handleException(Exception JavaDoc exception)
353     {
354         descriptor.getExceptionListener().exceptionThrown(exception);
355     }
356
357     /*
358      * (non-Javadoc)
359      *
360      * @see java.lang.Object#toString()
361      */

362     public String JavaDoc toString()
363     {
364         return "proxy for: " + descriptor.toString();
365     }
366
367     /**
368      * Determines if the proxy is suspended
369      *
370      * @return true if the proxy (and the UMO) are suspended
371      */

372     public boolean isSuspended()
373     {
374         return suspended;
375     }
376
377     /**
378      * Controls the suspension of the UMO event processing
379      */

380     public void suspend()
381     {
382         suspended = true;
383     }
384
385     /**
386      * Triggers the UMO to resume processing of events if it is suspended
387      */

388     public void resume()
389     {
390         suspended = false;
391     }
392
393     protected ReplyToHandler getReplyToHandler(UMOMessage message, UMOImmutableEndpoint endpoint)
394     {
395         Object JavaDoc replyTo = message.getReplyTo();
396         ReplyToHandler replyToHandler = null;
397         if (replyTo != null)
398         {
399             replyToHandler = ((AbstractConnector)endpoint.getConnector()).getReplyToHandler();
400             // Use the response transformer for the event if one is set
401
if (endpoint.getResponseTransformer() != null)
402             {
403                 replyToHandler.setTransformer(endpoint.getResponseTransformer());
404             }
405         }
406         return replyToHandler;
407     }
408
409     private void processReplyTo(UMOMessage returnMessage) throws UMOException
410     {
411         if (returnMessage != null && returnMessage.getReplyTo() != null)
412         {
413             if (logger.isDebugEnabled())
414             {
415                 logger.debug("sending reply to: " + returnMessage.getReplyTo());
416             }
417
418             UMOEndpointURI endpointUri = new MuleEndpointURI(returnMessage.getReplyTo().toString());
419
420             // get the endpointUri for this uri
421
UMOEndpoint endpoint = MuleEndpoint.getOrCreateEndpointForUri(endpointUri,
422                 UMOEndpoint.ENDPOINT_TYPE_SENDER);
423
424             // make sure remove the replyTo property as not cause a a forever
425
// replyto loop
426
returnMessage.removeProperty(MuleProperties.MULE_REPLY_TO_PROPERTY);
427
428             // Create the replyTo event asynchronous
429
UMOEvent replyToEvent = new MuleEvent(returnMessage, endpoint, event.getSession(), false);
430
431             // queue the event
432
onEvent(queueSession, replyToEvent);
433
434             if (logger.isDebugEnabled())
435             {
436                 logger.debug("reply to sent: " + returnMessage.getReplyTo());
437             }
438
439             if (stat.isEnabled())
440             {
441                 stat.incSentReplyToEvent();
442             }
443         }
444     }
445
446     /*
447      * (non-Javadoc)
448      *
449      * @see java.lang.Runnable#run()
450      */

451     public void run()
452     {
453         if (logger.isTraceEnabled())
454         {
455             logger.trace("MuleProxy: async onEvent for Mule UMO " + descriptor.getName());
456         }
457
458         try
459         {
460             if (event.getEndpoint().canReceive())
461             {
462                 // dispatch the next receiver
463
RequestContext.setEvent(event);
464                 Object JavaDoc replyTo = event.getMessage().getReplyTo();
465                 ReplyToHandler replyToHandler = getReplyToHandler(event.getMessage(), event.getEndpoint());
466                 InterceptorsInvoker invoker = new InterceptorsInvoker(interceptorList, descriptor,
467                     event.getMessage());
468
469                 // do stats
470
long startTime = 0;
471                 if (stat.isEnabled())
472                 {
473                     startTime = System.currentTimeMillis();
474                 }
475                 UMOMessage result = invoker.execute();
476                 if (stat.isEnabled())
477                 {
478                     stat.addExecutionTime(System.currentTimeMillis() - startTime);
479                 }
480                 // processResponse(result, replyTo, replyToHandler);
481
event = RequestContext.getEvent();
482                 if (result != null && !event.isStopFurtherProcessing())
483                 {
484                     descriptor.getOutboundRouter().route(result, event.getSession(), event.isSynchronous());
485                 }
486
487                 // process repltyTo if there is one
488
if (result != null && replyToHandler != null)
489                 {
490                     String JavaDoc requestor = (String JavaDoc)result.getProperty(MuleProperties.MULE_REPLY_TO_REQUESTOR_PROPERTY);
491                     if ((requestor != null && !requestor.equals(descriptor.getName())) || requestor == null)
492                     {
493                         replyToHandler.processReplyTo(event, result, replyTo);
494                     }
495                 }
496             }
497             else
498             {
499                 UMOMessageDispatcher dispatcher = event.getEndpoint().getConnector().getDispatcher(
500                     event.getEndpoint());
501                 dispatcher.dispatch(event);
502             }
503
504             if (stat.isEnabled())
505             {
506                 stat.incSentEventASync();
507             }
508         }
509         catch (Exception JavaDoc e)
510         {
511             event.getSession().setValid(false);
512             if (e instanceof MessagingException)
513             {
514                 handleException(e);
515             }
516             else
517             {
518                 handleException(new MessagingException(new Message(Messages.EVENT_PROCESSING_FAILED_FOR_X,
519                     descriptor.getName()), event.getMessage(), e));
520             }
521         }
522         finally
523         {
524             try
525             {
526                 proxyPool.returnObject(this);
527             }
528             catch (Exception JavaDoc e2)
529             {
530                 logger.error("Failed to return proxy: " + e2.getMessage(), e2);
531             }
532             getStatistics().setComponentPoolSize(proxyPool.getSize());
533         }
534     }
535
536     public void release()
537     {
538         // nothing to do
539
}
540
541     /*
542      * (non-Javadoc)
543      *
544      * @see org.mule.umo.UMOLifecycleAdapter#getDescriptor()
545      */

546     public UMOImmutableDescriptor getDescriptor()
547     {
548         return descriptor;
549     }
550 }
551
Popular Tags