KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > eip > patterns > Pipeline


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.eip.patterns;
18
19 import java.net.URI JavaDoc;
20
21 import javax.jbi.management.DeploymentException;
22 import javax.jbi.messaging.ExchangeStatus;
23 import javax.jbi.messaging.Fault;
24 import javax.jbi.messaging.InOnly;
25 import javax.jbi.messaging.InOut;
26 import javax.jbi.messaging.MessageExchange;
27 import javax.jbi.messaging.RobustInOnly;
28 import javax.wsdl.Definition;
29
30 import org.apache.servicemix.eip.EIPEndpoint;
31 import org.apache.servicemix.eip.support.ExchangeTarget;
32 import org.apache.servicemix.jbi.FaultException;
33 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
34 import org.apache.servicemix.jbi.util.MessageUtil;
35
36 /**
37  * The Pipeline component is a bridge between an In-Only (or Robust-In-Only) MEP and
38  * an In-Out MEP.
39  * When the Pipeline receives an In-Only MEP, it will send the input in an In-Out MEP
40  * to the tranformer destination and forward the response in an In-Only MEP to the target
41  * destination.
42  * In addition, this component is fully asynchronous and uses an exchange store to provide
43  * full HA and recovery for clustered / persistent flows.
44  *
45  * @author gnodet
46  * @version $Revision: 376451 $
47  * @org.apache.xbean.XBean element="pipeline"
48  * description="A Pipeline"
49  */

