1 17 package org.apache.servicemix.eip.patterns; 18 19 import java.io.Serializable ; 20 import java.util.Date ; 21 22 import javax.jbi.messaging.MessageExchange; 23 import javax.jbi.messaging.NormalizedMessage; 24 import javax.xml.namespace.QName ; 25 import javax.xml.transform.dom.DOMSource ; 26 27 import org.apache.servicemix.eip.support.AbstractAggregator; 28 import org.apache.servicemix.eip.support.AbstractSplitter; 29 import org.apache.servicemix.expression.Expression; 30 import org.apache.servicemix.expression.PropertyExpression; 31 import org.apache.servicemix.jbi.jaxp.SourceTransformer; 32 import org.w3c.dom.Document ; 33 import org.w3c.dom.Element ; 34 import org.w3c.dom.Node ; 35 36 50 public class SplitAggregator extends AbstractAggregator { 51 52 private Expression count = new PropertyExpression(AbstractSplitter.SPLITTER_COUNT); 53 private Expression index = new PropertyExpression(AbstractSplitter.SPLITTER_INDEX); 54 private Expression corrId = new PropertyExpression(AbstractSplitter.SPLITTER_CORRID); 55 56 private QName aggregateElementName = new QName ("aggregate"); 57 private QName messageElementName = new QName ("message"); 58 private String countAttribute = "count"; 59 private String indexAttribute = "index"; 60 61 private long timeout; 62 63 66 public QName getAggregateElementName() { 67 return aggregateElementName; 68 } 69 70 73 public void setAggregateElementName(QName aggregateElementName) { 74 this.aggregateElementName = aggregateElementName; 75 } 76 77 80 public Expression getCorrId() { 81 return corrId; 82 } 83 84 87 public void setCorrId(Expression corrId) { 88 this.corrId = corrId; 89 } 90 91 94 public Expression getCount() { 95 return count; 96 } 97 98 101 public void setCount(Expression count) { 102 this.count = count; 103 } 104 105 108 public String getCountAttribute() { 109 return countAttribute; 110 } 111 112 115 public void setCountAttribute(String countAttribute) { 116 this.countAttribute = countAttribute; 117 } 118 119 122 public Expression getIndex() { 123 return index; 124 } 125 126 129 public void setIndex(Expression index) { 130 this.index = index; 131 } 132 133 136 public String getIndexAttribute() { 137 return indexAttribute; 138 } 139 140 143 public void setIndexAttribute(String indexAttribute) { 144 this.indexAttribute = indexAttribute; 145 } 146 147 150 public QName getMessageElementName() { 151 return messageElementName; 152 } 153 154 157 public void setMessageElementName(QName messageElementName) { 158 this.messageElementName = messageElementName; 159 } 160 161 164 public long getTimeout() { 165 return timeout; 166 } 167 168 171 public void setTimeout(long timeout) { 172 this.timeout = timeout; 173 } 174 175 178 public Object createAggregation(String correlationID) { 179 return new SplitterAggregation(correlationID); 180 } 181 182 185 public String getCorrelationID(MessageExchange exchange, NormalizedMessage message) throws Exception { 186 return (String ) corrId.evaluate(exchange, message); 187 } 188 189 192 public boolean addMessage(Object aggregation, NormalizedMessage message, MessageExchange exchange) throws Exception { 193 NormalizedMessage[] messages = ((SplitterAggregation) aggregation).messages; 194 Integer count = (Integer ) SplitAggregator.this.count.evaluate(exchange, message); 196 if (count == null) { 197 throw new IllegalArgumentException ("Property " + AbstractSplitter.SPLITTER_COUNT + " not specified on message"); 198 } 199 if (messages == null) { 200 messages = new NormalizedMessage[count.intValue()]; 201 ((SplitterAggregation) aggregation).messages = messages; 202 } else if (count.intValue() != messages.length) { 203 throw new IllegalArgumentException ("Property " + AbstractSplitter.SPLITTER_COUNT + " is not consistent (received " + count.intValue() + ", was " + messages.length + ")"); 204 } 205 Integer index = (Integer ) SplitAggregator.this.index.evaluate(exchange, message); 206 if (index == null) { 207 throw new IllegalArgumentException ("Property " + AbstractSplitter.SPLITTER_INDEX + " not specified on message"); 208 } 209 if (index.intValue() < 0 || index.intValue() >= messages.length) { 210 throw new IllegalArgumentException ("Index is ouf of bound: " + index + " [0.." + messages.length + "]"); 211 } 212 if (messages[index.intValue()] != null) { 213 throw new IllegalStateException ("Message with index " + index.intValue() + " has already been received"); 214 } 215 messages[index.intValue()] = message; 217 for (int i = 0; i < messages.length; i++) { 219 if (messages[i] == null) { 220 return false; 221 } 222 } 223 return true; 224 } 225 226 229 public void buildAggregate(Object aggregation, NormalizedMessage message, MessageExchange exchange, boolean timeout) throws Exception { 230 NormalizedMessage[] messages = ((SplitterAggregation) aggregation).messages; 231 String correlationId = ((SplitterAggregation) aggregation).correlationId; 232 SourceTransformer st = new SourceTransformer(); 233 Document doc = st.createDocument(); 234 Element root = createChildElement(aggregateElementName, doc); 235 root.setAttribute(countAttribute, Integer.toString(messages.length)); 236 for (int i = 0; i < messages.length; i++) { 237 if (messages[i] != null) { 238 Element msg = createChildElement(messageElementName, root); 239 msg.setAttribute(indexAttribute, Integer.toString(i)); 240 Element elem = st.toDOMElement(messages[i]); 241 msg.appendChild(doc.importNode(elem, true)); 242 } 243 } 244 message.setContent(new DOMSource (doc)); 245 message.setProperty(AbstractSplitter.SPLITTER_CORRID, correlationId); 246 } 247 248 private Element createChildElement(QName name, Node parent) { 249 Document doc = parent instanceof Document ? (Document ) parent : parent.getOwnerDocument(); 250 Element elem; 251 if ("".equals(name.getNamespaceURI())) { 252 elem = doc.createElement(name.getLocalPart()); 253 } else { 254 elem = doc.createElementNS(name.getNamespaceURI(), 255 name.getPrefix() + ":" + name.getLocalPart()); 256 } 257 parent.appendChild(elem); 258 return elem; 259 } 260 261 264 public Date getTimeout(Object aggregation) { 265 if (timeout > 0) { 266 return new Date (System.currentTimeMillis() + timeout); 267 } 268 return null; 269 } 270 271 275 protected static class SplitterAggregation implements Serializable { 276 277 280 private static final long serialVersionUID = 8555934895155403923L; 281 282 protected NormalizedMessage[] messages; 283 protected String correlationId; 284 285 public SplitterAggregation(String correlationId) { 286 this.correlationId = correlationId; 287 } 288 289 292 public String getCorrelationId() { 293 return correlationId; 294 } 295 296 299 public void setCorrelationId(String correlationId) { 300 this.correlationId = correlationId; 301 } 302 303 306 public NormalizedMessage[] getMessages() { 307 return messages; 308 } 309 310 313 public void setMessages(NormalizedMessage[] messages) { 314 this.messages = messages; 315 } 316 317 } 318 319 } 320 | Popular Tags |