KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > impl > model > seda > optimised > OptimisedMuleProxy


1 /*
2  * $Id: OptimisedMuleProxy.java 4219 2006-12-09 10:15:14Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.impl.model.seda.optimised;
12
13 import org.apache.commons.logging.Log;
14 import org.apache.commons.logging.LogFactory;
15 import org.mule.config.i18n.Message;
16 import org.mule.config.i18n.Messages;
17 import org.mule.impl.ImmutableMuleDescriptor;
18 import org.mule.impl.MuleDescriptor;
19 import org.mule.impl.MuleMessage;
20 import org.mule.impl.RequestContext;
21 import org.mule.impl.model.MuleProxy;
22 import org.mule.management.stats.ComponentStatistics;
23 import org.mule.umo.MessagingException;
24 import org.mule.umo.UMOEvent;
25 import org.mule.umo.UMOEventContext;
26 import org.mule.umo.UMOException;
27 import org.mule.umo.UMOImmutableDescriptor;
28 import org.mule.umo.UMOMessage;
29 import org.mule.umo.lifecycle.Callable;
30 import org.mule.umo.lifecycle.Disposable;
31 import org.mule.umo.lifecycle.Startable;
32 import org.mule.umo.lifecycle.Stoppable;
33 import org.mule.umo.model.ModelException;
34 import org.mule.umo.provider.UMOMessageDispatcher;
35 import org.mule.util.ObjectPool;
36 import org.mule.util.queue.QueueSession;
37
38 /**
39  * <code>MuleProxy</code> is a proxy to a UMO. It is a poolable object that that
40  * can be executed in it's own thread.
41  *
42  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
43  * @version $Revision: 4219 $
44  */

