KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > eip > support > AbstractSplitter


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.eip.support;
18
19 import java.net.URI JavaDoc;
20 import java.util.Iterator JavaDoc;
21 import java.util.Set JavaDoc;
22
23 import javax.jbi.management.DeploymentException;
24 import javax.jbi.messaging.ExchangeStatus;
25 import javax.jbi.messaging.InOnly;
26 import javax.jbi.messaging.MessageExchange;
27 import javax.jbi.messaging.NormalizedMessage;
28 import javax.jbi.messaging.RobustInOnly;
29 import javax.xml.transform.Source JavaDoc;
30
31 import org.apache.servicemix.eip.EIPEndpoint;
32 import org.apache.servicemix.jbi.util.MessageUtil;
33
34 /**
35  * The AbstractSplitter is an abstract base class for Splitters.
36  * This component implements the
37  * <a HREF="http://www.enterpriseintegrationpatterns.com/Sequencer.html">Splitter</a>
38  * pattern.
39  *
40  * @author gnodet
41  * @version $Revision: 376451 $
42  */

43 public abstract class AbstractSplitter extends EIPEndpoint {
44     
45     public static final String JavaDoc SPLITTER_COUNT = "org.apache.servicemix.eip.splitter.count";
46     public static final String JavaDoc SPLITTER_INDEX = "org.apache.servicemix.eip.splitter.index";
47     public static final String JavaDoc SPLITTER_CORRID = "org.apache.servicemix.eip.splitter.corrid";
48
49     /**
50      * The address of the target endpoint
51      */

52     private ExchangeTarget target;
53     /**
54      * Indicates if faults and errors from splitted parts should be sent
55      * back to the consumer. In such a case, only the first fault or
56      * error received will be reported.
57      * Note that if the consumer is synchronous, it will be blocked
58      * until all parts have been successfully acked, or
59      * a fault or error is reported, and the exchange will be kept in the
60      * store for recovery.
61      */

62     private boolean reportErrors;
63     /**
64      * Indicates if incoming attachments should be forwarded with the new exchanges.
65      */

66     private boolean forwardAttachments;
67     /**
68      * Indicates if properties on the incoming message should be forwarded.
69      */

70     private boolean forwardProperties;
71     /**
72      * The correlation property used by this component
73      */

74     private String JavaDoc correlation;
75     /**
76      * Specifies wether exchanges for all parts are sent synchronously or not.
77      */

78     private boolean synchronous;
79     
80     /**
81      * @return the synchronous
82      */

83     public boolean isSynchronous() {
84         return synchronous;
85     }
86
87     /**
88      * @param synchronous the synchronous to set
89      */

90     public void setSynchronous(boolean synchronous) {
91         this.synchronous = synchronous;
92     }
93
94     /**
95      * @return Returns the reportErrors.
96      */

97     public boolean isReportErrors() {
98         return reportErrors;
99     }
100
101     /**
102      * @param reportErrors The reportErrors to set.
103      */

104     public void setReportErrors(boolean reportErrors) {
105         this.reportErrors = reportErrors;
106     }
107
108     /**
109      * @return Returns the target.
110      */

111     public ExchangeTarget getTarget() {
112         return target;
113     }
114
115     /**
116      * @param target The target to set.
117      */

118     public void setTarget(ExchangeTarget target) {
119         this.target = target;
120     }
121
122     /**
123      * @return Returns the forwardAttachments.
124      */

125     public boolean isForwardAttachments() {
126         return forwardAttachments;
127     }
128
129     /**
130      * @param forwardAttachments The forwardAttachments to set.
131      */

132     public void setForwardAttachments(boolean forwardAttachments) {
133         this.forwardAttachments = forwardAttachments;
134     }
135
136     /**
137      * @return Returns the forwardProperties.
138      */

139     public boolean isForwardProperties() {
140         return forwardProperties;
141     }
142
143     /**
144      * @param forwardProperties The forwardProperties to set.
145      */

146     public void setForwardProperties(boolean forwardProperties) {
147         this.forwardProperties = forwardProperties;
148     }
149
150     /* (non-Javadoc)
151      * @see org.apache.servicemix.eip.EIPEndpoint#validate()
152      */

153     public void validate() throws DeploymentException {
154         super.validate();
155         // Check target
156
if (target == null) {
157             throw new IllegalArgumentException JavaDoc("target should be set to a valid ExchangeTarget");
158         }
159         // Create correlation property
160
correlation = "Splitter.Correlation." + getContext().getComponentName();
161     }
162     
163     /* (non-Javadoc)
164      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
165      */

166     protected void processSync(MessageExchange exchange) throws Exception JavaDoc {
167         if (exchange instanceof InOnly == false &&
168             exchange instanceof RobustInOnly == false) {
169             fail(exchange, new UnsupportedOperationException JavaDoc("Use an InOnly or RobustInOnly MEP"));
170             return;
171         }
172         MessageExchange[] parts = createParts(exchange);
173         for (int i = 0; i < parts.length; i++) {
174             target.configureTarget(parts[i], getContext());
175             if (reportErrors || isSynchronous()) {
176                 sendSync(parts[i]);
177                 if (parts[i].getStatus() == ExchangeStatus.DONE) {
178                     // nothing to do
179
} else if (parts[i].getStatus() == ExchangeStatus.ERROR) {
180                     if (reportErrors) {
181                         fail(exchange, parts[i].getError());
182                         return;
183                     }
184                 } else if (parts[i].getFault() != null) {
185                     if (reportErrors) {
186                         MessageUtil.transferToFault(MessageUtil.copyFault(parts[i]), exchange);
187                         done(parts[i]);
188                         sendSync(exchange);
189                         return;
190                     } else {
191                         done(parts[i]);
192                     }
193                 } else {
194                     throw new IllegalStateException JavaDoc("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Fault message");
195                 }
196             } else {
197                 send(parts[i]);
198             }
199         }
200         done(exchange);
201     }
202
203     /* (non-Javadoc)
204      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
205      */

206     protected void processAsync(MessageExchange exchange) throws Exception JavaDoc {
207         // If we need to report errors, the behavior is really different,
208
// as we need to keep the incoming exchange in the store until
209
// all acks have been received
210
if (reportErrors) {
211             // TODO: implement this
212
throw new UnsupportedOperationException JavaDoc("Not implemented");
213         // We are in a simple fire-and-forget behaviour.
214
// This implementation is really efficient as we do not use
215
// the store at all.
216
} else {
217             if (exchange.getStatus() == ExchangeStatus.DONE) {
218                 return;
219             } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
220                 return;
221             } else if (exchange instanceof InOnly == false &&
222                        exchange instanceof RobustInOnly == false) {
223                 fail(exchange, new UnsupportedOperationException JavaDoc("Use an InOnly or RobustInOnly MEP"));
224             } else if (exchange.getFault() != null) {
225                 done(exchange);
226             } else {
227                 MessageExchange[] parts = createParts(exchange);
228                 for (int i = 0; i < parts.length; i++) {
229                     target.configureTarget(parts[i], getContext());
230                     send(parts[i]);
231                 }
232                 done(exchange);
233             }
234         }
235     }
236     
237     protected MessageExchange[] createParts(MessageExchange exchange) throws Exception JavaDoc {
238         NormalizedMessage in = MessageUtil.copyIn(exchange);
239         Source JavaDoc[] srcParts = split(in.getContent());
240         MessageExchange[] parts = new MessageExchange[srcParts.length];
241         for (int i = 0; i < srcParts.length; i++) {
242             parts[i] = createPart(exchange.getPattern(), in, srcParts[i]);
243             NormalizedMessage msg = parts[i].getMessage("in");
244             msg.setProperty(SPLITTER_COUNT, new Integer JavaDoc(srcParts.length));
245             msg.setProperty(SPLITTER_INDEX, new Integer JavaDoc(i));
246             msg.setProperty(SPLITTER_CORRID, exchange.getExchangeId());
247         }
248         return parts;
249     }
250     
251     protected MessageExchange createPart(URI JavaDoc pattern,
252                                          NormalizedMessage srcMessage,
253                                          Source JavaDoc content) throws Exception JavaDoc {
254         MessageExchange me = exchangeFactory.createExchange(pattern);
255         NormalizedMessage in = me.createMessage();
256         in.setContent(content);
257         me.setMessage(in, "in");
258         if (forwardAttachments) {
259             Set JavaDoc names = srcMessage.getAttachmentNames();
260             for (Iterator JavaDoc iter = names.iterator(); iter.hasNext();) {
261                 String JavaDoc name = (String JavaDoc) iter.next();
262                 in.addAttachment(name, srcMessage.getAttachment(name));
263             }
264         }
265         if (forwardProperties) {
266             Set JavaDoc names = srcMessage.getPropertyNames();
267             for (Iterator JavaDoc iter = names.iterator(); iter.hasNext();) {
268                 String JavaDoc name = (String JavaDoc) iter.next();
269                 in.setProperty(name, srcMessage.getProperty(name));
270             }
271         }
272         return me;
273     }
274
275     protected abstract Source JavaDoc[] split(Source JavaDoc main) throws Exception JavaDoc;
276
277 }
278
Popular Tags