1 22 package org.objectweb.petals.component.common.listener; 23 24 import java.util.logging.Level ; 25 import java.util.logging.Logger ; 26 27 import javax.jbi.messaging.DeliveryChannel; 28 import javax.jbi.messaging.ExchangeStatus; 29 import javax.jbi.messaging.Fault; 30 import javax.jbi.messaging.MessageExchange; 31 import javax.jbi.messaging.MessagingException; 32 import javax.jbi.messaging.NormalizedMessage; 33 import javax.xml.namespace.QName ; 34 import javax.xml.transform.Source ; 35 36 import org.objectweb.petals.component.common.MEPConstants; 37 import org.objectweb.petals.component.common.util.SourceHelper; 38 39 54 public abstract class AbstractInternalMEProcessor implements 55 IMessageExchangeProcessor { 56 57 60 protected DeliveryChannel channel; 61 62 protected Logger l; 63 64 74 public AbstractInternalMEProcessor(DeliveryChannel channel, Logger l) { 75 this.channel = channel; 76 this.l = l; 77 } 78 79 82 public AbstractInternalMEProcessor() { 83 } 84 85 88 public boolean process(MessageExchange exchange) throws Exception { 89 if (ExchangeStatus.ACTIVE.equals(exchange.getStatus())) { 90 if (MessageExchange.Role.PROVIDER.equals(exchange.getRole())) { 91 if (MEPConstants.IN_ONLY_PATTERN.equals(exchange.getPattern())) { 92 processInOnly(exchange); 93 } else if (MEPConstants.ROBUST_IN_ONLY_PATTERN.equals(exchange 94 .getPattern())) { 95 processRobustInOnly(exchange); 96 } else if (MEPConstants.IN_OUT_PATTERN.equals(exchange 97 .getPattern())) { 98 processInOut(exchange, false); 99 } else if (MEPConstants.IN_OPTIONAL_OUT_PATTERN.equals(exchange 100 .getPattern())) { 101 processInOut(exchange, true); 102 } else { 103 Exception e = new Exception ( 104 "MessageExchangePattern not recognized :" 105 + exchange.getPattern()); 106 exchange.setError(e); 107 channel.send(exchange); 108 throw e; 109 } 110 channel.send(exchange); 111 } 112 } 113 return true; 114 } 115 116 125 protected boolean ackFaultReception(MessageExchange ex) throws Exception { 126 boolean result = false; 127 128 if (ex.getFault() != null) { 129 ex.setStatus(ExchangeStatus.DONE); 130 131 result = true; 132 } 133 return result; 134 } 135 136 142 protected Fault createFault(Exception e, MessageExchange exchange) 143 throws MessagingException { 144 Fault f = exchange.createFault(); 145 String faultString = SourceHelper.createSoapFault(e, null); 146 Source content = SourceHelper.createSource(faultString); 147 f.setContent(content); 148 return f; 149 } 150 151 156 protected NormalizedMessage createNormalizedMessage(MessageExchange exchange) 157 throws MessagingException { 158 return exchange.createMessage(); 159 } 160 161 protected void logError(Throwable e) { 162 if (l != null) 163 l.log(Level.SEVERE, e.getMessage(), e); 164 else 165 e.printStackTrace(); 166 } 167 168 protected void processInOnly(MessageExchange exchange) throws Exception { 169 try { 170 sendInMessage(exchange.getEndpoint().getServiceName(), exchange 171 .getOperation(), exchange.getMessage("in")); 172 } catch (Exception e) { 173 logError(new Exception ( 175 "Error occured while processing, but InOnly message can not contains Fault.", 176 e)); 177 } 178 exchange.setStatus(ExchangeStatus.DONE); 179 } 180 181 protected void processInOut(MessageExchange exchange, boolean optional) 182 throws Exception { 183 try { 184 if (!ackFaultReception(exchange)) { 185 NormalizedMessage out = createNormalizedMessage(exchange); 186 187 boolean outTreated = sendInOutMessage(exchange.getEndpoint() 188 .getServiceName(), exchange.getOperation(), exchange 189 .getMessage("in"), out, optional); 190 191 if (outTreated) { 192 exchange.setMessage(out, "out"); 193 } else { 194 exchange.setStatus(ExchangeStatus.DONE); 195 } 196 } 197 } catch (Exception e) { 198 try { 199 exchange.setFault(createFault(e, exchange)); 200 } catch (Exception e1) { 201 logError(new Exception ( 202 "Error occured while processing, send a Fault.", e1)); 203 } 204 } 205 } 206 207 protected void processRobustInOnly(MessageExchange exchange) 208 throws Exception { 209 try { 210 sendInMessage(exchange.getEndpoint().getServiceName(), exchange 211 .getOperation(), exchange.getMessage("in")); 212 213 exchange.setStatus(ExchangeStatus.DONE); 214 } catch (Exception e) { 215 try { 216 exchange.setFault(createFault(e, exchange)); 217 } catch (Exception e1) { 218 logError(new Exception ( 219 "Error occured while processing, send a Fault.", e1)); 220 } 221 } 222 } 223 224 233 protected abstract void sendInMessage(QName service, QName operation, 234 NormalizedMessage in) throws Exception ; 235 236 252 protected abstract boolean sendInOutMessage(QName service, QName operation, 253 NormalizedMessage in, NormalizedMessage out, boolean optionalOut) 254 throws Exception ; 255 256 public DeliveryChannel getChannel() { 257 return channel; 258 } 259 260 public void setChannel(DeliveryChannel channel) { 261 this.channel = channel; 262 } 263 264 public Logger getLogger() { 265 return l; 266 } 267 268 public void setLogger(Logger l) { 269 this.l = l; 270 } 271 } 272 | Popular Tags |