1 17 package org.apache.servicemix.eip.patterns; 18 19 import javax.jbi.management.DeploymentException; 20 import javax.jbi.messaging.ExchangeStatus; 21 import javax.jbi.messaging.Fault; 22 import javax.jbi.messaging.InOnly; 23 import javax.jbi.messaging.MessageExchange; 24 import javax.jbi.messaging.NormalizedMessage; 25 import javax.jbi.messaging.RobustInOnly; 26 27 import org.apache.servicemix.eip.EIPEndpoint; 28 import org.apache.servicemix.eip.support.ExchangeTarget; 29 import org.apache.servicemix.eip.support.Predicate; 30 import org.apache.servicemix.jbi.util.MessageUtil; 31 32 43 public class MessageFilter extends EIPEndpoint { 44 45 48 private ExchangeTarget target; 49 52 private Predicate filter; 53 56 private String correlation; 57 66 private boolean reportErrors; 67 68 71 public ExchangeTarget getTarget() { 72 return target; 73 } 74 75 78 public void setTarget(ExchangeTarget target) { 79 this.target = target; 80 } 81 82 85 public Predicate getFilter() { 86 return filter; 87 } 88 89 92 public void setFilter(Predicate filter) { 93 this.filter = filter; 94 } 95 96 99 public boolean isReportErrors() { 100 return reportErrors; 101 } 102 103 106 public void setReportErrors(boolean reportErrors) { 107 this.reportErrors = reportErrors; 108 } 109 110 113 public void validate() throws DeploymentException { 114 super.validate(); 115 if (target == null) { 117 throw new IllegalArgumentException ("target should be set to a valid ExchangeTarget"); 118 } 119 if (filter == null) { 121 throw new IllegalArgumentException ("filter property should be set"); 122 } 123 correlation = "MessageFilter.Correlation." + getService() + "." + getEndpoint(); 125 } 126 127 130 protected void processSync(MessageExchange exchange) throws Exception { 131 if (exchange instanceof InOnly == false && 132 exchange instanceof RobustInOnly == false) { 133 fail(exchange, new UnsupportedOperationException ("Use an InOnly or RobustInOnly MEP")); 134 } else { 135 NormalizedMessage in = MessageUtil.copyIn(exchange); 136 MessageExchange me = exchangeFactory.createExchange(exchange.getPattern()); 137 target.configureTarget(me, getContext()); 138 MessageUtil.transferToIn(in, me); 139 if (filter.matches(me)) { 140 sendSync(me); 141 if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) { 142 fail(exchange, me.getError()); 143 } else if (me.getStatus() == ExchangeStatus.DONE) { 144 done(exchange); 145 } else if (me.getFault() != null && reportErrors) { 146 Fault fault = MessageUtil.copyFault(me); 147 done(me); 148 MessageUtil.transferToFault(fault, exchange); 149 sendSync(exchange); 150 } 151 } else { 152 done(exchange); 153 } 154 } 155 } 156 157 160 protected void processAsync(MessageExchange exchange) throws Exception { 161 if (reportErrors) { 165 throw new UnsupportedOperationException ("Not implemented"); 167 } else { 171 if (exchange.getStatus() == ExchangeStatus.DONE) { 172 return; 173 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 174 return; 175 } else if (exchange instanceof InOnly == false && 176 exchange instanceof RobustInOnly == false) { 177 fail(exchange, new UnsupportedOperationException ("Use an InOnly or RobustInOnly MEP")); 178 } else if (exchange.getFault() != null) { 179 done(exchange); 180 } else { 181 NormalizedMessage in = MessageUtil.copyIn(exchange); 182 MessageExchange me = exchangeFactory.createExchange(exchange.getPattern()); 183 target.configureTarget(me, getContext()); 184 MessageUtil.transferToIn(in, me); 185 if (filter.matches(me)) { 186 send(me); 187 } 188 done(exchange); 189 } 190 } 191 } 192 193 } 194 | Popular Tags |