50 public class Pipeline extends EIPEndpoint {
51
52     private static final String JavaDoc TRANSFORMER = "Pipeline.Transformer";
53     
54     private static final String JavaDoc CONSUMER_MEP = "Pipeline.ConsumerMEP";
55
56     /**
57      * The adress of the in-out endpoint acting as a transformer
58      */

59     private ExchangeTarget transformer;
60
61     /**
62      * The address of the target endpoint
63      */

64     private ExchangeTarget target;
65
66     /**
67      * The correlation property used by this component
68      */

69     private String JavaDoc correlationConsumer;
70
71     /**
72      * The correlation property used by this component
73      */

74     private String JavaDoc correlationTransformer;
75
76     /**
77      * The correlation property used by this component
78      */

79     private String JavaDoc correlationTarget;
80
81     /**
82      * @return Returns the target.
83      */

84     public ExchangeTarget getTarget() {
85         return target;
86     }
87
88     /**
89      * @param target The target to set.
90      */

91     public void setTarget(ExchangeTarget target) {
92         this.target = target;
93     }
94
95     /**
96      * @return Returns the transformer.
97      */

98     public ExchangeTarget getTransformer() {
99         return transformer;
100     }
101
102     /**
103      * @param transformer The transformer to set.
104      */

105     public void setTransformer(ExchangeTarget transformer) {
106         this.transformer = transformer;
107     }
108
109     /* (non-Javadoc)
110      * @see org.apache.servicemix.eip.EIPEndpoint#validate()
111      */

112     public void validate() throws DeploymentException {
113         super.validate();
114         // Check target
115
if (target == null) {
116             throw new IllegalArgumentException JavaDoc("target should be set to a valid ExchangeTarget");
117         }
118         // Check transformer
119
if (transformer == null) {
120             throw new IllegalArgumentException JavaDoc("transformer should be set to a valid ExchangeTarget");
121         }
122         // Create correlation properties
123
correlationConsumer = "Pipeline.Consumer." + getService() + "." + getEndpoint();
124         correlationTransformer = "Pipeline.Transformer." + getService() + "." + getEndpoint();
125         correlationTarget = "Pipeline.Target." + getService() + "." + getEndpoint();
126     }
127
128     /* (non-Javadoc)
129      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
130      */

131     protected void processSync(MessageExchange exchange) throws Exception JavaDoc {
132         if (exchange instanceof InOnly == false &&
133             exchange instanceof RobustInOnly == false) {
134             fail(exchange, new UnsupportedOperationException JavaDoc("Use an InOnly or RobustInOnly MEP"));
135             return;
136         }
137         // Create exchange for target
138
InOut tme = exchangeFactory.createInOutExchange();
139         transformer.configureTarget(tme, getContext());
140         // Send in to listener and target
141
MessageUtil.transferInToIn(exchange, tme);
142         sendSync(tme);
143         // Check result
144
if (tme.getStatus() == ExchangeStatus.DONE) {
145             throw new IllegalStateException JavaDoc("Received a DONE status from the transformer");
146         }
147         // Errors must be sent back to the consumer
148
else if (tme.getStatus() == ExchangeStatus.ERROR) {
149             fail(exchange, tme.getError());
150         }
151         // Faults must be sent back to the consumer
152
else if (tme.getFault() != null) {
153             if (exchange instanceof InOnly) {
154                 // Do not use the fault has it may contain streams
155
// So just transform it to a string and send an error
156
String JavaDoc fault = new SourceTransformer().contentToString(tme.getFault());
157                 done(tme);
158                 fail(exchange, new FaultException(fault, null, null));
159             } else {
160                 Fault fault = MessageUtil.copyFault(tme);
161                 MessageUtil.transferToFault(fault, exchange);
162                 done(tme);
163                 sendSync(exchange);
164             }
165         }
166         // This should not happen
167
else if (tme.getOutMessage() == null) {
168             throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
169         }
170         // This is the answer from the transformer
171
MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
172         target.configureTarget(me, getContext());
173         MessageUtil.transferOutToIn(tme, me);
174         sendSync(me);
175         done(tme);
176         if (me.getStatus() == ExchangeStatus.DONE) {
177             done(exchange);
178         } else if (me.getStatus() == ExchangeStatus.ERROR) {
179             fail(exchange, me.getError());
180         } else if (me.getFault() != null) {
181             if (exchange instanceof InOnly) {
182                 // Do not use the fault has it may contain streams
183
// So just transform it to a string and send an error
184
String JavaDoc fault = new SourceTransformer().contentToString(me.getFault());
185                 done(me);
186                 fail(exchange, new FaultException(fault, null, null));
187             } else {
188                 Fault fault = MessageUtil.copyFault(me);
189                 MessageUtil.transferToFault(fault, exchange);
190                 done(me);
191                 sendSync(exchange);
192             }
193         } else {
194             throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
195         }
196     }
197
198     /* (non-Javadoc)
199      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
200      */

201     protected void processAsync(MessageExchange exchange) throws Exception JavaDoc {
202         // The exchange comes from the consumer
203
if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
204             // A DONE status from the consumer can only be received
205
// when a fault has been sent
206
if (exchange.getStatus() == ExchangeStatus.DONE) {
207                 String JavaDoc transformerId = (String JavaDoc) exchange.getProperty(correlationTransformer);
208                 String JavaDoc targetId = (String JavaDoc) exchange.getProperty(correlationTarget);
209                 if (transformerId == null && targetId == null) {
210                     throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
211                 }
212                 // Load the exchange
213
MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
214                 done(me);
215             // Errors must be sent back to the target or transformer
216
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
217                 String JavaDoc transformerId = (String JavaDoc) exchange.getProperty(correlationTransformer);
218                 String JavaDoc targetId = (String JavaDoc) exchange.getProperty(correlationTarget);
219                 if (transformerId == null && targetId == null) {
220                     throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
221                 }
222                 // Load the exchange
223
MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
224                 fail(me, exchange.getError());
225             // This is a new exchange
226
} else if (exchange.getProperty(correlationTransformer) == null) {
227                 if (exchange instanceof InOnly == false && exchange instanceof RobustInOnly == false) {
228                     fail(exchange, new UnsupportedOperationException JavaDoc("Use an InOnly or RobustInOnly MEP"));
229                     return;
230                 }
231                 // Create exchange for target
232
MessageExchange tme = exchangeFactory.createInOutExchange();
233                 transformer.configureTarget(tme, getContext());
234                 // Set correlations
235
exchange.setProperty(correlationTransformer, tme.getExchangeId());
236                 tme.setProperty(correlationConsumer, exchange.getExchangeId());
237                 tme.setProperty(TRANSFORMER, Boolean.TRUE);
238                 tme.setProperty(CONSUMER_MEP, exchange.getPattern());
239                 // Put exchange to store
240
store.store(exchange.getExchangeId(), exchange);
241                 // Send in to listener and target
242
MessageUtil.transferInToIn(exchange, tme);
243                 send(tme);
244             } else {
245                 throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
246             }
247         // If the exchange comes from the transformer
248
} else if (Boolean.TRUE.equals(exchange.getProperty(TRANSFORMER))) {
249             // Retrieve the correlation id
250
String JavaDoc consumerId = (String JavaDoc) exchange.getProperty(correlationConsumer);
251             if (consumerId == null) {
252                 throw new IllegalStateException JavaDoc(correlationConsumer + " property not found");
253             }
254             // This should not happen beacause the MEP is an In-Out
255
// and the DONE status is always sent by the consumer (us)
256
if (exchange.getStatus() == ExchangeStatus.DONE) {
257                 throw new IllegalStateException JavaDoc("Received a DONE status from the transformer");
258             // Errors must be sent back to the consumer
259
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
260                 MessageExchange me = (MessageExchange) store.load(consumerId);
261                 fail(me, exchange.getError());
262             // Faults must be sent back to the consumer
263
} else if (exchange.getFault() != null) {
264                 MessageExchange me = (MessageExchange) store.load(consumerId);
265                 if (me instanceof InOnly) {
266                     // Do not use the fault has it may contain streams
267
// So just transform it to a string and send an error
268
String JavaDoc fault = new SourceTransformer().contentToString(exchange.getFault());
269                     fail(me, new FaultException(fault, null, null));
270                     done(exchange);
271                 } else {
272                     store.store(exchange.getExchangeId(), exchange);
273                     MessageUtil.transferFaultToFault(exchange, me);
274                     send(me);
275                 }
276             // This is the answer from the transformer
277
} else if (exchange.getMessage("out") != null) {
278                 // Retrieve the consumer MEP
279
URI JavaDoc mep = (URI JavaDoc) exchange.getProperty(CONSUMER_MEP);
280                 if (mep == null) {
281                     throw new IllegalStateException JavaDoc("Exchange does not carry the consumer MEP");
282                 }
283                 MessageExchange me = exchangeFactory.createExchange(mep);
284                 target.configureTarget(me, getContext());
285                 me.setProperty(correlationConsumer, consumerId);
286                 me.setProperty(correlationTransformer, exchange.getExchangeId());
287                 store.store(exchange.getExchangeId(), exchange);
288                 MessageUtil.transferOutToIn(exchange, me);
289                 send(me);
290             // This should not happen
291
} else {
292                 throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
293             }
294         // The exchange comes from the target
295
} else {
296             // Retrieve the correlation id for the consumer
297
String JavaDoc consumerId = (String JavaDoc) exchange.getProperty(correlationConsumer);
298             if (consumerId == null) {
299                 throw new IllegalStateException JavaDoc(correlationConsumer + " property not found");
300             }
301             // Retrieve the correlation id for the transformer
302
String JavaDoc transformerId = (String JavaDoc) exchange.getProperty(correlationTransformer);
303             if (transformerId == null) {
304                 throw new IllegalStateException JavaDoc(correlationTransformer + " property not found");
305             }
306             // This should be the last message received
307
if (exchange.getStatus() == ExchangeStatus.DONE) {
308                 // Need to ack the transformer
309
MessageExchange tme = (MessageExchange) store.load(transformerId);
310                 done(tme);
311                 // Need to ack the consumer
312
MessageExchange cme = (MessageExchange) store.load(consumerId);
313                 done(cme);
314             // Errors should be sent back to the consumer
315
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
316                 // Need to ack the transformer
317
MessageExchange tme = (MessageExchange) store.load(transformerId);
318                 done(tme);
319                 // Send error to consumer
320
MessageExchange cme = (MessageExchange) store.load(consumerId);
321                 fail(cme, exchange.getError());
322             // If we have a robust-in-only MEP, we can receive a fault
323
} else if (exchange.getFault() != null) {
324                 // Need to ack the transformer
325
MessageExchange tme = (MessageExchange) store.load(transformerId);
326                 done(tme);
327                 // Send fault back to consumer
328
store.store(exchange.getExchangeId(), exchange);
329                 MessageExchange cme = (MessageExchange) store.load(consumerId);
330                 cme.setProperty(correlationTarget, exchange.getExchangeId());
331                 MessageUtil.transferFaultToFault(exchange, cme);
332                 send(cme);
333             // This should not happen
334
} else {
335                 throw new IllegalStateException JavaDoc("Exchange from target has a " + ExchangeStatus.ACTIVE + " status but has no Fault message");
336             }
337         }
338     }
339     
340     protected Definition getDefinitionFromWsdlExchangeTarget() {
341         Definition rc = super.getDefinitionFromWsdlExchangeTarget();
342         if( rc !=null ) {
343             // TODO: This components wsdl is == transformer wsdl without the out message.
344
// need to massage the result wsdl so that it described an in only exchange
345
}
346         return rc;
347     }
348
349 }
350
Popular Tags