KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > http > processors > ProviderProcessor


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.http.processors;
18
19 import java.io.ByteArrayOutputStream JavaDoc;
20 import java.io.IOException JavaDoc;
21 import java.io.OutputStream JavaDoc;
22 import java.util.Enumeration JavaDoc;
23 import java.util.HashMap JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.Map JavaDoc;
26
27 import javax.jbi.component.ComponentLifeCycle;
28 import javax.jbi.messaging.DeliveryChannel;
29 import javax.jbi.messaging.ExchangeStatus;
30 import javax.jbi.messaging.Fault;
31 import javax.jbi.messaging.InOnly;
32 import javax.jbi.messaging.InOptionalOut;
33 import javax.jbi.messaging.InOut;
34 import javax.jbi.messaging.MessageExchange;
35 import javax.jbi.messaging.NormalizedMessage;
36 import javax.servlet.http.HttpServletRequest JavaDoc;
37
38 import org.apache.commons.httpclient.Header;
39 import org.apache.commons.httpclient.HostConfiguration;
40 import org.apache.commons.httpclient.HttpClient;
41 import org.apache.commons.httpclient.HttpHost;
42 import org.apache.commons.httpclient.HttpMethod;
43 import org.apache.commons.httpclient.HttpStatus;
44 import org.apache.commons.httpclient.URI;
45 import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
46 import org.apache.commons.httpclient.methods.PostMethod;
47 import org.apache.commons.httpclient.methods.RequestEntity;
48 import org.apache.commons.httpclient.protocol.Protocol;
49 import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
50 import org.apache.servicemix.JbiConstants;
51 import org.apache.servicemix.common.ExchangeProcessor;
52 import org.apache.servicemix.http.HttpConfiguration;
53 import org.apache.servicemix.http.HttpEndpoint;
54 import org.apache.servicemix.http.HttpLifeCycle;
55 import org.apache.servicemix.soap.Context;
56 import org.apache.servicemix.soap.SoapHelper;
57 import org.apache.servicemix.soap.marshalers.SoapMessage;
58 import org.apache.servicemix.soap.marshalers.SoapReader;
59 import org.apache.servicemix.soap.marshalers.SoapWriter;
60
61 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
62
63 /**
64  *
65  * @author Guillaume Nodet
66  * @version $Revision: 370186 $
67  * @since 3.0
68  */

