KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > eip > patterns > StaticRoutingSlip


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.eip.patterns;
18
19 import javax.jbi.management.DeploymentException;
20 import javax.jbi.messaging.ExchangeStatus;
21 import javax.jbi.messaging.Fault;
22 import javax.jbi.messaging.InOut;
23 import javax.jbi.messaging.MessageExchange;
24
25 import org.apache.servicemix.eip.EIPEndpoint;
26 import org.apache.servicemix.eip.support.ExchangeTarget;
27 import org.apache.servicemix.jbi.util.MessageUtil;
28
29 /**
30  * A RoutingSlip component can be used to route an incoming In-Out exchange
31  * through a series of target services.
32  * This component implements the
33  * <a HREF="http://www.enterpriseintegrationpatterns.com/RoutingTable.html">Routing Slip</a>
34  * pattern, with the limitation that the routing table is static.
35  * This component only uses In-Out MEPs and errors or faults sent by targets are reported
36  * back to the consumer, thus interrupting the routing process.
37  * In addition, this component is fully asynchronous and uses an exchange store to provide
38  * full HA and recovery for clustered / persistent flows.
39  *
40  * @author gnodet
41  * @version $Revision: 376451 $
42  * @org.apache.xbean.XBean element="static-routing-slip"
43  * description="A static Routing Slip"
44  */

45 public class StaticRoutingSlip extends EIPEndpoint {
46
47     /**
48      * List of target components used in the RoutingSlip
49      */

50     private ExchangeTarget[] targets;
51     /**
52      * The correlation property used by this component
53      */

54     private String JavaDoc correlation;
55     /**
56      * The current index of the target
57      */

58     private String JavaDoc index;
59     /**
60      * The id of the previous target exchange
61      */

62     private String JavaDoc previous;
63     
64     /**
65      * @return Returns the targets.
66      */

67     public ExchangeTarget[] getTargets() {
68         return targets;
69     }
70
71     /**
72      * @param targets The targets to set.
73      */

74     public void setTargets(ExchangeTarget[] targets) {
75         this.targets = targets;
76     }
77
78     /* (non-Javadoc)
79      * @see org.apache.servicemix.eip.EIPEndpoint#validate()
80      */

81     public void validate() throws DeploymentException {
82         super.validate();
83         // Check target
84
if (targets == null || targets.length == 0) {
85             throw new IllegalArgumentException JavaDoc("targets should contain at least one ExchangeTarget");
86         }
87         // Create correlation properties
88
correlation = "RoutingSlip.Correlation." + getService() + "." + getEndpoint();
89         index = "RoutingSlip.Index." + getService() + "." + getEndpoint();
90         previous = "RoutingSlip.Previous." + getService() + "." + getEndpoint();
91     }
92
93     /* (non-Javadoc)
94      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
95      */

96     protected void processSync(MessageExchange exchange) throws Exception JavaDoc {
97         if (exchange instanceof InOut == false) {
98             throw new IllegalStateException JavaDoc("Use an InOut MEP");
99         }
100         MessageExchange current = exchange;
101         for (int i = 0; i < targets.length; i++) {
102             InOut me = exchangeFactory.createInOutExchange();
103             targets[i].configureTarget(me, getContext());
104             if (i == 0) {
105                 MessageUtil.transferInToIn(current, me);
106             } else {
107                 MessageUtil.transferOutToIn(current, me);
108             }
109             sendSync(me);
110             if (i != 0) {
111                 done(current);
112             }
113             if (me.getStatus() == ExchangeStatus.DONE) {
114                 throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.DONE);
115             } else if (me.getStatus() == ExchangeStatus.ERROR) {
116                 fail(exchange, me.getError());
117                 return;
118             } else if (me.getFault() != null) {
119                 Fault fault = MessageUtil.copyFault(me);
120                 MessageUtil.transferToFault(fault, exchange);
121                 done(me);
122                 sendSync(exchange);
123                 return;
124             } else if (me.getOutMessage() == null) {
125                 throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
126             }
127             current = me;
128         }
129         MessageUtil.transferToOut(MessageUtil.copyOut(current), exchange);
130         done(current);
131         sendSync(exchange);
132     }
133
134     /* (non-Javadoc)
135      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
136      */

