1 17 package org.apache.servicemix.components.jms; 18 19 import javax.jbi.JBIException; 20 import javax.jbi.messaging.ExchangeStatus; 21 import javax.jbi.messaging.InOut; 22 import javax.jbi.messaging.MessageExchange; 23 import javax.jbi.messaging.MessagingException; 24 import javax.jbi.messaging.NormalizedMessage; 25 import javax.jms.Connection ; 26 import javax.jms.ConnectionFactory ; 27 import javax.jms.Destination ; 28 import javax.jms.JMSException ; 29 import javax.jms.Message ; 30 import javax.jms.MessageConsumer ; 31 import javax.jms.MessageListener ; 32 import javax.jms.Session ; 33 import javax.resource.spi.work.Work ; 34 import javax.resource.spi.work.WorkException ; 35 import javax.resource.spi.work.WorkManager ; 36 import javax.xml.transform.TransformerException ; 37 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 import org.apache.servicemix.components.util.ComponentSupport; 41 import org.apache.servicemix.jbi.framework.ComponentContextImpl; 42 import org.springframework.beans.factory.InitializingBean; 43 import org.springframework.jms.JmsException; 44 import org.springframework.jms.core.JmsTemplate; 45 import org.springframework.jms.core.MessageCreator; 46 47 54 public class JmsServiceComponent extends ComponentSupport implements MessageListener , InitializingBean { 55 private static final Log log = LogFactory.getLog(JmsServiceComponent.class); 56 private DestinationChooser destinationChooser; 57 private JmsMarshaler marshaler = new JmsMarshaler(); 58 private JmsTemplate template; 59 private String selector; 60 private MessageConsumer consumer; 61 private ConnectionFactory connectionFactory; 62 private Connection connection; 63 private Session session; 64 private WorkManager workManager; 65 66 70 public void afterPropertiesSet() throws Exception { 71 if (template == null) { 72 throw new IllegalArgumentException ("Must have a template set"); 73 } 74 } 75 76 public void start() throws JBIException { 77 super.start(); 79 try { 80 connectionFactory = template.getConnectionFactory(); 81 86 if (template instanceof org.springframework.jms.core.JmsTemplate102) { 87 if (template.isPubSubDomain()) { 89 javax.jms.TopicConnection tc; 90 connection = tc = ((javax.jms.TopicConnectionFactory )connectionFactory).createTopicConnection(); 91 session = tc.createTopicSession(template.isSessionTransacted(), template.getSessionAcknowledgeMode()); 92 } 93 else { 94 javax.jms.QueueConnection qc; 95 connection = qc = ((javax.jms.QueueConnectionFactory )connectionFactory).createQueueConnection(); 96 session = qc.createQueueSession(template.isSessionTransacted(), template.getSessionAcknowledgeMode()); 97 } 98 } else { connection = connectionFactory.createConnection(); 100 session = connection.createSession(template.isSessionTransacted(), template.getSessionAcknowledgeMode()); 101 } 102 103 Destination defaultDestination = template.getDefaultDestination(); 104 if (defaultDestination == null) { 105 defaultDestination = template.getDestinationResolver().resolveDestinationName(session, template.getDefaultDestinationName(), 106 template.isPubSubDomain()); 107 } 108 109 113 if (template instanceof org.springframework.jms.core.JmsTemplate102) { 114 if (template.isPubSubDomain()) { 117 consumer = ((javax.jms.TopicSession )session).createSubscriber((javax.jms.Topic )defaultDestination, selector, template.isPubSubNoLocal()); 118 } else { 119 consumer = ((javax.jms.QueueSession )session).createReceiver((javax.jms.Queue )defaultDestination, selector); 120 } 121 } else { consumer = session.createConsumer(defaultDestination, selector); 123 } 124 connection.start(); 125 consumer.setMessageListener(this); 126 } catch (JMSException e) { 127 throw new JBIException("Unable to start jms component"); 128 } 129 } 130 131 public void stop() throws JBIException { 132 try { 133 if (consumer != null) { 134 consumer.close(); 135 } 136 if (session != null) { 137 session.close(); 138 } 139 if (connection != null) { 140 connection.close(); 141 } 142 } catch (JMSException e) { 143 throw new JBIException("Unable to stop jms component"); 144 } finally { 145 connection = null; 146 session = null; 147 consumer = null; 148 } 149 } 150 151 protected void init() throws JBIException { 152 if (workManager == null) { 153 ComponentContextImpl context = (ComponentContextImpl) getContext(); 154 workManager = context.getWorkManager(); 155 } 156 super.init(); 157 } 158 159 162 public DestinationChooser getDestinationChooser() { 163 return destinationChooser; 164 } 165 166 171 public void setDestinationChooser(DestinationChooser destinationChooser) { 172 this.destinationChooser = destinationChooser; 173 } 174 175 180 public JmsMarshaler getMarshaler() { 181 return marshaler; 182 } 183 184 189 public void setMarshaler(JmsMarshaler marshaler) { 190 this.marshaler = marshaler; 191 } 192 193 196 public JmsTemplate getTemplate() { 197 return template; 198 } 199 200 205 public void setTemplate(JmsTemplate template) { 206 this.template = template; 207 } 208 209 212 public String getSelector() { 213 return selector; 214 } 215 216 221 public void setSelector(String selector) { 222 this.selector = selector; 223 } 224 225 public WorkManager getWorkManager() { 226 return workManager; 227 } 228 229 public void setWorkManager(WorkManager workManager) { 230 this.workManager = workManager; 231 } 232 233 237 public void onMessage(final Message jmsMessage) { 238 try { 239 workManager.scheduleWork(new Work () { 240 public void release() { 241 } 242 public void run() { 243 handleMessage(jmsMessage); 244 } 245 }); 246 } catch (WorkException e) { 247 log.error(e); 248 } 249 } 250 251 protected void handleMessage(final Message jmsMessage) { 252 try { 253 final InOut messageExchange = getDeliveryChannel().createExchangeFactory().createInOutExchange(); 254 NormalizedMessage inMessage = messageExchange.createMessage(); 255 try { 256 marshaler.toNMS(inMessage, jmsMessage); 257 messageExchange.setInMessage(inMessage); 258 if (getDeliveryChannel().sendSync(messageExchange)) { 259 Destination destination = getReplyToDestination(jmsMessage, messageExchange); 260 try { 261 template.send(destination, new MessageCreator() { 262 public Message createMessage(Session session) throws JMSException { 263 try { 264 Message message = marshaler.createMessage(messageExchange.getOutMessage(), session); 265 message.setJMSCorrelationID(jmsMessage.getJMSCorrelationID()); 266 if (log.isTraceEnabled()) { 267 log.trace("Sending message to: " + template.getDefaultDestinationName() 268 + " message: " + message); 269 } 270 return message; 271 } 272 catch (TransformerException e) { 273 JMSException jmsEx = new JMSException ("Failed to create a JMS Message: " + e); 274 jmsEx.setLinkedException(e); 275 throw jmsEx; 276 } 277 } 278 }); 279 done(messageExchange); 280 } 281 catch (JmsException e) { 282 fail(messageExchange, e); 283 } 284 } 285 } 286 catch (JMSException e) { 287 log.error("Couldn't process " + jmsMessage, e); 288 messageExchange.setError(e); 289 messageExchange.setStatus(ExchangeStatus.ERROR); 290 } 291 } 292 catch (MessagingException e) { 293 log.error("Failed to process inbound JMS Message: " + jmsMessage, e); 294 } 295 } 296 297 protected Destination getReplyToDestination(Message jmsMessage, final InOut messageExchange) throws JMSException { 298 if (destinationChooser == null) { 299 return jmsMessage.getJMSReplyTo(); 300 } 301 return destinationChooser.chooseDestination(messageExchange); 302 } 303 304 313 protected Destination chooseOutBoundDestination(MessageExchange exchange, Message inboundMessage) 314 throws JMSException { 315 Destination result = null; 316 if (destinationChooser != null) { 317 result = destinationChooser.chooseDestination(exchange); 318 } 319 else if (inboundMessage != null && inboundMessage.getJMSReplyTo() != null) { 320 result = inboundMessage.getJMSReplyTo(); 321 } 322 if (result == null) { 323 log.error("Could not find an outbound destination for " + inboundMessage); 324 throw new JMSException ("No outbound JMS Destination can be found"); 325 } 326 return result; 327 } 328 } 329 | Popular Tags |