KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > eip > patterns > SplitAggregator


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.eip.patterns;
18
19 import java.io.Serializable JavaDoc;
20 import java.util.Date JavaDoc;
21
22 import javax.jbi.messaging.MessageExchange;
23 import javax.jbi.messaging.NormalizedMessage;
24 import javax.xml.namespace.QName JavaDoc;
25 import javax.xml.transform.dom.DOMSource JavaDoc;
26
27 import org.apache.servicemix.eip.support.AbstractAggregator;
28 import org.apache.servicemix.eip.support.AbstractSplitter;
29 import org.apache.servicemix.expression.Expression;
30 import org.apache.servicemix.expression.PropertyExpression;
31 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
32 import org.w3c.dom.Document JavaDoc;
33 import org.w3c.dom.Element JavaDoc;
34 import org.w3c.dom.Node JavaDoc;
35
36 /**
37  * Aggregator can be used to wait and combine several messages.
38  * This component implements the
39  * <a HREF="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>
40  * pattern.
41  *
42  * This aggregator collect messages with a count, index and correlationId properties.
43  * These properties are automatically set by splitters.
44  * A timeout may be specified so that the aggregator will not keep data forever if a message is missing.
45  *
46  * @author gnodet
47  * @version $Revision: 376451 $
48  * @org.apache.xbean.XBean element="split-aggregator"
49  */

