KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > AbstractMessageDispatcher


1 /*
2  * $Id: AbstractMessageDispatcher.java 4219 2006-12-09 10:15:14Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers;
12
13 import java.beans.ExceptionListener JavaDoc;
14 import java.io.OutputStream JavaDoc;
15
16 import javax.resource.spi.work.Work JavaDoc;
17 import javax.resource.spi.work.WorkManager JavaDoc;
18
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.mule.MuleRuntimeException;
22 import org.mule.config.MuleProperties;
23 import org.mule.config.ThreadingProfile;
24 import org.mule.config.i18n.Message;
25 import org.mule.config.i18n.Messages;
26 import org.mule.impl.ImmutableMuleEndpoint;
27 import org.mule.impl.RequestContext;
28 import org.mule.impl.internal.notifications.ConnectionNotification;
29 import org.mule.impl.internal.notifications.MessageNotification;
30 import org.mule.impl.internal.notifications.SecurityNotification;
31 import org.mule.transaction.TransactionCoordination;
32 import org.mule.umo.TransactionException;
33 import org.mule.umo.UMOEvent;
34 import org.mule.umo.UMOException;
35 import org.mule.umo.UMOMessage;
36 import org.mule.umo.UMOTransaction;
37 import org.mule.umo.endpoint.UMOEndpointURI;
38 import org.mule.umo.endpoint.UMOImmutableEndpoint;
39 import org.mule.umo.manager.UMOWorkManager;
40 import org.mule.umo.provider.DispatchException;
41 import org.mule.umo.provider.ReceiveException;
42 import org.mule.umo.provider.UMOConnector;
43 import org.mule.umo.provider.UMOMessageDispatcher;
44 import org.mule.util.concurrent.WaitableBoolean;
45
46 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
47
48 /**
49  * <p/> <code>AbstractMessageDispatcher</code> provides a default dispatch (client)
50  * support for handling threads lifecycle and validation.
51  */

