KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > components > jms > JmsServiceComponent


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.components.jms;
18
19 import javax.jbi.JBIException;
20 import javax.jbi.messaging.ExchangeStatus;
21 import javax.jbi.messaging.InOut;
22 import javax.jbi.messaging.MessageExchange;
23 import javax.jbi.messaging.MessagingException;
24 import javax.jbi.messaging.NormalizedMessage;
25 import javax.jms.Connection JavaDoc;
26 import javax.jms.ConnectionFactory JavaDoc;
27 import javax.jms.Destination JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29 import javax.jms.Message JavaDoc;
30 import javax.jms.MessageConsumer JavaDoc;
31 import javax.jms.MessageListener JavaDoc;
32 import javax.jms.Session JavaDoc;
33 import javax.resource.spi.work.Work JavaDoc;
34 import javax.resource.spi.work.WorkException JavaDoc;
35 import javax.resource.spi.work.WorkManager JavaDoc;
36 import javax.xml.transform.TransformerException JavaDoc;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.servicemix.components.util.ComponentSupport;
41 import org.apache.servicemix.jbi.framework.ComponentContextImpl;
42 import org.springframework.beans.factory.InitializingBean;
43 import org.springframework.jms.JmsException;
44 import org.springframework.jms.core.JmsTemplate;
45 import org.springframework.jms.core.MessageCreator;
46
47 /**
48  * A component which uses a {@link JmsTemplate} to consume messages from a destination, forward then intot the JBI
49  * container for processing, and send back the result to the JMS requestor - used for the TopipcRequestor and
50  * QueueRequestor pattern
51  *
52  * @version $Revision: 441456 $
53  */

