KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > routing > outbound > AbstractOutboundRouter


1 /*
2  * $Id: AbstractOutboundRouter.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.routing.outbound;
12
13 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
14 import org.apache.commons.lang.SystemUtils;
15 import org.apache.commons.logging.Log;
16 import org.apache.commons.logging.LogFactory;
17 import org.mule.MuleManager;
18 import org.mule.util.ClassUtils;
19 import org.mule.config.MuleProperties;
20 import org.mule.util.properties.PropertyExtractor;
21 import org.mule.management.stats.RouterStatistics;
22 import org.mule.routing.CorrelationPropertiesExtractor;
23 import org.mule.umo.UMOException;
24 import org.mule.umo.UMOMessage;
25 import org.mule.umo.UMOSession;
26 import org.mule.umo.UMOTransactionConfig;
27 import org.mule.umo.endpoint.UMOEndpoint;
28 import org.mule.umo.routing.UMOOutboundRouter;
29 import org.mule.util.StringMessageUtils;
30
31 import java.util.Iterator JavaDoc;
32 import java.util.List JavaDoc;
33
34 /**
35  * <code>AbstractOutboundRouter</code> is a base router class that tracks
36  * statistics about message processing through the router.
37  *
38  * @author <a HREF="mailto:ross.mason@symphonysoft.com">Ross Mason</a>
39  * @version $Revision: 3798 $
40  */