52 public abstract class AbstractMessageDispatcher implements UMOMessageDispatcher, ExceptionListener JavaDoc
53 {
54     /**
55      * logger used by this class
56      */

57     protected transient Log logger = LogFactory.getLog(getClass());
58
59     /**
60      * Thread pool of Connector sessions
61      */

62     protected UMOWorkManager workManager = null;
63
64     protected final UMOImmutableEndpoint endpoint;
65     protected final AbstractConnector connector;
66
67     protected boolean disposed = false;
68
69     protected boolean doThreading = true;
70
71     protected ConnectionStrategy connectionStrategy;
72
73     protected final WaitableBoolean connected = new WaitableBoolean(false);
74     private final AtomicBoolean connecting = new AtomicBoolean(false);
75
76     public AbstractMessageDispatcher(UMOImmutableEndpoint endpoint)
77     {
78         this.endpoint = endpoint;
79         this.connector = (AbstractConnector)endpoint.getConnector();
80
81         connectionStrategy = connector.getConnectionStrategy();
82         if (connectionStrategy instanceof AbstractConnectionStrategy)
83         {
84             // We don't want to do threading in the dispatcher because we're either
85
// already running in a worker thread (asynchronous) or we need to
86
// complete the operation in a single thread
87
((AbstractConnectionStrategy)connectionStrategy).setDoThreading(false);
88         }
89
90         ThreadingProfile profile = connector.getDispatcherThreadingProfile();
91         doThreading = profile.isDoThreading();
92         if (doThreading)
93         {
94             workManager = connector.createDispatcherWorkManager(connector.getName() + ".dispatchers");
95             try
96             {
97                 workManager.start();
98             }
99             catch (UMOException e)
100             {
101                 dispose();
102                 throw new MuleRuntimeException(new Message(Messages.FAILED_TO_START_X, "WorkManager"), e);
103             }
104         }
105     }
106
107     /*
108      * (non-Javadoc)
109      *
110      * @see org.mule.umo.provider.UMOMessageDispatcher#dispatch(org.mule.umo.UMOEvent)
111      */

112     public final void dispatch(UMOEvent event) throws DispatchException
113     {
114         try
115         {
116             event.setSynchronous(false);
117             event.getMessage().setProperty(MuleProperties.MULE_ENDPOINT_PROPERTY,
118                 event.getEndpoint().getEndpointURI().toString());
119             RequestContext.setEvent(event);
120             // Apply Security filter if one is set
121
UMOImmutableEndpoint endpoint = event.getEndpoint();
122             if (endpoint.getSecurityFilter() != null)
123             {
124                 try
125                 {
126                     endpoint.getSecurityFilter().authenticate(event);
127                 }
128                 catch (org.mule.umo.security.SecurityException e)
129                 {
130                     logger.warn("Outbound Request was made but was not authenticated: " + e.getMessage(), e);
131                     connector.fireNotification(new SecurityNotification(e,
132                         SecurityNotification.ADMIN_EVENT_ACTION_START_RANGE));
133                     connector.handleException(e);
134                     return;
135                 }
136                 catch (UMOException e)
137                 {
138                     dispose();
139                     throw new DispatchException(event.getMessage(), event.getEndpoint(), e);
140                 }
141             }
142             // the security filter may update the payload so we need to get the
143
// latest event again
144
event = RequestContext.getEvent();
145
146             try
147             {
148                 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
149                 if (doThreading && !event.isSynchronous() && tx == null)
150                 {
151                     workManager.scheduleWork(new Worker(event), WorkManager.INDEFINITE, null, connector);
152                 }
153                 else
154                 {
155                     // Make sure we are connected
156
connectionStrategy.connect(this);
157                     doDispatch(event);
158                     if (connector.isEnableMessageEvents())
159                     {
160                         String JavaDoc component = null;
161                         if (event.getComponent() != null)
162                         {
163                             component = event.getComponent().getDescriptor().getName();
164                         }
165                         connector.fireNotification(new MessageNotification(event.getMessage(),
166                             event.getEndpoint(), component, MessageNotification.MESSAGE_DISPATCHED));
167                     }
168                 }
169             }
170             catch (DispatchException e)
171             {
172                 dispose();
173                 throw e;
174             }
175             catch (Exception JavaDoc e)
176             {
177                 dispose();
178                 throw new DispatchException(event.getMessage(), event.getEndpoint(), e);
179             }
180         }
181         finally
182         {
183             if (connector.isCreateDispatcherPerRequest())
184             {
185                 dispose();
186             }
187         }
188     }
189
190     public final UMOMessage send(UMOEvent event) throws DispatchException
191     {
192         try
193         {
194             // No point continuing if the component has rolledback the transaction
195
if (isTransactionRollback())
196             {
197                 return event.getMessage();
198             }
199             event.setSynchronous(true);
200             event.getMessage().setProperty(MuleProperties.MULE_ENDPOINT_PROPERTY,
201                 event.getEndpoint().getEndpointURI().toString());
202             RequestContext.setEvent(event);
203             // Apply Security filter if one is set
204
UMOImmutableEndpoint endpoint = event.getEndpoint();
205             if (endpoint.getSecurityFilter() != null)
206             {
207                 try
208                 {
209                     endpoint.getSecurityFilter().authenticate(event);
210                 }
211                 catch (org.mule.umo.security.SecurityException e)
212                 {
213                     logger.warn("Outbound Request was made but was not authenticated: " + e.getMessage(), e);
214                     connector.fireNotification(new SecurityNotification(e,
215                         SecurityNotification.SECURITY_AUTHENTICATION_FAILED));
216                     connector.handleException(e);
217                     return event.getMessage();
218                 }
219                 catch (UMOException e)
220                 {
221                     dispose();
222                     throw new DispatchException(event.getMessage(), event.getEndpoint(), e);
223                 }
224             }
225             // the security filter may update the payload so we need to get the
226
// latest event again
227
event = RequestContext.getEvent();
228             try
229             {
230                 // Make sure we are connected
231
connectionStrategy.connect(this);
232
233                 UMOMessage result = doSend(event);
234                 if (connector.isEnableMessageEvents())
235                 {
236                     String JavaDoc component = null;
237                     if (event.getComponent() != null)
238                     {
239                         component = event.getComponent().getDescriptor().getName();
240                     }
241                     connector.fireNotification(new MessageNotification(event.getMessage(),
242                         event.getEndpoint(), component, MessageNotification.MESSAGE_SENT));
243                 }
244                 // Once a dispatcher has done its work we need to romve this property
245
// so that
246
// it is not propagated to the next request
247
if (result != null)
248                 {
249                     result.removeProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY);
250                 }
251                 return result;
252             }
253             catch (DispatchException e)
254             {
255                 dispose();
256                 throw e;
257             }
258             catch (Exception JavaDoc e)
259             {
260                 dispose();
261                 throw new DispatchException(event.getMessage(), event.getEndpoint(), e);
262             }
263         }
264         finally
265         {
266             if (connector.isCreateDispatcherPerRequest())
267             {
268                 dispose();
269             }
270         }
271     }
272
273     /**
274      * Make a specific request to the underlying transport
275      *
276      * @param endpointUri the endpoint URI to use when connecting to the resource
277      * @param timeout the maximum time the operation should block before returning.
278      * The call should return immediately if there is data available. If
279      * no data becomes available before the timeout elapses, null will be
280      * returned
281      * @return the result of the request wrapped in a UMOMessage object. Null will be
282      * returned if no data was avaialable
283      * @throws Exception if the call to the underlying protocal cuases an exception
284      * //@deprecated Use receive(UMOImmutableEndpoint endpoint, long
285      * timeout)
286      */

