KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > transports > jms > JMSTransportTest


1 package org.objectweb.celtix.bus.transports.jms;
2
3 import java.io.IOException JavaDoc;
4 import java.io.InputStream JavaDoc;
5 import java.net.URL JavaDoc;
6 import java.util.List JavaDoc;
7 import java.util.concurrent.Executor JavaDoc;
8
9 import javax.wsdl.WSDLException;
10 import javax.xml.namespace.QName JavaDoc;
11 import javax.xml.ws.handler.MessageContext;
12
13 import junit.framework.Test;
14 import junit.framework.TestCase;
15 import junit.framework.TestSuite;
16
17 import org.objectweb.celtix.Bus;
18 import org.objectweb.celtix.BusException;
19 import org.objectweb.celtix.bus.bindings.TestClientBinding;
20 import org.objectweb.celtix.bus.transports.TransportFactoryManagerImpl;
21 import org.objectweb.celtix.bus.workqueue.WorkQueueManagerImpl;
22
23 import org.objectweb.celtix.configuration.Configuration;
24 import org.objectweb.celtix.configuration.ConfigurationBuilder;
25 import org.objectweb.celtix.configuration.ConfigurationBuilderFactory;
26 import org.objectweb.celtix.configuration.types.ClassNamespaceMappingListType;
27 import org.objectweb.celtix.configuration.types.ClassNamespaceMappingType;
28 import org.objectweb.celtix.configuration.types.ObjectFactory;
29 import org.objectweb.celtix.context.GenericMessageContext;
30 import org.objectweb.celtix.context.InputStreamMessageContext;
31 import org.objectweb.celtix.context.OutputStreamMessageContext;
32 import org.objectweb.celtix.transports.ClientTransport;
33 import org.objectweb.celtix.transports.ServerTransport;
34 import org.objectweb.celtix.transports.ServerTransportCallback;
35 import org.objectweb.celtix.transports.TransportFactory;
36 import org.objectweb.celtix.transports.TransportFactoryManager;
37 import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType;
38 import org.objectweb.celtix.transports.jms.context.JMSPropertyType;
39 import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
40 import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
41
42 public class JMSTransportTest extends TestCase {
43
44     public static final String JavaDoc JMSTRANSPORT_SKIP_RESPONSE = "JMSTransport.skipResponse";
45     private ServerTransportCallback callback;
46     private ServerTransportCallback callback1;
47     private Bus bus;
48     private String JavaDoc serverRcvdInOneWayCall;
49     private WorkQueueManagerImpl wqm;
50
51     public JMSTransportTest(String JavaDoc arg0) {
52         super(arg0);
53     }
54
55     public static void main(String JavaDoc[] args) {
56         junit.textui.TestRunner.run(JMSTransportTest.suite());
57     }
58
59     public static Test suite() {
60         TestSuite suite = new TestSuite(JMSTransportTest.class);
61         return new JMSBrokerSetup(suite, "tcp://localhost:61500");
62     }
63
64     public void setUp() throws Exception JavaDoc {
65         bus = Bus.init();
66     }
67
68     public void tearDown() throws Exception JavaDoc {
69         if (wqm != null) {
70             wqm.shutdown(true);
71         }
72         if (bus != null) {
73             bus.shutdown(true);
74         }
75     }
76
77     public void testOneWayTextQueueJMSTransport() throws Exception JavaDoc {
78         QName JavaDoc serviceName = new QName JavaDoc("http://celtix.objectweb.org/hello_world_jms",
79                                                            "HelloWorldOneWayQueueService");
80         doOneWayTestJMSTranport(false, serviceName, "HelloWorldOneWayQueuePort",
81                                     "dynamicQueues/test.jmstransport.oneway");
82     }
83
84     public void testPubSubJMSTransport() throws Exception JavaDoc {
85         QName JavaDoc serviceName = new QName JavaDoc("http://celtix.objectweb.org/hello_world_jms",
86                                                            "HelloWorldPubSubService");
87         doOneWayTestJMSTranport(false, serviceName, "HelloWorldPubSubPort",
88                                                "dynamicTopics/test.jmstransport.oneway.topic");
89     }
90
91     public void testTwoWayTextQueueJMSTransport() throws Exception JavaDoc {
92         QName JavaDoc serviceName = new QName JavaDoc("http://celtix.objectweb.org/hello_world_jms", "HelloWorldService");
93         doTestJMSTransport(false, serviceName, "HelloWorldPort", "dynamicQueues/test.jmstransport.text");
94     }
95
96     public void testTwoWayBinaryQueueJMSTransport() throws Exception JavaDoc {
97         QName JavaDoc serviceName = new QName JavaDoc("http://celtix.objectweb.org/hello_world_jms",
98                                                            "HelloWorldQueueBinMsgService");
99         doTestJMSTransport(false, serviceName, "HelloWorldQueueBinMsgPort",
100                            "dynamicQueues/test.jmstransport.binary");
101     }
102
103     public void test2WayStaticReplyQTextMessageJMSTransport() throws Exception JavaDoc {
104         QName JavaDoc serviceName =
105             new QName JavaDoc("http://celtix.objectweb.org/hello_world_jms",
106                                      "HWStaticReplyQTextMsgService");
107         doTestJMSTransport(false, serviceName, "HWStaticReplyQTextPort",
108                            "dynamicQueues/test.jmstransport.text");
109     }
110
111     private int readBytes(byte bytes[], InputStream JavaDoc ins) throws IOException JavaDoc {
112         int len = ins.read(bytes);
113         int total = 0;
114         while (len != -1) {
115             total += len;
116             len = ins.read(bytes, total, bytes.length - total);
117         }
118         return total;
119     }
120
121
122     public class TestServerTransportCallback implements ServerTransportCallback {
123         boolean useAutomaticWorkQueue;
124
125         public TestServerTransportCallback(boolean useAutoWQ) {
126             useAutomaticWorkQueue = useAutoWQ;
127         }
128
129         public void dispatch(InputStreamMessageContext ctx, ServerTransport transport) {
130
131             try {
132                 byte bytes[] = new byte[10000];
133                 if (ctx.containsKey(JMSConstants.JMS_SERVER_HEADERS)) {
134                     JMSMessageHeadersType msgHdr =
135                         (JMSMessageHeadersType)ctx.get(JMSConstants.JMS_SERVER_HEADERS);
136                     if (msgHdr.getProperty().contains(JMSTRANSPORT_SKIP_RESPONSE)) {
137                         //no need to process the response.
138
return;
139                     }
140                 }
141
142                 int total = readBytes(bytes, ctx.getInputStream());
143
144                 JMSOutputStreamContext octx =
145                     (JMSOutputStreamContext)transport.createOutputStreamContext(ctx);
146                 octx.setOneWay(false);
147                 transport.finalPrepareOutputStreamContext(octx);
148                 octx.getOutputStream().write(bytes, 0, total);
149                 octx.getOutputStream().flush();
150
151                 MessageContext replyCtx = new GenericMessageContext();
152                 ctx.put("ObjectMessageContext.MESSAGE_INPUT", Boolean.TRUE);
153                 replyCtx.putAll(ctx);
154                 replyCtx.put("ObjectMessageContext.MESSAGE_INPUT", Boolean.TRUE);
155
156                 ((JMSServerTransport)transport).postDispatch(replyCtx, octx);
157                 octx.getOutputStream().close();
158             } catch (Exception JavaDoc ex) {
159              //
160
}
161         }
162
163         public Executor JavaDoc getExecutor() {
164             if (useAutomaticWorkQueue) {
165                 if (wqm == null) {
166                     wqm = new WorkQueueManagerImpl(bus);
167                 }
168                 return wqm.getAutomaticWorkQueue();
169             } else {
170                 return null;
171             }
172         }
173     }
174
175     public void setupCallbackObject(final boolean useAutomaticWorkQueue) {
176         callback = new TestServerTransportCallback(useAutomaticWorkQueue);
177     }
178
179     public void doTestJMSTransport(final boolean useAutomaticWorkQueue,
180                         QName JavaDoc serviceName,
181                         String JavaDoc portName,
182                         String JavaDoc jndiDestinationName)
183         throws Exception JavaDoc {
184
185         String JavaDoc address = "jms:ConnectionFactory#" + jndiDestinationName;
186         URL JavaDoc wsdlUrl = getClass().getResource("/wsdl/jms_test.wsdl");
187         assertNotNull(wsdlUrl);
188
189         createConfiguration(wsdlUrl, serviceName, portName);
190         TransportFactory factory = createTransportFactory();
191
192         ServerTransport server = createServerTransport(factory, wsdlUrl, serviceName,
193                                                        portName, address);
194         setupCallbackObject(useAutomaticWorkQueue);
195
196         server.activate(callback);
197
198         ClientTransport client = createClientTransport(factory, wsdlUrl, serviceName, portName);
199         assertTrue("targetEndpoint address mismatch. Expected : " + address
200                    + " received : " + client.getTargetEndpoint(),
201                    address.equals(client.getTargetEndpoint().getAddress().getValue()));
202
203         OutputStreamMessageContext octx = null;
204         byte outBytes[] = "Hello World!!!".getBytes();
205         InputStreamMessageContext ictx = doClientInvoke(client, octx, outBytes, false);
206
207         byte bytes[] = new byte[10000];
208         int len = ictx.getInputStream().read(bytes);
209         assertTrue("Did not read anything " + len, len > 0);
210         assertEquals(new String JavaDoc(outBytes), new String JavaDoc(bytes, 0, len));
211
212         //long request
213
outBytes = new byte[5000];
214         for (int x = 0; x < outBytes.length; x++) {
215             outBytes[x] = (byte)('a' + (x % 26));
216         }
217
218         ictx = doClientInvoke(client, octx, outBytes, false);
219         int total = readBytes(bytes, ictx.getInputStream());
220
221         assertTrue("Did not read anything " + total, total > 0);
222         assertEquals(new String JavaDoc(outBytes), new String JavaDoc(bytes, 0, total));
223
224         outBytes = "Hello World!!!".getBytes();
225
226         server.deactivate();
227
228         try {
229             ictx = doClientInvoke(client, octx, outBytes, true);
230             len = ictx.getInputStream().read(bytes);
231
232             if (len != -1) {
233                 fail("was able to process a message after the servant was deactivated: " + len
234                      + " - " + new String JavaDoc(bytes));
235             }
236         } catch (IOException JavaDoc ex) {
237             //ignore - this is what we want
238
}
239
240         server.activate(callback);
241
242         outBytes = "New String and must match with response".getBytes();
243         ictx = doClientInvoke(client, octx, outBytes, false);
244         len = ictx.getInputStream().read(bytes);
245         assertTrue("Did not read anything " + len, len > 0);
246         assertEquals(new String JavaDoc(outBytes), new String JavaDoc(bytes, 0, len));
247         server.shutdown();
248         client.shutdown();
249     }
250
251     public InputStreamMessageContext doClientInvoke(ClientTransport client,
252                                                     OutputStreamMessageContext octx,
253                                                     byte[] outBytes,
254                                                     boolean insertContextInfo)
255         throws Exception JavaDoc {
256         octx = client.createOutputStreamContext(new GenericMessageContext());
257         client.finalPrepareOutputStreamContext(octx);
258         octx.getOutputStream().write(outBytes);
259         if (insertContextInfo) {
260             insertContextInfo(octx);
261         }
262         return client.invoke(octx);
263     }
264
265     public void insertContextInfo(OutputStreamMessageContext octx) {
266         //Set time to live and default receive timeout so as to timeout the client
267
JMSMessageHeadersType requestHeader = new JMSMessageHeadersType();
268         requestHeader.setTimeToLive(100L);
269         List JavaDoc<JMSPropertyType> props = requestHeader.getProperty();
270         JMSPropertyType skipResponseProperty = new JMSPropertyType();
271         skipResponseProperty.setName(JMSTRANSPORT_SKIP_RESPONSE);
272         skipResponseProperty.setValue("true");
273         props.add(skipResponseProperty);
274         octx.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, requestHeader);
275         octx.put(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT, new Long JavaDoc(10));
276
277     }
278
279     public void setupOneWayCallbackObject(final boolean useAutomaticWorkQueue) {
280         callback1 = new ServerTransportCallback() {
281             public void dispatch(InputStreamMessageContext ctx, ServerTransport transport) {
282                 try {
283                     byte bytes[] = new byte[10000];
284                     readBytes(bytes, ctx.getInputStream());
285
286                     JMSOutputStreamContext octx =
287                         (JMSOutputStreamContext)transport.createOutputStreamContext(ctx);
288                     octx.setOneWay(true);
289                     transport.finalPrepareOutputStreamContext(octx);
290                     serverRcvdInOneWayCall = new String JavaDoc(bytes);
291
292                     MessageContext replyCtx = new GenericMessageContext();
293                     ctx.put("ObjectMessageContext.MESSAGE_INPUT", Boolean.TRUE);
294                     replyCtx.putAll(ctx);
295                     replyCtx.put("ObjectMessageContext.MESSAGE_INPUT", Boolean.TRUE);
296
297                     ((JMSServerTransport)transport).postDispatch(replyCtx, octx);
298                     octx.getOutputStream().close();
299                 } catch (Exception JavaDoc ex) {
300                     ex.printStackTrace();
301                 }
302             }
303             public Executor JavaDoc getExecutor() {
304                 if (useAutomaticWorkQueue) {
305                     if (wqm == null) {
306                         wqm = new WorkQueueManagerImpl(bus);
307                     }
308                     return wqm.getAutomaticWorkQueue();
309                 } else {
310                     return null;
311                 }
312
313             }
314         };
315     }
316
317     public void doOneWayTestJMSTranport(final boolean useAutomaticWorkQueue,
318                                                             QName JavaDoc serviceName,
319                                                              String JavaDoc portName,
320                                                              String JavaDoc jndiDestinationName)
321         throws Exception JavaDoc {
322
323         String JavaDoc address = "jms:ConnectionFactory#" + jndiDestinationName;
324         URL JavaDoc wsdlUrl = getClass().getResource("/wsdl/jms_test.wsdl");
325         assertNotNull(wsdlUrl);
326
327         createConfiguration(wsdlUrl, serviceName, portName);
328         TransportFactory factory = createTransportFactory();
329         setupOneWayCallbackObject(useAutomaticWorkQueue);
330
331         ServerTransport server = createServerTransport(factory, wsdlUrl, serviceName,
332                                                        portName, address);
333
334
335         server.activate(callback1);
336
337         ClientTransport client = createClientTransport(factory, wsdlUrl, serviceName, portName);
338         
339         assertTrue("targetEndpoint address mismatch. Expected : " + address
340                    + " received : " + client.getTargetEndpoint(),
341                    address.equals(client.getTargetEndpoint().getAddress().getValue()));
342         OutputStreamMessageContext octx =
343             client.createOutputStreamContext(new GenericMessageContext());
344         client.finalPrepareOutputStreamContext(octx);
345         byte outBytes[] = "Hello World!!!".getBytes();
346         octx.getOutputStream().write(outBytes);
347         client.invokeOneway(octx);
348         Thread.sleep(500L);
349         assertEquals(new String JavaDoc(outBytes),
350                           serverRcvdInOneWayCall.substring(0, outBytes.length));
351
352         server.shutdown();
353         client.shutdown();
354     }
355
356     private TransportFactory createTransportFactory() throws BusException {
357         String JavaDoc transportId = "http://celtix.objectweb.org/transports/jms";
358         ObjectFactory of = new ObjectFactory();
359         ClassNamespaceMappingListType mappings = of.createClassNamespaceMappingListType();
360         ClassNamespaceMappingType mapping = of.createClassNamespaceMappingType();
361         mapping.setClassname("org.objectweb.celtix.bus.transports.jms.JMSTransportFactory");
362         mapping.getNamespace().add(transportId);
363         mappings.getMap().add(mapping);
364         TransportFactoryManager tfm = new TransportFactoryManagerImpl(bus);
365         return tfm.getTransportFactory(transportId);
366     }
367
368     private ClientTransport createClientTransport(TransportFactory factory, URL JavaDoc wsdlUrl,
369                                                   QName JavaDoc serviceName, String JavaDoc portName)
370         throws WSDLException, IOException JavaDoc {
371         EndpointReferenceType ref = EndpointReferenceUtils
372             .getEndpointReference(wsdlUrl, serviceName, portName);
373         ClientTransport transport =
374             factory.createClientTransport(ref, new TestClientBinding(bus, ref));
375
376         return transport;
377     }
378
379     private ServerTransport createServerTransport(TransportFactory factory, URL JavaDoc wsdlUrl, QName JavaDoc serviceName,
380                                                   String JavaDoc portName, String JavaDoc address) throws WSDLException,
381         IOException JavaDoc {
382
383         EndpointReferenceType ref = EndpointReferenceUtils.getEndpointReference(wsdlUrl, serviceName,
384                                                                                 portName);
385         EndpointReferenceUtils.setAddress(ref, address);
386         return factory.createServerTransport(ref);
387     }
388
389     // Create bus and populate the configuration for Endpoint, Service and port.
390
// This test uses all the info. either coming from WSDL or default and do not use
391
// any configuration files.
392
//
393

394     private void createConfiguration(URL JavaDoc wsdlUrl, QName JavaDoc serviceName,
395                           String JavaDoc portName) throws WSDLException,
396                           IOException JavaDoc, BusException {
397         assert bus != null;
398         EndpointReferenceType ref = EndpointReferenceUtils.getEndpointReference(wsdlUrl, serviceName,
399                                                                                 portName);
400         Configuration busCfg = bus.getConfiguration();
401         assert null != busCfg;
402         String JavaDoc id = EndpointReferenceUtils.getServiceName(ref).toString();
403         ConfigurationBuilder cb = ConfigurationBuilderFactory.getBuilder(null);
404         cb.buildConfiguration(JMSConstants.ENDPOINT_CONFIGURATION_URI, id, busCfg);
405         cb.buildConfiguration(JMSConstants.PORT_CONFIGURATION_URI,
406                               id + "/" + EndpointReferenceUtils.getPortName(ref).toString(),
407                               busCfg);
408     }
409 }
410
Popular Tags