50 public class SplitAggregator extends AbstractAggregator {
51
52     private Expression count = new PropertyExpression(AbstractSplitter.SPLITTER_COUNT);
53     private Expression index = new PropertyExpression(AbstractSplitter.SPLITTER_INDEX);
54     private Expression corrId = new PropertyExpression(AbstractSplitter.SPLITTER_CORRID);
55     
56     private QName JavaDoc aggregateElementName = new QName JavaDoc("aggregate");
57     private QName JavaDoc messageElementName = new QName JavaDoc("message");
58     private String JavaDoc countAttribute = "count";
59     private String JavaDoc indexAttribute = "index";
60     
61     private long timeout;
62     
63     /**
64      * @return the aggregateElementName
65      */

66     public QName JavaDoc getAggregateElementName() {
67         return aggregateElementName;
68     }
69
70     /**
71      * @param aggregateElementName the aggregateElementName to set
72      */

73     public void setAggregateElementName(QName JavaDoc aggregateElementName) {
74         this.aggregateElementName = aggregateElementName;
75     }
76
77     /**
78      * @return the corrId
79      */

80     public Expression getCorrId() {
81         return corrId;
82     }
83
84     /**
85      * @param corrId the corrId to set
86      */

87     public void setCorrId(Expression corrId) {
88         this.corrId = corrId;
89     }
90
91     /**
92      * @return the count
93      */

94     public Expression getCount() {
95         return count;
96     }
97
98     /**
99      * @param count the count to set
100      */

101     public void setCount(Expression count) {
102         this.count = count;
103     }
104
105     /**
106      * @return the countAttribute
107      */

108     public String JavaDoc getCountAttribute() {
109         return countAttribute;
110     }
111
112     /**
113      * @param countAttribute the countAttribute to set
114      */

115     public void setCountAttribute(String JavaDoc countAttribute) {
116         this.countAttribute = countAttribute;
117     }
118
119     /**
120      * @return the index
121      */

122     public Expression getIndex() {
123         return index;
124     }
125
126     /**
127      * @param index the index to set
128      */

129     public void setIndex(Expression index) {
130         this.index = index;
131     }
132
133     /**
134      * @return the indexAttribute
135      */

136     public String JavaDoc getIndexAttribute() {
137         return indexAttribute;
138     }
139
140     /**
141      * @param indexAttribute the indexAttribute to set
142      */

143     public void setIndexAttribute(String JavaDoc indexAttribute) {
144         this.indexAttribute = indexAttribute;
145     }
146
147     /**
148      * @return the messageElementName
149      */

150     public QName JavaDoc getMessageElementName() {
151         return messageElementName;
152     }
153
154     /**
155      * @param messageElementName the messageElementName to set
156      */

157     public void setMessageElementName(QName JavaDoc messageElementName) {
158         this.messageElementName = messageElementName;
159     }
160
161     /**
162      * @return the timeout
163      */

164     public long getTimeout() {
165         return timeout;
166     }
167
168     /**
169      * @param timeout the timeout to set
170      */

171     public void setTimeout(long timeout) {
172         this.timeout = timeout;
173     }
174
175     /*(non-Javadoc)
176      * @see org.apache.servicemix.eip.support.AggregationFactory#createAggregation(java.lang.String)
177      */

178     public Object JavaDoc createAggregation(String JavaDoc correlationID) {
179         return new SplitterAggregation(correlationID);
180     }
181
182     /*(non-Javadoc)
183      * @see org.apache.servicemix.eip.support.AggregationFactory#getCorrelationID(javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage)
184      */

185     public String JavaDoc getCorrelationID(MessageExchange exchange, NormalizedMessage message) throws Exception JavaDoc {
186         return (String JavaDoc) corrId.evaluate(exchange, message);
187     }
188     
189     /* (non-Javadoc)
190      * @see org.apache.servicemix.eip.support.Aggregation#addMessage(javax.jbi.messaging.NormalizedMessage, javax.jbi.messaging.MessageExchange)
191      */

192     public boolean addMessage(Object JavaDoc aggregation, NormalizedMessage message, MessageExchange exchange) throws Exception JavaDoc {
193         NormalizedMessage[] messages = ((SplitterAggregation) aggregation).messages;
194         // Retrieve count, index
195
Integer JavaDoc count = (Integer JavaDoc) SplitAggregator.this.count.evaluate(exchange, message);
196         if (count == null) {
197             throw new IllegalArgumentException JavaDoc("Property " + AbstractSplitter.SPLITTER_COUNT + " not specified on message");
198         }
199         if (messages == null) {
200             messages = new NormalizedMessage[count.intValue()];
201             ((SplitterAggregation) aggregation).messages = messages;
202         } else if (count.intValue() != messages.length) {
203             throw new IllegalArgumentException JavaDoc("Property " + AbstractSplitter.SPLITTER_COUNT + " is not consistent (received " + count.intValue() + ", was " + messages.length + ")");
204         }
205         Integer JavaDoc index = (Integer JavaDoc) SplitAggregator.this.index.evaluate(exchange, message);
206         if (index == null) {
207             throw new IllegalArgumentException JavaDoc("Property " + AbstractSplitter.SPLITTER_INDEX + " not specified on message");
208         }
209         if (index.intValue() < 0 || index.intValue() >= messages.length) {
210             throw new IllegalArgumentException JavaDoc("Index is ouf of bound: " + index + " [0.." + messages.length + "]");
211         }
212         if (messages[index.intValue()] != null) {
213             throw new IllegalStateException JavaDoc("Message with index " + index.intValue() + " has already been received");
214         }
215         // Store message
216
messages[index.intValue()] = message;
217         // Check if all messages have been received
218
for (int i = 0; i < messages.length; i++) {
219             if (messages[i] == null) {
220                 return false;
221             }
222         }
223         return true;
224     }
225
226     /* (non-Javadoc)
227      * @see org.apache.servicemix.eip.support.Aggregation#buildAggregate(javax.jbi.messaging.NormalizedMessage, javax.jbi.messaging.MessageExchange, boolean)
228      */

229     public void buildAggregate(Object JavaDoc aggregation, NormalizedMessage message, MessageExchange exchange, boolean timeout) throws Exception JavaDoc {
230         NormalizedMessage[] messages = ((SplitterAggregation) aggregation).messages;
231         String JavaDoc correlationId = ((SplitterAggregation) aggregation).correlationId;
232         SourceTransformer st = new SourceTransformer();
233         Document JavaDoc doc = st.createDocument();
234         Element JavaDoc root = createChildElement(aggregateElementName, doc);
235         root.setAttribute(countAttribute, Integer.toString(messages.length));
236         for (int i = 0; i < messages.length; i++) {
237             if (messages[i] != null) {
238                 Element JavaDoc msg = createChildElement(messageElementName, root);
239                 msg.setAttribute(indexAttribute, Integer.toString(i));
240                 Element JavaDoc elem = st.toDOMElement(messages[i]);
241                 msg.appendChild(doc.importNode(elem, true));
242             }
243         }
244         message.setContent(new DOMSource JavaDoc(doc));
245         message.setProperty(AbstractSplitter.SPLITTER_CORRID, correlationId);
246     }
247     
248     private Element JavaDoc createChildElement(QName JavaDoc name, Node JavaDoc parent) {
249         Document JavaDoc doc = parent instanceof Document JavaDoc ? (Document JavaDoc) parent : parent.getOwnerDocument();
250         Element JavaDoc elem;
251         if ("".equals(name.getNamespaceURI())) {
252             elem = doc.createElement(name.getLocalPart());
253         } else {
254             elem = doc.createElementNS(name.getNamespaceURI(),
255                                        name.getPrefix() + ":" + name.getLocalPart());
256         }
257         parent.appendChild(elem);
258         return elem;
259     }
260
261     /* (non-Javadoc)
262      * @see org.apache.servicemix.eip.support.Aggregation#getTimeout()
263      */

264     public Date JavaDoc getTimeout(Object JavaDoc aggregation) {
265         if (timeout > 0) {
266             return new Date JavaDoc(System.currentTimeMillis() + timeout);
267         }
268         return null;
269     }
270     
271     /**
272      *
273      * @author gnodet
274      */

275     protected static class SplitterAggregation implements Serializable JavaDoc {
276
277         /**
278          * Serial version UID
279          */

280         private static final long serialVersionUID = 8555934895155403923L;
281         
282         protected NormalizedMessage[] messages;
283         protected String JavaDoc correlationId;
284       
285         public SplitterAggregation(String JavaDoc correlationId) {
286             this.correlationId = correlationId;
287         }
288         
289         /**
290          * @return the correlationId
291          */

292         public String JavaDoc getCorrelationId() {
293             return correlationId;
294         }
295
296         /**
297          * @param correlationId the correlationId to set
298          */

299         public void setCorrelationId(String JavaDoc correlationId) {
300             this.correlationId = correlationId;
301         }
302
303         /**
304          * @return the messages
305          */

306         public NormalizedMessage[] getMessages() {
307             return messages;
308         }
309
310         /**
311          * @param messages the messages to set
312          */

313         public void setMessages(NormalizedMessage[] messages) {
314             this.messages = messages;
315         }
316
317     }
318     
319 }
320
Popular Tags