287     public final UMOMessage receive(UMOEndpointURI endpointUri, long timeout) throws Exception JavaDoc
288     {
289         return receive(new ImmutableMuleEndpoint(endpointUri.toString(), true), timeout);
290     }
291
292     /**
293      * Make a specific request to the underlying transport
294      *
295      * @param endpoint the endpoint to use when connecting to the resource
296      * @param timeout the maximum time the operation should block before returning.
297      * The call should return immediately if there is data available. If
298      * no data becomes available before the timeout elapses, null will be
299      * returned
300      * @return the result of the request wrapped in a UMOMessage object. Null will be
301      * returned if no data was avaialable
302      * @throws Exception if the call to the underlying protocal cuases an exception
303      */

304     public final UMOMessage receive(UMOImmutableEndpoint endpoint, long timeout) throws Exception JavaDoc
305     {
306         try
307         {
308             try
309             {
310                 // Make sure we are connected
311
connectionStrategy.connect(this);
312                 UMOMessage result = doReceive(endpoint, timeout);
313                 if (result != null && connector.isEnableMessageEvents())
314                 {
315                     String JavaDoc component = null;
316                     connector.fireNotification(new MessageNotification(result, endpoint, component,
317                         MessageNotification.MESSAGE_RECEIVED));
318                 }
319                 return result;
320             }
321             catch (DispatchException e)
322             {
323                 dispose();
324                 throw e;
325             }
326             catch (Exception JavaDoc e)
327             {
328                 dispose();
329                 throw new ReceiveException(endpoint, timeout, e);
330             }
331         }
332         finally
333         {
334             if (connector.isCreateDispatcherPerRequest())
335             {
336                 dispose();
337             }
338         }
339
340     }
341
342     /*
343      * (non-Javadoc)
344      *
345      * @see org.mule.util.ExceptionListener#onException(java.lang.Throwable)
346      */

347     public void exceptionThrown(Exception JavaDoc e)
348     {
349         try
350         {
351             getConnector().handleException(e);
352         }
353         finally
354         {
355             dispose();
356         }
357     }
358
359     public final boolean isDisposed()
360     {
361         return disposed;
362     }
363
364     /**
365      * Template method to destroy any resources held by the Message Dispatcher
366      */

