1 17 package org.apache.servicemix.eip.support; 18 19 import java.net.URI ; 20 import java.util.Iterator ; 21 import java.util.Set ; 22 23 import javax.jbi.management.DeploymentException; 24 import javax.jbi.messaging.ExchangeStatus; 25 import javax.jbi.messaging.InOnly; 26 import javax.jbi.messaging.MessageExchange; 27 import javax.jbi.messaging.NormalizedMessage; 28 import javax.jbi.messaging.RobustInOnly; 29 import javax.xml.transform.Source ; 30 31 import org.apache.servicemix.eip.EIPEndpoint; 32 import org.apache.servicemix.jbi.util.MessageUtil; 33 34 43 public abstract class AbstractSplitter extends EIPEndpoint { 44 45 public static final String SPLITTER_COUNT = "org.apache.servicemix.eip.splitter.count"; 46 public static final String SPLITTER_INDEX = "org.apache.servicemix.eip.splitter.index"; 47 public static final String SPLITTER_CORRID = "org.apache.servicemix.eip.splitter.corrid"; 48 49 52 private ExchangeTarget target; 53 62 private boolean reportErrors; 63 66 private boolean forwardAttachments; 67 70 private boolean forwardProperties; 71 74 private String correlation; 75 78 private boolean synchronous; 79 80 83 public boolean isSynchronous() { 84 return synchronous; 85 } 86 87 90 public void setSynchronous(boolean synchronous) { 91 this.synchronous = synchronous; 92 } 93 94 97 public boolean isReportErrors() { 98 return reportErrors; 99 } 100 101 104 public void setReportErrors(boolean reportErrors) { 105 this.reportErrors = reportErrors; 106 } 107 108 111 public ExchangeTarget getTarget() { 112 return target; 113 } 114 115 118 public void setTarget(ExchangeTarget target) { 119 this.target = target; 120 } 121 122 125 public boolean isForwardAttachments() { 126 return forwardAttachments; 127 } 128 129 132 public void setForwardAttachments(boolean forwardAttachments) { 133 this.forwardAttachments = forwardAttachments; 134 } 135 136 139 public boolean isForwardProperties() { 140 return forwardProperties; 141 } 142 143 146 public void setForwardProperties(boolean forwardProperties) { 147 this.forwardProperties = forwardProperties; 148 } 149 150 153 public void validate() throws DeploymentException { 154 super.validate(); 155 if (target == null) { 157 throw new IllegalArgumentException ("target should be set to a valid ExchangeTarget"); 158 } 159 correlation = "Splitter.Correlation." + getContext().getComponentName(); 161 } 162 163 166 protected void processSync(MessageExchange exchange) throws Exception { 167 if (exchange instanceof InOnly == false && 168 exchange instanceof RobustInOnly == false) { 169 fail(exchange, new UnsupportedOperationException ("Use an InOnly or RobustInOnly MEP")); 170 return; 171 } 172 MessageExchange[] parts = createParts(exchange); 173 for (int i = 0; i < parts.length; i++) { 174 target.configureTarget(parts[i], getContext()); 175 if (reportErrors || isSynchronous()) { 176 sendSync(parts[i]); 177 if (parts[i].getStatus() == ExchangeStatus.DONE) { 178 } else if (parts[i].getStatus() == ExchangeStatus.ERROR) { 180 if (reportErrors) { 181 fail(exchange, parts[i].getError()); 182 return; 183 } 184 } else if (parts[i].getFault() != null) { 185 if (reportErrors) { 186 MessageUtil.transferToFault(MessageUtil.copyFault(parts[i]), exchange); 187 done(parts[i]); 188 sendSync(exchange); 189 return; 190 } else { 191 done(parts[i]); 192 } 193 } else { 194 throw new IllegalStateException ("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Fault message"); 195 } 196 } else { 197 send(parts[i]); 198 } 199 } 200 done(exchange); 201 } 202 203 206 protected void processAsync(MessageExchange exchange) throws Exception { 207 if (reportErrors) { 211 throw new UnsupportedOperationException ("Not implemented"); 213 } else { 217 if (exchange.getStatus() == ExchangeStatus.DONE) { 218 return; 219 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 220 return; 221 } else if (exchange instanceof InOnly == false && 222 exchange instanceof RobustInOnly == false) { 223 fail(exchange, new UnsupportedOperationException ("Use an InOnly or RobustInOnly MEP")); 224 } else if (exchange.getFault() != null) { 225 done(exchange); 226 } else { 227 MessageExchange[] parts = createParts(exchange); 228 for (int i = 0; i < parts.length; i++) { 229 target.configureTarget(parts[i], getContext()); 230 send(parts[i]); 231 } 232 done(exchange); 233 } 234 } 235 } 236 237 protected MessageExchange[] createParts(MessageExchange exchange) throws Exception { 238 NormalizedMessage in = MessageUtil.copyIn(exchange); 239 Source [] srcParts = split(in.getContent()); 240 MessageExchange[] parts = new MessageExchange[srcParts.length]; 241 for (int i = 0; i < srcParts.length; i++) { 242 parts[i] = createPart(exchange.getPattern(), in, srcParts[i]); 243 NormalizedMessage msg = parts[i].getMessage("in"); 244 msg.setProperty(SPLITTER_COUNT, new Integer (srcParts.length)); 245 msg.setProperty(SPLITTER_INDEX, new Integer (i)); 246 msg.setProperty(SPLITTER_CORRID, exchange.getExchangeId()); 247 } 248 return parts; 249 } 250 251 protected MessageExchange createPart(URI pattern, 252 NormalizedMessage srcMessage, 253 Source content) throws Exception { 254 MessageExchange me = exchangeFactory.createExchange(pattern); 255 NormalizedMessage in = me.createMessage(); 256 in.setContent(content); 257 me.setMessage(in, "in"); 258 if (forwardAttachments) { 259 Set names = srcMessage.getAttachmentNames(); 260 for (Iterator iter = names.iterator(); iter.hasNext();) { 261 String name = (String ) iter.next(); 262 in.addAttachment(name, srcMessage.getAttachment(name)); 263 } 264 } 265 if (forwardProperties) { 266 Set names = srcMessage.getPropertyNames(); 267 for (Iterator iter = names.iterator(); iter.hasNext();) { 268 String name = (String ) iter.next(); 269 in.setProperty(name, srcMessage.getProperty(name)); 270 } 271 } 272 return me; 273 } 274 275 protected abstract Source [] split(Source main) throws Exception ; 276 277 } 278 | Popular Tags |