1 package org.objectweb.celtix.bus.transports.jms; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.net.URL ; 6 import java.util.List ; 7 import java.util.concurrent.Executor ; 8 9 import javax.wsdl.WSDLException; 10 import javax.xml.namespace.QName ; 11 import javax.xml.ws.handler.MessageContext; 12 13 import junit.framework.Test; 14 import junit.framework.TestCase; 15 import junit.framework.TestSuite; 16 17 import org.objectweb.celtix.Bus; 18 import org.objectweb.celtix.BusException; 19 import org.objectweb.celtix.bus.bindings.TestClientBinding; 20 import org.objectweb.celtix.bus.transports.TransportFactoryManagerImpl; 21 import org.objectweb.celtix.bus.workqueue.WorkQueueManagerImpl; 22 23 import org.objectweb.celtix.configuration.Configuration; 24 import org.objectweb.celtix.configuration.ConfigurationBuilder; 25 import org.objectweb.celtix.configuration.ConfigurationBuilderFactory; 26 import org.objectweb.celtix.configuration.types.ClassNamespaceMappingListType; 27 import org.objectweb.celtix.configuration.types.ClassNamespaceMappingType; 28 import org.objectweb.celtix.configuration.types.ObjectFactory; 29 import org.objectweb.celtix.context.GenericMessageContext; 30 import org.objectweb.celtix.context.InputStreamMessageContext; 31 import org.objectweb.celtix.context.OutputStreamMessageContext; 32 import org.objectweb.celtix.transports.ClientTransport; 33 import org.objectweb.celtix.transports.ServerTransport; 34 import org.objectweb.celtix.transports.ServerTransportCallback; 35 import org.objectweb.celtix.transports.TransportFactory; 36 import org.objectweb.celtix.transports.TransportFactoryManager; 37 import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType; 38 import org.objectweb.celtix.transports.jms.context.JMSPropertyType; 39 import org.objectweb.celtix.ws.addressing.EndpointReferenceType; 40 import org.objectweb.celtix.wsdl.EndpointReferenceUtils; 41 42 public class JMSTransportTest extends TestCase { 43 44 public static final String JMSTRANSPORT_SKIP_RESPONSE = "JMSTransport.skipResponse"; 45 private ServerTransportCallback callback; 46 private ServerTransportCallback callback1; 47 private Bus bus; 48 private String serverRcvdInOneWayCall; 49 private WorkQueueManagerImpl wqm; 50 51 public JMSTransportTest(String arg0) { 52 super(arg0); 53 } 54 55 public static void main(String [] args) { 56 junit.textui.TestRunner.run(JMSTransportTest.suite()); 57 } 58 59 public static Test suite() { 60 TestSuite suite = new TestSuite(JMSTransportTest.class); 61 return new JMSBrokerSetup(suite, "tcp://localhost:61500"); 62 } 63 64 public void setUp() throws Exception { 65 bus = Bus.init(); 66 } 67 68 public void tearDown() throws Exception { 69 if (wqm != null) { 70 wqm.shutdown(true); 71 } 72 if (bus != null) { 73 bus.shutdown(true); 74 } 75 } 76 77 public void testOneWayTextQueueJMSTransport() throws Exception { 78 QName serviceName = new QName ("http://celtix.objectweb.org/hello_world_jms", 79 "HelloWorldOneWayQueueService"); 80 doOneWayTestJMSTranport(false, serviceName, "HelloWorldOneWayQueuePort", 81 "dynamicQueues/test.jmstransport.oneway"); 82 } 83 84 public void testPubSubJMSTransport() throws Exception { 85 QName serviceName = new QName ("http://celtix.objectweb.org/hello_world_jms", 86 "HelloWorldPubSubService"); 87 doOneWayTestJMSTranport(false, serviceName, "HelloWorldPubSubPort", 88 "dynamicTopics/test.jmstransport.oneway.topic"); 89 } 90 91 public void testTwoWayTextQueueJMSTransport() throws Exception { 92 QName serviceName = new QName ("http://celtix.objectweb.org/hello_world_jms", "HelloWorldService"); 93 doTestJMSTransport(false, serviceName, "HelloWorldPort", "dynamicQueues/test.jmstransport.text"); 94 } 95 96 public void testTwoWayBinaryQueueJMSTransport() throws Exception { 97 QName serviceName = new QName ("http://celtix.objectweb.org/hello_world_jms", 98 "HelloWorldQueueBinMsgService"); 99 doTestJMSTransport(false, serviceName, "HelloWorldQueueBinMsgPort", 100 "dynamicQueues/test.jmstransport.binary"); 101 } 102 103 public void test2WayStaticReplyQTextMessageJMSTransport() throws Exception { 104 QName serviceName = 105 new QName ("http://celtix.objectweb.org/hello_world_jms", 106 "HWStaticReplyQTextMsgService"); 107 doTestJMSTransport(false, serviceName, "HWStaticReplyQTextPort", 108 "dynamicQueues/test.jmstransport.text"); 109 } 110 111 private int readBytes(byte bytes[], InputStream ins) throws IOException { 112 int len = ins.read(bytes); 113 int total = 0; 114 while (len != -1) { 115 total += len; 116 len = ins.read(bytes, total, bytes.length - total); 117 } 118 return total; 119 } 120 121 122 public class TestServerTransportCallback implements ServerTransportCallback { 123 boolean useAutomaticWorkQueue; 124 125 public TestServerTransportCallback(boolean useAutoWQ) { 126 useAutomaticWorkQueue = useAutoWQ; 127 } 128 129 public void dispatch(InputStreamMessageContext ctx, ServerTransport transport) { 130 131 try { 132 byte bytes[] = new byte[10000]; 133 if (ctx.containsKey(JMSConstants.JMS_SERVER_HEADERS)) { 134 JMSMessageHeadersType msgHdr = 135 (JMSMessageHeadersType)ctx.get(JMSConstants.JMS_SERVER_HEADERS); 136 if (msgHdr.getProperty().contains(JMSTRANSPORT_SKIP_RESPONSE)) { 137 return; 139 } 140 } 141 142 int total = readBytes(bytes, ctx.getInputStream()); 143 144 JMSOutputStreamContext octx = 145 (JMSOutputStreamContext)transport.createOutputStreamContext(ctx); 146 octx.setOneWay(false); 147 transport.finalPrepareOutputStreamContext(octx); 148 octx.getOutputStream().write(bytes, 0, total); 149 octx.getOutputStream().flush(); 150 151 MessageContext replyCtx = new GenericMessageContext(); 152 ctx.put("ObjectMessageContext.MESSAGE_INPUT", Boolean.TRUE); 153 replyCtx.putAll(ctx); 154 replyCtx.put("ObjectMessageContext.MESSAGE_INPUT", Boolean.TRUE); 155 156 ((JMSServerTransport)transport).postDispatch(replyCtx, octx); 157 octx.getOutputStream().close(); 158 } catch (Exception ex) { 159 } 161 } 162 163 public Executor getExecutor() { 164 if (useAutomaticWorkQueue) { 165 if (wqm == null) { 166 wqm = new WorkQueueManagerImpl(bus); 167 } 168 return wqm.getAutomaticWorkQueue(); 169 } else { 170 return null; 171 } 172 } 173 } 174 175 public void setupCallbackObject(final boolean useAutomaticWorkQueue) { 176 callback = new TestServerTransportCallback(useAutomaticWorkQueue); 177 } 178 179 public void doTestJMSTransport(final boolean useAutomaticWorkQueue, 180 QName serviceName, 181 String portName, 182 String jndiDestinationName) 183 throws Exception { 184 185 String address = "jms:ConnectionFactory#" + jndiDestinationName; 186 URL wsdlUrl = getClass().getResource("/wsdl/jms_test.wsdl"); 187 assertNotNull(wsdlUrl); 188 189 createConfiguration(wsdlUrl, serviceName, portName); 190 TransportFactory factory = createTransportFactory(); 191 192 ServerTransport server = createServerTransport(factory, wsdlUrl, serviceName, 193 portName, address); 194 setupCallbackObject(useAutomaticWorkQueue); 195 196 server.activate(callback); 197 198 ClientTransport client = createClientTransport(factory, wsdlUrl, serviceName, portName); 199 assertTrue("targetEndpoint address mismatch. Expected : " + address 200 + " received : " + client.getTargetEndpoint(), 201 address.equals(client.getTargetEndpoint().getAddress().getValue())); 202 203 OutputStreamMessageContext octx = null; 204 byte outBytes[] = "Hello World!!!".getBytes(); 205 InputStreamMessageContext ictx = doClientInvoke(client, octx, outBytes, false); 206 207 byte bytes[] = new byte[10000]; 208 int len = ictx.getInputStream().read(bytes); 209 assertTrue("Did not read anything " + len, len > 0); 210 assertEquals(new String (outBytes), new String (bytes, 0, len)); 211 212 outBytes = new byte[5000]; 214 for (int x = 0; x < outBytes.length; x++) { 215 outBytes[x] = (byte)('a' + (x % 26)); 216 } 217 218 ictx = doClientInvoke(client, octx, outBytes, false); 219 int total = readBytes(bytes, ictx.getInputStream()); 220 221 assertTrue("Did not read anything " + total, total > 0); 222 assertEquals(new String (outBytes), new String (bytes, 0, total)); 223 224 outBytes = "Hello World!!!".getBytes(); 225 226 server.deactivate(); 227 228 try { 229 ictx = doClientInvoke(client, octx, outBytes, true); 230 len = ictx.getInputStream().read(bytes); 231 232 if (len != -1) { 233 fail("was able to process a message after the servant was deactivated: " + len 234 + " - " + new String (bytes)); 235 } 236 } catch (IOException ex) { 237 } 239 240 server.activate(callback); 241 242 outBytes = "New String and must match with response".getBytes(); 243 ictx = doClientInvoke(client, octx, outBytes, false); 244 len = ictx.getInputStream().read(bytes); 245 assertTrue("Did not read anything " + len, len > 0); 246 assertEquals(new String (outBytes), new String (bytes, 0, len)); 247 server.shutdown(); 248 client.shutdown(); 249 } 250 251 public InputStreamMessageContext doClientInvoke(ClientTransport client, 252 OutputStreamMessageContext octx, 253 byte[] outBytes, 254 boolean insertContextInfo) 255 throws Exception { 256 octx = client.createOutputStreamContext(new GenericMessageContext()); 257 client.finalPrepareOutputStreamContext(octx); 258 octx.getOutputStream().write(outBytes); 259 if (insertContextInfo) { 260 insertContextInfo(octx); 261 } 262 return client.invoke(octx); 263 } 264 265 public void insertContextInfo(OutputStreamMessageContext octx) { 266 JMSMessageHeadersType requestHeader = new JMSMessageHeadersType(); 268 requestHeader.setTimeToLive(100L); 269 List <JMSPropertyType> props = requestHeader.getProperty(); 270 JMSPropertyType skipResponseProperty = new JMSPropertyType(); 271 skipResponseProperty.setName(JMSTRANSPORT_SKIP_RESPONSE); 272 skipResponseProperty.setValue("true"); 273 props.add(skipResponseProperty); 274 octx.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, requestHeader); 275 octx.put(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT, new Long (10)); 276 277 } 278 279 public void setupOneWayCallbackObject(final boolean useAutomaticWorkQueue) { 280 callback1 = new ServerTransportCallback() { 281 public void dispatch(InputStreamMessageContext ctx, ServerTransport transport) { 282 try { 283 byte bytes[] = new byte[10000]; 284 readBytes(bytes, ctx.getInputStream()); 285 286 JMSOutputStreamContext octx = 287 (JMSOutputStreamContext)transport.createOutputStreamContext(ctx); 288 octx.setOneWay(true); 289 transport.finalPrepareOutputStreamContext(octx); 290 serverRcvdInOneWayCall = new String (bytes); 291 292 MessageContext replyCtx = new GenericMessageContext(); 293 ctx.put("ObjectMessageContext.MESSAGE_INPUT", Boolean.TRUE); 294 replyCtx.putAll(ctx); 295 replyCtx.put("ObjectMessageContext.MESSAGE_INPUT", Boolean.TRUE); 296 297 ((JMSServerTransport)transport).postDispatch(replyCtx, octx); 298 octx.getOutputStream().close(); 299 } catch (Exception ex) { 300 ex.printStackTrace(); 301 } 302 } 303 public Executor getExecutor() { 304 if (useAutomaticWorkQueue) { 305 if (wqm == null) { 306 wqm = new WorkQueueManagerImpl(bus); 307 } 308 return wqm.getAutomaticWorkQueue(); 309 } else { 310 return null; 311 } 312 313 } 314 }; 315 } 316 317 public void doOneWayTestJMSTranport(final boolean useAutomaticWorkQueue, 318 QName serviceName, 319 String portName, 320 String jndiDestinationName) 321 throws Exception { 322 323 String address = "jms:ConnectionFactory#" + jndiDestinationName; 324 URL wsdlUrl = getClass().getResource("/wsdl/jms_test.wsdl"); 325 assertNotNull(wsdlUrl); 326 327 createConfiguration(wsdlUrl, serviceName, portName); 328 TransportFactory factory = createTransportFactory(); 329 setupOneWayCallbackObject(useAutomaticWorkQueue); 330 331 ServerTransport server = createServerTransport(factory, wsdlUrl, serviceName, 332 portName, address); 333 334 335 server.activate(callback1); 336 337 ClientTransport client = createClientTransport(factory, wsdlUrl, serviceName, portName); 338 339 assertTrue("targetEndpoint address mismatch. Expected : " + address 340 + " received : " + client.getTargetEndpoint(), 341 address.equals(client.getTargetEndpoint().getAddress().getValue())); 342 OutputStreamMessageContext octx = 343 client.createOutputStreamContext(new GenericMessageContext()); 344 client.finalPrepareOutputStreamContext(octx); 345 byte outBytes[] = "Hello World!!!".getBytes(); 346 octx.getOutputStream().write(outBytes); 347 client.invokeOneway(octx); 348 Thread.sleep(500L); 349 assertEquals(new String (outBytes), 350 serverRcvdInOneWayCall.substring(0, outBytes.length)); 351 352 server.shutdown(); 353 client.shutdown(); 354 } 355 356 private TransportFactory createTransportFactory() throws BusException { 357 String transportId = "http://celtix.objectweb.org/transports/jms"; 358 ObjectFactory of = new ObjectFactory(); 359 ClassNamespaceMappingListType mappings = of.createClassNamespaceMappingListType(); 360 ClassNamespaceMappingType mapping = of.createClassNamespaceMappingType(); 361 mapping.setClassname("org.objectweb.celtix.bus.transports.jms.JMSTransportFactory"); 362 mapping.getNamespace().add(transportId); 363 mappings.getMap().add(mapping); 364 TransportFactoryManager tfm = new TransportFactoryManagerImpl(bus); 365 return tfm.getTransportFactory(transportId); 366 } 367 368 private ClientTransport createClientTransport(TransportFactory factory, URL wsdlUrl, 369 QName serviceName, String portName) 370 throws WSDLException, IOException { 371 EndpointReferenceType ref = EndpointReferenceUtils 372 .getEndpointReference(wsdlUrl, serviceName, portName); 373 ClientTransport transport = 374 factory.createClientTransport(ref, new TestClientBinding(bus, ref)); 375 376 return transport; 377 } 378 379 private ServerTransport createServerTransport(TransportFactory factory, URL wsdlUrl, QName serviceName, 380 String portName, String address) throws WSDLException, 381 IOException { 382 383 EndpointReferenceType ref = EndpointReferenceUtils.getEndpointReference(wsdlUrl, serviceName, 384 portName); 385 EndpointReferenceUtils.setAddress(ref, address); 386 return factory.createServerTransport(ref); 387 } 388 389 394 private void createConfiguration(URL wsdlUrl, QName serviceName, 395 String portName) throws WSDLException, 396 IOException , BusException { 397 assert bus != null; 398 EndpointReferenceType ref = EndpointReferenceUtils.getEndpointReference(wsdlUrl, serviceName, 399 portName); 400 Configuration busCfg = bus.getConfiguration(); 401 assert null != busCfg; 402 String id = EndpointReferenceUtils.getServiceName(ref).toString(); 403 ConfigurationBuilder cb = ConfigurationBuilderFactory.getBuilder(null); 404 cb.buildConfiguration(JMSConstants.ENDPOINT_CONFIGURATION_URI, id, busCfg); 405 cb.buildConfiguration(JMSConstants.PORT_CONFIGURATION_URI, 406 id + "/" + EndpointReferenceUtils.getPortName(ref).toString(), 407 busCfg); 408 } 409 } 410 | Popular Tags |