54 public class JmsServiceComponent extends ComponentSupport implements MessageListener JavaDoc, InitializingBean {
55     private static final Log log = LogFactory.getLog(JmsServiceComponent.class);
56     private DestinationChooser destinationChooser;
57     private JmsMarshaler marshaler = new JmsMarshaler();
58     private JmsTemplate template;
59     private String JavaDoc selector;
60     private MessageConsumer JavaDoc consumer;
61     private ConnectionFactory JavaDoc connectionFactory;
62     private Connection JavaDoc connection;
63     private Session JavaDoc session;
64     private WorkManager JavaDoc workManager;
65
66     /**
67      * called by Spring framework after initialization
68      * @throws Exception
69      */

70     public void afterPropertiesSet() throws Exception JavaDoc {
71         if (template == null) {
72             throw new IllegalArgumentException JavaDoc("Must have a template set");
73         }
74     }
75
76     public void start() throws JBIException {
77         // Start receiving messages only when the component has actually been started.
78
super.start();
79         try {
80             connectionFactory = template.getConnectionFactory();
81             /*
82              * Component code did not work for JMS 1.02 compliant provider because uses APIs
83              * that did not exist in JMS 1.02 : ConnectionFactory.createConnection,
84              * Connection.createSession
85              */

86             if (template instanceof org.springframework.jms.core.JmsTemplate102) {
87                 //Note1 - would've preferred to call JmsTemplate102 methods but they are protected.
88
if (template.isPubSubDomain()) {
89                     javax.jms.TopicConnection JavaDoc tc;
90                     connection = tc = ((javax.jms.TopicConnectionFactory JavaDoc)connectionFactory).createTopicConnection();
91                     session = tc.createTopicSession(template.isSessionTransacted(), template.getSessionAcknowledgeMode());
92                 }
93                 else {
94                     javax.jms.QueueConnection JavaDoc qc;
95                     connection = qc = ((javax.jms.QueueConnectionFactory JavaDoc)connectionFactory).createQueueConnection();
96                     session = qc.createQueueSession(template.isSessionTransacted(), template.getSessionAcknowledgeMode());
97                 }
98             } else { // JMS 1.1 style
99
connection = connectionFactory.createConnection();
100                 session = connection.createSession(template.isSessionTransacted(), template.getSessionAcknowledgeMode());
101             }
102
103             Destination JavaDoc defaultDestination = template.getDefaultDestination();
104             if (defaultDestination == null) {
105                 defaultDestination = template.getDestinationResolver().resolveDestinationName(session, template.getDefaultDestinationName(),
106                         template.isPubSubDomain());
107             }
108             
109             /*
110              * Component code did not work for JMS 1.02 compliant provider because uses APIs
111              * that did not exist in JMS 1.02: Session.createConsumer
112              */

113             if (template instanceof org.springframework.jms.core.JmsTemplate102) {
114                 //Note1 - would've preferred to call JmsTemplate102.createConsumer but it is protected. Code below is same.
115
//Note2 - assert that defaultDestination is correct type according to isPubSubDomain()
116
if (template.isPubSubDomain()) {
117                     consumer = ((javax.jms.TopicSession JavaDoc)session).createSubscriber((javax.jms.Topic JavaDoc)defaultDestination, selector, template.isPubSubNoLocal());
118                 } else {
119                     consumer = ((javax.jms.QueueSession JavaDoc)session).createReceiver((javax.jms.Queue JavaDoc)defaultDestination, selector);
120                 }
121             } else { // JMS 1.1 style
122
consumer = session.createConsumer(defaultDestination, selector);
123             }
124             connection.start();
125             consumer.setMessageListener(this);
126         } catch (JMSException JavaDoc e) {
127             throw new JBIException("Unable to start jms component");
128         }
129     }
130
131     public void stop() throws JBIException {
132         try {
133             if (consumer != null) {
134                 consumer.close();
135             }
136             if (session != null) {
137                 session.close();
138             }
139             if (connection != null) {
140                 connection.close();
141             }
142         } catch (JMSException JavaDoc e) {
143             throw new JBIException("Unable to stop jms component");
144         } finally {
145             connection = null;
146             session = null;
147             consumer = null;
148         }
149     }
150
151     protected void init() throws JBIException {
152         if (workManager == null) {
153             ComponentContextImpl context = (ComponentContextImpl) getContext();
154             workManager = context.getWorkManager();
155         }
156         super.init();
157     }
158
159     /**
160      * @return Return the DestinationChooser
161      */

162     public DestinationChooser getDestinationChooser() {
163         return destinationChooser;
164     }
165
166     /**
167      * Set the DestinationChooser
168      *
169      * @param destinationChooser
170      */

171     public void setDestinationChooser(DestinationChooser destinationChooser) {
172         this.destinationChooser = destinationChooser;
173     }
174
175     /**
176      * Get the JMSMarshaler
177      *
178      * @return the marshaler
179      */

180     public JmsMarshaler getMarshaler() {
181         return marshaler;
182     }
183
184     /**
185      * Set the JMSMarshaler
186      *
187      * @param marshaler
188      */

189     public void setMarshaler(JmsMarshaler marshaler) {
190         this.marshaler = marshaler;
191     }
192
193     /**
194      * @return the JmsTemplate
195      */

196     public JmsTemplate getTemplate() {
197         return template;
198     }
199
200     /**
201      * Set the JmsTemplate
202      *
203      * @param template
204      */

205     public void setTemplate(JmsTemplate template) {
206         this.template = template;
207     }
208
209     /**
210      * @return Return the selector
211      */

212     public String JavaDoc getSelector() {
213         return selector;
214     }
215
216     /**
217      * Set the Selector
218      *
219      * @param selector
220      */

221     public void setSelector(String JavaDoc selector) {
222         this.selector = selector;
223     }
224
225      public WorkManager JavaDoc getWorkManager() {
226         return workManager;
227     }
228
229     public void setWorkManager(WorkManager JavaDoc workManager) {
230         this.workManager = workManager;
231     }
232
233     /**
234      * MessageListener implementation
235      * @param jmsMessage
236      */

237     public void onMessage(final Message jmsMessage) {
238         try {
239             workManager.scheduleWork(new Work JavaDoc() {
240                 public void release() {
241                 }
242                 public void run() {
243                     handleMessage(jmsMessage);
244                 }
245             });
246         } catch (WorkException JavaDoc e) {
247             log.error(e);
248         }
249     }
250     
251     protected void handleMessage(final Message jmsMessage) {
252         try {
253             final InOut messageExchange = getDeliveryChannel().createExchangeFactory().createInOutExchange();
254             NormalizedMessage inMessage = messageExchange.createMessage();
255             try {
256                 marshaler.toNMS(inMessage, jmsMessage);
257                 messageExchange.setInMessage(inMessage);
258                 if (getDeliveryChannel().sendSync(messageExchange)) {
259                     Destination JavaDoc destination = getReplyToDestination(jmsMessage, messageExchange);
260                     try {
261                         template.send(destination, new MessageCreator() {
262                             public Message createMessage(Session JavaDoc session) throws JMSException JavaDoc {
263                                 try {
264                                     Message message = marshaler.createMessage(messageExchange.getOutMessage(), session);
265                                     message.setJMSCorrelationID(jmsMessage.getJMSCorrelationID());
266                                     if (log.isTraceEnabled()) {
267                                         log.trace("Sending message to: " + template.getDefaultDestinationName()
268                                                 + " message: " + message);
269                                     }
270                                     return message;
271                                 }
272                                 catch (TransformerException JavaDoc e) {
273                                     JMSException JavaDoc jmsEx = new JMSException JavaDoc("Failed to create a JMS Message: " + e);
274                                     jmsEx.setLinkedException(e);
275                                     throw jmsEx;
276                                 }
277                             }
278                         });
279                         done(messageExchange);
280                     }
281                     catch (JmsException e) {
282                         fail(messageExchange, e);
283                     }
284                 }
285             }
286             catch (JMSException JavaDoc e) {
287                 log.error("Couldn't process " + jmsMessage, e);
288                 messageExchange.setError(e);
289                 messageExchange.setStatus(ExchangeStatus.ERROR);
290             }
291         }
292         catch (MessagingException e) {
293             log.error("Failed to process inbound JMS Message: " + jmsMessage, e);
294         }
295     }
296
297     protected Destination JavaDoc getReplyToDestination(Message jmsMessage, final InOut messageExchange) throws JMSException JavaDoc {
298         if (destinationChooser == null) {
299             return jmsMessage.getJMSReplyTo();
300         }
301         return destinationChooser.chooseDestination(messageExchange);
302     }
303
304     /**
305      * Choose the out bound destination to send the repsonse from JBI too If a DestinatonChooser is set, this is used,
306      * else the replyTo destination on the inbound message is used
307      *
308      * @param exchange
309      * @param inboundMessage
310      * @return the choosen outbound destination or null
311      * @throws JMSException if no destination can be found
312      */

313     protected Destination JavaDoc chooseOutBoundDestination(MessageExchange exchange, Message inboundMessage)
314             throws JMSException JavaDoc {
315         Destination JavaDoc result = null;
316         if (destinationChooser != null) {
317             result = destinationChooser.chooseDestination(exchange);
318         }
319         else if (inboundMessage != null && inboundMessage.getJMSReplyTo() != null) {
320             result = inboundMessage.getJMSReplyTo();
321         }
322         if (result == null) {
323             log.error("Could not find an outbound destination for " + inboundMessage);
324             throw new JMSException JavaDoc("No outbound JMS Destination can be found");
325         }
326         return result;
327     }
328 }
329
Popular Tags