KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > http > processors > ConsumerProcessor


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.http.processors;
18
19 import java.net.URI JavaDoc;
20 import java.util.Enumeration JavaDoc;
21 import java.util.HashMap JavaDoc;
22 import java.util.Map JavaDoc;
23
24 import javax.jbi.component.ComponentContext;
25 import javax.jbi.messaging.DeliveryChannel;
26 import javax.jbi.messaging.ExchangeStatus;
27 import javax.jbi.messaging.MessageExchange;
28 import javax.jbi.messaging.NormalizedMessage;
29 import javax.security.auth.Subject JavaDoc;
30 import javax.servlet.http.HttpServletRequest JavaDoc;
31 import javax.servlet.http.HttpServletResponse JavaDoc;
32 import javax.xml.namespace.QName JavaDoc;
33 import javax.xml.transform.dom.DOMSource JavaDoc;
34 import javax.xml.transform.stream.StreamResult JavaDoc;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.servicemix.JbiConstants;
39 import org.apache.servicemix.common.BaseLifeCycle;
40 import org.apache.servicemix.common.ExchangeProcessor;
41 import org.apache.servicemix.http.ContextManager;
42 import org.apache.servicemix.http.HttpEndpoint;
43 import org.apache.servicemix.http.HttpLifeCycle;
44 import org.apache.servicemix.http.HttpProcessor;
45 import org.apache.servicemix.http.SslParameters;
46 import org.apache.servicemix.http.jetty.JaasJettyPrincipal;
47 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
48 import org.apache.servicemix.soap.Context;
49 import org.apache.servicemix.soap.SoapFault;
50 import org.apache.servicemix.soap.SoapHelper;
51 import org.apache.servicemix.soap.marshalers.JBIMarshaler;
52 import org.apache.servicemix.soap.marshalers.SoapMessage;
53 import org.apache.servicemix.soap.marshalers.SoapWriter;
54 import org.mortbay.util.ajax.Continuation;
55 import org.mortbay.util.ajax.ContinuationSupport;
56 import org.w3c.dom.Node JavaDoc;
57
58 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
59
60 public class ConsumerProcessor implements ExchangeProcessor, HttpProcessor {
61
62     public static final URI JavaDoc IN_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/in-only");
63     public static final URI JavaDoc IN_OUT = URI.create("http://www.w3.org/2004/08/wsdl/in-out");
64     public static final URI JavaDoc ROBUST_IN_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/robust-in-only");
65     
66     private static Log log = LogFactory.getLog(ConsumerProcessor.class);
67     
68     protected HttpEndpoint endpoint;
69     protected Object JavaDoc httpContext;
70     protected ComponentContext context;
71     protected DeliveryChannel channel;
72     protected SoapHelper soapHelper;
73     protected Map JavaDoc locks;
74     protected Map JavaDoc exchanges;
75         
76     public ConsumerProcessor(HttpEndpoint endpoint) {
77         this.endpoint = endpoint;
78         this.soapHelper = new SoapHelper(endpoint);
79         this.locks = new ConcurrentHashMap();
80         this.exchanges = new ConcurrentHashMap();
81     }
82     
83     public SslParameters getSsl() {
84         return this.endpoint.getSsl();
85     }
86     
87     public String JavaDoc getAuthMethod() {
88         return this.endpoint.getAuthMethod();
89     }
90     
91     public void process(MessageExchange exchange) throws Exception JavaDoc {
92         Continuation cont = (Continuation) locks.remove(exchange.getExchangeId());
93         if (cont != null) {
94             synchronized (cont) {
95                 if (log.isDebugEnabled()) {
96                     log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
97                 }
98                 exchanges.put(exchange.getExchangeId(), exchange);
99                 cont.resume();
100             }
101         } else {
102             throw new IllegalStateException JavaDoc("Exchange not found");
103         }
104     }
105
106     public void start() throws Exception JavaDoc {
107         Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
108         String JavaDoc url = endpoint.getLocationURI();
109         context = endpoint.getServiceUnit().getComponent().getComponentContext();
110         channel = context.getDeliveryChannel();
111         httpContext = getServerManager().createContext(url, this);
112     }
113
114     public void stop() throws Exception JavaDoc {
115         getServerManager().remove(httpContext);
116     }
117
118     public void process(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws Exception JavaDoc {
119         if (log.isDebugEnabled()) {
120             log.debug("Receiving HTTP request: " + request);
121         }
122         if ("GET".equals(request.getMethod())) {
123             String JavaDoc query = request.getQueryString();
124             if (query != null && query.trim().equalsIgnoreCase("wsdl")) {
125                 String JavaDoc uri = request.getRequestURI();
126                 if (!uri.endsWith("/")) {
127                     uri += "/";
128                 }
129                 uri += "main.wsdl";
130                 response.sendRedirect(uri);
131                 return;
132             }
133             String JavaDoc path = request.getPathInfo();
134             if (path.lastIndexOf('/') >= 0) {
135                 path = path.substring(path.lastIndexOf('/') + 1);
136             }
137             Node JavaDoc node = (Node JavaDoc) endpoint.getWsdls().get(path);
138             generateDocument(response, node);
139             return;
140         }
141         if (!"POST".equals(request.getMethod())) {
142             response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, request.getMethod() + " not supported");
143             return;
144         }
145         // Not giving a specific mutex will synchronize on the contination itself
146
Continuation cont = ContinuationSupport.getContinuation(request, null);
147         MessageExchange exchange;
148         // If the continuation is not a retry
149
if (!cont.isPending()) {
150             try {
151                 SoapMessage message = soapHelper.getSoapMarshaler().createReader().read(
152                                             request.getInputStream(),
153                                             request.getHeader(Constants.HEADER_CONTENT_TYPE));
154                 Context context = soapHelper.createContext(message);
155                 if (request.getUserPrincipal() != null) {
156                     if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
157                         Subject JavaDoc subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
158                         context.getInMessage().setSubject(subject);
159                     } else {
160                         context.getInMessage().addPrincipal(request.getUserPrincipal());
161                     }
162                 }
163                 request.setAttribute(Context.class.getName(), context);
164                 exchange = soapHelper.onReceive(context);
165                 NormalizedMessage inMessage = exchange.getMessage("in");
166                 inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
167                 locks.put(exchange.getExchangeId(), cont);
168                 request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
169                 synchronized (cont) {
170                     ((BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle()).sendConsumerExchange(exchange, endpoint);
171                     if (exchanges.remove(exchange.getExchangeId()) == null) {
172                         if (log.isDebugEnabled()) {
173                             log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
174                         }
175                         // TODO: make this timeout configurable
176
boolean result = cont.suspend(1000 * 60); // 60 s
177
if (!result) {
178                             throw new Exception JavaDoc("Error sending exchange: aborted");
179                         }
180                     }
181                     request.removeAttribute(MessageExchange.class.getName());
182                 }
183             } catch (SoapFault fault) {
184                 sendFault(fault, request, response);
185                 return;
186             }
187         } else {
188             String JavaDoc id = (String JavaDoc) request.getAttribute(MessageExchange.class.getName());
189             exchange = (MessageExchange) exchanges.remove(id);
190             request.removeAttribute(MessageExchange.class.getName());
191             boolean result = cont.suspend(0);
192             // Check if this is a timeout
193
if (exchange == null) {
194                 throw new IllegalStateException JavaDoc("Exchange not found");
195             }
196             if (!result) {
197                 throw new Exception JavaDoc("Timeout");
198             }
199         }
200         if (exchange.getStatus() == ExchangeStatus.ERROR) {
201             if (exchange.getError() != null) {
202                 throw new Exception JavaDoc(exchange.getError());
203             } else {
204                 throw new Exception JavaDoc("Unknown Error");
205             }
206         } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
207             try {
208                 if (exchange.getFault() != null) {
209                     SoapFault fault = new SoapFault(
210                                     (QName JavaDoc) exchange.getFault().getProperty(JBIMarshaler.SOAP_FAULT_CODE),
211                                     (QName JavaDoc) exchange.getFault().getProperty(JBIMarshaler.SOAP_FAULT_SUBCODE),
212                                     (String JavaDoc) exchange.getFault().getProperty(JBIMarshaler.SOAP_FAULT_REASON),
213                                     (URI JavaDoc) exchange.getFault().getProperty(JBIMarshaler.SOAP_FAULT_NODE),
214                                     (URI JavaDoc) exchange.getFault().getProperty(JBIMarshaler.SOAP_FAULT_ROLE),
215                                     exchange.getFault().getContent());
216                     sendFault(fault, request, response);
217                 } else {
218                     NormalizedMessage outMsg = exchange.getMessage("out");
219                     if (outMsg != null) {
220                         Context context = (Context) request.getAttribute(Context.class.getName());
221                         SoapMessage out = soapHelper.onReply(context, outMsg);
222                         SoapWriter writer = soapHelper.getSoapMarshaler().createWriter(out);
223                         response.setContentType(writer.getContentType());
224                         writer.write(response.getOutputStream());
225                     }
226                 }
227             } finally {
228                 exchange.setStatus(ExchangeStatus.DONE);
229                 channel.send(exchange);
230             }
231         } else if (exchange.getStatus() == ExchangeStatus.DONE) {
232             // This happens when there is no response to send back
233
response.setStatus(HttpServletResponse.SC_ACCEPTED);
234         }
235     }
236     
237     protected void sendFault(SoapFault fault, HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws Exception JavaDoc {
238         if (SoapFault.SENDER.equals(fault.getCode())) {
239             response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
240         } else {
241             response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
242         }
243         Context context = (Context) request.getAttribute(Context.class.getName());
244         SoapMessage soapFault = soapHelper.onFault(context, fault);
245         SoapWriter writer = soapHelper.getSoapMarshaler().createWriter(soapFault);
246         response.setContentType(writer.getContentType());
247         writer.write(response.getOutputStream());
248     }
249     
250     protected Map JavaDoc getHeaders(HttpServletRequest JavaDoc request) {
251         Map JavaDoc headers = new HashMap JavaDoc();
252         Enumeration JavaDoc enumeration = request.getHeaderNames();
253         while (enumeration.hasMoreElements()) {
254             String JavaDoc name = (String JavaDoc) enumeration.nextElement();
255             String JavaDoc value = request.getHeader(name);
256             headers.put(name, value);
257         }
258         return headers;
259     }
260     
261     protected ContextManager getServerManager() {
262         HttpLifeCycle lf = (HttpLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle();
263         return lf.getServer();
264     }
265     
266     protected void generateDocument(HttpServletResponse JavaDoc response, Node JavaDoc node) throws Exception JavaDoc {
267         if (node == null) {
268             response.sendError(HttpServletResponse.SC_NOT_FOUND, "Unable to find requested resource");
269             return;
270         }
271         response.setStatus(200);
272         response.setContentType("text/xml");
273         new SourceTransformer().toResult(new DOMSource JavaDoc(node), new StreamResult JavaDoc(response.getOutputStream()));
274     }
275
276 }
277
Popular Tags