KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > AbstractMessageReceiver


1 /*
2  * $Id: AbstractMessageReceiver.java 4259 2006-12-14 03:12:07Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers;
12
13 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
14 import org.apache.commons.logging.Log;
15 import org.apache.commons.logging.LogFactory;
16 import org.mule.config.ExceptionHelper;
17 import org.mule.config.i18n.Message;
18 import org.mule.config.i18n.Messages;
19 import org.mule.impl.MuleEvent;
20 import org.mule.impl.MuleMessage;
21 import org.mule.impl.MuleSession;
22 import org.mule.impl.RequestContext;
23 import org.mule.impl.ResponseOutputStream;
24 import org.mule.impl.internal.notifications.ConnectionNotification;
25 import org.mule.impl.internal.notifications.MessageNotification;
26 import org.mule.impl.internal.notifications.SecurityNotification;
27 import org.mule.transaction.TransactionCoordination;
28 import org.mule.umo.UMOComponent;
29 import org.mule.umo.UMOEvent;
30 import org.mule.umo.UMOException;
31 import org.mule.umo.UMOMessage;
32 import org.mule.umo.UMOSession;
33 import org.mule.umo.UMOTransaction;
34 import org.mule.umo.endpoint.UMOEndpoint;
35 import org.mule.umo.endpoint.UMOEndpointURI;
36 import org.mule.umo.lifecycle.InitialisationException;
37 import org.mule.umo.manager.UMOWorkManager;
38 import org.mule.umo.provider.UMOConnector;
39 import org.mule.umo.provider.UMOMessageReceiver;
40 import org.mule.umo.security.SecurityException;
41 import org.mule.umo.transformer.TransformerException;
42 import org.mule.umo.transformer.UMOTransformer;
43 import org.mule.util.StringMessageUtils;
44 import org.mule.util.concurrent.WaitableBoolean;
45
46 import java.io.OutputStream JavaDoc;
47
48 /**
49  * <code>AbstractMessageReceiver</code> provides common methods for all Message
50  * Receivers provided with Mule. A message receiver enables an endpoint to receive a
51  * message from an external system.
52  *
53  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
54  * @version $Revision: 4259 $
55  */

56 public abstract class AbstractMessageReceiver implements UMOMessageReceiver
57 {
58     /**
59      * logger used by this class
60      */

61     protected transient Log logger = LogFactory.getLog(getClass());
62
63     /**
64      * The Component with which this receiver is associated with
65      */

66     protected UMOComponent component = null;
67
68     /**
69      * The endpoint descriptor which is associated with this receiver
70      */

71     protected UMOEndpoint endpoint = null;
72
73     private InternalMessageListener listener;
74     /**
75      * the connector associated with this receiver
76      */

77     protected AbstractConnector connector = null;
78
79     protected AtomicBoolean disposing = new AtomicBoolean(false);
80
81     protected WaitableBoolean connected = new WaitableBoolean(false);
82
83     protected WaitableBoolean stopped = new WaitableBoolean(true);
84
85     private AtomicBoolean connecting = new AtomicBoolean(false);
86
87     /**
88      * Stores the endpointUri that this receiver listens on. This enpoint can be
89      * different to the endpointUri in the endpoint stored on the receiver as
90      * endpoint endpointUri may get rewritten if this endpointUri is a wildcard
91      * endpointUri such as jms.*
92      */

93     private UMOEndpointURI endpointUri;
94
95     private UMOWorkManager workManager;
96
97     protected ConnectionStrategy connectionStrategy;
98
99     /**
100      * Creates the Message Receiver
101      *
102      * @param connector the endpoint that created this listener
103      * @param component the component to associate with the receiver. When data is
104      * received the component <code>dispatchEvent</code> or
105      * <code>sendEvent</code> is used to dispatch the data to the
106      * relivant UMO.
107      * @param endpoint the provider contains the endpointUri on which the receiver
108      * will listen on. The endpointUri can be anything and is specific to
109      * the receiver implementation i.e. an email address, a directory, a
110      * jms destination or port address.
111      * @see UMOComponent
112      * @see UMOEndpoint
113      */

114     public AbstractMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint)
115         throws InitialisationException
116     {
117         setConnector(connector);
118         setComponent(component);
119         setEndpoint(endpoint);
120         listener = new DefaultInternalMessageListener();
121         endpointUri = endpoint.getEndpointURI();
122
123         workManager = this.connector.createReceiverWorkManager(endpoint.getName());
124         try
125         {
126             workManager.start();
127         }
128         catch (UMOException e)
129         {
130             throw new InitialisationException(e, this);
131         }
132
133         connectionStrategy = this.connector.getConnectionStrategy();
134     }
135
136     /*
137      * (non-Javadoc)
138      *
139      * @see org.mule.umo.provider.UMOMessageReceiver#getEndpointName()
140      */

