1 17 package org.apache.servicemix.http.processors; 18 19 import java.net.URI ; 20 import java.util.Enumeration ; 21 import java.util.HashMap ; 22 import java.util.Map ; 23 24 import javax.jbi.component.ComponentContext; 25 import javax.jbi.messaging.DeliveryChannel; 26 import javax.jbi.messaging.ExchangeStatus; 27 import javax.jbi.messaging.MessageExchange; 28 import javax.jbi.messaging.NormalizedMessage; 29 import javax.security.auth.Subject ; 30 import javax.servlet.http.HttpServletRequest ; 31 import javax.servlet.http.HttpServletResponse ; 32 import javax.xml.namespace.QName ; 33 import javax.xml.transform.dom.DOMSource ; 34 import javax.xml.transform.stream.StreamResult ; 35 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 import org.apache.servicemix.JbiConstants; 39 import org.apache.servicemix.common.BaseLifeCycle; 40 import org.apache.servicemix.common.ExchangeProcessor; 41 import org.apache.servicemix.http.ContextManager; 42 import org.apache.servicemix.http.HttpEndpoint; 43 import org.apache.servicemix.http.HttpLifeCycle; 44 import org.apache.servicemix.http.HttpProcessor; 45 import org.apache.servicemix.http.SslParameters; 46 import org.apache.servicemix.http.jetty.JaasJettyPrincipal; 47 import org.apache.servicemix.jbi.jaxp.SourceTransformer; 48 import org.apache.servicemix.soap.Context; 49 import org.apache.servicemix.soap.SoapFault; 50 import org.apache.servicemix.soap.SoapHelper; 51 import org.apache.servicemix.soap.marshalers.JBIMarshaler; 52 import org.apache.servicemix.soap.marshalers.SoapMessage; 53 import org.apache.servicemix.soap.marshalers.SoapWriter; 54 import org.mortbay.util.ajax.Continuation; 55 import org.mortbay.util.ajax.ContinuationSupport; 56 import org.w3c.dom.Node ; 57 58 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 59 60 public class ConsumerProcessor implements ExchangeProcessor, HttpProcessor { 61 62 public static final URI IN_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/in-only"); 63 public static final URI IN_OUT = URI.create("http://www.w3.org/2004/08/wsdl/in-out"); 64 public static final URI ROBUST_IN_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/robust-in-only"); 65 66 private static Log log = LogFactory.getLog(ConsumerProcessor.class); 67 68 protected HttpEndpoint endpoint; 69 protected Object httpContext; 70 protected ComponentContext context; 71 protected DeliveryChannel channel; 72 protected SoapHelper soapHelper; 73 protected Map locks; 74 protected Map exchanges; 75 76 public ConsumerProcessor(HttpEndpoint endpoint) { 77 this.endpoint = endpoint; 78 this.soapHelper = new SoapHelper(endpoint); 79 this.locks = new ConcurrentHashMap(); 80 this.exchanges = new ConcurrentHashMap(); 81 } 82 83 public SslParameters getSsl() { 84 return this.endpoint.getSsl(); 85 } 86 87 public String getAuthMethod() { 88 return this.endpoint.getAuthMethod(); 89 } 90 91 public void process(MessageExchange exchange) throws Exception { 92 Continuation cont = (Continuation) locks.remove(exchange.getExchangeId()); 93 if (cont != null) { 94 synchronized (cont) { 95 if (log.isDebugEnabled()) { 96 log.debug("Resuming continuation for exchange: " + exchange.getExchangeId()); 97 } 98 exchanges.put(exchange.getExchangeId(), exchange); 99 cont.resume(); 100 } 101 } else { 102 throw new IllegalStateException ("Exchange not found"); 103 } 104 } 105 106 public void start() throws Exception { 107 Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); 108 String url = endpoint.getLocationURI(); 109 context = endpoint.getServiceUnit().getComponent().getComponentContext(); 110 channel = context.getDeliveryChannel(); 111 httpContext = getServerManager().createContext(url, this); 112 } 113 114 public void stop() throws Exception { 115 getServerManager().remove(httpContext); 116 } 117 118 public void process(HttpServletRequest request, HttpServletResponse response) throws Exception { 119 if (log.isDebugEnabled()) { 120 log.debug("Receiving HTTP request: " + request); 121 } 122 if ("GET".equals(request.getMethod())) { 123 String query = request.getQueryString(); 124 if (query != null && query.trim().equalsIgnoreCase("wsdl")) { 125 String uri = request.getRequestURI(); 126 if (!uri.endsWith("/")) { 127 uri += "/"; 128 } 129 uri += "main.wsdl"; 130 response.sendRedirect(uri); 131 return; 132 } 133 String path = request.getPathInfo(); 134 if (path.lastIndexOf('/') >= 0) { 135 path = path.substring(path.lastIndexOf('/') + 1); 136 } 137 Node node = (Node ) endpoint.getWsdls().get(path); 138 generateDocument(response, node); 139 return; 140 } 141 if (!"POST".equals(request.getMethod())) { 142 response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, request.getMethod() + " not supported"); 143 return; 144 } 145 Continuation cont = ContinuationSupport.getContinuation(request, null); 147 MessageExchange exchange; 148 if (!cont.isPending()) { 150 try { 151 SoapMessage message = soapHelper.getSoapMarshaler().createReader().read( 152 request.getInputStream(), 153 request.getHeader(Constants.HEADER_CONTENT_TYPE)); 154 Context context = soapHelper.createContext(message); 155 if (request.getUserPrincipal() != null) { 156 if (request.getUserPrincipal() instanceof JaasJettyPrincipal) { 157 Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject(); 158 context.getInMessage().setSubject(subject); 159 } else { 160 context.getInMessage().addPrincipal(request.getUserPrincipal()); 161 } 162 } 163 request.setAttribute(Context.class.getName(), context); 164 exchange = soapHelper.onReceive(context); 165 NormalizedMessage inMessage = exchange.getMessage("in"); 166 inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request)); 167 locks.put(exchange.getExchangeId(), cont); 168 request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId()); 169 synchronized (cont) { 170 ((BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle()).sendConsumerExchange(exchange, endpoint); 171 if (exchanges.remove(exchange.getExchangeId()) == null) { 172 if (log.isDebugEnabled()) { 173 log.debug("Suspending continuation for exchange: " + exchange.getExchangeId()); 174 } 175 boolean result = cont.suspend(1000 * 60); if (!result) { 178 throw new Exception ("Error sending exchange: aborted"); 179 } 180 } 181 request.removeAttribute(MessageExchange.class.getName()); 182 } 183 } catch (SoapFault fault) { 184 sendFault(fault, request, response); 185 return; 186 } 187 } else { 188 String id = (String ) request.getAttribute(MessageExchange.class.getName()); 189 exchange = (MessageExchange) exchanges.remove(id); 190 request.removeAttribute(MessageExchange.class.getName()); 191 boolean result = cont.suspend(0); 192 if (exchange == null) { 194 throw new IllegalStateException ("Exchange not found"); 195 } 196 if (!result) { 197 throw new Exception ("Timeout"); 198 } 199 } 200 if (exchange.getStatus() == ExchangeStatus.ERROR) { 201 if (exchange.getError() != null) { 202 throw new Exception (exchange.getError()); 203 } else { 204 throw new Exception ("Unknown Error"); 205 } 206 } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 207 try { 208 if (exchange.getFault() != null) { 209 SoapFault fault = new SoapFault( 210 (QName ) exchange.getFault().getProperty(JBIMarshaler.SOAP_FAULT_CODE), 211 (QName ) exchange.getFault().getProperty(JBIMarshaler.SOAP_FAULT_SUBCODE), 212 (String ) exchange.getFault().getProperty(JBIMarshaler.SOAP_FAULT_REASON), 213 (URI ) exchange.getFault().getProperty(JBIMarshaler.SOAP_FAULT_NODE), 214 (URI ) exchange.getFault().getProperty(JBIMarshaler.SOAP_FAULT_ROLE), 215 exchange.getFault().getContent()); 216 sendFault(fault, request, response); 217 } else { 218 NormalizedMessage outMsg = exchange.getMessage("out"); 219 if (outMsg != null) { 220 Context context = (Context) request.getAttribute(Context.class.getName()); 221 SoapMessage out = soapHelper.onReply(context, outMsg); 222 SoapWriter writer = soapHelper.getSoapMarshaler().createWriter(out); 223 response.setContentType(writer.getContentType()); 224 writer.write(response.getOutputStream()); 225 } 226 } 227 } finally { 228 exchange.setStatus(ExchangeStatus.DONE); 229 channel.send(exchange); 230 } 231 } else if (exchange.getStatus() == ExchangeStatus.DONE) { 232 response.setStatus(HttpServletResponse.SC_ACCEPTED); 234 } 235 } 236 237 protected void sendFault(SoapFault fault, HttpServletRequest request, HttpServletResponse response) throws Exception { 238 if (SoapFault.SENDER.equals(fault.getCode())) { 239 response.setStatus(HttpServletResponse.SC_BAD_REQUEST); 240 } else { 241 response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); 242 } 243 Context context = (Context) request.getAttribute(Context.class.getName()); 244 SoapMessage soapFault = soapHelper.onFault(context, fault); 245 SoapWriter writer = soapHelper.getSoapMarshaler().createWriter(soapFault); 246 response.setContentType(writer.getContentType()); 247 writer.write(response.getOutputStream()); 248 } 249 250 protected Map getHeaders(HttpServletRequest request) { 251 Map headers = new HashMap (); 252 Enumeration enumeration = request.getHeaderNames(); 253 while (enumeration.hasMoreElements()) { 254 String name = (String ) enumeration.nextElement(); 255 String value = request.getHeader(name); 256 headers.put(name, value); 257 } 258 return headers; 259 } 260 261 protected ContextManager getServerManager() { 262 HttpLifeCycle lf = (HttpLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle(); 263 return lf.getServer(); 264 } 265 266 protected void generateDocument(HttpServletResponse response, Node node) throws Exception { 267 if (node == null) { 268 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Unable to find requested resource"); 269 return; 270 } 271 response.setStatus(200); 272 response.setContentType("text/xml"); 273 new SourceTransformer().toResult(new DOMSource (node), new StreamResult (response.getOutputStream())); 274 } 275 276 } 277 | Popular Tags |