1 package org.objectweb.celtix.bus.transports.jms; 2 3 import java.util.Enumeration ; 4 import java.util.List ; 5 import java.util.logging.Level ; 6 import java.util.logging.Logger ; 7 8 import javax.jms.Destination ; 9 import javax.jms.JMSException ; 10 import javax.jms.Message ; 11 import javax.jms.ObjectMessage ; 12 import javax.jms.Session ; 13 import javax.jms.TextMessage ; 14 import javax.wsdl.Port; 15 import javax.wsdl.WSDLException; 16 import javax.xml.ws.handler.MessageContext; 17 18 19 20 import org.objectweb.celtix.Bus; 21 import org.objectweb.celtix.bus.configuration.wsdl.WsdlJMSConfigurationProvider; 22 import org.objectweb.celtix.common.logging.LogUtils; 23 import org.objectweb.celtix.configuration.Configuration; 24 import org.objectweb.celtix.configuration.ConfigurationBuilder; 25 import org.objectweb.celtix.configuration.ConfigurationBuilderFactory; 26 import org.objectweb.celtix.transports.jms.JMSAddressPolicyType; 27 import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType; 28 import org.objectweb.celtix.transports.jms.context.JMSPropertyType; 29 import org.objectweb.celtix.ws.addressing.EndpointReferenceType; 30 import org.objectweb.celtix.wsdl.EndpointReferenceUtils; 31 32 public class JMSTransportBase { 33 private static final Logger LOG = LogUtils.getL7dLogger(JMSTransportBase.class); 35 protected JMSAddressPolicyType jmsAddressPolicy; 36 protected boolean queueDestinationStyle; 37 protected Destination targetDestination; 38 protected Destination replyDestination; 39 protected JMSSessionFactory sessionFactory; 40 protected Bus bus; 41 protected EndpointReferenceType targetEndpoint; 42 protected Port port; 43 protected Configuration configuration; 44 45 public JMSTransportBase(Bus b, EndpointReferenceType epr, boolean isServer) throws WSDLException { 47 bus = b; 48 49 port = EndpointReferenceUtils.getPort(bus.getWSDLManager(), epr); 50 51 configuration = createConfiguration(bus, epr, isServer); 52 jmsAddressPolicy = getAddressPolicy(configuration); 53 targetEndpoint = epr; 54 queueDestinationStyle = 55 JMSConstants.JMS_QUEUE.equals(jmsAddressPolicy.getDestinationStyle().value()); 56 } 57 58 private JMSAddressPolicyType getAddressPolicy(Configuration conf) { 59 JMSAddressPolicyType pol = conf.getObject(JMSAddressPolicyType.class, "jmsAddress"); 60 if (pol == null) { 61 pol = new JMSAddressPolicyType(); 62 } 63 return pol; 64 } 65 66 67 private Configuration createConfiguration(Bus b, 68 EndpointReferenceType ref, 69 boolean isServer) { 70 ConfigurationBuilder cb = ConfigurationBuilderFactory.getBuilder(null); 71 72 Configuration busConfiguration = b.getConfiguration(); 73 Configuration parent = null; 74 75 String configURI; 76 String configID; 77 78 if (isServer) { 79 configURI = JMSConstants.JMS_SERVER_CONFIGURATION_URI; 80 configID = JMSConstants.JMS_SERVER_CONFIG_ID; 81 parent = busConfiguration 82 .getChild(JMSConstants.ENDPOINT_CONFIGURATION_URI, 83 EndpointReferenceUtils.getServiceName(ref).toString()); 84 } else { 85 configURI = JMSConstants.JMS_CLIENT_CONFIGURATION_URI; 86 configID = JMSConstants.JMS_CLIENT_CONFIG_ID; 87 String id = EndpointReferenceUtils.getServiceName(ref).toString() 88 + "/" + EndpointReferenceUtils.getPortName(ref); 89 parent = busConfiguration 90 .getChild(JMSConstants.PORT_CONFIGURATION_URI, id); 91 } 92 93 assert null != parent; 94 95 Configuration cfg = cb.getConfiguration(configURI, configID, parent); 96 if (null == cfg) { 97 cfg = cb.buildConfiguration(configURI, configID, parent); 98 } 99 if (null != port) { 101 cfg.getProviders().add(new WsdlJMSConfigurationProvider(port, false)); 102 } 103 return cfg; 104 } 105 106 108 public final JMSAddressPolicyType getJmsAddressDetails() { 109 return jmsAddressPolicy; 110 } 111 118 protected void connected(Destination target, Destination reply, JMSSessionFactory factory) { 119 targetDestination = target; 120 replyDestination = reply; 121 sessionFactory = factory; 122 } 123 124 125 134 protected Message marshal(Object payload, Session session, Destination replyTo, 135 String messageType) throws JMSException { 136 Message message = null; 137 138 if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) { 139 message = session.createTextMessage((String )payload); 140 } else { 141 message = session.createObjectMessage(); 142 ((ObjectMessage )message).setObject((byte[])payload); 143 } 144 145 if (replyTo != null) { 146 message.setJMSReplyTo(replyTo); 147 } 148 149 return message; 150 } 151 152 153 160 protected Object unmarshal(Message message, String messageType) throws JMSException { 161 Object ret = null; 162 163 if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) { 164 ret = ((TextMessage )message).getText(); 165 } else { 166 ret = (byte[])((ObjectMessage )message).getObject(); 167 } 168 169 return ret; 170 } 171 172 173 protected final void entry(String trace) { 174 LOG.log(Level.FINE, trace); 175 } 176 177 protected JMSMessageHeadersType populateIncomingContext(Message message, 178 MessageContext context, 179 String headerType) throws JMSException { 180 JMSMessageHeadersType headers = null; 181 182 headers = (JMSMessageHeadersType)context.get(headerType); 183 184 if (headers == null) { 185 headers = new JMSMessageHeadersType(); 186 context.put(headerType, headers); 187 } 188 189 headers.setJMSCorrelationID(message.getJMSCorrelationID()); 190 headers.setJMSDeliveryMode(new Integer (message.getJMSDeliveryMode())); 191 headers.setJMSExpiration(new Long (message.getJMSExpiration())); 192 headers.setJMSMessageID(message.getJMSMessageID()); 193 headers.setJMSPriority(new Integer (message.getJMSPriority())); 194 headers.setJMSRedelivered(Boolean.valueOf(message.getJMSRedelivered())); 195 headers.setJMSTimeStamp(new Long (message.getJMSTimestamp())); 196 headers.setJMSType(message.getJMSType()); 197 198 List <JMSPropertyType> props = headers.getProperty(); 199 Enumeration enm = message.getPropertyNames(); 200 while (enm.hasMoreElements()) { 201 String name = (String )enm.nextElement(); 202 String val = message.getStringProperty(name); 203 JMSPropertyType prop = new JMSPropertyType(); 204 prop.setName(name); 205 prop.setValue(val); 206 props.add(prop); 207 } 208 209 return headers; 210 } 211 212 protected int getJMSDeliveryMode(JMSMessageHeadersType headers) { 213 int deliveryMode = Message.DEFAULT_DELIVERY_MODE; 214 215 if (headers != null && headers.isSetJMSDeliveryMode()) { 216 deliveryMode = headers.getJMSDeliveryMode(); 217 } 218 return deliveryMode; 219 } 220 221 protected int getJMSPriority(JMSMessageHeadersType headers) { 222 int priority = Message.DEFAULT_PRIORITY; 223 if (headers != null && headers.isSetJMSPriority()) { 224 priority = headers.getJMSPriority(); 225 } 226 return priority; 227 } 228 229 protected long getTimeToLive(JMSMessageHeadersType headers) { 230 long ttl = -1; 231 if (headers != null && headers.isSetTimeToLive()) { 232 ttl = headers.getTimeToLive(); 233 } 234 return ttl; 235 } 236 237 protected String getCorrelationId(JMSMessageHeadersType headers) { 238 String correlationId = null; 239 if (headers != null 240 && headers.isSetJMSCorrelationID()) { 241 correlationId = headers.getJMSCorrelationID(); 242 } 243 return correlationId; 244 } 245 246 protected void setMessageProperties(JMSMessageHeadersType headers, Message message) 247 throws JMSException { 248 249 if (headers != null 250 && headers.isSetProperty()) { 251 List <JMSPropertyType> props = headers.getProperty(); 252 for (int x = 0; x < props.size(); x++) { 253 message.setStringProperty(props.get(x).getName(), props.get(x).getValue()); 254 } 255 } 256 } 257 258 protected String getAddrUriFromJMSAddrPolicy() { 259 return "jms:" 260 + jmsAddressPolicy.getJndiConnectionFactoryName() 261 + "#" 262 + jmsAddressPolicy.getJndiDestinationName(); 263 } 264 265 protected String getReplyTotAddrUriFromJMSAddrPolicy() { 266 return "jms:" 267 + jmsAddressPolicy.getJndiConnectionFactoryName() 268 + "#" 269 + jmsAddressPolicy.getJndiReplyDestinationName(); 270 } 271 } 272 | Popular Tags |