141     public UMOEndpoint getEndpoint()
142     {
143         return endpoint;
144     }
145
146     /*
147      * (non-Javadoc)
148      *
149      * @see org.mule.umo.provider.UMOMessageReceiver#getExceptionListener()
150      */

151     public void handleException(Exception JavaDoc exception)
152     {
153         if (exception instanceof ConnectException)
154         {
155             logger.info("Exception caught is a ConnectException, disconnecting receiver and invoking ReconnectStrategy");
156             try
157             {
158                 disconnect();
159             }
160             catch (Exception JavaDoc e)
161             {
162                 connector.getExceptionListener().exceptionThrown(e);
163             }
164         }
165         connector.getExceptionListener().exceptionThrown(exception);
166         if (exception instanceof ConnectException)
167         {
168             try
169             {
170                 logger.warn("Reconnecting after exception: " + exception.getMessage(), exception);
171                 connectionStrategy.connect(this);
172             }
173             catch (UMOException e)
174             {
175                 connector.getExceptionListener().exceptionThrown(e);
176             }
177         }
178     }
179
180     /**
181      * This method is used to set any additional aand possibly transport specific
182      * information on the return message where it has an exception payload.
183      *
184      * @param message
185      * @param exception
186      */

187     protected void setExceptionDetails(UMOMessage message, Throwable JavaDoc exception)
188     {
189         String JavaDoc propName = ExceptionHelper.getErrorCodePropertyName(connector.getProtocol());
190         // If we dont find a error code property we can assume there are not
191
// error code mappings for this connector
192
if (propName != null)
193         {
194             String JavaDoc code = ExceptionHelper.getErrorMapping(connector.getProtocol(), exception.getClass());
195             if (logger.isDebugEnabled())
196             {
197                 logger.debug("Setting error code for: " + connector.getProtocol() + ", " + propName + "="
198                              + code);
199             }
200             message.setProperty(propName, code);
201         }
202     }
203
204     public UMOConnector getConnector()
205     {
206         return connector;
207     }
208
209     public void setConnector(UMOConnector connector)
210     {
211         if (connector != null)
212         {
213             if (connector instanceof AbstractConnector)
214             {
215                 this.connector = (AbstractConnector)connector;
216             }
217             else
218             {
219                 throw new IllegalArgumentException JavaDoc(new Message(
220                     Messages.PROPERTY_X_IS_NOT_SUPPORTED_TYPE_X_IT_IS_TYPE_X, "connector",
221                     AbstractConnector.class.getName(), connector.getClass().getName()).getMessage());
222             }
223         }
224         else
225         {
226             throw new NullPointerException JavaDoc(new Message(Messages.X_IS_NULL, "connector").getMessage());
227         }
228     }
229
230     public UMOComponent getComponent()
231     {
232         return component;
233     }
234
235     public final UMOMessage routeMessage(UMOMessage message) throws UMOException
236     {
237         return routeMessage(message, (endpoint.isSynchronous() || TransactionCoordination.getInstance()
238             .getTransaction() != null));
239     }
240
241     public final UMOMessage routeMessage(UMOMessage message, boolean synchronous) throws UMOException
242     {
243         UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
244         return routeMessage(message, tx, tx != null || synchronous, null);
245     }
246
247     public final UMOMessage routeMessage(UMOMessage message, UMOTransaction trans, boolean synchronous)
248         throws UMOException
249     {
250         return routeMessage(message, trans, synchronous, null);
251     }
252
253     public final UMOMessage routeMessage(UMOMessage message, OutputStream outputStream) throws UMOException
254     {
255         return routeMessage(message, endpoint.isSynchronous(), outputStream);
256     }
257
258     public final UMOMessage routeMessage(UMOMessage message, boolean synchronous, OutputStream outputStream)
259         throws UMOException
260     {
261         UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
262         return routeMessage(message, tx, tx != null || synchronous, outputStream);
263     }
264
265     public final UMOMessage routeMessage(UMOMessage message,
266                                          UMOTransaction trans,
267                                          boolean synchronous,
268                                          OutputStream outputStream) throws UMOException
269     {
270
271         if (connector.isEnableMessageEvents())
272         {
273             connector.fireNotification(new MessageNotification(message, endpoint, component.getDescriptor()
274                 .getName(), MessageNotification.MESSAGE_RECEIVED));
275         }
276
277         if (logger.isDebugEnabled())
278         {
279             logger.debug("Message Received from: " + endpoint.getEndpointURI());
280         }
281         if (logger.isTraceEnabled())
282         {
283             try
284             {
285                 logger.trace("Message Payload: \n"
286                              + StringMessageUtils.truncate(StringMessageUtils.toString(message.getPayload()),
287                                  200, false));
288             }
289             catch (Exception JavaDoc e)
290             {
291                 // ignore
292
}
293         }
294
295         // Apply the endpoint filter if one is configured
296
if (endpoint.getFilter() != null)
297         {
298             if (!endpoint.getFilter().accept(message))
299             {
300                 handleUnacceptedFilter(message);
301                 return null;
302             }
303         }
304         return listener.onMessage(message, trans, synchronous, outputStream);
305     }
306
307     protected UMOMessage handleUnacceptedFilter(UMOMessage message)
308     {
309         String JavaDoc messageId = null;
310         messageId = message.getUniqueId();
311
312         if (logger.isDebugEnabled())
313         {
314             logger.debug("Message " + messageId + " failed to pass filter on endpoint: " + endpoint
315                          + ". Message is being ignored");
316         }
317
318         return null;
319     }
320
321     /*
322      * (non-Javadoc)
323      *
324      * @see org.mule.umo.provider.UMOMessageReceiver#setEndpoint(org.mule.umo.endpoint.UMOEndpoint)
325      */

