1 package org.objectweb.celtix.bus.transports.http; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.net.URL ; 6 import java.util.concurrent.Callable ; 7 import java.util.concurrent.Executor ; 8 import java.util.concurrent.ExecutorService ; 9 import java.util.concurrent.Executors ; 10 import java.util.concurrent.Future ; 11 import java.util.concurrent.TimeUnit ; 12 import java.util.concurrent.TimeoutException ; 13 import java.util.concurrent.locks.Condition ; 14 import java.util.concurrent.locks.Lock ; 15 import java.util.concurrent.locks.ReentrantLock ; 16 17 import javax.wsdl.WSDLException; 18 import javax.xml.namespace.QName ; 19 20 import junit.extensions.TestSetup; 21 import junit.framework.Test; 22 import junit.framework.TestCase; 23 import junit.framework.TestSuite; 24 25 import org.easymock.classextension.EasyMock; 26 import org.objectweb.celtix.Bus; 27 import org.objectweb.celtix.BusException; 28 import org.objectweb.celtix.bindings.ClientBinding; 29 import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent; 30 import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent; 31 import org.objectweb.celtix.bus.configuration.ConfigurationEventFilter; 32 import org.objectweb.celtix.bus.transports.TestResponseCallback; 33 import org.objectweb.celtix.bus.transports.TransportFactoryManagerImpl; 34 import org.objectweb.celtix.bus.workqueue.WorkQueueManagerImpl; 35 import org.objectweb.celtix.bus.wsdl.WSDLManagerImpl; 36 import org.objectweb.celtix.buslifecycle.BusLifeCycleManager; 37 import org.objectweb.celtix.configuration.Configuration; 38 import org.objectweb.celtix.configuration.types.ClassNamespaceMappingListType; 39 import org.objectweb.celtix.configuration.types.ClassNamespaceMappingType; 40 import org.objectweb.celtix.configuration.types.ObjectFactory; 41 import org.objectweb.celtix.context.GenericMessageContext; 42 import org.objectweb.celtix.context.InputStreamMessageContext; 43 import org.objectweb.celtix.context.OutputStreamMessageContext; 44 import org.objectweb.celtix.transports.ClientTransport; 45 import org.objectweb.celtix.transports.ServerTransport; 46 import org.objectweb.celtix.transports.ServerTransportCallback; 47 import org.objectweb.celtix.transports.TransportFactoryManager; 48 import org.objectweb.celtix.ws.addressing.EndpointReferenceType; 49 import org.objectweb.celtix.wsdl.EndpointReferenceUtils; 50 import org.objectweb.celtix.wsdl.WSDLManager; 51 import static org.easymock.EasyMock.isA; 52 53 public class HTTPTransportTest extends TestCase { 54 55 private static final QName SERVICE_NAME = new 56 QName ("http://objectweb.org/hello_world_soap_http", "SOAPService"); 57 private static final String PORT_NAME = "SoapPort"; 58 private static final String ADDRESS = "http://localhost:9000/SoapContext/SoapPort"; 59 private static final String DECOUPLED_ADDRESS = "http://localhost:9999/decoupled"; 60 private static final int DECOUPLED_PORT = 9999; 61 62 private static final URL WSDL_URL = HTTPTransportTest.class.getResource("/wsdl/hello_world.wsdl"); 63 64 private static boolean first = true; 65 66 Bus bus; 67 private WSDLManager wsdlManager; 68 private WorkQueueManagerImpl queueManager; 69 private ExecutorService executorService; 70 private TestResponseCallback responseCallback; 71 private HTTPTransportFactory factory; 72 private Lock partialResponseReceivedLock; 73 private Condition partialResponseReceivedCondition; 74 private boolean partialResponseReceivedNotified; 75 private ClientBinding clientBinding; 76 77 public HTTPTransportTest(String arg0) { 78 super(arg0); 79 } 80 81 public static Test suite() throws Exception { 82 TestSuite suite = new TestSuite(HTTPTransportTest.class); 83 return new TestSetup(suite) { 84 protected void tearDown() throws Exception { 85 super.tearDown(); 86 JettyHTTPServerEngine.destroyForPort(9000); 87 } 88 }; 89 } 90 91 92 public static void main(String [] args) { 93 junit.textui.TestRunner.run(HTTPTransportTest.class); 94 } 95 96 public void setUp() throws BusException { 97 bus = EasyMock.createMock(Bus.class); 98 wsdlManager = new WSDLManagerImpl(null); 99 partialResponseReceivedLock = new ReentrantLock (); 100 partialResponseReceivedCondition = partialResponseReceivedLock.newCondition(); 101 partialResponseReceivedNotified = false; 102 responseCallback = new TestResponseCallback(); 103 clientBinding = EasyMock.createMock(ClientBinding.class); 104 } 105 106 public void tearDown() throws Exception { 107 EasyMock.reset(bus); 108 try { 109 bus.removeListener(isA(JettyHTTPServerTransport.class)); 110 } catch (BusException e) { 111 } 113 EasyMock.expectLastCall(); 114 checkBusRemovedEvent(); 115 EasyMock.replay(bus); 116 117 if (queueManager != null) { 118 queueManager.shutdown(false); 119 } 120 if (executorService != null) { 121 executorService.shutdownNow(); 122 } 123 JettyHTTPServerEngine.destroyForPort(DECOUPLED_PORT); 124 } 125 126 int readBytes(byte bytes[], InputStream ins) throws IOException { 127 int len = ins.read(bytes); 128 int total = 0; 129 while (len != -1) { 130 total += len; 131 len = ins.read(bytes, total, bytes.length - total); 132 } 133 return total; 134 } 135 136 137 public void testInvokeOneway() throws Exception { 138 doTestInvokeOneway(false); 139 } 140 141 public void testInvokeOnewayDecoupled() throws Exception { 142 doTestInvokeOneway(true); 143 } 144 145 public void testInvoke() throws Exception { 146 doTestInvoke(false); 147 doTestInvoke(false); 148 } 149 150 151 public void testInvokeDecoupled() throws Exception { 152 doTestInvoke(false, true, ADDRESS); 153 } 154 155 public void testInvokeUsingAutomaticWorkQueue() throws Exception { 156 doTestInvoke(true); 157 } 158 159 public void testInvokeDecoupledUsingAutomaticWorkQueue() throws Exception { 160 doTestInvoke(true, true, ADDRESS); 161 } 162 163 public void testInvokeAsync() throws Exception { 164 doTestInvokeAsync(false); 165 } 166 167 public void testInvokeAsyncDecoupled() throws Exception { 168 doTestInvokeAsync(false, true); 169 } 170 171 public void testInvokeAsyncUsingAutomaticWorkQueue() throws Exception { 172 doTestInvokeAsync(true); 173 } 174 175 public void testInvokeAsyncDecoupledUsingAutomaticWorkQueue() throws Exception { 176 doTestInvokeAsync(true, true); 177 } 178 179 public void testInputStreamMessageContextCallable() throws Exception { 180 factory = createTransportFactory(); 181 HTTPClientTransport.HTTPClientOutputStreamContext octx = 182 EasyMock.createMock(HTTPClientTransport.HTTPClientOutputStreamContext.class); 183 HTTPClientTransport.HTTPClientInputStreamContext ictx = 184 EasyMock.createMock(HTTPClientTransport.HTTPClientInputStreamContext.class); 185 octx.getCorrespondingInputStreamContext(); 186 EasyMock.expectLastCall().andReturn(ictx); 187 EasyMock.replay(octx); 188 HTTPClientTransport client = (HTTPClientTransport) 189 createClientTransport(WSDL_URL, SERVICE_NAME, PORT_NAME, ADDRESS, false); 190 191 Callable c = client.getInputStreamMessageContextCallable(octx); 192 assertNotNull(c); 193 InputStreamMessageContext result = (InputStreamMessageContext)c.call(); 194 assertEquals(result, ictx); 195 } 196 197 public void doTestInvokeOneway(boolean decoupled) throws Exception { 198 199 factory = createTransportFactory(); 200 201 ServerTransport server = 202 createServerTransport(WSDL_URL, SERVICE_NAME, PORT_NAME, ADDRESS); 203 byte[] buffer = new byte[64]; 204 activateServer(server, false, 200, buffer, true, decoupled); 205 206 ClientTransport client = 207 createClientTransport(WSDL_URL, SERVICE_NAME, PORT_NAME, ADDRESS, decoupled); 208 byte outBytes[] = "Hello World!!!".getBytes(); 209 210 long start = System.currentTimeMillis(); 211 OutputStreamMessageContext octx = doRequest(client, outBytes, true, decoupled); 212 client.invokeOneway(octx); 213 long stop = System.currentTimeMillis(); 214 215 octx = doRequest(client, outBytes, false, decoupled); 216 client.invokeOneway(octx); 217 octx = doRequest(client, outBytes, false, decoupled); 218 client.invokeOneway(octx); 219 long stop2 = System.currentTimeMillis(); 220 221 server.deactivate(); 222 EasyMock.reset(bus); 223 checkBusRemovedEvent(); 224 EasyMock.replay(bus); 225 client.shutdown(); 226 227 assertTrue("Total one call: " + (stop - start), (stop - start) < 400); 228 assertTrue("Total: " + (stop2 - start), (stop2 - start) < 600); 229 assertEquals(new String (outBytes), new String (buffer, 0, outBytes.length)); 230 Thread.sleep(200); 231 } 232 233 public void doTestInvoke(final boolean useAutomaticWorkQueue) throws Exception { 234 doTestInvoke(useAutomaticWorkQueue, false, ADDRESS); 235 } 236 237 public void doTestInvoke(final boolean useAutomaticWorkQueue, 238 final boolean decoupled, 239 final String address) throws Exception { 240 241 factory = createTransportFactory(); 242 243 ServerTransport server = 244 createServerTransport(WSDL_URL, SERVICE_NAME, PORT_NAME, address); 245 246 activateServer(server, useAutomaticWorkQueue, 0, null, false, decoupled); 247 ClientTransport client = 249 createClientTransport(WSDL_URL, SERVICE_NAME, PORT_NAME, address, decoupled); 250 doRequestResponse(client, "Hello World".getBytes(), true, decoupled); 251 252 byte outBytes[] = new byte[5000]; 254 for (int x = 0; x < outBytes.length; x++) { 255 outBytes[x] = (byte)('a' + (x % 26)); 256 } 257 client = 258 createClientTransport(WSDL_URL, SERVICE_NAME, PORT_NAME, address, decoupled); 259 doRequestResponse(client, outBytes, false, decoupled); 260 261 server.deactivate(); 262 outBytes = "HelloWorld".getBytes(); 263 264 try { 265 OutputStreamMessageContext octx = client.createOutputStreamContext(new GenericMessageContext()); 266 client.finalPrepareOutputStreamContext(octx); 267 octx.getOutputStream().write(outBytes); 268 octx.getOutputStream().close(); 269 InputStreamMessageContext ictx = client.invoke(octx); 270 byte bytes[] = new byte[10000]; 271 int len = ictx.getInputStream().read(bytes); 272 if (len != -1 273 && new String (bytes, 0, len).indexOf("HTTP Status 503") == -1 274 && new String (bytes, 0, len).indexOf("Error 404") == -1) { 275 fail("was able to process a message after the servant was deactivated: " + len 276 + " - " + new String (bytes)); 277 } 278 } catch (IOException ex) { 279 } 281 activateServer(server, useAutomaticWorkQueue, 0, null, false, decoupled); 282 doRequestResponse(client, "Hello World 3".getBytes(), false, decoupled); 283 server.deactivate(); 284 activateServer(server, useAutomaticWorkQueue, 0, null, false, decoupled); 285 doRequestResponse(client, "Hello World 4".getBytes(), false, decoupled); 286 server.deactivate(); 287 EasyMock.reset(bus); 288 checkBusRemovedEvent(); 289 EasyMock.replay(bus); 290 client.shutdown(); 291 } 292 293 public void doTestInvokeAsync(final boolean useAutomaticWorkQueue) throws Exception { 294 doTestInvokeAsync(useAutomaticWorkQueue, false); 295 } 296 297 public void doTestInvokeAsync(final boolean useAutomaticWorkQueue, boolean decoupled) throws Exception { 298 299 Executor executor = null; 300 if (useAutomaticWorkQueue) { 301 queueManager = new WorkQueueManagerImpl(bus); 302 executor = queueManager.getAutomaticWorkQueue(); 303 } else { 304 executorService = Executors.newFixedThreadPool(1); 305 executor = executorService; 306 } 307 factory = createTransportFactory(); 308 309 ServerTransport server = 310 createServerTransport(WSDL_URL, SERVICE_NAME, PORT_NAME, ADDRESS); 311 activateServer(server, false, 400, null, false, decoupled); 312 313 ClientTransport client = 314 createClientTransport(WSDL_URL, SERVICE_NAME, PORT_NAME, ADDRESS, decoupled); 315 byte outBytes[] = "Hello World!!!".getBytes(); 316 317 OutputStreamMessageContext octx = doRequest(client, outBytes, true, decoupled); 319 Future <InputStreamMessageContext> f = client.invokeAsync(octx, executor); 320 assertNotNull(f); 321 int i = 0; 322 while (i < 10) { 323 Thread.sleep(100); 324 if (f.isDone()) { 325 break; 326 } 327 i++; 328 } 329 assertTrue(f.isDone()); 330 InputStreamMessageContext ictx = f.get(); 331 doResponse(client, ictx, outBytes, decoupled); 332 333 octx = doRequest(client, outBytes, false, decoupled); 335 f = client.invokeAsync(octx, executor); 336 ictx = f.get(); 337 assertTrue(f.isDone()); 338 doResponse(client, ictx, outBytes, decoupled); 339 340 boolean timeoutImplemented = false; 342 if (timeoutImplemented) { 343 octx = doRequest(client, outBytes, false, decoupled); 344 f = client.invokeAsync(octx, executor); 345 try { 346 ictx = f.get(200, TimeUnit.MILLISECONDS); 347 fail("Expected TimeoutException not thrown."); 348 } catch (TimeoutException ex) { 349 } 351 assertTrue(!f.isDone()); 352 } 353 server.deactivate(); 354 } 355 356 public void testInvokeNoContext() throws Exception { 357 boolean oldFirst = first; 358 try { 359 first = true; 360 doTestInvoke(false, false, "http://localhost:9888"); 361 } finally { 362 first = oldFirst; 363 JettyHTTPServerEngine.destroyForPort(9888); 364 } 365 } 366 367 368 private void checkBusCreatedEvent() { 369 370 bus.sendEvent(isA(ComponentCreatedEvent.class)); 371 372 EasyMock.expectLastCall(); 373 } 374 375 private void checkBusRemovedEvent() { 376 377 bus.sendEvent(isA(ComponentRemovedEvent.class)); 378 379 EasyMock.expectLastCall(); 380 } 381 382 private void activateServer(ServerTransport server, 383 final boolean useAutomaticWorkQueue, 384 final int delay, 385 final byte[] buffer, 386 final boolean oneWay, 387 final boolean decoupled) throws Exception { 388 ServerTransportCallback callback = new TestServerTransportCallback(server, 389 useAutomaticWorkQueue, 390 delay, 391 buffer, 392 oneWay, 393 decoupled); 394 EasyMock.reset(bus); 395 Configuration bc = EasyMock.createMock(Configuration.class); 396 bus.getConfiguration(); 397 EasyMock.expectLastCall().andReturn(bc); 398 server.activate(callback); 399 } 400 401 private void doRequestResponse(ClientTransport client, 402 byte outBytes[], 403 boolean initial, 404 boolean decoupled) 405 throws Exception { 406 OutputStreamMessageContext octx = doRequest(client, outBytes, initial, decoupled); 407 InputStreamMessageContext ictx = client.invoke(octx); 408 doResponse(client, ictx, outBytes, decoupled); 409 } 410 411 private OutputStreamMessageContext doRequest(ClientTransport client, 412 byte outBytes[], 413 boolean initial, 414 boolean decoupled) throws Exception { 415 if (decoupled) { 416 if (initial) { 417 assertFalse(((HTTPClientTransport)client).hasDecoupledEndpoint()); 418 EasyMock.reset(bus); 419 Configuration lc = EasyMock.createMock(Configuration.class); 420 bus.getConfiguration(); 421 EasyMock.expectLastCall().andReturn(lc); 422 EasyMock.replay(bus); 423 EasyMock.reset(clientBinding); 424 clientBinding.createResponseCallback(); 425 EasyMock.expectLastCall().andReturn(responseCallback); 426 EasyMock.replay(clientBinding); 427 } 428 429 EndpointReferenceType decoupledEndpoint = client.getDecoupledEndpoint(); 430 assertNotNull(decoupledEndpoint); 431 assertNotNull(decoupledEndpoint.getAddress()); 432 assertEquals(decoupledEndpoint.getAddress().getValue(), DECOUPLED_ADDRESS); 433 assertTrue(((HTTPClientTransport)client).hasDecoupledEndpoint()); 434 assertSame(responseCallback, client.getResponseCallback()); 435 436 if (initial) { 437 EasyMock.verify(bus); 438 EasyMock.verify(clientBinding); 439 } 440 } 441 OutputStreamMessageContext octx = client.createOutputStreamContext(new GenericMessageContext()); 442 client.finalPrepareOutputStreamContext(octx); 443 octx.getOutputStream().write(outBytes); 444 return octx; 445 } 446 447 private void doResponse(ClientTransport client, 448 InputStreamMessageContext ictx, 449 byte outBytes[], 450 boolean decoupled) throws Exception { 451 if (decoupled) { 452 signalPartialResponseReceived(); 453 doResponse(client, responseCallback.waitForNextResponse(), outBytes); 454 } else { 455 doResponse(client, ictx, outBytes); 456 } 457 } 458 459 private void doResponse(ClientTransport client, 460 InputStreamMessageContext ictx, byte outBytes[]) throws Exception { 461 byte bytes[] = new byte[10000]; 462 int len = readBytes(bytes, ictx.getInputStream()); 463 assertTrue("Did not read anything " + len, len > 0); 464 assertEquals(new String (outBytes), new String (bytes, 0, len)); 465 } 466 467 private void awaitPartialResponseReceived() throws Exception { 468 partialResponseReceivedLock.lock(); 469 try { 470 while (!partialResponseReceivedNotified) { 471 partialResponseReceivedCondition.await(); 472 } 473 } finally { 474 partialResponseReceivedNotified = false; 475 partialResponseReceivedLock.unlock(); 476 } 477 } 478 479 private void signalPartialResponseReceived() throws Exception { 480 partialResponseReceivedLock.lock(); 481 try { 482 partialResponseReceivedNotified = true; 483 partialResponseReceivedCondition.signal(); 484 } finally { 485 partialResponseReceivedLock.unlock(); 486 } 487 } 488 489 private HTTPTransportFactory createTransportFactory() throws BusException { 490 EasyMock.reset(bus); 491 Configuration bc = EasyMock.createMock(Configuration.class); 492 493 String transportId = "http://celtix.objectweb.org/transports/http/configuration"; 494 ObjectFactory of = new ObjectFactory(); 495 ClassNamespaceMappingListType mappings = of.createClassNamespaceMappingListType(); 496 ClassNamespaceMappingType mapping = of.createClassNamespaceMappingType(); 497 mapping.setClassname("org.objectweb.celtix.bus.transports.http.HTTPTransportFactory"); 498 mapping.getNamespace().add(transportId); 499 mappings.getMap().add(mapping); 500 501 bus.getWSDLManager(); 502 EasyMock.expectLastCall().andReturn(wsdlManager); 503 bus.getWSDLManager(); 504 EasyMock.expectLastCall().andReturn(wsdlManager); 505 bus.getWSDLManager(); 506 EasyMock.expectLastCall().andReturn(wsdlManager); 507 508 BusLifeCycleManager lifecycleManager = EasyMock.createNiceMock(BusLifeCycleManager.class); 509 bus.getLifeCycleManager(); 510 EasyMock.expectLastCall().andReturn(lifecycleManager); 511 bus.getConfiguration(); 512 EasyMock.expectLastCall().andReturn(bc); 513 bc.getObject("transportFactories"); 514 EasyMock.expectLastCall().andReturn(mappings); 515 checkBusCreatedEvent(); 517 EasyMock.replay(bus); 518 EasyMock.replay(bc); 519 520 TransportFactoryManager tfm = new TransportFactoryManagerImpl(bus); 521 return (HTTPTransportFactory)tfm.getTransportFactory(transportId); 522 } 523 524 private ClientTransport createClientTransport(URL wsdlUrl, 525 QName serviceName, 526 String portName, 527 String address, 528 boolean decoupled) 529 throws WSDLException, IOException { 530 EasyMock.reset(bus); 531 532 Configuration bc = EasyMock.createMock(Configuration.class); 533 Configuration pc = EasyMock.createMock(Configuration.class); 534 535 bus.getConfiguration(); 536 EasyMock.expectLastCall().andReturn(bc); 537 String id = serviceName.toString() + "/" + portName; 538 bc.getChild("http://celtix.objectweb.org/bus/jaxws/port-config", id); 539 EasyMock.expectLastCall().andReturn(pc); 540 pc.getChild("http://celtix.objectweb.org/bus/transports/http/http-client-config", "http-client"); 541 EasyMock.expectLastCall().andReturn(null); 542 bus.getWSDLManager(); 543 EasyMock.expectLastCall().andReturn(wsdlManager); 544 pc.getString("address"); 545 EasyMock.expectLastCall().andReturn(address); 546 547 checkBusCreatedEvent(); 548 549 EasyMock.replay(bus); 550 EasyMock.replay(bc); 551 EasyMock.replay(pc); 552 553 EndpointReferenceType ref = EndpointReferenceUtils 554 .getEndpointReference(wsdlUrl, serviceName, portName); 555 ClientTransport transport = factory.createClientTransport(ref, clientBinding); 556 if (decoupled) { 557 ((HTTPClientTransport)transport).policy.setDecoupledEndpoint(DECOUPLED_ADDRESS); 558 } 559 560 EasyMock.verify(bus); 561 EasyMock.verify(bc); 562 EasyMock.verify(pc); 563 return transport; 564 565 } 566 567 private ServerTransport createServerTransport(URL wsdlUrl, 568 QName serviceName, 569 String portName, 570 String address) 571 throws WSDLException, IOException { 572 573 URL url = new URL (address); 574 575 EasyMock.reset(bus); 576 577 Configuration bc = EasyMock.createMock(Configuration.class); 578 Configuration ec = EasyMock.createMock(Configuration.class); 579 580 bus.getConfiguration(); 581 EasyMock.expectLastCall().andReturn(bc); 582 bc.getChild("http://celtix.objectweb.org/bus/jaxws/endpoint-config", serviceName.toString()); 583 EasyMock.expectLastCall().andReturn(ec); 584 ec.getChild("http://celtix.objectweb.org/bus/transports/http/http-server-config", "http-server"); 585 EasyMock.expectLastCall().andReturn(null); 586 bus.getWSDLManager(); 587 EasyMock.expectLastCall().andReturn(wsdlManager); 588 if (first) { 589 bus.getConfiguration(); 591 EasyMock.expectLastCall().andReturn(bc); 592 bc.getChild("http://celtix.objectweb.org/bus/transports/http/http-listener-config", 593 "http-listener." + url.getPort()); 594 EasyMock.expectLastCall().andReturn(null); 595 first = false; 596 } 597 598 try { 599 bus.addListener(isA(JettyHTTPServerTransport.class), 600 isA(ConfigurationEventFilter.class)); 601 } catch (BusException e) { 602 } 604 EasyMock.expectLastCall(); 605 606 checkBusCreatedEvent(); 607 608 EasyMock.replay(bus); 609 EasyMock.replay(bc); 610 EasyMock.replay(ec); 611 612 EndpointReferenceType ref = EndpointReferenceUtils.getEndpointReference(wsdlUrl, serviceName, 613 portName); 614 EndpointReferenceUtils.setAddress(ref, address); 615 ServerTransport transport = factory.createServerTransport(ref); 616 617 EasyMock.verify(bus); 618 EasyMock.verify(bc); 619 EasyMock.verify(ec); 620 621 return transport; 622 623 } 624 625 private class TestServerTransportCallback implements ServerTransportCallback { 626 private ServerTransport server; 627 private boolean useAutomaticWorkQueue; 628 private int delay; 629 private byte[] buffer; 630 private boolean oneWay; 631 private boolean decoupled; 632 633 TestServerTransportCallback(ServerTransport s, 634 boolean uaq, 635 int d, 636 byte[] b, 637 boolean ow, 638 boolean dc) { 639 server = s; 640 useAutomaticWorkQueue = uaq; 641 delay = d; 642 buffer = b; 643 oneWay = ow; 644 decoupled = dc; 645 } 646 647 public void dispatch(InputStreamMessageContext ctx, ServerTransport transport) { 648 try { 649 byte[] bytes = buffer; 650 if (null == bytes) { 651 bytes = new byte[10000]; 652 } 653 int total = readBytes(bytes, ctx.getInputStream()); 654 655 OutputStreamMessageContext octx = null; 656 if (decoupled) { 657 EndpointReferenceType ref = new EndpointReferenceType(); 658 EndpointReferenceUtils.setAddress(ref, DECOUPLED_ADDRESS); 659 octx = server.rebase(ctx, ref); 660 server.finalPrepareOutputStreamContext(octx); 661 octx.getOutputStream().flush(); 662 octx.getOutputStream().close(); 663 assertEquals(ctx.get(HTTPServerInputStreamContext.HTTP_RESPONSE), ref); 664 if (!oneWay) { 665 awaitPartialResponseReceived(); 666 } 667 } 668 669 if (oneWay) { 670 octx = transport.createOutputStreamContext(ctx); 671 octx.setOneWay(oneWay); 672 transport.finalPrepareOutputStreamContext(octx); 673 octx.getOutputStream().close(); 674 transport.postDispatch(ctx, octx); 675 } 676 677 if (delay > 0) { 679 Thread.sleep(delay); 680 } 681 682 if (!oneWay) { 683 octx = transport.createOutputStreamContext(ctx); 684 octx.setOneWay(oneWay); 685 transport.finalPrepareOutputStreamContext(octx); 686 octx.getOutputStream().write(bytes, 0, total); 687 octx.getOutputStream().flush(); 688 octx.getOutputStream().close(); 689 transport.postDispatch(ctx, octx); 690 } 691 } catch (Exception ex) { 692 ex.printStackTrace(); 693 } 694 } 695 public synchronized Executor getExecutor() { 696 EasyMock.reset(bus); 697 checkBusCreatedEvent(); 698 EasyMock.replay(bus); 699 if (useAutomaticWorkQueue) { 700 if (queueManager == null) { 701 queueManager = new WorkQueueManagerImpl(bus); 702 } 703 return queueManager.getAutomaticWorkQueue(); 704 } else { 705 return null; 706 } 707 } 708 } 709 } 710 | Popular Tags |