1 package org.objectweb.celtix.bus.transports.http; 2 3 4 5 6 import java.io.BufferedOutputStream ; 7 import java.io.ByteArrayInputStream ; 8 import java.io.ByteArrayOutputStream ; 9 import java.io.FilterOutputStream ; 10 import java.io.IOException ; 11 import java.io.InputStream ; 12 import java.io.OutputStream ; 13 import java.net.HttpURLConnection ; 14 import java.net.InetSocketAddress ; 15 import java.net.Proxy ; 16 import java.net.URL ; 17 import java.net.URLConnection ; 18 import java.util.Arrays ; 19 import java.util.HashMap ; 20 import java.util.List ; 21 import java.util.Map ; 22 import java.util.concurrent.Callable ; 23 import java.util.concurrent.Executor ; 24 import java.util.concurrent.Future ; 25 import java.util.concurrent.FutureTask ; 26 import java.util.logging.Level ; 27 import java.util.logging.Logger ; 28 29 import javax.net.ssl.HttpsURLConnection; 30 import javax.wsdl.Port; 31 import javax.wsdl.WSDLException; 32 import javax.xml.ws.BindingProvider; 33 import javax.xml.ws.WebServiceException; 34 import javax.xml.ws.handler.MessageContext; 35 36 import static javax.xml.ws.handler.MessageContext.HTTP_RESPONSE_CODE; 37 38 import org.mortbay.http.HttpRequest; 39 import org.mortbay.http.HttpResponse; 40 import org.mortbay.http.handler.AbstractHttpHandler; 41 import org.objectweb.celtix.Bus; 42 import org.objectweb.celtix.bindings.BindingContextUtils; 43 import org.objectweb.celtix.bindings.ClientBinding; 44 import org.objectweb.celtix.bindings.ResponseCallback; 45 import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent; 46 import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent; 47 import org.objectweb.celtix.bus.configuration.security.AuthorizationPolicy; 48 import org.objectweb.celtix.bus.configuration.security.SSLClientPolicy; 49 import org.objectweb.celtix.bus.configuration.wsdl.WsdlHttpConfigurationProvider; 50 import org.objectweb.celtix.bus.configuration.wsdl.WsdlPortProvider; 51 import org.objectweb.celtix.bus.management.counters.TransportClientCounters; 52 import org.objectweb.celtix.bus.transports.https.JettySslClientConfigurer; 53 import org.objectweb.celtix.common.logging.LogUtils; 54 import org.objectweb.celtix.common.util.Base64Utility; 55 import org.objectweb.celtix.configuration.Configuration; 56 import org.objectweb.celtix.configuration.ConfigurationBuilder; 57 import org.objectweb.celtix.configuration.ConfigurationBuilderFactory; 58 import org.objectweb.celtix.context.GenericMessageContext; 59 import org.objectweb.celtix.context.InputStreamMessageContext; 60 import org.objectweb.celtix.context.MessageContextWrapper; 61 import org.objectweb.celtix.context.ObjectMessageContext; 62 import org.objectweb.celtix.context.OutputStreamMessageContext; 63 import org.objectweb.celtix.transports.ClientTransport; 64 import org.objectweb.celtix.transports.http.configuration.HTTPClientPolicy; 65 import org.objectweb.celtix.ws.addressing.EndpointReferenceType; 66 import org.objectweb.celtix.wsdl.EndpointReferenceUtils; 67 68 69 70 public class HTTPClientTransport implements ClientTransport { 71 72 private static final Logger LOG = LogUtils.getL7dLogger(HTTPClientTransport.class); 73 74 private static final String PORT_CONFIGURATION_URI = 75 "http://celtix.objectweb.org/bus/jaxws/port-config"; 76 private static final String HTTP_CLIENT_CONFIGURATION_URI = 77 "http://celtix.objectweb.org/bus/transports/http/http-client-config"; 78 private static final String HTTP_CLIENT_CONFIGURATION_ID = "http-client"; 79 80 final HTTPClientPolicy policy; 81 final SSLClientPolicy sslClientPolicy; 82 final AuthorizationPolicy authPolicy; 83 final AuthorizationPolicy proxyAuthPolicy; 84 final Configuration configuration; 85 final Configuration portConfiguration; 86 final EndpointReferenceType targetEndpoint; 87 final Bus bus; 88 final Port port; 89 final HTTPTransportFactory factory; 90 91 URL url; 92 TransportClientCounters counters; 93 94 private JettyHTTPServerEngine decoupledEngine; 95 private EndpointReferenceType decoupledEndpoint; 96 private String decoupledAddress; 97 private URL decoupledURL; 98 private ClientBinding clientBinding; 99 private ResponseCallback responseCallback; 100 101 public HTTPClientTransport(Bus b, 102 EndpointReferenceType ref, 103 ClientBinding binding, 104 HTTPTransportFactory f) 105 throws WSDLException, IOException { 106 107 bus = b; 108 portConfiguration = getPortConfiguration(bus, ref); 109 String address = portConfiguration.getString("address"); 110 EndpointReferenceUtils.setAddress(ref, address); 111 targetEndpoint = ref; 112 clientBinding = binding; 113 factory = f; 114 url = new URL (address); 115 counters = new TransportClientCounters("HTTPClientTransport"); 116 117 port = EndpointReferenceUtils.getPort(bus.getWSDLManager(), ref); 118 configuration = createConfiguration(portConfiguration); 119 policy = getClientPolicy(configuration); 120 authPolicy = getAuthPolicy("authorization", configuration); 121 proxyAuthPolicy = getAuthPolicy("proxyAuthorization", configuration); 122 sslClientPolicy = getSSLClientPolicy(configuration); 123 bus.sendEvent(new ComponentCreatedEvent(this)); 124 125 } 126 127 private HTTPClientPolicy getClientPolicy(Configuration conf) { 128 HTTPClientPolicy pol = conf.getObject(HTTPClientPolicy.class, "httpClient"); 129 if (pol == null) { 130 pol = new HTTPClientPolicy(); 131 } 132 return pol; 133 } 134 private AuthorizationPolicy getAuthPolicy(String type, Configuration conf) { 135 AuthorizationPolicy pol = conf.getObject(AuthorizationPolicy.class, type); 136 if (pol == null) { 137 pol = new AuthorizationPolicy(); 138 } 139 return pol; 140 } 141 142 private SSLClientPolicy getSSLClientPolicy(Configuration conf) { 143 SSLClientPolicy pol = conf.getObject(SSLClientPolicy.class, "sslClient"); 144 if (pol == null) { 145 pol = new SSLClientPolicy(); 146 } 147 return pol; 148 } 149 150 public EndpointReferenceType getTargetEndpoint() { 151 return targetEndpoint; 152 } 153 154 public synchronized EndpointReferenceType getDecoupledEndpoint() throws IOException { 155 if (decoupledEndpoint == null && policy.getDecoupledEndpoint() != null) { 156 decoupledEndpoint = setUpDecoupledEndpoint(); 157 } 158 return decoupledEndpoint; 159 } 160 161 public Port getPort() { 162 return port; 163 } 164 165 public OutputStreamMessageContext createOutputStreamContext(MessageContext context) throws IOException { 166 return new HTTPClientOutputStreamContext(url, policy, authPolicy, 167 proxyAuthPolicy, sslClientPolicy, 168 context, 169 portConfiguration); 170 } 171 172 public void finalPrepareOutputStreamContext(OutputStreamMessageContext context) throws IOException { 173 HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext)context; 174 ctx.flushHeaders(); 175 } 176 177 public void invokeOneway(OutputStreamMessageContext context) throws IOException { 178 try { 179 HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext)context; 180 context.getOutputStream().close(); 181 ctx.getCorrespondingInputStreamContext().getInputStream().close(); 182 counters.getInvokeOneWay().increase(); 183 } catch (Exception ex) { 184 counters.getInvokeError().increase(); 185 throw new IOException (ex.getMessage()); 186 } 187 } 188 189 public InputStreamMessageContext invoke(OutputStreamMessageContext context) throws IOException { 190 try { 191 context.getOutputStream().close(); 192 HTTPClientOutputStreamContext requestContext = (HTTPClientOutputStreamContext)context; 193 counters.getInvoke().increase(); 194 return getResponseContext(requestContext); 195 } catch (Exception ex) { 196 counters.getInvokeError().increase(); 197 throw new IOException (ex.getMessage()); 198 } 199 } 200 201 public Future <InputStreamMessageContext> invokeAsync(OutputStreamMessageContext context, 202 Executor executor) 203 throws IOException { 204 try { 205 context.getOutputStream().close(); 206 HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext)context; 207 FutureTask <InputStreamMessageContext> f = new FutureTask <InputStreamMessageContext>( 208 getInputStreamMessageContextCallable(ctx)); 209 executor.execute(f); 211 counters.getInvokeAsync().increase(); 212 return f; 213 } catch (Exception ex) { 214 counters.getInvokeError().increase(); 215 throw new IOException (ex.getMessage()); 216 } 217 } 218 219 public ResponseCallback getResponseCallback() { 220 return responseCallback; 221 } 222 223 public void shutdown() { 224 if (url != null) { 225 try { 226 URLConnection connect = url.openConnection(); 227 if (connect instanceof HttpURLConnection ) { 228 ((HttpURLConnection )connect).disconnect(); 229 } 230 } catch (IOException ex) { 231 } 233 url = null; 234 } 235 236 if (decoupledURL != null && decoupledEngine != null) { 237 try { 238 DecoupledHandler decoupledHandler = 239 (DecoupledHandler)decoupledEngine.getServant(decoupledAddress); 240 if (decoupledHandler != null) { 241 decoupledHandler.release(); 242 } 243 } catch (IOException ioe) { 244 } 246 } 247 248 bus.sendEvent(new ComponentRemovedEvent(this)); 249 } 250 251 protected InputStreamMessageContext getResponseContext( 252 HTTPClientOutputStreamContext requestContext) 253 throws IOException { 254 InputStreamMessageContext responseContext = null; 255 if (hasDecoupledEndpoint()) { 256 int responseCode = getResponseCode(requestContext.connection); 257 if (responseCode == HttpURLConnection.HTTP_ACCEPTED) { 258 responseContext = 262 requestContext.getCorrespondingInputStreamContext(); 263 BindingContextUtils.storeDecoupledResponse(responseContext, true); 264 } else { 265 responseContext = requestContext.getCorrespondingInputStreamContext(); 268 } 269 } else { 270 responseContext = requestContext.getCorrespondingInputStreamContext(); 271 } 272 return responseContext; 273 } 274 275 private EndpointReferenceType setUpDecoupledEndpoint() { 276 EndpointReferenceType reference = 277 EndpointReferenceUtils.getEndpointReference(policy.getDecoupledEndpoint()); 278 if (reference != null) { 279 decoupledAddress = reference.getAddress().getValue(); 280 LOG.info("creating decoupled endpoint: " + decoupledAddress); 281 try { 282 decoupledURL = new URL (decoupledAddress); 283 decoupledEngine = 284 JettyHTTPServerEngine.getForPort(bus, 285 decoupledURL.getProtocol(), 286 decoupledURL.getPort()); 287 DecoupledHandler decoupledHandler = 288 (DecoupledHandler)decoupledEngine.getServant(decoupledAddress); 289 if (decoupledHandler == null) { 290 responseCallback = clientBinding.createResponseCallback(); 291 decoupledEngine.addServant(decoupledAddress, 292 new DecoupledHandler(responseCallback)); 293 } else { 294 responseCallback = decoupledHandler.duplicate(); 295 } 296 297 } catch (Exception e) { 298 LOG.log(Level.WARNING, "decoupled endpoint creation failed: ", e); 300 } 301 } 302 return reference; 303 } 304 305 306 protected synchronized boolean hasDecoupledEndpoint() { 307 return decoupledEndpoint != null; 308 } 309 310 protected static Configuration getPortConfiguration(Bus bus, EndpointReferenceType ref) { 311 Configuration busConfiguration = bus.getConfiguration(); 312 String id = EndpointReferenceUtils.getServiceName(ref).toString() 313 + "/" + EndpointReferenceUtils.getPortName(ref); 314 Configuration portConfiguration = busConfiguration 315 .getChild(PORT_CONFIGURATION_URI, 316 id); 317 318 if (portConfiguration == null) { 319 ConfigurationBuilder cb = ConfigurationBuilderFactory.getBuilder(null); 320 portConfiguration = cb.getConfiguration(PORT_CONFIGURATION_URI, id, 321 bus.getConfiguration()); 322 if (null == portConfiguration) { 323 portConfiguration = cb.buildConfiguration(PORT_CONFIGURATION_URI, id, 324 bus.getConfiguration()); 325 } 326 327 Port port = null; 329 try { 330 port = EndpointReferenceUtils.getPort(bus.getWSDLManager(), ref); 331 } catch (WSDLException ex) { 332 throw new WebServiceException("Could not get port from wsdl", ex); 333 } 334 portConfiguration.getProviders().add(new WsdlPortProvider(port)); 335 } 336 return portConfiguration; 337 } 338 339 private Configuration createConfiguration(Configuration portCfg) { 340 ConfigurationBuilder cb = ConfigurationBuilderFactory.getBuilder(null); 341 Configuration cfg = cb.getConfiguration(HTTP_CLIENT_CONFIGURATION_URI, 342 HTTP_CLIENT_CONFIGURATION_ID, 343 portCfg); 344 if (null == cfg) { 345 cfg = cb.buildConfiguration(HTTP_CLIENT_CONFIGURATION_URI, 346 HTTP_CLIENT_CONFIGURATION_ID, 347 portCfg); 348 } 349 if (null != port) { 351 cfg.getProviders().add(new WsdlHttpConfigurationProvider(port, false)); 352 } 353 return cfg; 354 } 355 356 protected static int getResponseCode(URLConnection connection) throws IOException { 357 int responseCode = HttpURLConnection.HTTP_OK; 358 if (connection instanceof HttpURLConnection ) { 359 HttpURLConnection hc = (HttpURLConnection )connection; 360 responseCode = hc.getResponseCode(); 361 } else { 362 if (connection.getHeaderField(HTTP_RESPONSE_CODE) != null) { 363 responseCode = 364 Integer.parseInt(connection.getHeaderField(HTTP_RESPONSE_CODE)); 365 } 366 } 367 return responseCode; 368 } 369 370 protected InputStreamMessageContextCallable getInputStreamMessageContextCallable( 371 HTTPClientOutputStreamContext context) { 372 return new InputStreamMessageContextCallable(context); 373 } 374 375 protected static class HTTPClientOutputStreamContext 376 extends MessageContextWrapper 377 implements OutputStreamMessageContext { 378 379 URLConnection connection; 380 WrappedOutputStream origOut; 381 OutputStream out; 382 HTTPClientInputStreamContext inputStreamContext; 383 HTTPClientPolicy policy; 384 AuthorizationPolicy authPolicy; 385 AuthorizationPolicy proxyAuthPolicy; 386 SSLClientPolicy sslClientPolicy; 387 Configuration portConfiguration; 388 389 @SuppressWarnings ("unchecked") 390 public HTTPClientOutputStreamContext(URL url, 391 HTTPClientPolicy p, 392 AuthorizationPolicy ap, 393 AuthorizationPolicy pap, 394 SSLClientPolicy sslcp, 395 MessageContext ctx, 396 Configuration configParam) 397 throws IOException { 398 super(ctx); 399 400 Map <String , List <String >> headers = (Map <String , List <String >>)super.get(HTTP_REQUEST_HEADERS); 401 if (null == headers) { 402 headers = new HashMap <String , List <String >>(); 403 super.put(HTTP_REQUEST_HEADERS, headers); 404 } 405 406 policy = p; 407 authPolicy = ap; 408 proxyAuthPolicy = pap; 409 sslClientPolicy = sslcp; 410 portConfiguration = configParam; 411 String value = (String )ctx.get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY); 412 if (value != null) { 413 url = new URL (value); 414 } 415 416 if (policy.isSetProxyServer()) { 417 Proxy proxy = new Proxy (Proxy.Type.valueOf(policy.getProxyServerType().toString()), 418 new InetSocketAddress (policy.getProxyServer(), 419 policy.getProxyServerPort())); 420 connection = url.openConnection(proxy); 421 } else { 422 connection = url.openConnection(); 423 } 424 connection.setDoOutput(true); 425 426 if (connection instanceof HttpURLConnection ) { 427 HttpURLConnection hc = (HttpURLConnection )connection; 428 hc.setRequestMethod("POST"); 429 } 430 431 connection.setConnectTimeout((int)policy.getConnectionTimeout()); 432 connection.setReadTimeout((int)policy.getReceiveTimeout()); 433 434 connection.setUseCaches(false); 435 if (connection instanceof HttpURLConnection ) { 436 HttpURLConnection hc = (HttpURLConnection )connection; 437 if (policy.isAutoRedirect()) { 438 hc.setInstanceFollowRedirects(true); 441 } else { 442 hc.setInstanceFollowRedirects(false); 443 if (policy.isAllowChunking()) { 444 hc.setChunkedStreamingMode(2048); 445 } 446 } 447 } 448 setPolicies(headers); 449 if (connection instanceof HttpsURLConnection) { 450 setSSLPolicies(); 451 } 452 453 origOut = new WrappedOutputStream(); 454 out = origOut; 455 } 456 457 private void setSSLPolicies() { 458 JettySslClientConfigurer sslClientConfigurer = 459 new JettySslClientConfigurer(sslClientPolicy, connection, portConfiguration); 460 sslClientConfigurer.configure(); 461 } 462 463 private void setPolicies(Map <String , List <String >> headers) { 464 String userName = (String )get(BindingProvider.USERNAME_PROPERTY); 465 if (userName == null && authPolicy.isSetUserName()) { 466 userName = authPolicy.getUserName(); 467 } 468 if (userName != null) { 469 String passwd = (String )get(BindingProvider.PASSWORD_PROPERTY); 470 if (passwd == null && authPolicy.isSetPassword()) { 471 passwd = authPolicy.getPassword(); 472 } 473 userName += ":"; 474 if (passwd != null) { 475 userName += passwd; 476 } 477 userName = Base64Utility.encode(userName.getBytes()); 478 headers.put("Authorization", 479 Arrays.asList(new String [] {"Basic " + userName})); 480 } else if (authPolicy.isSetAuthorizationType() && authPolicy.isSetAuthorization()) { 481 String type = authPolicy.getAuthorizationType(); 482 type += " "; 483 type += authPolicy.getAuthorization(); 484 headers.put("Authorization", 485 Arrays.asList(new String [] {type})); 486 } 487 if (proxyAuthPolicy.isSetUserName()) { 488 userName = proxyAuthPolicy.getUserName(); 489 if (userName != null) { 490 String passwd = ""; 491 if (proxyAuthPolicy.isSetPassword()) { 492 passwd = proxyAuthPolicy.getPassword(); 493 } 494 userName += ":"; 495 if (passwd != null) { 496 userName += passwd; 497 } 498 userName = Base64Utility.encode(userName.getBytes()); 499 headers.put("Proxy-Authorization", 500 Arrays.asList(new String [] {"Basic " + userName})); 501 } else if (proxyAuthPolicy.isSetAuthorizationType() && proxyAuthPolicy.isSetAuthorization()) { 502 String type = proxyAuthPolicy.getAuthorizationType(); 503 type += " "; 504 type += proxyAuthPolicy.getAuthorization(); 505 headers.put("Proxy-Authorization", 506 Arrays.asList(new String [] {type})); 507 } 508 } 509 if (policy.isSetCacheControl()) { 510 headers.put("Cache-Control", 511 Arrays.asList(new String [] {policy.getCacheControl().value()})); 512 } 513 if (policy.isSetHost()) { 514 headers.put("Host", 515 Arrays.asList(new String [] {policy.getHost()})); 516 } 517 if (policy.isSetConnection()) { 518 headers.put("Connection", 519 Arrays.asList(new String [] {policy.getConnection().value()})); 520 } 521 if (policy.isSetAccept()) { 522 headers.put("Accept", 523 Arrays.asList(new String [] {policy.getAccept()})); 524 } 525 if (policy.isSetAcceptEncoding()) { 526 headers.put("Accept-Encoding", 527 Arrays.asList(new String [] {policy.getAcceptEncoding()})); 528 } 529 if (policy.isSetAcceptLanguage()) { 530 headers.put("Accept-Language", 531 Arrays.asList(new String [] {policy.getAcceptLanguage()})); 532 } 533 if (policy.isSetContentType()) { 534 headers.put("Content-Type", 535 Arrays.asList(new String [] {policy.getContentType()})); 536 } 537 if (policy.isSetCookie()) { 538 headers.put("Cookie", 539 Arrays.asList(new String [] {policy.getCookie()})); 540 } 541 if (policy.isSetBrowserType()) { 542 headers.put("BrowserType", 543 Arrays.asList(new String [] {policy.getBrowserType()})); 544 } 545 if (policy.isSetReferer()) { 546 headers.put("Referer", 547 Arrays.asList(new String [] {policy.getReferer()})); 548 } 549 } 550 551 @SuppressWarnings ("unchecked") 552 void flushHeaders() throws IOException { 553 Map <String , List <String >> headers = (Map <String , List <String >>)super.get(HTTP_REQUEST_HEADERS); 554 if (null != headers) { 555 for (String header : headers.keySet()) { 556 List <String > headerList = headers.get(header); 557 for (String string : headerList) { 558 connection.addRequestProperty(header, string); 559 } 560 } 561 } 562 563 origOut.resetOut(new BufferedOutputStream (connection.getOutputStream(), 1024)); 564 } 565 566 public void setFault(boolean isFault) { 567 } 569 570 public boolean isFault() { 571 return false; 572 } 573 574 public void setOneWay(boolean isOneWay) { 575 put(ONEWAY_MESSAGE_TF, isOneWay); 576 } 577 578 public boolean isOneWay() { 579 return ((Boolean )get(ONEWAY_MESSAGE_TF)).booleanValue(); 580 } 581 582 public OutputStream getOutputStream() { 583 return out; 584 } 585 586 public void setOutputStream(OutputStream o) { 587 out = o; 588 } 589 590 public InputStreamMessageContext getCorrespondingInputStreamContext() throws IOException { 591 if (inputStreamContext == null) { 592 inputStreamContext = new HTTPClientInputStreamContext(connection); 593 } 594 return inputStreamContext; 595 } 596 597 private class WrappedOutputStream extends FilterOutputStream { 598 WrappedOutputStream() { 599 super(new ByteArrayOutputStream ()); 600 } 601 void resetOut(OutputStream newOut) throws IOException { 602 ByteArrayOutputStream bout = (ByteArrayOutputStream )out; 603 if (bout.size() > 0) { 604 bout.writeTo(newOut); 605 } 606 out = newOut; 607 } 608 609 610 public void close() throws IOException { 611 out.flush(); 612 if (inputStreamContext != null) { 613 inputStreamContext.initialise(); 614 } 615 } 616 } 617 } 618 619 static class HTTPClientInputStreamContext 620 extends GenericMessageContext 621 implements InputStreamMessageContext { 622 623 private static final long serialVersionUID = 1L; 624 625 final URLConnection connection; 626 InputStream origInputStream; 627 InputStream inStream; 628 private boolean initialised; 629 630 public HTTPClientInputStreamContext(URLConnection con) throws IOException { 631 connection = con; 632 initialise(); 633 } 634 635 645 void initialise() throws IOException { 646 if (!initialised) { 647 put(ObjectMessageContext.MESSAGE_INPUT, false); 648 put(HTTP_RESPONSE_HEADERS, connection.getHeaderFields()); 649 put(HTTP_RESPONSE_CODE, getResponseCode(connection)); 650 if (connection instanceof HttpURLConnection ) { 651 HttpURLConnection hc = (HttpURLConnection )connection; 652 origInputStream = hc.getErrorStream(); 653 if (null == origInputStream) { 654 origInputStream = connection.getInputStream(); 655 } 656 } else { 657 origInputStream = connection.getInputStream(); 658 } 659 660 inStream = origInputStream; 661 initialised = true; 662 } 663 } 664 665 public InputStream getInputStream() { 666 try { 667 initialise(); 668 } catch (IOException ex) { 669 throw new RuntimeException (ex); 670 } 671 return inStream; 672 } 673 674 public void setInputStream(InputStream ins) { 675 inStream = ins; 676 } 677 678 public void setFault(boolean isFault) { 679 } 681 682 public boolean isFault() { 683 assert get(HTTP_RESPONSE_CODE) != null; 684 return ((Integer )get(HTTP_RESPONSE_CODE)).intValue() == 500; 685 } 686 } 687 688 static class HTTPDecoupledClientInputStreamContext 689 extends GenericMessageContext 690 implements InputStreamMessageContext { 691 692 InputStream inStream; 693 694 public HTTPDecoupledClientInputStreamContext(HttpRequest decoupledResponse) 695 throws IOException { 696 put(ObjectMessageContext.MESSAGE_INPUT, false); 697 put(HTTP_RESPONSE_HEADERS, decoupledResponse.getParameters()); 698 put(HTTP_RESPONSE_CODE, HttpURLConnection.HTTP_ACCEPTED); 699 inStream = drain(decoupledResponse.getInputStream()); 700 } 701 702 public InputStream getInputStream() { 703 return inStream; 704 } 705 706 public void setInputStream(InputStream ins) { 707 inStream = ins; 708 } 709 710 public void setFault(boolean isFault) { 711 } 713 714 public boolean isFault() { 715 return false; 716 } 717 718 private static InputStream drain(InputStream r) throws IOException { 719 byte[] bytes = new byte[4096]; 720 ByteArrayOutputStream w = new ByteArrayOutputStream (); 721 try { 722 int offset = 0; 723 int length = r.read(bytes, offset, bytes.length - offset); 724 while (length != -1) { 725 offset += length; 726 727 if (offset == bytes.length) { 728 w.write(bytes, 0, bytes.length); 729 offset = 0; 730 } 731 732 length = r.read(bytes, offset, bytes.length - offset); 733 } 734 if (offset != 0) { 735 w.write(bytes, 0, offset); 736 } 737 } finally { 738 bytes = null; 739 } 740 return new ByteArrayInputStream (w.toByteArray()); 741 } 742 } 743 744 private class InputStreamMessageContextCallable implements Callable <InputStreamMessageContext> { 745 private final HTTPClientOutputStreamContext httpClientOutputStreamContext; 746 747 InputStreamMessageContextCallable(HTTPClientOutputStreamContext ctx) { 748 httpClientOutputStreamContext = ctx; 749 } 750 public InputStreamMessageContext call() throws Exception { 751 return getResponseContext(httpClientOutputStreamContext); 752 } 753 } 754 755 private class DecoupledHandler extends AbstractHttpHandler { 756 private ResponseCallback responseCallback; 757 private int refCount; 758 759 DecoupledHandler(ResponseCallback callback) { 760 responseCallback = callback; 761 } 762 763 synchronized ResponseCallback duplicate() { 764 refCount++; 765 return responseCallback; 766 } 767 768 synchronized void release() { 769 if (--refCount == 0) { 770 try { 771 decoupledEngine.removeServant(decoupledAddress); 772 JettyHTTPServerEngine.destroyForPort(decoupledURL.getPort()); 773 } catch (IOException ex) { 774 } 776 } 777 } 778 779 public void handle(String pathInContext, 780 String pathParams, 781 HttpRequest req, 782 HttpResponse resp) throws IOException { 783 HTTPDecoupledClientInputStreamContext ctx = 784 new HTTPDecoupledClientInputStreamContext(req); 785 responseCallback.dispatch(ctx); 786 resp.commit(); 787 req.setHandled(true); 788 } 789 } 790 } 791 | Popular Tags |