367     public final synchronized void dispose()
368     {
369         if (!disposed)
370         {
371             try
372             {
373                 try
374                 {
375                     disconnect();
376                 }
377                 catch (Exception JavaDoc e)
378                 {
379                     logger.warn(e.getMessage(), e);
380                 }
381                 doDispose();
382                 if (workManager != null)
383                 {
384                     workManager.dispose();
385                 }
386             }
387             finally
388             {
389                 disposed = true;
390             }
391         }
392     }
393
394     public UMOConnector getConnector()
395     {
396         return connector;
397     }
398
399     /**
400      * RemoteSync causes the message dispatch to wait for a response to an event on a
401      * response channel after it sends the event. The following rules apply to
402      * RemoteSync 1. The connector has to support remoteSync. Some transports do not
403      * have the notion of a response channel 2. Check if the endpoint has been
404      * configured for remoteSync 3. Check if the REMOTE_SYNC message header has been
405      * set 4. Finally, if the current component has a response router configured,
406      * that the router will handle the response channel event and we should not try
407      * and receive a response in the Message dispatcher If remotesync should not be
408      * used we must remove the REMOTE_SYNC header Note the MuleClient will
409      * automatically set the REMOTE_SYNC header when client.send(..) is called so
410      * that results are returned from remote invocations too.
411      *
412      * @param event the current event
413      * @return true if a response channel should be used to get a resposne from the
414      * event dispatch.
415      */

416     protected boolean useRemoteSync(UMOEvent event)
417     {
418         boolean remoteSync = false;
419         if (event.getEndpoint().getConnector().isRemoteSyncEnabled())
420         {
421             remoteSync = event.getEndpoint().isRemoteSync()
422                             || event.getMessage().getBooleanProperty(
423                                 MuleProperties.MULE_REMOTE_SYNC_PROPERTY, false);
424             if (remoteSync)
425             {
426                 // component will be null for client calls
427
if (event.getComponent() != null)
428                 {
429                     remoteSync = event.getComponent().getDescriptor().getResponseRouter() == null;
430                 }
431             }
432         }
433         if (!remoteSync)
434         {
435             event.getMessage().removeProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY);
436         }
437         return remoteSync;
438     }
439
440     /**
441      * Well get the output stream (if any) for this type of transport. Typically this
442      * will be called only when Streaming is being used on an outbound endpoint
443      *
444      * @param endpoint the endpoint that releates to this Dispatcher
445      * @param message the current message being processed
446      * @return the output stream to use for this request or null if the transport
447      * does not support streaming
448      * @throws org.mule.umo.UMOException
449      */

450     public OutputStream JavaDoc getOutputStream(UMOImmutableEndpoint endpoint, UMOMessage message)
451         throws UMOException
452     {
453         return null;
454     }
455
456     public synchronized void connect() throws Exception JavaDoc
457     {
458         if (connected.get())
459         {
460             return;
461         }
462
463         if (disposed)
464         {
465             if (logger.isWarnEnabled())
466             {
467                 logger.warn("Dispatcher has been disposed. Cannot connector resource");
468             }
469         }
470
471         if (logger.isDebugEnabled())
472         {
473             logger.debug("Attempting to connect to: " + endpoint.getEndpointURI());
474         }
475
476         if (connecting.compareAndSet(false, true))
477         {
478             connectionStrategy.connect(this);
479             logger.info("Successfully connected to: " + endpoint.getEndpointURI());
480             return;
481         }
482
483         try
484         {
485             doConnect(endpoint);
486             connector.fireNotification(new ConnectionNotification(this, getConnectEventId(endpoint),
487                 ConnectionNotification.CONNECTION_CONNECTED));
488         }
489         catch (Exception JavaDoc e)
490         {
491             connector.fireNotification(new ConnectionNotification(this, getConnectEventId(endpoint),
492                 ConnectionNotification.CONNECTION_FAILED));
493             if (e instanceof ConnectException)
494             {
495                 throw (ConnectException)e;
496             }
497             else
498             {
499                 throw new ConnectException(e, this);
500             }
501         }
502
503         connected.set(true);
504         connecting.set(false);
505     }
506
507     public synchronized void disconnect() throws Exception JavaDoc
508     {
509         if (logger.isDebugEnabled())
510         {
511             logger.debug("Disconnecting from: " + endpoint.getEndpointURI());
512         }
513
514         connector.fireNotification(new ConnectionNotification(this, getConnectEventId(endpoint),
515             ConnectionNotification.CONNECTION_DISCONNECTED));
516         connected.set(false);
517         doDisconnect();
518         logger.info("Disconnected from: " + endpoint.getEndpointURI());
519     }
520
521     protected String JavaDoc getConnectEventId(UMOImmutableEndpoint endpoint)
522     {
523         return connector.getName() + ".dispatcher (" + endpoint.getEndpointURI() + ")";
524     }
525
526     public final boolean isConnected()
527     {
528         return connected.get();
529     }
530
531     /**
532      * Returns a string identifying the underlying resource
533      *
534      * @return
535      */