41 public abstract class AbstractOutboundRouter implements UMOOutboundRouter
42 {
43     public static final int ENABLE_CORRELATION_IF_NOT_SET = 0;
44     public static final int ENABLE_CORRELATION_ALWAYS = 1;
45     public static final int ENABLE_CORRELATION_NEVER = 2;
46     /**
47      * logger used by this class
48      */

49     protected transient Log logger = LogFactory.getLog(getClass());
50
51     protected List endpoints = new CopyOnWriteArrayList();
52
53     protected String JavaDoc replyTo = null;
54
55     protected int enableCorrelation = ENABLE_CORRELATION_IF_NOT_SET;
56
57     protected PropertyExtractor propertyExtractor = new CorrelationPropertiesExtractor();
58
59     protected RouterStatistics routerStatistics;
60
61     protected UMOTransactionConfig transactionConfig;
62
63     public void dispatch(UMOSession session, UMOMessage message, UMOEndpoint endpoint) throws UMOException
64     {
65         setMessageProperties(session, message, endpoint);
66
67         if (logger.isDebugEnabled())
68         {
69             try
70             {
71                 logger.debug("Message being sent to: " + endpoint.getEndpointURI() + " Message payload: \n"
72                              + StringMessageUtils.truncate(message.getPayloadAsString(), 100, false));
73             }
74             catch (Exception JavaDoc e)
75             {
76                 logger.debug("Message being sent to: " + endpoint.getEndpointURI()
77                              + " Message payload: \n(unable to retrieve payload: " + e.getMessage());
78             }
79         }
80
81         session.dispatchEvent(message, endpoint);
82         if (routerStatistics != null)
83         {
84             if (routerStatistics.isEnabled())
85             {
86                 routerStatistics.incrementRoutedMessage(endpoint);
87             }
88         }
89     }
90
91     public UMOMessage send(UMOSession session, UMOMessage message, UMOEndpoint endpoint) throws UMOException
92     {
93
94         if (replyTo != null)
95         {
96             logger.debug("event was dispatched synchronously, but there is a ReplyTo endpoint set, so using asynchronous dispatch");
97             dispatch(session, message, endpoint);
98             return null;
99         }
100         setMessageProperties(session, message, endpoint);
101
102         if (logger.isDebugEnabled())
103         {
104             logger.debug("Message being sent to: " + endpoint.getEndpointURI());
105             logger.debug(message);
106         }
107         if (logger.isTraceEnabled())
108         {
109             try
110             {
111                 logger.trace("Message payload: \n" + message.getPayloadAsString());
112             }
113             catch (Exception JavaDoc e)
114             {
115                 // ignore
116
}
117         }
118         UMOMessage result = session.sendEvent(message, endpoint);
119         if (routerStatistics != null)
120         {
121             if (routerStatistics.isEnabled())
122             {
123                 routerStatistics.incrementRoutedMessage(endpoint);
124             }
125         }
126
127         if (logger.isDebugEnabled())
128         {
129             logger.debug("Response message from sending to: " + endpoint.getEndpointURI());
130             logger.debug(message);
131         }
132         if (logger.isTraceEnabled())
133         {
134             try
135             {
136                 logger.trace("Message payload: \n" + message.getPayloadAsString());
137             }
138             catch (Exception JavaDoc e)
139             {
140                 // ignore
141
}
142         }
143         return result;
144     }
145
146     protected void setMessageProperties(UMOSession session, UMOMessage message, UMOEndpoint endpoint)
147     {
148         if (replyTo != null)
149         {
150             // if replyTo is set we'll probably want the correlationId set as
151
// well
152
message.setReplyTo(replyTo);
153             message.setProperty(MuleProperties.MULE_REPLY_TO_REQUESTOR_PROPERTY, session.getComponent()
154                 .getDescriptor()
155                 .getName());
156             if (logger.isDebugEnabled())
157             {
158                 logger.debug("Setting replyTo=" + replyTo + " for outbound endpoint: "
159                              + endpoint.getEndpointURI());
160             }
161         }
162         if (enableCorrelation != ENABLE_CORRELATION_NEVER)
163         {
164             boolean correlationSet = message.getCorrelationId() != null;
165             if (correlationSet && (enableCorrelation == ENABLE_CORRELATION_IF_NOT_SET))
166             {
167                 if (logger.isDebugEnabled())
168                 {
169                     logger.debug("CorrelationId is already set to '" + message.getCorrelationId()
170                                  + "' , not setting it again");
171                 }
172                 return;
173             }
174             else if (correlationSet)
175             {
176                 if (logger.isDebugEnabled())
177                 {
178                     logger.debug("CorrelationId is already set to '" + message.getCorrelationId()
179                                  + "', but router is configured to overwrite it");
180                 }
181             }
182             else
183             {
184                 if (logger.isDebugEnabled())
185                 {
186                     logger.debug("No CorrelationId is set on the message, will set a new Id");
187                 }
188             }
189
190             String JavaDoc correlation;
191             Object JavaDoc o = propertyExtractor.getProperty(MuleProperties.MULE_CORRELATION_ID_PROPERTY, message);
192             if (logger.isDebugEnabled())
193             {
194                 logger.debug("Extracted correlation Id as: " + o);
195             }
196             correlation = o.toString();
197
198             if (logger.isDebugEnabled())
199             {
200                 StringBuffer JavaDoc buf = new StringBuffer JavaDoc();
201                 buf.append("Setting Correlation info on Outbound router for endpoint: ").append(
202                     endpoint.getEndpointURI());
203                 buf.append(SystemUtils.LINE_SEPARATOR).append("Id=").append(correlation);
204                 // buf.append(", ").append("Seq=").append(seq);
205
// buf.append(", ").append("Group Size=").append(group);
206
logger.debug(buf.toString());
207             }
208             message.setCorrelationId(correlation);
209             // message.setCorrelationGroupSize(group);
210
// message.setCorrelationSequence(seq);
211
}
212     }
213
214     public List getEndpoints()
215     {
216         return endpoints;
217     }
218
219     public void setEndpoints(List endpoints)
220     {
221         // this.endpoints = new CopyOnWriteArrayList(endpoints);
222
for (Iterator JavaDoc iterator = endpoints.iterator(); iterator.hasNext();)
223         {
224             UMOEndpoint umoEndpoint = (UMOEndpoint)iterator.next();
225             addEndpoint(umoEndpoint);
226         }
227     }
228
229     public void addEndpoint(UMOEndpoint endpoint)
230     {
231         endpoint.setType(UMOEndpoint.ENDPOINT_TYPE_SENDER);
232         endpoints.add(endpoint);
233     }
234
235     public boolean removeEndpoint(UMOEndpoint endpoint)
236     {
237         return endpoints.remove(endpoint);
238     }
239
240     public String JavaDoc getReplyTo()
241     {
242         return replyTo;
243     }
244
245     public void setReplyTo(String JavaDoc replyTo)
246     {
247         if (replyTo != null)
248         {
249             this.replyTo = MuleManager.getInstance().lookupEndpointIdentifier(replyTo, replyTo);
250         }
251         else
252         {
253             this.replyTo = null;
254         }
255     }
256
257     public RouterStatistics getRouterStatistics()
258     {
259         return routerStatistics;
260     }
261
262     public void setRouterStatistics(RouterStatistics routerStatistics)
263     {
264         this.routerStatistics = routerStatistics;
265     }
266
267     public int getEnableCorrelation()
268     {
269         return enableCorrelation;
270     }
271
272     public void setEnableCorrelation(int enableCorrelation)
273     {
274         this.enableCorrelation = enableCorrelation;
275     }
276
277     public void setEnableCorrelationAsString(String JavaDoc enableCorrelation)
278     {
279         if (enableCorrelation != null)
280         {
281             if (enableCorrelation.equals("ALWAYS"))
282             {
283                 this.enableCorrelation = ENABLE_CORRELATION_ALWAYS;
284             }
285             else if (enableCorrelation.equals("NEVER"))
286             {
287                 this.enableCorrelation = ENABLE_CORRELATION_NEVER;
288             }
289             else if (enableCorrelation.equals("IF_NOT_SET"))
290             {
291                 this.enableCorrelation = ENABLE_CORRELATION_IF_NOT_SET;
292             }
293             else
294             {
295                 throw new IllegalArgumentException JavaDoc("Value for enableCorrelation not recognised: "
296                                                    + enableCorrelation);
297             }
298         }
299     }
300
301     public PropertyExtractor getPropertyExtractor()
302     {
303         return propertyExtractor;
304     }
305
306     public void setPropertyExtractor(PropertyExtractor propertyExtractor)
307     {
308         this.propertyExtractor = propertyExtractor;
309     }
310
311     public void setPropertyExtractorAsString(String JavaDoc className)
312     {
313         try
314         {
315             this.propertyExtractor = (PropertyExtractor)ClassUtils.instanciateClass(className, null,
316                 getClass());
317         }
318         catch (Exception JavaDoc ex)
319         {
320             throw new IllegalArgumentException JavaDoc("Couldn't instanciate property extractor class " + className);
321         }
322     }
323
324     public UMOTransactionConfig getTransactionConfig()
325     {
326         return transactionConfig;
327     }
328
329     public void setTransactionConfig(UMOTransactionConfig transactionConfig)
330     {
331         this.transactionConfig = transactionConfig;
332     }
333
334     public boolean isDynamicEndpoints()
335     {
336         return false;
337     }
338 }
339
Popular Tags