69 public class ProviderProcessor implements ExchangeProcessor {
70
71     protected HttpEndpoint endpoint;
72     protected HostConfiguration host;
73     protected SoapHelper soapHelper;
74     protected DeliveryChannel channel;
75     private String JavaDoc relUri;
76     private Map JavaDoc methods;
77     
78     public ProviderProcessor(HttpEndpoint endpoint) {
79         this.endpoint = endpoint;
80         this.soapHelper = new SoapHelper(endpoint);
81         java.net.URI JavaDoc uri = java.net.URI.create(endpoint.getLocationURI());
82         relUri = uri.getPath();
83         if (!relUri.startsWith("/")) {
84             relUri = "/" + relUri;
85         }
86         if (uri.getQuery() != null) {
87             relUri += "?" + uri.getQuery();
88         }
89         if (uri.getFragment() != null) {
90             relUri += "#" + uri.getFragment();
91         }
92         this.methods = new ConcurrentHashMap();
93     }
94
95     public void process(MessageExchange exchange) throws Exception JavaDoc {
96         if (exchange.getStatus() == ExchangeStatus.DONE ||
97             exchange.getStatus() == ExchangeStatus.ERROR) {
98             PostMethod method = (PostMethod) methods.remove(exchange.getExchangeId());
99             if (method != null) {
100                 method.releaseConnection();
101             }
102             return;
103         }
104         boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
105         NormalizedMessage nm = exchange.getMessage("in");
106         if (nm == null) {
107             throw new IllegalStateException JavaDoc("Exchange has no input message");
108         }
109         PostMethod method = new PostMethod(relUri);
110         SoapMessage soapMessage = new SoapMessage();
111         soapHelper.getJBIMarshaler().fromNMS(soapMessage, nm);
112         Context context = soapHelper.createContext(soapMessage);
113         soapHelper.onSend(context);
114         SoapWriter writer = soapHelper.getSoapMarshaler().createWriter(soapMessage);
115         Map JavaDoc headers = (Map JavaDoc) nm.getProperty(JbiConstants.PROTOCOL_HEADERS);
116         if (headers != null) {
117             for (Iterator JavaDoc it = headers.keySet().iterator(); it.hasNext();) {
118                 String JavaDoc name = (String JavaDoc) it.next();
119                 String JavaDoc value = (String JavaDoc) headers.get(name);
120                 method.addRequestHeader(name, value);
121             }
122         }
123         RequestEntity entity = writeMessage(writer);
124         // remove content-type header that may have been part of the in message
125
method.removeRequestHeader(Constants.HEADER_CONTENT_TYPE);
126         method.addRequestHeader(Constants.HEADER_CONTENT_TYPE, entity.getContentType());
127         if (entity.getContentLength() < 0) {
128             method.removeRequestHeader(Constants.HEADER_CONTENT_LENGTH);
129         } else {
130             method.setRequestHeader(Constants.HEADER_CONTENT_LENGTH, Long.toString(entity.getContentLength()));
131         }
132         if (endpoint.isSoap() && method.getRequestHeader(Constants.HEADER_SOAP_ACTION) == null) {
133             if (endpoint.getSoapAction() != null) {
134                 method.setRequestHeader(Constants.HEADER_SOAP_ACTION, endpoint.getSoapAction());
135             } else {
136                 method.setRequestHeader(Constants.HEADER_SOAP_ACTION, "\"\"");
137             }
138         }
139         method.setRequestEntity(entity);
140         boolean close = true;
141         try {
142             // Uncomment to avoid the http request being sent several times.
143
// Can be useful when debugging
144
//================================
145
//method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new HttpMethodRetryHandler() {
146
// public boolean retryMethod(HttpMethod method, IOException exception, int executionCount) {
147
// return false;
148
// }
149
//});
150
if (endpoint.getBasicAuthentication() != null) {
151                 endpoint.getBasicAuthentication().applyCredentials( getClient() );
152             }
153             int response = getClient().executeMethod(host, method);
154             if (response != HttpStatus.SC_OK && response != HttpStatus.SC_ACCEPTED) {
155                 if (exchange instanceof InOnly == false) {
156                     SoapReader reader = soapHelper.getSoapMarshaler().createReader();
157                     Header contentType = method.getResponseHeader(Constants.HEADER_CONTENT_TYPE);
158                     soapMessage = reader.read(method.getResponseBodyAsStream(),
159                                               contentType != null ? contentType.getValue() : null);
160                     context.setFaultMessage(soapMessage);
161                     soapHelper.onAnswer(context);
162                     Fault fault = exchange.createFault();
163                     fault.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method));
164                     soapHelper.getJBIMarshaler().toNMS(fault, soapMessage);
165                     exchange.setFault(fault);
166                     if (txSync) {
167                         channel.sendSync(exchange);
168                     } else {
169                         methods.put(exchange.getExchangeId(), method);
170                         channel.send(exchange);
171                         close = false;
172                     }
173                     return;
174                 } else {
175                     throw new Exception JavaDoc("Invalid status response: " + response);
176                 }
177             }
178             if (exchange instanceof InOut) {
179                 NormalizedMessage msg = exchange.createMessage();
180                 SoapReader reader = soapHelper.getSoapMarshaler().createReader();
181                 Header contentType = method.getResponseHeader(Constants.HEADER_CONTENT_TYPE);
182                 soapMessage = reader.read(method.getResponseBodyAsStream(),
183                                           contentType != null ? contentType.getValue() : null);
184                 context.setOutMessage(soapMessage);
185                 soapHelper.onAnswer(context);
186                 msg.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method));
187                 soapHelper.getJBIMarshaler().toNMS(msg, soapMessage);
188                 ((InOut) exchange).setOutMessage(msg);
189                 if (txSync) {
190                     channel.sendSync(exchange);
191                 } else {
192                     methods.put(exchange.getExchangeId(), method);
193                     channel.send(exchange);
194                     close = false;
195                 }
196             } else if (exchange instanceof InOptionalOut) {
197                 if (method.getResponseContentLength() == 0) {
198                     exchange.setStatus(ExchangeStatus.DONE);
199                     channel.send(exchange);
200                 } else {
201                     NormalizedMessage msg = exchange.createMessage();
202                     SoapReader reader = soapHelper.getSoapMarshaler().createReader();
203                     soapMessage = reader.read(method.getResponseBodyAsStream(),
204                                               method.getResponseHeader(Constants.HEADER_CONTENT_TYPE).getValue());
205                     context.setOutMessage(soapMessage);
206                     soapHelper.onAnswer(context);
207                     msg.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method));
208                     soapHelper.getJBIMarshaler().toNMS(msg, soapMessage);
209                     ((InOptionalOut) exchange).setOutMessage(msg);
210                     if (txSync) {
211                         channel.sendSync(exchange);
212                     } else {
213                         methods.put(exchange.getExchangeId(), method);
214                         channel.send(exchange);
215                         close = false;
216                     }
217                 }
218             } else {
219                 exchange.setStatus(ExchangeStatus.DONE);
220                 channel.send(exchange);
221             }
222         } finally {
223             if (close) {
224                 method.releaseConnection();
225             }
226         }
227     }
228
229     public void start() throws Exception JavaDoc {
230         URI uri = new URI(endpoint.getLocationURI(), false);
231         if (uri.getScheme().equals("https")) {
232             ProtocolSocketFactory sf = new CommonsHttpSSLSocketFactory(
233                             endpoint.getSsl(),
234                             endpoint.getKeystoreManager());
235             Protocol protocol = new Protocol("https", sf, 443);
236             HttpHost host = new HttpHost(uri.getHost(), uri.getPort(), protocol);
237             this.host = new HostConfiguration();
238             this.host.setHost(host);
239         } else {
240             this.host = new HostConfiguration();
241             this.host.setHost(uri.getHost(), uri.getPort());
242         }
243         channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
244     }
245     
246     protected HttpConfiguration getConfiguration(HttpEndpoint endpoint) {
247         ComponentLifeCycle lf = endpoint.getServiceUnit().getComponent().getLifeCycle();
248         return ((HttpLifeCycle) lf).getConfiguration();
249     }
250
251     public void stop() throws Exception JavaDoc {
252     }
253
254     protected Map JavaDoc getHeaders(HttpServletRequest JavaDoc request) {
255         Map JavaDoc headers = new HashMap JavaDoc();
256         Enumeration JavaDoc enumeration = request.getHeaderNames();
257         while (enumeration.hasMoreElements()) {
258             String JavaDoc name = (String JavaDoc) enumeration.nextElement();
259             String JavaDoc value = request.getHeader(name);
260             headers.put(name, value);
261         }
262         return headers;
263     }
264
265     protected Map JavaDoc getHeaders(HttpMethod method) {
266         Map JavaDoc headers = new HashMap JavaDoc();
267         Header[] h = method.getResponseHeaders();
268         for (int i = 0; i < h.length; i++) {
269             headers.put(h[i].getName(), h[i].getValue());
270         }
271         return headers;
272     }
273     
274     protected RequestEntity writeMessage(SoapWriter writer) throws Exception JavaDoc {
275         HttpLifeCycle lf = (HttpLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle();
276         if (lf.getConfiguration().isStreamingEnabled()) {
277             return new StreamingRequestEntity(writer);
278         } else {
279             ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
280             writer.write(baos);
281             return new ByteArrayRequestEntity(baos.toByteArray(), writer.getContentType());
282         }
283     }
284
285     protected HttpClient getClient() {
286         HttpLifeCycle lf = (HttpLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle();
287         return lf.getClient();
288     }
289
290     public static class StreamingRequestEntity implements RequestEntity {
291
292         private SoapWriter writer;
293         
294         public StreamingRequestEntity(SoapWriter writer) {
295             this.writer = writer;
296         }
297         
298         public boolean isRepeatable() {
299             return false;
300         }
301
302         public void writeRequest(OutputStream JavaDoc out) throws IOException JavaDoc {
303             try {
304                 writer.write(out);
305                 out.flush();
306             } catch (Exception JavaDoc e) {
307                 throw (IOException JavaDoc) new IOException JavaDoc("Could not write request").initCause(e);
308             }
309         }
310
311         public long getContentLength() {
312             // not known so we send negative value
313
return -1;
314         }
315
316         public String JavaDoc getContentType() {
317             return writer.getContentType();
318         }
319         
320     }
321 }
322
Popular Tags