326     public void setEndpoint(UMOEndpoint endpoint)
327     {
328         if (endpoint == null)
329         {
330             throw new IllegalArgumentException JavaDoc("Provider cannot be null");
331         }
332         this.endpoint = endpoint;
333     }
334
335     /*
336      * (non-Javadoc)
337      *
338      * @see org.mule.umo.provider.UMOMessageReceiver#setSession(org.mule.umo.UMOSession)
339      */

340     public void setComponent(UMOComponent component)
341     {
342         if (component == null)
343         {
344             throw new IllegalArgumentException JavaDoc("Component cannot be null");
345         }
346         this.component = component;
347     }
348
349     public final void dispose()
350     {
351         stop();
352         disposing.set(true);
353         doDispose();
354         workManager.dispose();
355     }
356
357     /**
358      * Template method to dispose any resources associated with this receiver. There
359      * is not need to dispose the connector as this is already done by the framework
360      */

361     protected void doDispose()
362     {
363         // nothing to do
364
}
365
366     public UMOEndpointURI getEndpointURI()
367     {
368         return endpointUri;
369     }
370
371     protected UMOWorkManager getWorkManager()
372     {
373         return workManager;
374     }
375
376     protected void setWorkManager(UMOWorkManager workManager)
377     {
378         this.workManager = workManager;
379     }
380
381     public void connect() throws Exception JavaDoc
382     {
383         if (connected.get())
384         {
385             return;
386         }
387         if (logger.isDebugEnabled())
388         {
389             logger.debug("Attempting to connect to: " + endpoint.getEndpointURI());
390         }
391         if (connecting.compareAndSet(false, true))
392         {
393             connectionStrategy.connect(this);
394             logger.info("Successfully connected to: " + endpoint.getEndpointURI());
395             return;
396         }
397
398         try
399         {
400             doConnect();
401             connector.fireNotification(new ConnectionNotification(this, getConnectEventId(),
402                 ConnectionNotification.CONNECTION_CONNECTED));
403         }
404         catch (Exception JavaDoc e)
405         {
406             connector.fireNotification(new ConnectionNotification(this, getConnectEventId(),
407                 ConnectionNotification.CONNECTION_FAILED));
408             if (e instanceof ConnectException)
409             {
410                 throw (ConnectException)e;
411             }
412             else
413             {
414                 throw new ConnectException(e, this);
415             }
416         }
417         connected.set(true);
418         connecting.set(false);
419     }
420
421     public void disconnect() throws Exception JavaDoc
422     {
423         if (logger.isDebugEnabled())
424         {
425             logger.debug("Disconnecting from: " + endpoint.getEndpointURI());
426         }
427         connector.fireNotification(new ConnectionNotification(this, getConnectEventId(),
428             ConnectionNotification.CONNECTION_DISCONNECTED));
429         connected.set(false);
430         doDisconnect();
431         logger.info("Disconnected from: " + endpoint.getEndpointURI());
432     }
433
434     public String JavaDoc getConnectionDescription()
435     {
436         return endpoint.getEndpointURI().toString();
437     }
438
439     public final void start() throws UMOException
440     {
441         if (stopped.commit(true, false))
442         {
443             if (!connected.get())
444             {
445                 connectionStrategy.connect(this);
446             }
447             doStart();
448         }
449     }
450
451     public final void stop()
452     {
453         try
454         {
455             if (connected.get())
456             {
457                 disconnect();
458             }
459         }
460         catch (Exception JavaDoc e)
461         {
462             logger.error(e.getMessage(), e);
463         }
464
465         if (stopped.commit(false, true))
466         {
467             try
468             {
469                 doStop();
470             }
471             catch (UMOException e)
472             {
473                 logger.error(e.getMessage(), e);
474             }
475
476         }
477     }
478
479     public final boolean isConnected()
480     {
481         return connected.get();
482     }
483
484     public InternalMessageListener getListener()
485     {
486         return listener;
487     }
488
489     public void setListener(InternalMessageListener listener)
490     {
491         this.listener = listener;
492     }
493
494     private class DefaultInternalMessageListener implements InternalMessageListener
495     {
496
497         public UMOMessage onMessage(UMOMessage message,
498                                     UMOTransaction trans,
499                                     boolean synchronous,
500                                     OutputStream outputStream) throws UMOException
501         {
502
503             UMOMessage resultMessage = null;
504             ResponseOutputStream ros = null;
505             if (outputStream != null)
506             {
507                 if (outputStream instanceof ResponseOutputStream)
508                 {
509                     ros = (ResponseOutputStream)outputStream;
510                 }
511                 else
512                 {
513                     ros = new ResponseOutputStream(outputStream);
514                 }
515             }
516             UMOSession session = new MuleSession(message, connector.getSessionHandler(), component);
517             UMOEvent muleEvent = new MuleEvent(message, endpoint, session, synchronous, ros);
518             RequestContext.setEvent(muleEvent);
519
520             // Apply Security filter if one is set
521
boolean authorised = false;
522             if (endpoint.getSecurityFilter() != null)
523             {
524                 try
525                 {
526                     endpoint.getSecurityFilter().authenticate(muleEvent);
527                     authorised = true;
528                 }
529                 catch (SecurityException JavaDoc e)
530                 {
531                     logger.warn("Request was made but was not authenticated: " + e.getMessage(), e);
532                     connector.fireNotification(new SecurityNotification(e,
533                         SecurityNotification.SECURITY_AUTHENTICATION_FAILED));
534                     handleException(e);
535                     resultMessage = message;
536                     // setExceptionDetails(resultMessage, e);
537
}
538             }
539             else
540             {
541                 authorised = true;
542             }
543
544             if (authorised)
545             {
546                 // the security filter may update the payload so we need to get the
547
// latest event again
548
muleEvent = RequestContext.getEvent();
549
550                 // This is a replyTo event for a current request
551
if (UMOEndpoint.ENDPOINT_TYPE_RESPONSE.equals(endpoint.getType()))
552                 {
553                     component.getDescriptor().getResponseRouter().route(muleEvent);
554                     return null;
555                 }
556                 else
557                 {
558                     resultMessage = component.getDescriptor().getInboundRouter().route(muleEvent);
559                 }
560             }
561             if (resultMessage != null)
562             {
563                 RequestContext.rewriteEvent(resultMessage);
564                 if (resultMessage.getExceptionPayload() != null)
565                 {
566                     setExceptionDetails(resultMessage, resultMessage.getExceptionPayload().getException());
567                 }
568             }
569             return applyResponseTransformer(resultMessage);
570         }
571     }
572
573     protected String JavaDoc getConnectEventId()
574     {
575         return connector.getName() + ".receiver (" + endpoint.getEndpointURI() + ")";
576     }
577
578     protected UMOMessage applyResponseTransformer(UMOMessage returnMessage) throws TransformerException
579     {
580         UMOTransformer transformer = endpoint.getResponseTransformer();
581
582         // no endpoint transformer, so check on component
583
if (transformer == null)
584         {
585             transformer = component.getDescriptor().getResponseTransformer();
586         }
587
588         // still no transformer, so do nothing.
589
if (transformer == null)
590         {
591             return returnMessage;
592         }
593
594         if (returnMessage == null)
595         {
596             if (transformer.isAcceptNull())
597             {
598                 returnMessage = new MuleMessage(new NullPayload(), RequestContext.getEventContext()
599                     .getMessage());
600             }
601             else
602             {
603                 return null;
604             }
605         }
606
607         Object JavaDoc returnPayload = returnMessage.getPayload();
608         if (transformer.isSourceTypeSupported(returnPayload.getClass()))
609         {
610             Object JavaDoc result = transformer.transform(returnPayload);
611             if (result instanceof UMOMessage)
612             {
613                 returnMessage = (UMOMessage)result;
614             }
615             else
616             {
617                 // Try and wrap the response in the correct messageAdapter, if this
618
// doesn't work for some reason
619
// just use a standard adater
620
// try {
621
// UMOMessageAdapter adapter =
622
// endpoint.getConnector().getMessageAdapter(result);
623
// returnMessage = new MuleMessage(adapter, returnMessage);
624
// } catch (MessagingException e) {
625
// if(logger.isWarnEnabled()) {
626
// logger.warn("Failed to wrap response in " +
627
// endpoint.getConnector().getProtocol() + ". Error is: " +
628
// e.getMessage());
629
// }
630
returnMessage = new MuleMessage(result, returnMessage);
631                 // }
632
//
633
}
634         }
635         else
636         {
637             if (logger.isDebugEnabled())
638             {
639                 logger.debug("Response transformer: " + transformer + " doesn't support the result payload: "
640                              + returnPayload.getClass());
641             }
642         }
643         return returnMessage;
644     }
645
646     public void doStart() throws UMOException
647     {
648         // nothing to do
649
}
650
651     public void doStop() throws UMOException
652     {
653         // nothing to do
654
}
655
656     public abstract void doConnect() throws Exception JavaDoc;
657
658     public abstract void doDisconnect() throws Exception JavaDoc;
659
660 }
661
Popular Tags