1 package org.objectweb.celtix.bus.transports.jms; 2 3 import java.io.ByteArrayInputStream ; 4 import java.io.ByteArrayOutputStream ; 5 import java.io.IOException ; 6 import java.util.concurrent.Executor ; 7 import java.util.concurrent.Future ; 8 import java.util.logging.Level ; 9 import java.util.logging.Logger ; 10 11 import javax.jms.Destination ; 12 import javax.jms.JMSException ; 13 import javax.jms.Message ; 14 import javax.jms.Queue ; 15 import javax.jms.QueueSender ; 16 import javax.jms.TextMessage ; 17 import javax.jms.Topic ; 18 import javax.jms.TopicPublisher ; 19 import javax.naming.NamingException ; 20 import javax.wsdl.Port; 21 import javax.wsdl.WSDLException; 22 import javax.xml.ws.handler.MessageContext; 23 24 import org.objectweb.celtix.Bus; 25 import org.objectweb.celtix.bindings.ClientBinding; 26 import org.objectweb.celtix.bindings.ResponseCallback; 27 import org.objectweb.celtix.bus.management.counters.TransportClientCounters; 28 import org.objectweb.celtix.common.logging.LogUtils; 29 import org.objectweb.celtix.configuration.Configuration; 30 31 import org.objectweb.celtix.context.InputStreamMessageContext; 32 import org.objectweb.celtix.context.OutputStreamMessageContext; 33 import org.objectweb.celtix.transports.ClientTransport; 34 import org.objectweb.celtix.transports.jms.JMSClientBehaviorPolicyType; 35 import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType; 36 import org.objectweb.celtix.ws.addressing.EndpointReferenceType; 37 import org.objectweb.celtix.wsdl.EndpointReferenceUtils; 38 39 40 41 public class JMSClientTransport extends JMSTransportBase implements ClientTransport { 42 43 private static final Logger LOG = LogUtils.getL7dLogger(JMSClientTransport.class); 44 private static final long DEFAULT_RECEIVE_TIMEOUT = 0; 45 46 protected boolean textPayload; 47 TransportClientCounters counters; 48 private JMSClientBehaviorPolicyType clientBehaviourPolicy; 49 private ResponseCallback responseCallback; 50 51 public JMSClientTransport(Bus bus, 52 EndpointReferenceType address, 53 ClientBinding binding) 54 throws WSDLException, IOException { 55 56 super(bus, address, false); 57 clientBehaviourPolicy = getClientPolicy(configuration); 58 counters = new TransportClientCounters("JMSClientTransport"); 59 60 EndpointReferenceUtils.setAddress(address, getAddrUriFromJMSAddrPolicy()); 61 targetEndpoint = address; 62 63 textPayload = 64 JMSConstants.TEXT_MESSAGE_TYPE.equals(clientBehaviourPolicy.getMessageType().value()); 65 66 LOG.log(Level.FINE, "TEXT_MESSAGE_TYPE: " , textPayload); 67 LOG.log(Level.FINE, "QUEUE_DESTINATION_STYLE: " , queueDestinationStyle); 68 if (binding != null) { 69 responseCallback = binding.createResponseCallback(); 70 } 71 entry("JMSClientTransport Constructor"); 72 } 73 74 private JMSClientBehaviorPolicyType getClientPolicy(Configuration conf) { 75 JMSClientBehaviorPolicyType pol = conf.getObject(JMSClientBehaviorPolicyType.class, "jmsClient"); 76 if (pol == null) { 77 pol = new JMSClientBehaviorPolicyType(); 78 } 79 return pol; 80 } 81 82 public JMSClientBehaviorPolicyType getJMSClientBehaviourPolicy() { 83 84 return clientBehaviourPolicy; 85 } 86 87 89 public void shutdown() { 90 entry("JMSClientTransport shutdown()"); 91 92 if (sessionFactory != null) { 95 sessionFactory.shutdown(); 96 } 97 } 98 99 public EndpointReferenceType getTargetEndpoint() { 100 return targetEndpoint; 101 } 102 103 public EndpointReferenceType getDecoupledEndpoint() throws IOException { 104 105 if (jmsAddressPolicy.getJndiReplyDestinationName() != null) { 106 EndpointReferenceType epr = new EndpointReferenceType(); 107 EndpointReferenceUtils.setAddress(epr, getReplyTotAddrUriFromJMSAddrPolicy()); 108 return epr; 109 } 110 111 return null; 112 } 113 114 public Port getPort() { 115 return port; 116 } 117 118 public OutputStreamMessageContext createOutputStreamContext(MessageContext context) throws IOException { 119 return new JMSOutputStreamContext(context); 120 } 121 122 123 public void finalPrepareOutputStreamContext(OutputStreamMessageContext context) throws IOException { 124 } 125 126 public InputStreamMessageContext invoke(OutputStreamMessageContext context) 127 throws IOException { 128 129 if (!queueDestinationStyle) { 130 LOG.log(Level.WARNING, "Non-oneway invocations not supported for JMS Topics"); 131 throw new IOException ("Non-oneway invocations not supported for JMS Topics"); 132 } 133 134 try { 135 byte[] responseData = null; 136 if (textPayload) { 137 String responseString = (String )invoke(context, true); 138 responseData = responseString.getBytes(); 139 } else { 140 responseData = (byte[])invoke(context, true); 141 } 142 counters.getInvoke().increase(); 143 JMSInputStreamContext respContext = 144 new JMSInputStreamContext(new ByteArrayInputStream (responseData)); 145 146 if (context.containsKey(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS)) { 147 JMSMessageHeadersType responseHdr = 148 (JMSMessageHeadersType)context.remove(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); 149 respContext.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, responseHdr); 150 respContext.setScope(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, 151 MessageContext.Scope.APPLICATION); 152 } 153 154 return respContext; 155 } catch (Exception ex) { 156 counters.getInvokeError().increase(); 158 throw new IOException (ex.getMessage()); 159 } 160 } 161 162 163 168 public void invokeOneway(OutputStreamMessageContext context) throws IOException { 169 try { 170 invoke(context, false); 171 counters.getInvokeOneWay(); 172 } catch (Exception ex) { 173 counters.getInvokeError().increase(); 174 throw new IOException (ex.getMessage()); 175 } 176 } 177 178 public Future <InputStreamMessageContext> invokeAsync(OutputStreamMessageContext context, 179 Executor executor) 180 throws IOException { 181 return null; 182 } 183 184 public ResponseCallback getResponseCallback() { 185 return responseCallback; 186 } 187 188 195 private Object invoke(OutputStreamMessageContext context, boolean responseExpected) 196 throws JMSException , NamingException { 197 entry("JMSClientTransport invoke()"); 198 199 try { 200 if (null == sessionFactory) { 201 JMSProviderHub.connect(this); 202 } 203 } catch (JMSException ex) { 204 LOG.log(Level.FINE, "JMS connect failed with JMSException : ", ex); 205 throw ex; 206 } catch (NamingException e) { 207 LOG.log(Level.FINE, "JMS connect failed with NamingException : ", e); 208 throw e; 209 } 210 211 if (sessionFactory == null) { 212 throw new java.lang.IllegalStateException ("JMSClientTransport not connected"); 213 } 214 215 PooledSession pooledSession = sessionFactory.get(responseExpected); 216 send(pooledSession, context, responseExpected); 217 218 Object response = null; 219 220 if (responseExpected) { 221 response = receive(pooledSession, context); 222 } 223 224 sessionFactory.recycle(pooledSession); 225 226 return response; 227 } 228 229 230 236 private void send(PooledSession pooledSession, 237 OutputStreamMessageContext context, 238 boolean responseExpected) 239 throws JMSException { 240 Object request; 241 242 if (textPayload) { 243 request = context.getOutputStream().toString(); 244 } else { 245 request = ((ByteArrayOutputStream )context.getOutputStream()).toByteArray(); 246 } 247 248 Destination replyTo = pooledSession.destination(); 249 250 if (!responseExpected 253 && (jmsAddressPolicy.getJndiReplyDestinationName() == null)) { 254 replyTo = null; 255 } 256 257 Message message = marshal(request, pooledSession.session(), replyTo, 258 clientBehaviourPolicy.getMessageType().value()); 259 261 JMSMessageHeadersType headers = 262 (JMSMessageHeadersType)context.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); 263 264 265 int deliveryMode = getJMSDeliveryMode(headers); 266 int priority = getJMSPriority(headers); 267 String correlationID = getCorrelationId(headers); 268 long ttl = getTimeToLive(headers); 269 if (ttl <= 0) { 270 ttl = DEFAULT_RECEIVE_TIMEOUT; 271 } 272 273 setMessageProperties(headers, message); 274 if (responseExpected) { 275 String id = pooledSession.getCorrelationID(); 276 277 if (id != null) { 278 if (correlationID != null) { 279 String error = "User cannot set JMSCorrelationID when " 280 + "making a request/reply invocation using " 281 + "a static replyTo Queue."; 282 throw new JMSException (error); 283 } 284 correlationID = id; 285 } 286 } 287 288 if (correlationID != null) { 289 message.setJMSCorrelationID(correlationID); 290 } else { 291 } 294 295 LOG.log(Level.FINE, "client sending request: ", message); 296 297 if (queueDestinationStyle) { 298 QueueSender sender = (QueueSender )pooledSession.producer(); 299 sender.setTimeToLive(ttl); 300 sender.send((Queue )targetDestination, message, deliveryMode, priority, ttl); 301 } else { 302 TopicPublisher publisher = (TopicPublisher )pooledSession.producer(); 303 publisher.setTimeToLive(ttl); 304 publisher.publish((Topic )targetDestination, message, deliveryMode, priority, ttl); 305 } 306 } 307 308 309 315 private Object receive(PooledSession pooledSession, 316 OutputStreamMessageContext context) 317 throws JMSException { 318 Object response = null; 319 320 long timeout = DEFAULT_RECEIVE_TIMEOUT; 321 322 Long receiveTimeout = (Long )context.get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT); 323 324 if (receiveTimeout != null) { 325 timeout = receiveTimeout.longValue(); 326 } 327 328 Message message = pooledSession.consumer().receive(timeout); 329 LOG.log(Level.FINE, "client received reply: " , message); 330 331 if (message != null) { 332 333 populateIncomingContext(message, context, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); 334 String messageType = message instanceof TextMessage 335 ? JMSConstants.TEXT_MESSAGE_TYPE : JMSConstants.BINARY_MESSAGE_TYPE; 336 response = unmarshal(message, messageType); 337 return response; 338 } else { 339 String error = "JMSClientTransport.receive() timed out. No message available."; 340 LOG.log(Level.SEVERE, error); 341 return null; 344 } 345 } 346 } 347 | Popular Tags |