KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > client > JmsMessageConsumer


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: JmsMessageConsumer.java,v 1.2 2005/03/18 03:36:37 tanderson Exp $
44  */

45 package org.exolab.jms.client;
46
47 import javax.jms.JMSException JavaDoc;
48 import javax.jms.Message JavaDoc;
49 import javax.jms.MessageConsumer JavaDoc;
50 import javax.jms.MessageListener JavaDoc;
51 import javax.jms.Destination JavaDoc;
52
53 import org.apache.commons.logging.Log;
54 import org.apache.commons.logging.LogFactory;
55
56 import org.exolab.jms.message.MessageImpl;
57
58
59 /**
60  * Client implementation of the <code>javax.jms.MessageConsumer</code>
61  * interface
62  *
63  * @author <a HREF="mailto:jima@comware.com.au">Jim Alateras</a>
64  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
65  * @version $Revision: 1.2 $ $Date: 2005/03/18 03:36:37 $
66  */

67 class JmsMessageConsumer
68         implements MessageListener JavaDoc, MessageConsumer JavaDoc {
69
70     /**
71      * The session which created this
72      */

73     private JmsSession _session = null;
74
75     /**
76      * The consumer's identity, allocated by the server
77      */

78     private final long _consumerId;
79
80     /**
81      * The destination to receive messages from
82      */

83     private final Destination JavaDoc _destination;
84
85     /**
86      * A message listener may be assigned to this session, for asynchronous
87      * message delivery.
88      */

89     private MessageListener JavaDoc _messageListener = null;
90
91     /**
92      * The message selector, for filtering messages. May be <code>null</code>
93      */

94     private String JavaDoc _selector = null;
95
96     /**
97      * Indicates if the session is closed
98      */

99     private volatile boolean _closed = false;
100
101     /**
102      * This is the last time that the listener had been set through {@link
103      * #setMessageListener}
104      */

105     private long _listenerSetTimestamp = 0;
106
107     /**
108      * This is the last message id asynchronously delivered to the listener.
109      */

110     private String JavaDoc _lastMessageDelivered;
111
112     /**
113      * The logger
114      */

115     private static final Log _log =
116             LogFactory.getLog(JmsMessageConsumer.class);
117
118
119     /**
120      * Construct a new <code>JmsMessageProducer</code>.
121      *
122      * @param session the session responsible for the consumer
123      * @param consumerId the identity of this consumer
124      * @param destination the destination to receive messages from
125      * @param selector the message selector. May be <code>null</code
126      */

127     public JmsMessageConsumer(JmsSession session, long consumerId,
128                               Destination JavaDoc destination, String JavaDoc selector) {
129         if (session == null) {
130             throw new IllegalArgumentException JavaDoc("Argument 'session' is null");
131         }
132         if (destination == null) {
133             throw new IllegalArgumentException JavaDoc(
134                     "Argument 'destination' is null");
135         }
136         _session = session;
137         _consumerId = consumerId;
138         _destination = destination;
139         _selector = selector;
140     }
141
142     /**
143      * Return the message consumer's message selector expression
144      *
145      * @return the selector expression, or <code>null</code> if one isn't set
146      */

147     public String JavaDoc getMessageSelector() {
148         return _selector;
149     }
150
151     /**
152      * Return the consumer's listener
153      *
154      * @return the listener for the consumer, or <code>null</code> if there
155      * isn't one set
156      */

157     public MessageListener JavaDoc getMessageListener() {
158         return _messageListener;
159     }
160
161     /**
162      * Set the consumer's listener
163      *
164      * @param listener the message listener, or <code>null</code> to deregister
165      * an existing listener
166      * @throws JMSException if the listener cannot be set
167      */

168     public void setMessageListener(MessageListener JavaDoc listener)
169             throws JMSException JavaDoc {
170         // if listener is not null then enable asynchronous delivery
171
// otherwise disable it
172
if (listener != null) {
173             if (_messageListener == null) {
174                 // previously asynchronouse messaging was disabled
175
_listenerSetTimestamp = System.currentTimeMillis();
176                 _messageListener = listener;
177                 _session.setMessageListener(this);
178             } else {
179                 // asynch message deliver is enabled, just changing the
180
// client side receiving entity.
181
_messageListener = listener;
182             }
183         } else {
184             if (_messageListener != null) {
185                 _session.removeMessageListener(this);
186                 _messageListener = listener;
187             }
188         }
189
190         // reset the lastMessageDelivered regardless what the value
191
// of the listener is.
192
_lastMessageDelivered = null;
193     }
194
195     /**
196      * Receive the next message produced for this consumer. This call blocks
197      * indefinitely until a message is produced or until this message consumer
198      * is closed.
199      *
200      * @return the next message produced for this consumer, or <code>null</code>
201      * if this consumer is concurrently closed
202      * @throws JMSException if the next message can't be received
203      */

204     public Message JavaDoc receive() throws JMSException JavaDoc {
205         return retrieveMessage(0);
206     }
207
208     /**
209      * Receive the next message that arrives within the specified timeout
210      * interval. This call blocks until a message arrives, the timeout expires,
211      * or this message consumer is closed. A timeout of zero never expires and
212      * the call blocks indefinitely.
213      *
214      * @param timeout the timeout interval, in milliseconds
215      * @return the next message produced for this consumer, or <code>null</code>
216      * if the timeout expires or the consumer concurrently closed
217      * @throws JMSException if the next message can't be received
218      */

219     public Message JavaDoc receive(long timeout) throws JMSException JavaDoc {
220         return retrieveMessage(timeout);
221     }
222
223     /**
224      * Receive the next message if one is immediately available
225      *
226      * @return the next message produced for this consumer, or <code>null</code>
227      * if one is not available
228      * @throws JMSException if the next message can't be received
229      */

230     public Message JavaDoc receiveNoWait() throws JMSException JavaDoc {
231         return retrieveMessage(-1);
232     }
233
234     /**
235      * Close the consumer. This call blocks until a receive or message listener
236      * in progress has completed. A blocked consumer receive call returns
237      * <code>null</code> when this consumer is closed.
238      *
239      * @throws JMSException if this consumer can't be closed
240      */

241     public synchronized void close() throws JMSException JavaDoc {
242         if (!_closed) {
243             try {
244                 _closed = true;
245                 _session.removeConsumer(this);
246
247                 // wake up any blocked threads and let them complete
248
notifyAll();
249             } finally {
250                 _messageListener = null;
251                 _session= null;
252                 _selector = null;
253             }
254         }
255     }
256
257     /**
258      * Handles messages received asynchronously via the owning session, passing
259      * them to the registered listener
260      *
261      * @param message the message received
262      */

263     public void onMessage(Message JavaDoc message) {
264         try {
265             if (_messageListener != null) {
266                 // drop all messages if they were received before the listener
267
// had been set.
268
long rcvd = message.getLongProperty("JMSXRcvTimestamp");
269                 if (rcvd < _listenerSetTimestamp) {
270                     return;
271                 }
272
273                 // According to section 4.5.2 Asynchronous Delivery messages
274
// delivered to consumers, through the MessageListener
275
// interface in a transacted session must be treated the same
276
// as synchronous delivery.
277
// Need to set this field before we actually deliver the
278
// message since the client can actually call
279
// setMessageListener in onMessage()
280
_lastMessageDelivered = ((MessageImpl) message).getId();
281                 _messageListener.onMessage(message);
282             }
283         } catch (JMSException JavaDoc exception) {
284             //report the exception
285
_log.error("Error in onMessage", exception);
286         }
287     }
288
289     /**
290      * Retrieve the next message for the consumer.
291      *
292      * @param wait the maximum time to wait for a message, in milliseconds. If
293      * <code>-1</code>, don't wait, if <code>0</code> wait
294      * indefinitely, otherwise wait the specified time.
295      * @return the received message, or <code>null</code>, if no message is
296      * available
297      * @throws JMSException if an error occurs retrieving the message, the
298      * session is closed, or a message listener is set.
299      */

300     protected Message JavaDoc retrieveMessage(long wait) throws JMSException JavaDoc {
301         if (_messageListener != null) {
302             // cannot call this method when a listener is defined
303
throw new JMSException JavaDoc("Can't receive when listener defined");
304         }
305
306         if (_closed) {
307             // cannot call this method when a listener is defined
308
throw new JMSException JavaDoc("Can't receive when session closed");
309         }
310
311         MessageImpl message =
312                 (MessageImpl) _session.retrieveMessage(_consumerId, wait);
313         if (message != null) {
314             _lastMessageDelivered = message.getId();
315         }
316
317         return message;
318     }
319
320     /**
321      * Return the last message asynchronously delivered to the consumer
322      *
323      * @return the last message delivered
324      */

325     protected String JavaDoc getLastMessageDelivered() {
326         return _lastMessageDelivered;
327     }
328
329     /**
330      * Returns the destination to receive messages from
331      *
332      * @return the destination to receive messages from
333      */

334     protected Destination JavaDoc getDestination() {
335         return _destination;
336     }
337
338     /**
339      * Returns the identity of this consumer
340      *
341      * @return the identity of this consumer
342      */

343     protected long getConsumerId() {
344         return _consumerId;
345     }
346
347     /**
348      * Returns the session that created this consumer
349      *
350      * @return the session that created this consumer
351      */

352     protected JmsSession getSession() {
353         return _session;
354     }
355
356 }
357
Popular Tags