1 package org.objectweb.celtix.bus.transports.jms; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.net.URL ; 6 import java.util.concurrent.Executor ; 7 8 import javax.jms.DeliveryMode ; 9 import javax.wsdl.Port; 10 import javax.wsdl.WSDLException; 11 import javax.xml.namespace.QName ; 12 import javax.xml.ws.handler.MessageContext; 13 14 import junit.framework.Test; 15 import junit.framework.TestCase; 16 import junit.framework.TestSuite; 17 18 import org.objectweb.celtix.Bus; 19 import org.objectweb.celtix.BusException; 20 import org.objectweb.celtix.bindings.ClientBinding; 21 import org.objectweb.celtix.bus.workqueue.WorkQueueManagerImpl; 22 23 import org.objectweb.celtix.configuration.Configuration; 24 import org.objectweb.celtix.configuration.ConfigurationBuilder; 25 import org.objectweb.celtix.configuration.ConfigurationBuilderFactory; 26 27 import org.objectweb.celtix.context.GenericMessageContext; 28 import org.objectweb.celtix.context.InputStreamMessageContext; 29 import org.objectweb.celtix.context.OutputStreamMessageContext; 30 import org.objectweb.celtix.transports.ServerTransport; 31 import org.objectweb.celtix.transports.ServerTransportCallback; 32 33 import org.objectweb.celtix.transports.jms.JMSAddressPolicyType; 34 import org.objectweb.celtix.transports.jms.JMSClientBehaviorPolicyType; 35 import org.objectweb.celtix.transports.jms.JMSServerBehaviorPolicyType; 36 import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType; 37 38 import org.objectweb.celtix.ws.addressing.EndpointReferenceType; 39 import org.objectweb.celtix.wsdl.EndpointReferenceUtils; 40 import org.objectweb.celtix.wsdl.JAXBExtensionHelper; 41 42 public class JMSContextTest extends TestCase { 43 44 public static final String TEST_CORRELATION_ID = "TestCorrelationId"; 45 private ServerTransportCallback callback; 46 private Bus bus; 47 private WorkQueueManagerImpl wqm; 48 49 public JMSContextTest(String arg0) { 50 super(arg0); 51 } 52 53 public static void main(String [] args) { 54 junit.textui.TestRunner.run(JMSContextTest.suite()); 55 } 56 57 public static Test suite() { 58 TestSuite suite = new TestSuite(JMSContextTest.class); 59 return new JMSBrokerSetup(suite, "tcp://localhost:61500"); 60 } 61 62 public void setUp() throws Exception { 63 bus = Bus.init(); 64 JAXBExtensionHelper.addExtensions(bus.getWSDLManager().getExtenstionRegistry(), 65 Port.class, 66 JMSAddressPolicyType.class); 67 JAXBExtensionHelper.addExtensions(bus.getWSDLManager().getExtenstionRegistry(), 68 Port.class, 69 JMSServerBehaviorPolicyType.class); 70 JAXBExtensionHelper.addExtensions(bus.getWSDLManager().getExtenstionRegistry(), 71 Port.class, 72 JMSClientBehaviorPolicyType.class); 73 } 74 75 public void tearDown() throws Exception { 76 if (wqm != null) { 77 wqm.shutdown(true); 78 } 79 if (bus != null) { 80 bus.shutdown(true); 81 } 82 } 83 84 public void testTwoWayTextQueueJMSTransport() throws Exception { 85 QName serviceName = new QName ("http://celtix.objectweb.org/hello_world_jms", "HelloWorldService"); 86 doTestJMSTransport(false, serviceName, "HelloWorldPort", "/wsdl/jms_test.wsdl"); 87 } 88 89 private int readBytes(byte bytes[], InputStream ins) throws IOException { 90 int len = ins.read(bytes); 91 int total = 0; 92 while (len != -1) { 93 total += len; 94 len = ins.read(bytes, total, bytes.length - total); 95 } 96 return total; 97 } 98 99 public class TestServerTransportCallback implements ServerTransportCallback { 100 boolean useAutomaticWorkQueue; 101 102 public TestServerTransportCallback(boolean useAutoWQ) { 103 useAutomaticWorkQueue = useAutoWQ; 104 } 105 106 public void dispatch(InputStreamMessageContext ctx, ServerTransport transport) { 107 108 try { 109 byte bytes[] = new byte[10000]; 110 111 int total = readBytes(bytes, ctx.getInputStream()); 112 113 JMSOutputStreamContext octx = 114 (JMSOutputStreamContext)transport.createOutputStreamContext(ctx); 115 octx.setOneWay(false); 116 transport.finalPrepareOutputStreamContext(octx); 117 octx.getOutputStream().write(bytes, 0, total); 118 octx.getOutputStream().flush(); 119 120 MessageContext replyCtx = new GenericMessageContext(); 121 ctx.put("ObjectMessageContext.MESSAGE_INPUT", Boolean.TRUE); 122 replyCtx.putAll(ctx); 123 replyCtx.put("ObjectMessageContext.MESSAGE_INPUT", Boolean.TRUE); 124 125 ((JMSServerTransport)transport).postDispatch(replyCtx, octx); 126 octx.getOutputStream().close(); 127 } catch (Exception ex) { 128 } 130 } 131 132 public Executor getExecutor() { 133 if (useAutomaticWorkQueue) { 134 if (wqm == null) { 135 wqm = new WorkQueueManagerImpl(bus); 136 } 137 return wqm.getAutomaticWorkQueue(); 138 } else { 139 return null; 140 } 141 } 142 } 143 144 public void setupCallbackObject(final boolean useAutomaticWorkQueue) { 145 callback = new TestServerTransportCallback(useAutomaticWorkQueue); 146 } 147 148 149 public void doTestJMSTransport(final boolean useAutomaticWorkQueue, 150 QName serviceName, 151 String portName, 152 String testWsdlFileName) 153 throws Exception { 154 155 String address = "http://localhost:9000/SoapContext/SoapPort"; 156 URL wsdlUrl = getClass().getResource(testWsdlFileName); 157 assertNotNull(wsdlUrl); 158 159 createConfiguration(wsdlUrl, serviceName, portName); 160 161 ServerTransport server = createServerTransport(wsdlUrl, serviceName, 162 portName, address); 163 setupCallbackObject(useAutomaticWorkQueue); 164 165 server.activate(callback); 166 167 TestJMSClientTransport client = createClientTransport(wsdlUrl, serviceName, portName, address); 168 OutputStreamMessageContext octx = 169 client.createOutputStreamContext(new GenericMessageContext()); 170 171 setRequestContextHeader(octx); 172 173 client.finalPrepareOutputStreamContext(octx); 174 175 byte outBytes[] = "Hello World!!!".getBytes(); 176 octx.getOutputStream().write(outBytes); 177 178 checkContextHeader(client.getContext(), JMSConstants.JMS_CLIENT_REQUEST_HEADERS); 180 InputStreamMessageContext ictx = client.invoke(octx); 181 byte bytes[] = new byte[10000]; 182 int len = ictx.getInputStream().read(bytes); 183 assertTrue("Did not read anything " + len, len > 0); 184 assertEquals(new String (outBytes), new String (bytes, 0, len)); 185 186 checkResponseContextHeader(ictx); 187 188 server.shutdown(); 189 client.shutdown(); 190 } 191 192 public void checkResponseContextHeader(MessageContext ctx) { 193 assertTrue("JMSContext should contain the property " + JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, 194 null != ctx.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS)); 195 196 JMSMessageHeadersType responseHeader = (JMSMessageHeadersType) 197 ctx.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); 198 assertTrue("JMSHeader correlation id mismatch: expected " + TEST_CORRELATION_ID, 199 TEST_CORRELATION_ID.equals(responseHeader.getJMSCorrelationID())); 200 assertTrue("JMSRedelivered should be false", 201 !responseHeader.isJMSRedelivered()); 202 } 203 204 public void setRequestContextHeader(OutputStreamMessageContext octx) { 205 JMSMessageHeadersType requestHeader = new JMSMessageHeadersType(); 206 requestHeader.setJMSCorrelationID(TEST_CORRELATION_ID); 207 requestHeader.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); 208 requestHeader.setJMSExpiration(3600000L); 209 requestHeader.setJMSPriority(6); 210 requestHeader.setTimeToLive(3600000L); 211 212 octx.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, requestHeader); 213 } 214 215 public void checkContextHeader(MessageContext ctx, String headerName) { 216 217 assertTrue("JMSContext should contain the property " + headerName, 218 null != ctx.get(headerName)); 219 220 JMSMessageHeadersType reqHdr = (JMSMessageHeadersType) 221 ctx.get(headerName); 222 assertTrue("JMSHeader correlation id mismatch: expected " + TEST_CORRELATION_ID, 223 TEST_CORRELATION_ID.equals(reqHdr.getJMSCorrelationID())); 224 assertTrue("JMSRedelivered should be false", 225 !reqHdr.isSetJMSRedelivered()); 226 assertTrue("JMS priority should be 6", reqHdr.getJMSPriority() == 6); 227 assertTrue("JMS timetolive should be greater than 0 ", reqHdr.getTimeToLive() > 0); 228 } 229 230 private TestJMSClientTransport createClientTransport(URL wsdlUrl, 231 QName serviceName, String portName, 232 String address) throws WSDLException, IOException { 233 EndpointReferenceType ref = EndpointReferenceUtils 234 .getEndpointReference(wsdlUrl, serviceName, portName); 235 return new TestJMSClientTransport(bus, ref, null); 236 } 237 238 private ServerTransport createServerTransport(URL wsdlUrl, QName serviceName, 239 String portName, String address) throws WSDLException, 240 IOException { 241 242 EndpointReferenceType ref = EndpointReferenceUtils.getEndpointReference(wsdlUrl, serviceName, 243 portName); 244 EndpointReferenceUtils.setAddress(ref, address); 245 return new JMSServerTransport(bus, ref); 246 } 247 248 253 private void createConfiguration(URL wsdlUrl, QName serviceName, 254 String portName) throws WSDLException, 255 IOException , BusException { 256 assert bus != null; 257 EndpointReferenceType ref = EndpointReferenceUtils.getEndpointReference(wsdlUrl, serviceName, 258 portName); 259 Configuration busCfg = bus.getConfiguration(); 260 assert null != busCfg; 261 262 String id = EndpointReferenceUtils.getServiceName(ref).toString(); 263 ConfigurationBuilder cb = ConfigurationBuilderFactory.getBuilder(null); 264 cb.buildConfiguration(JMSConstants.ENDPOINT_CONFIGURATION_URI, id, busCfg); 265 cb.buildConfiguration(JMSConstants.PORT_CONFIGURATION_URI, 266 id + "/" + EndpointReferenceUtils.getPortName(ref).toString(), 267 busCfg); 268 } 269 270 public class TestJMSClientTransport extends JMSClientTransport { 271 272 private MessageContext localContext; 273 274 public TestJMSClientTransport(Bus theBus, 275 EndpointReferenceType address, 276 ClientBinding binding) 277 throws WSDLException, IOException { 278 super(theBus, address, binding); 279 } 280 281 @Override 282 public OutputStreamMessageContext createOutputStreamContext(MessageContext context) 283 throws IOException { 284 localContext = context; 285 return new JMSOutputStreamContext(context); 286 } 287 288 public MessageContext getContext() { 289 return localContext; 290 } 291 292 } 293 294 } 295 | Popular Tags |