KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > transports > http > HTTPClientTransport


1 package org.objectweb.celtix.bus.transports.http;
2
3
4
5
6 import java.io.BufferedOutputStream JavaDoc;
7 import java.io.ByteArrayInputStream JavaDoc;
8 import java.io.ByteArrayOutputStream JavaDoc;
9 import java.io.FilterOutputStream JavaDoc;
10 import java.io.IOException JavaDoc;
11 import java.io.InputStream JavaDoc;
12 import java.io.OutputStream JavaDoc;
13 import java.net.HttpURLConnection JavaDoc;
14 import java.net.InetSocketAddress JavaDoc;
15 import java.net.Proxy JavaDoc;
16 import java.net.URL JavaDoc;
17 import java.net.URLConnection JavaDoc;
18 import java.util.Arrays JavaDoc;
19 import java.util.HashMap JavaDoc;
20 import java.util.List JavaDoc;
21 import java.util.Map JavaDoc;
22 import java.util.concurrent.Callable JavaDoc;
23 import java.util.concurrent.Executor JavaDoc;
24 import java.util.concurrent.Future JavaDoc;
25 import java.util.concurrent.FutureTask JavaDoc;
26 import java.util.logging.Level JavaDoc;
27 import java.util.logging.Logger JavaDoc;
28
29 import javax.net.ssl.HttpsURLConnection;
30 import javax.wsdl.Port;
31 import javax.wsdl.WSDLException;
32 import javax.xml.ws.BindingProvider;
33 import javax.xml.ws.WebServiceException;
34 import javax.xml.ws.handler.MessageContext;
35
36 import static javax.xml.ws.handler.MessageContext.HTTP_RESPONSE_CODE;
37
38 import org.mortbay.http.HttpRequest;
39 import org.mortbay.http.HttpResponse;
40 import org.mortbay.http.handler.AbstractHttpHandler;
41 import org.objectweb.celtix.Bus;
42 import org.objectweb.celtix.bindings.BindingContextUtils;
43 import org.objectweb.celtix.bindings.ClientBinding;
44 import org.objectweb.celtix.bindings.ResponseCallback;
45 import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent;
46 import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent;
47 import org.objectweb.celtix.bus.configuration.security.AuthorizationPolicy;
48 import org.objectweb.celtix.bus.configuration.security.SSLClientPolicy;
49 import org.objectweb.celtix.bus.configuration.wsdl.WsdlHttpConfigurationProvider;
50 import org.objectweb.celtix.bus.configuration.wsdl.WsdlPortProvider;
51 import org.objectweb.celtix.bus.management.counters.TransportClientCounters;
52 import org.objectweb.celtix.bus.transports.https.JettySslClientConfigurer;
53 import org.objectweb.celtix.common.logging.LogUtils;
54 import org.objectweb.celtix.common.util.Base64Utility;
55 import org.objectweb.celtix.configuration.Configuration;
56 import org.objectweb.celtix.configuration.ConfigurationBuilder;
57 import org.objectweb.celtix.configuration.ConfigurationBuilderFactory;
58 import org.objectweb.celtix.context.GenericMessageContext;
59 import org.objectweb.celtix.context.InputStreamMessageContext;
60 import org.objectweb.celtix.context.MessageContextWrapper;
61 import org.objectweb.celtix.context.ObjectMessageContext;
62 import org.objectweb.celtix.context.OutputStreamMessageContext;
63 import org.objectweb.celtix.transports.ClientTransport;
64 import org.objectweb.celtix.transports.http.configuration.HTTPClientPolicy;
65 import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
66 import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
67
68
69
70 public class HTTPClientTransport implements ClientTransport {
71
72     private static final Logger JavaDoc LOG = LogUtils.getL7dLogger(HTTPClientTransport.class);
73
74     private static final String JavaDoc PORT_CONFIGURATION_URI =
75         "http://celtix.objectweb.org/bus/jaxws/port-config";
76     private static final String JavaDoc HTTP_CLIENT_CONFIGURATION_URI =
77         "http://celtix.objectweb.org/bus/transports/http/http-client-config";
78     private static final String JavaDoc HTTP_CLIENT_CONFIGURATION_ID = "http-client";
79
80     final HTTPClientPolicy policy;
81     final SSLClientPolicy sslClientPolicy;
82     final AuthorizationPolicy authPolicy;
83     final AuthorizationPolicy proxyAuthPolicy;
84     final Configuration configuration;
85     final Configuration portConfiguration;
86     final EndpointReferenceType targetEndpoint;
87     final Bus bus;
88     final Port port;
89     final HTTPTransportFactory factory;
90    
91     URL JavaDoc url;
92     TransportClientCounters counters;
93
94     private JettyHTTPServerEngine decoupledEngine;
95     private EndpointReferenceType decoupledEndpoint;
96     private String JavaDoc decoupledAddress;
97     private URL JavaDoc decoupledURL;
98     private ClientBinding clientBinding;
99     private ResponseCallback responseCallback;
100
101     public HTTPClientTransport(Bus b,
102                                EndpointReferenceType ref,
103                                ClientBinding binding,
104                                HTTPTransportFactory f)
105         throws WSDLException, IOException JavaDoc {
106
107         bus = b;
108         portConfiguration = getPortConfiguration(bus, ref);
109         String JavaDoc address = portConfiguration.getString("address");
110         EndpointReferenceUtils.setAddress(ref, address);
111         targetEndpoint = ref;
112         clientBinding = binding;
113         factory = f;
114         url = new URL JavaDoc(address);
115         counters = new TransportClientCounters("HTTPClientTransport");
116
117         port = EndpointReferenceUtils.getPort(bus.getWSDLManager(), ref);
118         configuration = createConfiguration(portConfiguration);
119         policy = getClientPolicy(configuration);
120         authPolicy = getAuthPolicy("authorization", configuration);
121         proxyAuthPolicy = getAuthPolicy("proxyAuthorization", configuration);
122         sslClientPolicy = getSSLClientPolicy(configuration);
123         bus.sendEvent(new ComponentCreatedEvent(this));
124
125     }
126
127     private HTTPClientPolicy getClientPolicy(Configuration conf) {
128         HTTPClientPolicy pol = conf.getObject(HTTPClientPolicy.class, "httpClient");
129         if (pol == null) {
130             pol = new HTTPClientPolicy();
131         }
132         return pol;
133     }
134     private AuthorizationPolicy getAuthPolicy(String JavaDoc type, Configuration conf) {
135         AuthorizationPolicy pol = conf.getObject(AuthorizationPolicy.class, type);
136         if (pol == null) {
137             pol = new AuthorizationPolicy();
138         }
139         return pol;
140     }
141     
142     private SSLClientPolicy getSSLClientPolicy(Configuration conf) {
143         SSLClientPolicy pol = conf.getObject(SSLClientPolicy.class, "sslClient");
144         if (pol == null) {
145             pol = new SSLClientPolicy();
146         }
147         return pol;
148     }
149        
150     public EndpointReferenceType getTargetEndpoint() {
151         return targetEndpoint;
152     }
153
154     public synchronized EndpointReferenceType getDecoupledEndpoint() throws IOException JavaDoc {
155         if (decoupledEndpoint == null && policy.getDecoupledEndpoint() != null) {
156             decoupledEndpoint = setUpDecoupledEndpoint();
157         }
158         return decoupledEndpoint;
159     }
160
161     public Port getPort() {
162         return port;
163     }
164
165     public OutputStreamMessageContext createOutputStreamContext(MessageContext context) throws IOException JavaDoc {
166         return new HTTPClientOutputStreamContext(url, policy, authPolicy,
167                                                  proxyAuthPolicy, sslClientPolicy,
168                                                  context,
169                                                  portConfiguration);
170     }
171
172     public void finalPrepareOutputStreamContext(OutputStreamMessageContext context) throws IOException JavaDoc {
173         HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext)context;
174         ctx.flushHeaders();
175     }
176
177     public void invokeOneway(OutputStreamMessageContext context) throws IOException JavaDoc {
178         try {
179             HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext)context;
180             context.getOutputStream().close();
181             ctx.getCorrespondingInputStreamContext().getInputStream().close();
182             counters.getInvokeOneWay().increase();
183         } catch (Exception JavaDoc ex) {
184             counters.getInvokeError().increase();
185             throw new IOException JavaDoc(ex.getMessage());
186         }
187     }
188
189     public InputStreamMessageContext invoke(OutputStreamMessageContext context) throws IOException JavaDoc {
190         try {
191             context.getOutputStream().close();
192             HTTPClientOutputStreamContext requestContext = (HTTPClientOutputStreamContext)context;
193             counters.getInvoke().increase();
194             return getResponseContext(requestContext);
195         } catch (Exception JavaDoc ex) {
196             counters.getInvokeError().increase();
197             throw new IOException JavaDoc(ex.getMessage());
198         }
199     }
200
201     public Future JavaDoc<InputStreamMessageContext> invokeAsync(OutputStreamMessageContext context,
202                                                          Executor JavaDoc executor)
203         throws IOException JavaDoc {
204         try {
205             context.getOutputStream().close();
206             HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext)context;
207             FutureTask JavaDoc<InputStreamMessageContext> f = new FutureTask JavaDoc<InputStreamMessageContext>(
208                 getInputStreamMessageContextCallable(ctx));
209             // client (service) must always have an executor associated with it
210
executor.execute(f);
211             counters.getInvokeAsync().increase();
212             return f;
213         } catch (Exception JavaDoc ex) {
214             counters.getInvokeError().increase();
215             throw new IOException JavaDoc(ex.getMessage());
216         }
217     }
218     
219     public ResponseCallback getResponseCallback() {
220         return responseCallback;
221     }
222
223     public void shutdown() {
224         if (url != null) {
225             try {
226                 URLConnection JavaDoc connect = url.openConnection();
227                 if (connect instanceof HttpURLConnection JavaDoc) {
228                     ((HttpURLConnection JavaDoc)connect).disconnect();
229                 }
230             } catch (IOException JavaDoc ex) {
231                 //ignore
232
}
233             url = null;
234         }
235         
236         if (decoupledURL != null && decoupledEngine != null) {
237             try {
238                 DecoupledHandler decoupledHandler =
239                     (DecoupledHandler)decoupledEngine.getServant(decoupledAddress);
240                 if (decoupledHandler != null) {
241                     decoupledHandler.release();
242                 }
243             } catch (IOException JavaDoc ioe) {
244                 // ignore
245
}
246         }
247
248         bus.sendEvent(new ComponentRemovedEvent(this));
249     }
250
251     protected InputStreamMessageContext getResponseContext(
252                                  HTTPClientOutputStreamContext requestContext)
253         throws IOException JavaDoc {
254         InputStreamMessageContext responseContext = null;
255         if (hasDecoupledEndpoint()) {
256             int responseCode = getResponseCode(requestContext.connection);
257             if (responseCode == HttpURLConnection.HTTP_ACCEPTED) {
258                 // server transport was rebased on decoupled response endpoint,
259
// dispatch this partial response immediately as it may include
260
// piggybacked content
261
responseContext =
262                     requestContext.getCorrespondingInputStreamContext();
263                 BindingContextUtils.storeDecoupledResponse(responseContext, true);
264             } else {
265                 // request failed *before* server transport was rebased on
266
// decoupled response endpoint
267
responseContext = requestContext.getCorrespondingInputStreamContext();
268             }
269         } else {
270             responseContext = requestContext.getCorrespondingInputStreamContext();
271         }
272         return responseContext;
273     }
274     
275     private EndpointReferenceType setUpDecoupledEndpoint() {
276         EndpointReferenceType reference =
277             EndpointReferenceUtils.getEndpointReference(policy.getDecoupledEndpoint());
278         if (reference != null) {
279             decoupledAddress = reference.getAddress().getValue();
280             LOG.info("creating decoupled endpoint: " + decoupledAddress);
281             try {
282                 decoupledURL = new URL JavaDoc(decoupledAddress);
283                 decoupledEngine =
284                     JettyHTTPServerEngine.getForPort(bus,
285                                                      decoupledURL.getProtocol(),
286                                                      decoupledURL.getPort());
287                 DecoupledHandler decoupledHandler =
288                     (DecoupledHandler)decoupledEngine.getServant(decoupledAddress);
289                 if (decoupledHandler == null) {
290                     responseCallback = clientBinding.createResponseCallback();
291                     decoupledEngine.addServant(decoupledAddress,
292                                                new DecoupledHandler(responseCallback));
293                 } else {
294                     responseCallback = decoupledHandler.duplicate();
295                 }
296
297             } catch (Exception JavaDoc e) {
298                 // REVISIT move message to localizable Messages.properties
299
LOG.log(Level.WARNING, "decoupled endpoint creation failed: ", e);
300             }
301         }
302         return reference;
303     }
304
305
306     protected synchronized boolean hasDecoupledEndpoint() {
307         return decoupledEndpoint != null;
308     }
309
310     protected static Configuration getPortConfiguration(Bus bus, EndpointReferenceType ref) {
311         Configuration busConfiguration = bus.getConfiguration();
312         String JavaDoc id = EndpointReferenceUtils.getServiceName(ref).toString()
313             + "/" + EndpointReferenceUtils.getPortName(ref);
314         Configuration portConfiguration = busConfiguration
315             .getChild(PORT_CONFIGURATION_URI,
316                       id);
317         
318         if (portConfiguration == null) {
319             ConfigurationBuilder cb = ConfigurationBuilderFactory.getBuilder(null);
320             portConfiguration = cb.getConfiguration(PORT_CONFIGURATION_URI, id,
321                                                     bus.getConfiguration());
322             if (null == portConfiguration) {
323                 portConfiguration = cb.buildConfiguration(PORT_CONFIGURATION_URI, id,
324                                                           bus.getConfiguration());
325             }
326
327             // add the additional provider
328
Port port = null;
329             try {
330                 port = EndpointReferenceUtils.getPort(bus.getWSDLManager(), ref);
331             } catch (WSDLException ex) {
332                 throw new WebServiceException("Could not get port from wsdl", ex);
333             }
334             portConfiguration.getProviders().add(new WsdlPortProvider(port));
335         }
336         return portConfiguration;
337     }
338
339     private Configuration createConfiguration(Configuration portCfg) {
340         ConfigurationBuilder cb = ConfigurationBuilderFactory.getBuilder(null);
341         Configuration cfg = cb.getConfiguration(HTTP_CLIENT_CONFIGURATION_URI,
342                                                 HTTP_CLIENT_CONFIGURATION_ID,
343                                                 portCfg);
344         if (null == cfg) {
345             cfg = cb.buildConfiguration(HTTP_CLIENT_CONFIGURATION_URI,
346                                         HTTP_CLIENT_CONFIGURATION_ID,
347                                         portCfg);
348         }
349         // register the additional provider
350
if (null != port) {
351             cfg.getProviders().add(new WsdlHttpConfigurationProvider(port, false));
352         }
353         return cfg;
354     }
355
356     protected static int getResponseCode(URLConnection JavaDoc connection) throws IOException JavaDoc {
357         int responseCode = HttpURLConnection.HTTP_OK;
358         if (connection instanceof HttpURLConnection JavaDoc) {
359             HttpURLConnection JavaDoc hc = (HttpURLConnection JavaDoc)connection;
360             responseCode = hc.getResponseCode();
361         } else {
362             if (connection.getHeaderField(HTTP_RESPONSE_CODE) != null) {
363                 responseCode =
364                     Integer.parseInt(connection.getHeaderField(HTTP_RESPONSE_CODE));
365             }
366         }
367         return responseCode;
368     }
369     
370     protected InputStreamMessageContextCallable getInputStreamMessageContextCallable(
371                                                HTTPClientOutputStreamContext context) {
372         return new InputStreamMessageContextCallable(context);
373     }
374
375     protected static class HTTPClientOutputStreamContext
376         extends MessageContextWrapper
377         implements OutputStreamMessageContext {
378
379         URLConnection JavaDoc connection;
380         WrappedOutputStream origOut;
381         OutputStream JavaDoc out;
382         HTTPClientInputStreamContext inputStreamContext;
383         HTTPClientPolicy policy;
384         AuthorizationPolicy authPolicy;
385         AuthorizationPolicy proxyAuthPolicy;
386         SSLClientPolicy sslClientPolicy;
387         Configuration portConfiguration;
388
389         @SuppressWarnings JavaDoc("unchecked")
390         public HTTPClientOutputStreamContext(URL JavaDoc url,
391                                              HTTPClientPolicy p,
392                                              AuthorizationPolicy ap,
393                                              AuthorizationPolicy pap,
394                                              SSLClientPolicy sslcp,
395                                              MessageContext ctx,
396                                              Configuration configParam)
397             throws IOException JavaDoc {
398             super(ctx);
399
400             Map JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>> headers = (Map JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>>)super.get(HTTP_REQUEST_HEADERS);
401             if (null == headers) {
402                 headers = new HashMap JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>>();
403                 super.put(HTTP_REQUEST_HEADERS, headers);
404             }
405
406             policy = p;
407             authPolicy = ap;
408             proxyAuthPolicy = pap;
409             sslClientPolicy = sslcp;
410             portConfiguration = configParam;
411             String JavaDoc value = (String JavaDoc)ctx.get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY);
412             if (value != null) {
413                 url = new URL JavaDoc(value);
414             }
415
416             if (policy.isSetProxyServer()) {
417                 Proxy JavaDoc proxy = new Proxy JavaDoc(Proxy.Type.valueOf(policy.getProxyServerType().toString()),
418                                         new InetSocketAddress JavaDoc(policy.getProxyServer(),
419                                                               policy.getProxyServerPort()));
420                 connection = url.openConnection(proxy);
421             } else {
422                 connection = url.openConnection();
423             }
424             connection.setDoOutput(true);
425
426             if (connection instanceof HttpURLConnection JavaDoc) {
427                 HttpURLConnection JavaDoc hc = (HttpURLConnection JavaDoc)connection;
428                 hc.setRequestMethod("POST");
429             }
430
431             connection.setConnectTimeout((int)policy.getConnectionTimeout());
432             connection.setReadTimeout((int)policy.getReceiveTimeout());
433
434             connection.setUseCaches(false);
435             if (connection instanceof HttpURLConnection JavaDoc) {
436                 HttpURLConnection JavaDoc hc = (HttpURLConnection JavaDoc)connection;
437                 if (policy.isAutoRedirect()) {
438                     //cannot use chunking if autoredirect as the request will need to be
439
//completely cached locally and resent to the redirect target
440
hc.setInstanceFollowRedirects(true);
441                 } else {
442                     hc.setInstanceFollowRedirects(false);
443                     if (policy.isAllowChunking()) {
444                         hc.setChunkedStreamingMode(2048);
445                     }
446                 }
447             }
448             setPolicies(headers);
449             if (connection instanceof HttpsURLConnection) {
450                 setSSLPolicies();
451             }
452
453             origOut = new WrappedOutputStream();
454             out = origOut;
455         }
456         
457         private void setSSLPolicies() {
458             JettySslClientConfigurer sslClientConfigurer =
459                 new JettySslClientConfigurer(sslClientPolicy, connection, portConfiguration);
460             sslClientConfigurer.configure();
461         }
462         
463         private void setPolicies(Map JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>> headers) {
464             String JavaDoc userName = (String JavaDoc)get(BindingProvider.USERNAME_PROPERTY);
465             if (userName == null && authPolicy.isSetUserName()) {
466                 userName = authPolicy.getUserName();
467             }
468             if (userName != null) {
469                 String JavaDoc passwd = (String JavaDoc)get(BindingProvider.PASSWORD_PROPERTY);
470                 if (passwd == null && authPolicy.isSetPassword()) {
471                     passwd = authPolicy.getPassword();
472                 }
473                 userName += ":";
474                 if (passwd != null) {
475                     userName += passwd;
476                 }
477                 userName = Base64Utility.encode(userName.getBytes());
478                 headers.put("Authorization",
479                             Arrays.asList(new String JavaDoc[] {"Basic " + userName}));
480             } else if (authPolicy.isSetAuthorizationType() && authPolicy.isSetAuthorization()) {
481                 String JavaDoc type = authPolicy.getAuthorizationType();
482                 type += " ";
483                 type += authPolicy.getAuthorization();
484                 headers.put("Authorization",
485                             Arrays.asList(new String JavaDoc[] {type}));
486             }
487             if (proxyAuthPolicy.isSetUserName()) {
488                 userName = proxyAuthPolicy.getUserName();
489                 if (userName != null) {
490                     String JavaDoc passwd = "";
491                     if (proxyAuthPolicy.isSetPassword()) {
492                         passwd = proxyAuthPolicy.getPassword();
493                     }
494                     userName += ":";
495                     if (passwd != null) {
496                         userName += passwd;
497                     }
498                     userName = Base64Utility.encode(userName.getBytes());
499                     headers.put("Proxy-Authorization",
500                                 Arrays.asList(new String JavaDoc[] {"Basic " + userName}));
501                 } else if (proxyAuthPolicy.isSetAuthorizationType() && proxyAuthPolicy.isSetAuthorization()) {
502                     String JavaDoc type = proxyAuthPolicy.getAuthorizationType();
503                     type += " ";
504                     type += proxyAuthPolicy.getAuthorization();
505                     headers.put("Proxy-Authorization",
506                                 Arrays.asList(new String JavaDoc[] {type}));
507                 }
508             }
509             if (policy.isSetCacheControl()) {
510                 headers.put("Cache-Control",
511                             Arrays.asList(new String JavaDoc[] {policy.getCacheControl().value()}));
512             }
513             if (policy.isSetHost()) {
514                 headers.put("Host",
515                             Arrays.asList(new String JavaDoc[] {policy.getHost()}));
516             }
517             if (policy.isSetConnection()) {
518                 headers.put("Connection",
519                             Arrays.asList(new String JavaDoc[] {policy.getConnection().value()}));
520             }
521             if (policy.isSetAccept()) {
522                 headers.put("Accept",
523                             Arrays.asList(new String JavaDoc[] {policy.getAccept()}));
524             }
525             if (policy.isSetAcceptEncoding()) {
526                 headers.put("Accept-Encoding",
527                             Arrays.asList(new String JavaDoc[] {policy.getAcceptEncoding()}));
528             }
529             if (policy.isSetAcceptLanguage()) {
530                 headers.put("Accept-Language",
531                             Arrays.asList(new String JavaDoc[] {policy.getAcceptLanguage()}));
532             }
533             if (policy.isSetContentType()) {
534                 headers.put("Content-Type",
535                             Arrays.asList(new String JavaDoc[] {policy.getContentType()}));
536             }
537             if (policy.isSetCookie()) {
538                 headers.put("Cookie",
539                             Arrays.asList(new String JavaDoc[] {policy.getCookie()}));
540             }
541             if (policy.isSetBrowserType()) {
542                 headers.put("BrowserType",
543                             Arrays.asList(new String JavaDoc[] {policy.getBrowserType()}));
544             }
545             if (policy.isSetReferer()) {
546                 headers.put("Referer",
547                             Arrays.asList(new String JavaDoc[] {policy.getReferer()}));
548             }
549         }
550
551         @SuppressWarnings JavaDoc("unchecked")
552         void flushHeaders() throws IOException JavaDoc {
553             Map JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>> headers = (Map JavaDoc<String JavaDoc, List JavaDoc<String JavaDoc>>)super.get(HTTP_REQUEST_HEADERS);
554             if (null != headers) {
555                 for (String JavaDoc header : headers.keySet()) {
556                     List JavaDoc<String JavaDoc> headerList = headers.get(header);
557                     for (String JavaDoc string : headerList) {
558                         connection.addRequestProperty(header, string);
559                     }
560                 }
561             }
562
563             origOut.resetOut(new BufferedOutputStream JavaDoc(connection.getOutputStream(), 1024));
564         }
565
566         public void setFault(boolean isFault) {
567             //nothing to do
568
}
569
570         public boolean isFault() {
571             return false;
572         }
573
574         public void setOneWay(boolean isOneWay) {
575             put(ONEWAY_MESSAGE_TF, isOneWay);
576         }
577
578         public boolean isOneWay() {
579             return ((Boolean JavaDoc)get(ONEWAY_MESSAGE_TF)).booleanValue();
580         }
581
582         public OutputStream JavaDoc getOutputStream() {
583             return out;
584         }
585
586         public void setOutputStream(OutputStream JavaDoc o) {
587             out = o;
588         }
589
590         public InputStreamMessageContext getCorrespondingInputStreamContext() throws IOException JavaDoc {
591             if (inputStreamContext == null) {
592                 inputStreamContext = new HTTPClientInputStreamContext(connection);
593             }
594             return inputStreamContext;
595         }
596
597         private class WrappedOutputStream extends FilterOutputStream JavaDoc {
598             WrappedOutputStream() {
599                 super(new ByteArrayOutputStream JavaDoc());
600             }
601             void resetOut(OutputStream JavaDoc newOut) throws IOException JavaDoc {
602                 ByteArrayOutputStream JavaDoc bout = (ByteArrayOutputStream JavaDoc)out;
603                 if (bout.size() > 0) {
604                     bout.writeTo(newOut);
605                 }
606                 out = newOut;
607             }
608
609
610             public void close() throws IOException JavaDoc {
611                 out.flush();
612                 if (inputStreamContext != null) {
613                     inputStreamContext.initialise();
614                 }
615             }
616         }
617     }
618
619     static class HTTPClientInputStreamContext
620         extends GenericMessageContext
621         implements InputStreamMessageContext {
622
623         private static final long serialVersionUID = 1L;
624
625         final URLConnection JavaDoc connection;
626         InputStream JavaDoc origInputStream;
627         InputStream JavaDoc inStream;
628         private boolean initialised;
629
630         public HTTPClientInputStreamContext(URLConnection JavaDoc con) throws IOException JavaDoc {
631             connection = con;
632             initialise();
633         }
634
635         /**
636          * Calling getHeaderFields on the connection implicitly gets
637          * the InputStream from the connection. Getting the
638          * InputStream implicitly closes the output stream which
639          * renders it unwritable. The InputStream context is created
640          * before the binding is finished with it. For this reason it
641          * is necessary to initialise the InputStreamContext lazily.
642          * When the OutputStream associated with this connection is
643          * closed, it will invoke on this initialise method.
644          */

645         void initialise() throws IOException JavaDoc {
646             if (!initialised) {
647                 put(ObjectMessageContext.MESSAGE_INPUT, false);
648                 put(HTTP_RESPONSE_HEADERS, connection.getHeaderFields());
649                 put(HTTP_RESPONSE_CODE, getResponseCode(connection));
650                 if (connection instanceof HttpURLConnection JavaDoc) {
651                     HttpURLConnection JavaDoc hc = (HttpURLConnection JavaDoc)connection;
652                     origInputStream = hc.getErrorStream();
653                     if (null == origInputStream) {
654                         origInputStream = connection.getInputStream();
655                     }
656                 } else {
657                     origInputStream = connection.getInputStream();
658                 }
659
660                 inStream = origInputStream;
661                 initialised = true;
662             }
663         }
664
665         public InputStream JavaDoc getInputStream() {
666             try {
667                 initialise();
668             } catch (IOException JavaDoc ex) {
669                 throw new RuntimeException JavaDoc(ex);
670             }
671             return inStream;
672         }
673
674         public void setInputStream(InputStream JavaDoc ins) {
675             inStream = ins;
676         }
677
678         public void setFault(boolean isFault) {
679             //nothing to do
680
}
681
682         public boolean isFault() {
683             assert get(HTTP_RESPONSE_CODE) != null;
684             return ((Integer JavaDoc)get(HTTP_RESPONSE_CODE)).intValue() == 500;
685         }
686     }
687
688     static class HTTPDecoupledClientInputStreamContext
689         extends GenericMessageContext
690         implements InputStreamMessageContext {
691
692         InputStream JavaDoc inStream;
693
694         public HTTPDecoupledClientInputStreamContext(HttpRequest decoupledResponse)
695             throws IOException JavaDoc {
696             put(ObjectMessageContext.MESSAGE_INPUT, false);
697             put(HTTP_RESPONSE_HEADERS, decoupledResponse.getParameters());
698             put(HTTP_RESPONSE_CODE, HttpURLConnection.HTTP_ACCEPTED);
699             inStream = drain(decoupledResponse.getInputStream());
700         }
701
702         public InputStream JavaDoc getInputStream() {
703             return inStream;
704         }
705
706         public void setInputStream(InputStream JavaDoc ins) {
707             inStream = ins;
708         }
709
710         public void setFault(boolean isFault) {
711             //nothing to do
712
}
713
714         public boolean isFault() {
715             return false;
716         }
717
718         private static InputStream JavaDoc drain(InputStream JavaDoc r) throws IOException JavaDoc {
719             byte[] bytes = new byte[4096];
720             ByteArrayOutputStream JavaDoc w = new ByteArrayOutputStream JavaDoc();
721             try {
722                 int offset = 0;
723                 int length = r.read(bytes, offset, bytes.length - offset);
724                 while (length != -1) {
725                     offset += length;
726
727                     if (offset == bytes.length) {
728                         w.write(bytes, 0, bytes.length);
729                         offset = 0;
730                     }
731
732                     length = r.read(bytes, offset, bytes.length - offset);
733                 }
734                 if (offset != 0) {
735                     w.write(bytes, 0, offset);
736                 }
737             } finally {
738                 bytes = null;
739             }
740             return new ByteArrayInputStream JavaDoc(w.toByteArray());
741         }
742     }
743
744     private class InputStreamMessageContextCallable implements Callable JavaDoc<InputStreamMessageContext> {
745         private final HTTPClientOutputStreamContext httpClientOutputStreamContext;
746
747         InputStreamMessageContextCallable(HTTPClientOutputStreamContext ctx) {
748             httpClientOutputStreamContext = ctx;
749         }
750         public InputStreamMessageContext call() throws Exception JavaDoc {
751             return getResponseContext(httpClientOutputStreamContext);
752         }
753     }
754     
755     private class DecoupledHandler extends AbstractHttpHandler {
756         private ResponseCallback responseCallback;
757         private int refCount;
758         
759         DecoupledHandler(ResponseCallback callback) {
760             responseCallback = callback;
761         }
762         
763         synchronized ResponseCallback duplicate() {
764             refCount++;
765             return responseCallback;
766         }
767         
768         synchronized void release() {
769             if (--refCount == 0) {
770                 try {
771                     decoupledEngine.removeServant(decoupledAddress);
772                     JettyHTTPServerEngine.destroyForPort(decoupledURL.getPort());
773                 } catch (IOException JavaDoc ex) {
774                     //ignore
775
}
776             }
777         }
778         
779         public void handle(String JavaDoc pathInContext,
780                            String JavaDoc pathParams,
781                            HttpRequest req,
782                            HttpResponse resp) throws IOException JavaDoc {
783             HTTPDecoupledClientInputStreamContext ctx =
784                 new HTTPDecoupledClientInputStreamContext(req);
785             responseCallback.dispatch(ctx);
786             resp.commit();
787             req.setHandled(true);
788         }
789     }
790 }
791
Popular Tags