536     public String JavaDoc getConnectionDescription()
537     {
538         return endpoint.getEndpointURI().toString();
539     }
540
541     public synchronized void reconnect() throws Exception JavaDoc
542     {
543         disconnect();
544         connect();
545     }
546
547     protected abstract void doDispose();
548
549     protected abstract void doDispatch(UMOEvent event) throws Exception JavaDoc;
550
551     protected abstract UMOMessage doSend(UMOEvent event) throws Exception JavaDoc;
552
553     protected abstract void doConnect(UMOImmutableEndpoint endpoint) throws Exception JavaDoc;
554
555     protected abstract void doDisconnect() throws Exception JavaDoc;
556
557     /**
558      * Make a specific request to the underlying transport
559      *
560      * @param endpoint the endpoint to use when connecting to the resource
561      * @param timeout the maximum time the operation should block before returning.
562      * The call should return immediately if there is data available. If
563      * no data becomes available before the timeout elapses, null will be
564      * returned
565      * @return the result of the request wrapped in a UMOMessage object. Null will be
566      * returned if no data was avaialable
567      * @throws Exception if the call to the underlying protocal cuases an exception
568      */

569     protected abstract UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception JavaDoc;
570
571     private class Worker implements Work JavaDoc
572     {
573         private UMOEvent event;
574
575         public Worker(UMOEvent event)
576         {
577             this.event = event;
578         }
579
580         /*
581          * (non-Javadoc)
582          *
583          * @see java.lang.Runnable#run()
584          */

585         public void run()
586         {
587             try
588             {
589                 RequestContext.setEvent(event);
590                 // Make sure we are connected
591
connectionStrategy.connect(AbstractMessageDispatcher.this);
592                 doDispatch(event);
593                 if (connector.isEnableMessageEvents())
594                 {
595                     String JavaDoc component = null;
596                     if (event.getComponent() != null)
597                     {
598                         component = event.getComponent().getDescriptor().getName();
599                     }
600                     connector.fireNotification(new MessageNotification(event.getMessage(),
601                         event.getEndpoint(), component, MessageNotification.MESSAGE_DISPATCHED));
602                 }
603             }
604             catch (Exception JavaDoc e)
605             {
606                 getConnector().handleException(e);
607             }
608         }
609
610         public void release()
611         {
612             // nothing to do
613
}
614     }
615
616     /**
617      * Checks to see if the current transaction has been rolled back
618      *
619      * @return
620      */

621     protected boolean isTransactionRollback()
622     {
623         try
624         {
625             UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
626             if (tx != null && tx.isRollbackOnly())
627             {
628                 return true;
629             }
630         }
631         catch (TransactionException e)
632         {
633             logger.warn(e.getMessage());
634         }
635         return false;
636     }
637 }
638
Popular Tags