45
46 public class OptimisedMuleProxy implements MuleProxy
47 {
48     /**
49      * logger used by this class
50      */

51     private static Log logger = LogFactory.getLog(OptimisedMuleProxy.class);
52
53     /**
54      * Holds the current event being processed
55      */

56     private UMOEvent event;
57
58     /**
59      * holds the UMO descriptor
60      */

61     private ImmutableMuleDescriptor descriptor;
62
63     /**
64      * Determines if the proxy is suspended
65      */

66     private boolean suspended = true;
67
68     private ObjectPool proxyPool;
69
70     private ComponentStatistics stat = null;
71
72     // Never read locally
73
// private QueueSession queueSession = null;
74

75     private Callable umo;
76
77     private boolean started = false;
78     private boolean disposed = false;
79
80     /**
81      * Constructs a Proxy using the UMO's AbstractMessageDispatcher and the UMO
82      * itself
83      *
84      * @param component the underlying object that with receive events
85      * @param descriptor the UMOComponent descriptor associated with the component
86      */

87     public OptimisedMuleProxy(Callable component, MuleDescriptor descriptor, ObjectPool proxyPool)
88         throws UMOException
89     {
90         this.descriptor = new ImmutableMuleDescriptor(descriptor);
91         this.proxyPool = proxyPool;
92         umo = component;
93     }
94
95     public void start() throws UMOException
96     {
97         checkDisposed();
98         if (!started && umo instanceof Startable)
99         {
100             try
101             {
102                 ((Startable)umo).start();
103                 started = true;
104             }
105             catch (Exception JavaDoc e)
106             {
107                 throw new ModelException(
108                     new Message(Messages.FAILED_TO_START_X, "Component '" + descriptor.getName() + "'"), e);
109             }
110         }
111
112     }
113
114     public boolean isStarted()
115     {
116         return started;
117     }
118
119     public void stop() throws UMOException
120     {
121         checkDisposed();
122
123         if (started && umo instanceof Stoppable)
124         {
125             started = false;
126             try
127             {
128                 ((Stoppable)umo).stop();
129             }
130             catch (Exception JavaDoc e)
131             {
132                 throw new ModelException(
133                     new Message(Messages.FAILED_TO_STOP_X, "Component '" + descriptor.getName() + "'"), e);
134             }
135         }
136     }
137
138     public void dispose()
139     {
140         checkDisposed();
141         if (umo instanceof Disposable)
142         {
143             ((Disposable)umo).dispose();
144             disposed = true;
145         }
146     }
147
148     private void checkDisposed()
149     {
150         if (disposed)
151         {
152             throw new IllegalStateException JavaDoc("Components Disposed Of");
153         }
154     }
155
156     /**
157      * Sets the current event being processed
158      *
159      * @param event the event being processed
160      */

161     public void onEvent(QueueSession session, UMOEvent event)
162     {
163         // this.queueSession = session;
164
this.event = event;
165     }
166
167     public ComponentStatistics getStatistics()
168     {
169         return stat;
170     }
171
172     public void setStatistics(ComponentStatistics stat)
173     {
174         this.stat = stat;
175     }
176
177     /**
178      * Makes a synchronous call on the UMO
179      *
180      * @param event the event to pass to the UMO
181      * @return the return event from the UMO
182      * @throws org.mule.umo.UMOException if the call fails
183      */

184     public Object JavaDoc onCall(UMOEvent event) throws UMOException
185     {
186         if (logger.isTraceEnabled())
187         {
188             logger.trace("MuleProxy: sync call for Mule UMO " + descriptor.getName());
189         }
190
191         UMOMessage returnMessage = null;
192         try
193         {
194             if (event.getEndpoint().canReceive())
195             {
196                 // RequestContext.setEvent(event);
197
// Object replyTo = event.getMessage().getReplyTo();
198
// ReplyToHandler replyToHandler = null;
199
// if (replyTo != null) {
200
// replyToHandler = ((AbstractConnector)
201
// event.getEndpoint().getConnector()).getReplyToHandler();
202
// }
203

204                 // stats
205
long startTime = 0;
206                 if (stat.isEnabled())
207                 {
208                     startTime = System.currentTimeMillis();
209                 }
210                 returnMessage = invokeUmo(RequestContext.getEventContext());
211                 // stats
212
if (stat.isEnabled())
213                 {
214                     stat.addExecutionTime(System.currentTimeMillis() - startTime);
215                 }
216                 // this is the request event
217
event = RequestContext.getEvent();
218                 if (event.isStopFurtherProcessing())
219                 {
220                     logger.debug("Event stop further processing has been set, no outbound routing will be performed.");
221                 }
222                 if (returnMessage != null && !event.isStopFurtherProcessing())
223                 {
224                     // Map context = RequestContext.clearProperties();
225
// if (context != null) {
226
// returnMessage.addProperties(context);
227
// }
228
if (descriptor.getOutboundRouter().hasEndpoints())
229                     {
230                         UMOMessage outboundReturnMessage = descriptor.getOutboundRouter().route(
231                             returnMessage, event.getSession(), event.isSynchronous());
232                         if (outboundReturnMessage != null)
233                         {
234                             returnMessage = outboundReturnMessage;
235                         }
236                     }
237                     else
238                     {
239                         logger.debug("Outbound router on component '" + descriptor.getName()
240                                      + "' doesn't have any endpoints configured.");
241                     }
242                 }
243
244                 // Process Response Router
245
// if (returnMessage != null && descriptor.getResponseRouter() !=
246
// null) {
247
// logger.debug("Waiting for response router message");
248
// returnMessage =
249
// descriptor.getResponseRouter().getResponse(returnMessage);
250
// }
251
//
252
// // process repltyTo if there is one
253
// if (returnMessage != null && replyToHandler != null) {
254
// String requestor = (String)
255
// returnMessage.getProperty(MuleProperties.MULE_REPLY_TO_REQUESTOR_PROPERTY);
256
// if ((requestor != null && !requestor.equals(descriptor.getName()))
257
// || requestor == null) {
258
// replyToHandler.processReplyTo(event, returnMessage, replyTo);
259
// }
260
// }
261

262             }
263             else
264             {
265                 returnMessage = event.getSession().sendEvent(event);
266                 // processReplyTo(returnMessage);
267
}
268
269             // stats
270
if (stat.isEnabled())
271             {
272                 stat.incSentEventSync();
273             }
274         }
275         catch (Exception JavaDoc e)
276         {
277             event.getSession().setValid(false);
278             if (e instanceof UMOException)
279             {
280                 handleException(e);
281             }
282             else
283             {
284                 handleException(new MessagingException(new Message(Messages.EVENT_PROCESSING_FAILED_FOR_X,
285                     descriptor.getName()), event.getMessage(), e));
286             }
287         }
288         return returnMessage;
289     }
290
291     protected UMOMessage invokeUmo(UMOEventContext context) throws Exception JavaDoc
292     {
293         Object JavaDoc result = umo.onCall(RequestContext.getEventContext());
294         if (result != null)
295         {
296             if (result instanceof UMOMessage)
297             {
298                 return (UMOMessage)result;
299             }
300             else
301             {
302                 return new MuleMessage(result, context.getMessage());
303             }
304         }
305         return null;
306     }
307
308     /**
309      * When an exception occurs this method can be called to invoke the configured
310      * UMOExceptionStrategy on the UMO
311      *
312      * @param exception If the UMOExceptionStrategy implementation fails
313      */

314     public void handleException(Exception JavaDoc exception)
315     {
316         descriptor.getExceptionListener().exceptionThrown(exception);
317     }
318
319     /*
320      * (non-Javadoc)
321      *
322      * @see java.lang.Object#toString()
323      */

324     public String JavaDoc toString()
325     {
326         return "optimised proxy for: " + descriptor.toString();
327     }
328
329     /**
330      * Determines if the proxy is suspended
331      *
332      * @return true if the proxy (and the UMO) are suspended
333      */

334     public boolean isSuspended()
335     {
336         return suspended;
337     }
338
339     /**
340      * Controls the suspension of the UMO event processing
341      */

342     public void suspend()
343     {
344         suspended = true;
345     }
346
347     /**
348      * Triggers the UMO to resume processing of events if it is suspended
349      */

350     public void resume()
351     {
352         suspended = false;
353     }
354
355     // private void processReplyTo(UMOMessage returnMessage) throws UMOException
356
// {
357
// if (returnMessage != null && returnMessage.getReplyTo() != null) {
358
// logger.info("sending reply to: " + returnMessage.getReplyTo());
359
// UMOEndpointURI endpointUri = new
360
// MuleEndpointURI(returnMessage.getReplyTo().toString());
361
//
362
// // get the endpointUri for this uri
363
// UMOEndpoint endpoint = MuleEndpoint.getOrCreateEndpointForUri(endpointUri,
364
// UMOEndpoint.ENDPOINT_TYPE_SENDER);
365
//
366
// // Create the replyTo event asynchronous
367
// UMOEvent replyToEvent = new MuleEvent(returnMessage, endpoint,
368
// event.getSession(), false);
369
// // make sure remove the replyTo property as not cause a a forever
370
// // replyto loop
371
// replyToEvent.removeProperty(MuleProperties.MULE_REPLY_TO_PROPERTY);
372
//
373
// // queue the event
374
// onEvent(queueSession, replyToEvent);
375
// logger.info("reply to sent: " + returnMessage.getReplyTo());
376
// if (stat.isEnabled()) {
377
// stat.incSentReplyToEvent();
378
// }
379
// }
380
// }
381

382     /*
383      * (non-Javadoc)
384      *
385      * @see java.lang.Runnable#run()
386      */

387     public void run()
388     {
389         if (logger.isTraceEnabled())
390         {
391             logger.trace("MuleProxy: async onEvent for Mule UMO " + descriptor.getName());
392         }
393
394         try
395         {
396             if (event.getEndpoint().canReceive())
397             {
398                 // dispatch the next receiver
399
RequestContext.setEvent(event);
400                 // Object replyTo = event.getMessage().getReplyTo();
401
// ReplyToHandler replyToHandler = null;
402
// if (replyTo != null) {
403
// replyToHandler = ((AbstractConnector)
404
// event.getEndpoint().getConnector()).getReplyToHandler();
405
// }
406
// InterceptorsInvoker invoker = new
407
// InterceptorsInvoker(interceptorList, descriptor,
408
// event.getMessage());
409

410                 // do stats
411
long startTime = 0;
412                 if (stat.isEnabled())
413                 {
414                     startTime = System.currentTimeMillis();
415                 }
416                 UMOMessage result = invokeUmo(RequestContext.getEventContext());
417                 if (stat.isEnabled())
418                 {
419                     stat.addExecutionTime(System.currentTimeMillis() - startTime);
420                 }
421                 // processResponse(result, replyTo, replyToHandler);
422
event = RequestContext.getEvent();
423                 if (result != null && !event.isStopFurtherProcessing())
424                 {
425                     descriptor.getOutboundRouter().route(result, event.getSession(), event.isSynchronous());
426                 }
427
428                 // process repltyTo if there is one
429
// if (result != null && replyToHandler != null) {
430
// String requestor = (String)
431
// result.getProperty(MuleProperties.MULE_REPLY_TO_REQUESTOR_PROPERTY);
432
// if ((requestor != null && !requestor.equals(descriptor.getName()))
433
// || requestor == null) {
434
// replyToHandler.processReplyTo(event, result, replyTo);
435
// }
436
// }
437
}
438             else
439             {
440                 UMOMessageDispatcher dispatcher = event.getEndpoint().getConnector().getDispatcher(
441                     event.getEndpoint());
442                 dispatcher.dispatch(event);
443             }
444
445             if (stat.isEnabled())
446             {
447                 stat.incSentEventASync();
448             }
449         }
450         catch (Exception JavaDoc e)
451         {
452             event.getSession().setValid(false);
453             if (e instanceof UMOException)
454             {
455                 handleException(e);
456             }
457             else
458             {
459                 handleException(new MessagingException(new Message(Messages.EVENT_PROCESSING_FAILED_FOR_X,
460                     descriptor.getName()), event.getMessage(), e));
461             }
462         }
463         finally
464         {
465
466             try
467             {
468                 proxyPool.returnObject(this);
469             }
470             catch (Exception JavaDoc e2)
471             {
472                 logger.error("Failed to return proxy: " + e2.getMessage(), e2);
473             }
474             getStatistics().setComponentPoolSize(proxyPool.getSize());
475         }
476     }
477
478     public void release()
479     {
480         // nothing to do
481
}
482
483     /*
484      * (non-Javadoc)
485      *
486      * @see org.mule.umo.UMOLifecycleAdapter#getDescriptor()
487      */

488     public UMOImmutableDescriptor getDescriptor()
489     {
490         return descriptor;
491     }
492 }
493
Popular Tags