KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > jms > SingleJmsMessageReceiver


1 /*
2  * $Id: SingleJmsMessageReceiver.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.jms;
12
13 import javax.jms.Destination JavaDoc;
14 import javax.jms.JMSException JavaDoc;
15 import javax.jms.Message JavaDoc;
16 import javax.jms.MessageConsumer JavaDoc;
17 import javax.jms.MessageListener JavaDoc;
18 import javax.jms.Session JavaDoc;
19 import javax.jms.Topic JavaDoc;
20
21 import org.apache.commons.collections.MapUtils;
22 import org.mule.impl.MuleMessage;
23 import org.mule.providers.AbstractMessageReceiver;
24 import org.mule.providers.ConnectException;
25 import org.mule.providers.jms.filters.JmsSelectorFilter;
26 import org.mule.umo.UMOComponent;
27 import org.mule.umo.UMOException;
28 import org.mule.umo.endpoint.UMOEndpoint;
29 import org.mule.umo.lifecycle.InitialisationException;
30 import org.mule.umo.lifecycle.LifecycleException;
31 import org.mule.umo.provider.UMOConnector;
32 import org.mule.umo.provider.UMOMessageAdapter;
33
34 /**
35  * Registers a single Jms MessageListener for an endpoint
36  */

37 public class SingleJmsMessageReceiver extends AbstractMessageReceiver implements MessageListener JavaDoc
38 {
39
40     protected JmsConnector connector;
41     protected RedeliveryHandler redeliveryHandler;
42     protected MessageConsumer JavaDoc consumer;
43     protected Session JavaDoc session;
44     protected boolean startOnConnect = false;
45
46     public SingleJmsMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint)
47         throws InitialisationException
48     {
49         super(connector, component, endpoint);
50         this.connector = (JmsConnector)connector;
51
52         try
53         {
54             redeliveryHandler = this.connector.createRedeliveryHandler();
55             redeliveryHandler.setConnector(this.connector);
56         }
57         catch (Exception JavaDoc e)
58         {
59             throw new InitialisationException(e, this);
60         }
61     }
62
63     public void doConnect() throws Exception JavaDoc
64     {
65         createConsumer();
66         if (startOnConnect)
67         {
68             doStart();
69         }
70     }
71
72     public void doDisconnect() throws Exception JavaDoc
73     {
74         closeConsumer();
75     }
76
77     public void onMessage(Message JavaDoc message)
78     {
79         try
80         {
81             if (logger.isDebugEnabled())
82             {
83                 logger.debug("Message received it is of type: " + message.getClass().getName());
84                 if (message.getJMSDestination() != null)
85                 {
86                     logger.debug("Message received on " + message.getJMSDestination() + " ("
87                                  + message.getJMSDestination().getClass().getName() + ")");
88                 }
89                 else
90                 {
91                     logger.debug("Message received on unknown destination");
92                 }
93                 logger.debug("Message CorrelationId is: " + message.getJMSCorrelationID());
94                 logger.debug("Jms Message Id is: " + message.getJMSMessageID());
95             }
96
97             if (message.getJMSRedelivered())
98             {
99                 if (logger.isDebugEnabled())
100                 {
101                     logger.debug("Message with correlationId: " + message.getJMSCorrelationID()
102                                  + " is redelivered. handing off to Exception Handler");
103                 }
104                 redeliveryHandler.handleRedelivery(message);
105             }
106
107             UMOMessageAdapter adapter = connector.getMessageAdapter(message);
108             routeMessage(new MuleMessage(adapter));
109         }
110         catch (Exception JavaDoc e)
111         {
112             handleException(e);
113         }
114     }
115
116     public void doStart() throws UMOException
117     {
118         try
119         {
120             // We ned to register the listener when start is called in order to only
121
// start receiving messages after
122
// start/
123
// If the consumer is null it means that the connection strategy is being
124
// run in a separate thread
125
// And hasn't managed to connect yet.
126
if (consumer == null)
127             {
128                 startOnConnect = true;
129             }
130             else
131             {
132                 startOnConnect = false;
133                 consumer.setMessageListener(this);
134             }
135         }
136         catch (JMSException JavaDoc e)
137         {
138             throw new LifecycleException(e, this);
139         }
140     }
141
142     public void doStop() throws UMOException
143     {
144         try
145         {
146             if (consumer != null)
147             {
148                 consumer.setMessageListener(null);
149             }
150         }
151         catch (JMSException JavaDoc e)
152         {
153             throw new LifecycleException(e, this);
154         }
155     }
156
157     protected void closeConsumer()
158     {
159         connector.closeQuietly(consumer);
160         consumer = null;
161         connector.closeQuietly(session);
162         session = null;
163     }
164
165     /**
166      * Create a consumer for the jms destination
167      *
168      * @throws Exception
169      */

170     protected void createConsumer() throws Exception JavaDoc
171     {
172         try
173         {
174             JmsSupport jmsSupport = this.connector.getJmsSupport();
175             // Create session if none exists
176
if (session == null)
177             {
178                 session = this.connector.getSession(endpoint);
179             }
180
181             // Create destination
182
String JavaDoc resourceInfo = endpoint.getEndpointURI().getResourceInfo();
183             boolean topic = (resourceInfo != null && JmsConstants.TOPIC_PROPERTY.equalsIgnoreCase(resourceInfo));
184
185             // todo MULE20 remove resource Info support
186
if (!topic)
187             {
188                 topic = MapUtils.getBooleanValue(endpoint.getProperties(), JmsConstants.TOPIC_PROPERTY, false);
189             }
190
191             Destination JavaDoc dest = jmsSupport.createDestination(session, endpoint.getEndpointURI().getAddress(),
192                 topic);
193
194             // Extract jms selector
195
String JavaDoc selector = null;
196             if (endpoint.getFilter() != null && endpoint.getFilter() instanceof JmsSelectorFilter)
197             {
198                 selector = ((JmsSelectorFilter)endpoint.getFilter()).getExpression();
199             }
200             else if (endpoint.getProperties() != null)
201             {
202                 // still allow the selector to be set as a property on the endpoint
203
// to be backward compatable
204
selector = (String JavaDoc)endpoint.getProperties().get(JmsConstants.JMS_SELECTOR_PROPERTY);
205             }
206             String JavaDoc tempDurable = (String JavaDoc)endpoint.getProperties().get(JmsConstants.DURABLE_PROPERTY);
207             boolean durable = connector.isDurable();
208             if (tempDurable != null)
209             {
210                 durable = Boolean.valueOf(tempDurable).booleanValue();
211             }
212
213             // Get the durable subscriber name if there is one
214
String JavaDoc durableName = (String JavaDoc)endpoint.getProperties().get(JmsConstants.DURABLE_NAME_PROPERTY);
215             if (durableName == null && durable && dest instanceof Topic JavaDoc)
216             {
217                 durableName = "mule." + connector.getName() + "." + endpoint.getEndpointURI().getAddress();
218                 logger.debug("Jms Connector for this receiver is durable but no durable name has been specified. Defaulting to: "
219                              + durableName);
220             }
221
222             // Create consumer
223
consumer = jmsSupport.createConsumer(session, dest, selector, connector.isNoLocal(), durableName,
224                 topic);
225         }
226         catch (JMSException JavaDoc e)
227         {
228             throw new ConnectException(e, this);
229         }
230     }
231 }
232
Popular Tags