1 17 package org.apache.servicemix.http.processors; 18 19 import java.io.ByteArrayOutputStream ; 20 import java.io.IOException ; 21 import java.io.OutputStream ; 22 import java.util.Enumeration ; 23 import java.util.HashMap ; 24 import java.util.Iterator ; 25 import java.util.Map ; 26 27 import javax.jbi.component.ComponentLifeCycle; 28 import javax.jbi.messaging.DeliveryChannel; 29 import javax.jbi.messaging.ExchangeStatus; 30 import javax.jbi.messaging.Fault; 31 import javax.jbi.messaging.InOnly; 32 import javax.jbi.messaging.InOptionalOut; 33 import javax.jbi.messaging.InOut; 34 import javax.jbi.messaging.MessageExchange; 35 import javax.jbi.messaging.NormalizedMessage; 36 import javax.servlet.http.HttpServletRequest ; 37 38 import org.apache.commons.httpclient.Header; 39 import org.apache.commons.httpclient.HostConfiguration; 40 import org.apache.commons.httpclient.HttpClient; 41 import org.apache.commons.httpclient.HttpHost; 42 import org.apache.commons.httpclient.HttpMethod; 43 import org.apache.commons.httpclient.HttpStatus; 44 import org.apache.commons.httpclient.URI; 45 import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; 46 import org.apache.commons.httpclient.methods.PostMethod; 47 import org.apache.commons.httpclient.methods.RequestEntity; 48 import org.apache.commons.httpclient.protocol.Protocol; 49 import org.apache.commons.httpclient.protocol.ProtocolSocketFactory; 50 import org.apache.servicemix.JbiConstants; 51 import org.apache.servicemix.common.ExchangeProcessor; 52 import org.apache.servicemix.http.HttpConfiguration; 53 import org.apache.servicemix.http.HttpEndpoint; 54 import org.apache.servicemix.http.HttpLifeCycle; 55 import org.apache.servicemix.soap.Context; 56 import org.apache.servicemix.soap.SoapHelper; 57 import org.apache.servicemix.soap.marshalers.SoapMessage; 58 import org.apache.servicemix.soap.marshalers.SoapReader; 59 import org.apache.servicemix.soap.marshalers.SoapWriter; 60 61 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 62 63 69 public class ProviderProcessor implements ExchangeProcessor { 70 71 protected HttpEndpoint endpoint; 72 protected HostConfiguration host; 73 protected SoapHelper soapHelper; 74 protected DeliveryChannel channel; 75 private String relUri; 76 private Map methods; 77 78 public ProviderProcessor(HttpEndpoint endpoint) { 79 this.endpoint = endpoint; 80 this.soapHelper = new SoapHelper(endpoint); 81 java.net.URI uri = java.net.URI.create(endpoint.getLocationURI()); 82 relUri = uri.getPath(); 83 if (!relUri.startsWith("/")) { 84 relUri = "/" + relUri; 85 } 86 if (uri.getQuery() != null) { 87 relUri += "?" + uri.getQuery(); 88 } 89 if (uri.getFragment() != null) { 90 relUri += "#" + uri.getFragment(); 91 } 92 this.methods = new ConcurrentHashMap(); 93 } 94 95 public void process(MessageExchange exchange) throws Exception { 96 if (exchange.getStatus() == ExchangeStatus.DONE || 97 exchange.getStatus() == ExchangeStatus.ERROR) { 98 PostMethod method = (PostMethod) methods.remove(exchange.getExchangeId()); 99 if (method != null) { 100 method.releaseConnection(); 101 } 102 return; 103 } 104 boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC)); 105 NormalizedMessage nm = exchange.getMessage("in"); 106 if (nm == null) { 107 throw new IllegalStateException ("Exchange has no input message"); 108 } 109 PostMethod method = new PostMethod(relUri); 110 SoapMessage soapMessage = new SoapMessage(); 111 soapHelper.getJBIMarshaler().fromNMS(soapMessage, nm); 112 Context context = soapHelper.createContext(soapMessage); 113 soapHelper.onSend(context); 114 SoapWriter writer = soapHelper.getSoapMarshaler().createWriter(soapMessage); 115 Map headers = (Map ) nm.getProperty(JbiConstants.PROTOCOL_HEADERS); 116 if (headers != null) { 117 for (Iterator it = headers.keySet().iterator(); it.hasNext();) { 118 String name = (String ) it.next(); 119 String value = (String ) headers.get(name); 120 method.addRequestHeader(name, value); 121 } 122 } 123 RequestEntity entity = writeMessage(writer); 124 method.removeRequestHeader(Constants.HEADER_CONTENT_TYPE); 126 method.addRequestHeader(Constants.HEADER_CONTENT_TYPE, entity.getContentType()); 127 if (entity.getContentLength() < 0) { 128 method.removeRequestHeader(Constants.HEADER_CONTENT_LENGTH); 129 } else { 130 method.setRequestHeader(Constants.HEADER_CONTENT_LENGTH, Long.toString(entity.getContentLength())); 131 } 132 if (endpoint.isSoap() && method.getRequestHeader(Constants.HEADER_SOAP_ACTION) == null) { 133 if (endpoint.getSoapAction() != null) { 134 method.setRequestHeader(Constants.HEADER_SOAP_ACTION, endpoint.getSoapAction()); 135 } else { 136 method.setRequestHeader(Constants.HEADER_SOAP_ACTION, "\"\""); 137 } 138 } 139 method.setRequestEntity(entity); 140 boolean close = true; 141 try { 142 if (endpoint.getBasicAuthentication() != null) { 151 endpoint.getBasicAuthentication().applyCredentials( getClient() ); 152 } 153 int response = getClient().executeMethod(host, method); 154 if (response != HttpStatus.SC_OK && response != HttpStatus.SC_ACCEPTED) { 155 if (exchange instanceof InOnly == false) { 156 SoapReader reader = soapHelper.getSoapMarshaler().createReader(); 157 Header contentType = method.getResponseHeader(Constants.HEADER_CONTENT_TYPE); 158 soapMessage = reader.read(method.getResponseBodyAsStream(), 159 contentType != null ? contentType.getValue() : null); 160 context.setFaultMessage(soapMessage); 161 soapHelper.onAnswer(context); 162 Fault fault = exchange.createFault(); 163 fault.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method)); 164 soapHelper.getJBIMarshaler().toNMS(fault, soapMessage); 165 exchange.setFault(fault); 166 if (txSync) { 167 channel.sendSync(exchange); 168 } else { 169 methods.put(exchange.getExchangeId(), method); 170 channel.send(exchange); 171 close = false; 172 } 173 return; 174 } else { 175 throw new Exception ("Invalid status response: " + response); 176 } 177 } 178 if (exchange instanceof InOut) { 179 NormalizedMessage msg = exchange.createMessage(); 180 SoapReader reader = soapHelper.getSoapMarshaler().createReader(); 181 Header contentType = method.getResponseHeader(Constants.HEADER_CONTENT_TYPE); 182 soapMessage = reader.read(method.getResponseBodyAsStream(), 183 contentType != null ? contentType.getValue() : null); 184 context.setOutMessage(soapMessage); 185 soapHelper.onAnswer(context); 186 msg.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method)); 187 soapHelper.getJBIMarshaler().toNMS(msg, soapMessage); 188 ((InOut) exchange).setOutMessage(msg); 189 if (txSync) { 190 channel.sendSync(exchange); 191 } else { 192 methods.put(exchange.getExchangeId(), method); 193 channel.send(exchange); 194 close = false; 195 } 196 } else if (exchange instanceof InOptionalOut) { 197 if (method.getResponseContentLength() == 0) { 198 exchange.setStatus(ExchangeStatus.DONE); 199 channel.send(exchange); 200 } else { 201 NormalizedMessage msg = exchange.createMessage(); 202 SoapReader reader = soapHelper.getSoapMarshaler().createReader(); 203 soapMessage = reader.read(method.getResponseBodyAsStream(), 204 method.getResponseHeader(Constants.HEADER_CONTENT_TYPE).getValue()); 205 context.setOutMessage(soapMessage); 206 soapHelper.onAnswer(context); 207 msg.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method)); 208 soapHelper.getJBIMarshaler().toNMS(msg, soapMessage); 209 ((InOptionalOut) exchange).setOutMessage(msg); 210 if (txSync) { 211 channel.sendSync(exchange); 212 } else { 213 methods.put(exchange.getExchangeId(), method); 214 channel.send(exchange); 215 close = false; 216 } 217 } 218 } else { 219 exchange.setStatus(ExchangeStatus.DONE); 220 channel.send(exchange); 221 } 222 } finally { 223 if (close) { 224 method.releaseConnection(); 225 } 226 } 227 } 228 229 public void start() throws Exception { 230 URI uri = new URI(endpoint.getLocationURI(), false); 231 if (uri.getScheme().equals("https")) { 232 ProtocolSocketFactory sf = new CommonsHttpSSLSocketFactory( 233 endpoint.getSsl(), 234 endpoint.getKeystoreManager()); 235 Protocol protocol = new Protocol("https", sf, 443); 236 HttpHost host = new HttpHost(uri.getHost(), uri.getPort(), protocol); 237 this.host = new HostConfiguration(); 238 this.host.setHost(host); 239 } else { 240 this.host = new HostConfiguration(); 241 this.host.setHost(uri.getHost(), uri.getPort()); 242 } 243 channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel(); 244 } 245 246 protected HttpConfiguration getConfiguration(HttpEndpoint endpoint) { 247 ComponentLifeCycle lf = endpoint.getServiceUnit().getComponent().getLifeCycle(); 248 return ((HttpLifeCycle) lf).getConfiguration(); 249 } 250 251 public void stop() throws Exception { 252 } 253 254 protected Map getHeaders(HttpServletRequest request) { 255 Map headers = new HashMap (); 256 Enumeration enumeration = request.getHeaderNames(); 257 while (enumeration.hasMoreElements()) { 258 String name = (String ) enumeration.nextElement(); 259 String value = request.getHeader(name); 260 headers.put(name, value); 261 } 262 return headers; 263 } 264 265 protected Map getHeaders(HttpMethod method) { 266 Map headers = new HashMap (); 267 Header[] h = method.getResponseHeaders(); 268 for (int i = 0; i < h.length; i++) { 269 headers.put(h[i].getName(), h[i].getValue()); 270 } 271 return headers; 272 } 273 274 protected RequestEntity writeMessage(SoapWriter writer) throws Exception { 275 HttpLifeCycle lf = (HttpLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle(); 276 if (lf.getConfiguration().isStreamingEnabled()) { 277 return new StreamingRequestEntity(writer); 278 } else { 279 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 280 writer.write(baos); 281 return new ByteArrayRequestEntity(baos.toByteArray(), writer.getContentType()); 282 } 283 } 284 285 protected HttpClient getClient() { 286 HttpLifeCycle lf = (HttpLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle(); 287 return lf.getClient(); 288 } 289 290 public static class StreamingRequestEntity implements RequestEntity { 291 292 private SoapWriter writer; 293 294 public StreamingRequestEntity(SoapWriter writer) { 295 this.writer = writer; 296 } 297 298 public boolean isRepeatable() { 299 return false; 300 } 301 302 public void writeRequest(OutputStream out) throws IOException { 303 try { 304 writer.write(out); 305 out.flush(); 306 } catch (Exception e) { 307 throw (IOException ) new IOException ("Could not write request").initCause(e); 308 } 309 } 310 311 public long getContentLength() { 312 return -1; 314 } 315 316 public String getContentType() { 317 return writer.getContentType(); 318 } 319 320 } 321 } 322 | Popular Tags |