137     protected void processAsync(MessageExchange exchange) throws Exception JavaDoc {
138         // This exchange comes from the consumer
139
if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
140             if (exchange.getStatus() == ExchangeStatus.DONE) {
141                 String JavaDoc correlationId = (String JavaDoc) exchange.getProperty(correlation);
142                 if (correlationId == null) {
143                     throw new IllegalStateException JavaDoc(correlation + " property not found");
144                 }
145                 // Ack last target hit
146
MessageExchange me = (MessageExchange) store.load(correlationId);
147                 done(me);
148             } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
149                 String JavaDoc correlationId = (String JavaDoc) exchange.getProperty(correlation);
150                 if (correlationId == null) {
151                     throw new IllegalStateException JavaDoc(correlation + " property not found");
152                 }
153                 // Ack last target hit
154
MessageExchange me = (MessageExchange) store.load(correlationId);
155                 done(me);
156             } else if (exchange instanceof InOut == false) {
157                 throw new IllegalStateException JavaDoc("Use an InOut MEP");
158             } else {
159                 MessageExchange me = exchangeFactory.createInOutExchange();
160                 me.setProperty(correlation, exchange.getExchangeId());
161                 me.setProperty(index, new Integer JavaDoc(0));
162                 targets[0].configureTarget(me, getContext());
163                 store.store(exchange.getExchangeId(), exchange);
164                 MessageUtil.transferInToIn(exchange, me);
165                 send(me);
166             }
167         // The exchange comes from a target
168
} else {
169             String JavaDoc correlationId = (String JavaDoc) exchange.getProperty(correlation);
170             String JavaDoc previousId = (String JavaDoc) exchange.getProperty(previous);
171             Integer JavaDoc prevIndex = (Integer JavaDoc) exchange.getProperty(index);
172             if (correlationId == null) {
173                 throw new IllegalStateException JavaDoc(correlation + " property not found");
174             }
175             if (prevIndex == null) {
176                 throw new IllegalStateException JavaDoc(previous + " property not found");
177             }
178             // This should never happen, as we can only send DONE
179
if (exchange.getStatus() == ExchangeStatus.DONE) {
180                 throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.DONE);
181             // ERROR are sent back to the consumer
182
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
183                 MessageExchange me = (MessageExchange) store.load(correlationId);
184                 fail(me, exchange.getError());
185                 // Ack the previous target
186
if (previousId != null) {
187                     me = (MessageExchange) store.load(previousId);
188                     done(me);
189                 }
190             // Faults are sent back to the consumer
191
} else if (exchange.getFault() != null) {
192                 MessageExchange me = (MessageExchange) store.load(correlationId);
193                 me.setProperty(correlation, exchange.getExchangeId());
194                 store.store(exchange.getExchangeId(), exchange);
195                 MessageUtil.transferFaultToFault(exchange, me);
196                 send(me);
197                 // Ack the previous target
198
if (previousId != null) {
199                     me = (MessageExchange) store.load(previousId);
200                     done(me);
201                 }
202             // Out message, give it to next target or back to consumer
203
} else if (exchange.getMessage("out") != null) {
204                 // This is the answer from the last target
205
if (prevIndex.intValue() == targets.length - 1) {
206                     MessageExchange me = (MessageExchange) store.load(correlationId);
207                     me.setProperty(correlation, exchange.getExchangeId());
208                     store.store(exchange.getExchangeId(), exchange);
209                     MessageUtil.transferOutToOut(exchange, me);
210                     send(me);
211                     if (previousId != null) {
212                         me = (MessageExchange) store.load(previousId);
213                         done(me);
214                     }
215                 // We still have a target to hit
216
} else {
217                     MessageExchange me = exchangeFactory.createInOutExchange();
218                     Integer JavaDoc curIndex = new Integer JavaDoc(prevIndex.intValue() + 1);
219                     me.setProperty(correlation, correlationId);
220                     me.setProperty(index, curIndex);
221                     me.setProperty(previous, exchange.getExchangeId());
222                     targets[curIndex.intValue()].configureTarget(me, getContext());
223                     store.store(exchange.getExchangeId(), exchange);
224                     MessageUtil.transferOutToIn(exchange, me);
225                     send(me);
226                     if (previousId != null) {
227                         me = (MessageExchange) store.load(previousId);
228                         done(me);
229                     }
230                 }
231             // This should not happen
232
} else {
233                 throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
234             }
235         }
236     }
237
238 }
239
Popular Tags