KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > routing > outbound > ChainingRouter


1 /*
2  * $Id: ChainingRouter.java 4259 2006-12-14 03:12:07Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.routing.outbound;
12
13 import org.mule.config.i18n.Message;
14 import org.mule.config.i18n.Messages;
15 import org.mule.umo.UMOException;
16 import org.mule.umo.UMOMessage;
17 import org.mule.umo.UMOSession;
18 import org.mule.umo.endpoint.UMOEndpoint;
19 import org.mule.umo.routing.CouldNotRouteOutboundMessageException;
20 import org.mule.umo.routing.RoutePathNotFoundException;
21 import org.mule.umo.routing.RoutingException;
22
23 /**
24  * <code>ChainingRouter</code> is used to pass a Mule event through multiple
25  * endpoints using the result of the first and the input for the second.
26  */

27
28 public class ChainingRouter extends FilteringOutboundRouter
29 {
30
31     public UMOMessage route(UMOMessage message, UMOSession session, boolean synchronous)
32         throws RoutingException
33     {
34         UMOMessage resultToReturn = null;
35         if (endpoints == null || endpoints.size() == 0)
36         {
37             throw new RoutePathNotFoundException(new Message(Messages.NO_ENDPOINTS_FOR_ROUTER), message, null);
38         }
39
40         final int endpointsCount = endpoints.size();
41         if (logger.isDebugEnabled())
42         {
43             logger.debug("About to chain " + endpointsCount + " endpoints.");
44         }
45
46         // need that ref for an error message
47
UMOEndpoint endpoint = null;
48         try
49         {
50             UMOMessage intermediaryResult = message;
51
52             for (int i = 0; i < endpointsCount; i++)
53             {
54                 endpoint = getEndpoint(i, intermediaryResult);
55                 // if it's not the last endpoint in the chain,
56
// enforce the synchronous call, otherwise we lose response
57
boolean lastEndpointInChain = (i == endpointsCount - 1);
58
59                 if (logger.isDebugEnabled())
60                 {
61                     logger.debug("Sending Chained message '" + i + "': "
62                                  + (intermediaryResult == null ? "null" : intermediaryResult.toString()));
63                 }
64
65                 if (!lastEndpointInChain)
66                 {
67                     UMOMessage localResult = send(session, intermediaryResult, endpoint);
68                     // Need to propagate correlation info and replyTo, because there
69
// is no guarantee that an external system will preserve headers
70
// (in fact most will not)
71
if (localResult != null && intermediaryResult != null)
72                     {
73                         processIntermediaryResult(localResult, intermediaryResult);
74                     }
75                     intermediaryResult = localResult;
76
77                     if (logger.isDebugEnabled())
78                     {
79                         logger.debug("Received Chain result '" + i + "': "
80                                      + (intermediaryResult != null ? intermediaryResult.toString() : "null"));
81                     }
82
83                     if (intermediaryResult == null)
84                     {
85                         logger.warn("Chaining router cannot process any further endpoints. "
86                                     + "There was no result returned from endpoint invocation: " + endpoint);
87                         break;
88                     }
89                 }
90                 else
91                 {
92                     // ok, the last call,
93
// use the 'sync/async' method parameter
94
if (synchronous)
95                     {
96                         resultToReturn = send(session, intermediaryResult, endpoint);
97                         if (logger.isDebugEnabled())
98                         {
99                             logger.debug("Received final Chain result '" + i + "': "
100                                          + (resultToReturn == null ? "null" : resultToReturn.toString()));
101                         }
102                     }
103                     else
104                     {
105                         // reset the previous call result to avoid confusion
106
resultToReturn = null;
107                         dispatch(session, intermediaryResult, endpoint);
108                     }
109                 }
110             }
111
112         }
113         catch (UMOException e)
114         {
115             throw new CouldNotRouteOutboundMessageException(message, endpoint, e);
116         }
117         return resultToReturn;
118     }
119
120     public void addEndpoint(UMOEndpoint endpoint)
121     {
122         if (!endpoint.isRemoteSync())
123         {
124             logger.debug("Endpoint: "
125                          + endpoint.getEndpointURI()
126                          + " registered on chaining router needs to be RemoteSync enabled. Setting this property now");
127             endpoint.setRemoteSync(true);
128         }
129         super.addEndpoint(endpoint);
130     }
131
132     /**
133      * Process intermediary result of invocation. The method will be invoked
134      * <strong>only</strong> if both local and intermediary results are available
135      * (not null).
136      * <p/>
137      * Overriding methods must call <code>super(localResult, intermediaryResult)</code>,
138      * unless they are modifying the correlation workflow (if you know what that means,
139      * you know what you are doing and when to do it).
140      * <p/>
141      * Default implementation propagates
142      * the following properties:
143      * <ul>
144      * <li>correlationId
145      * <li>correlationSequence
146      * <li>correlationGroupSize
147      * <li>replyTo
148      * </ul>
149      * @param localResult result of the last endpoint invocation
150      * @param intermediaryResult the message travelling across the endpoints
151      */

152     protected void processIntermediaryResult(UMOMessage localResult, UMOMessage intermediaryResult)
153     {
154         localResult.setCorrelationId(intermediaryResult.getCorrelationId());
155         localResult.setCorrelationSequence(intermediaryResult.getCorrelationSequence());
156         localResult.setCorrelationGroupSize(intermediaryResult.getCorrelationGroupSize());
157         localResult.setReplyTo(intermediaryResult.getReplyTo());
158     }
159 }
160
Popular Tags