KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > QueueConsumerEndpoint


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: QueueConsumerEndpoint.java,v 1.2 2005/03/18 03:58:39 tanderson Exp $
44  */

45 package org.exolab.jms.messagemgr;
46
47 import java.rmi.RemoteException JavaDoc;
48 import java.sql.Connection JavaDoc;
49 import javax.jms.InvalidSelectorException JavaDoc;
50 import javax.jms.JMSException JavaDoc;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54
55 import org.exolab.jms.client.JmsQueue;
56 import org.exolab.jms.message.MessageImpl;
57 import org.exolab.jms.scheduler.Scheduler;
58 import org.exolab.jms.server.JmsServerSession;
59
60
61 /**
62  * A {@link ConsumerEndpoint} for queues. This object shares access to a
63  * particular queue with other QueueConsumerEndpoint instances.
64  *
65  * @author <a HREF="mailto:jima@comware.com.au">Jim Alateras</a>
66  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
67  * @version $Revision: 1.2 $ $Date: 2005/03/18 03:58:39 $
68  */

69 public class QueueConsumerEndpoint
70         extends AbstractConsumerEndpoint {
71
72     /**
73      * The destination that this consumer subscribes to.
74      */

75     private QueueDestinationCache _cache = null;
76
77     /**
78      * The maximum number of messages that a dispatch can deliver at any one
79      * time.
80      */

81     private final int MAX_MESSAGES = 200;
82
83     /**
84      * The logger.
85      */

86     private static final Log _log =
87             LogFactory.getLog(QueueConsumerEndpoint.class);
88
89
90     /**
91      * Construct a new <code>QueueConsumerEndpoint</code>.
92      *
93      * @param consumerId the identity of this consumer
94      * @param session the owning session
95      * @param queue the queue to access
96      * @param scheduler used to schedule asynchronous message delivery.
97      * @throws InvalidSelectorException if the selector is not well formed
98      * @throws JMSException if the destination cache can't be
99      * created
100      */

101     public QueueConsumerEndpoint(long consumerId, JmsServerSession session,
102                                  JmsQueue queue, String JavaDoc selector,
103                                  Scheduler scheduler)
104             throws InvalidSelectorException JavaDoc, JMSException JavaDoc {
105         super(consumerId, session, queue, selector, false, scheduler);
106
107         // register the endpoint with the destination cache. If
108
// the destination cache does not exist then create it.
109
_cache = (QueueDestinationCache)
110                 DestinationManager.instance().getDestinationCache(queue);
111         _cache.addConsumer(this);
112     }
113
114     /**
115      * Return the number of unsent messages.
116      *
117      * @return the number of unsent messages
118      */

119     public int getMessageCount() {
120         return _cache.getMessageCount();
121     }
122
123     // override ConsumerEndpoint.setMessageListener
124
public void setMessageListener(ConsumerEndpointListener listener) {
125         // if the listener is null we unregister from the destination. If
126
// the listener is not null then we register with it. When we
127
// unregister we need to determine what happens to any messages
128
// in the sent and
129
if (listener == null) {
130             _cache.removeConsumer(this);
131         } else {
132             _cache.addConsumer(this);
133         }
134
135         super.setMessageListener(listener);
136     }
137
138     /**
139      * Return the next available message to the client.
140      * <p/>
141      * The <code>wait</code> parameter indicates how many milliseconds to wait
142      * for a message before returning. If <code>wait</code> is <code>0</code>
143      * then do not wait. If <code>wait</code> is <code>-1</code> then wait
144      * indefinitely for the next message.
145      *
146      * @param wait number of milliseconds to wait
147      * @return the next message or <code>null</code>
148      * @throws JMSException for any error
149      */

150     public MessageHandle receive(long wait) throws JMSException JavaDoc {
151         MessageHandle handle = getMessageFromCache();
152         if (handle == null && wait >= 0) {
153             // set a flag indicating that we are waiting
154
// for a message
155
setWaitingForMessage();
156
157             // perform a double check and we receive a
158
// message then clear the previous set flag
159
handle = getMessageFromCache();
160             if (handle != null) {
161                 clearWaitingForMessage();
162             }
163         }
164
165         return handle;
166     }
167
168     /**
169      * Check whether a listener has been registered with this endpoint to
170      * support async message delivery
171      *
172      * @return boolean - true if it has
173      */

174     public boolean hasMessageListener() {
175         return _listener != null;
176     }
177
178     /**
179      * This event is called when a non-persistent message is added to a
180      * <code>DestinationCache</code>.
181      *
182      * @param handle a handle to the added message
183      * @param message the added message
184      * @return <code>true</code> if the listener accepted the message; otherwise
185      * <code>false</ode>
186      */

187     public boolean messageAdded(MessageHandle handle, MessageImpl message) {
188         if (_listener != null) {
189             schedule();
190         } else {
191             // notify the consumer
192
notifyMessageAvailable();
193         }
194
195         return true;
196     }
197
198     /**
199      * This event is called when a persistent message is added to the
200      * <code>DestinationCache</code>.
201      *
202      * @param handle a handle to the added message
203      * @param message the added message
204      * @param connection the database connection
205      * @return <code>true</code>
206      */

207     public boolean persistentMessageAdded(MessageHandle handle,
208                                           MessageImpl message,
209                                           Connection JavaDoc connection) {
210         return messageAdded(handle, message);
211     }
212
213     /**
214      * This event is called when a message is removed from the
215      * <code>DestinationCache</code>.
216      *
217      * @param messageId the identifier of the removed message
218      */

219     public void messageRemoved(String JavaDoc messageId) {
220         // no-op
221
}
222
223     /**
224      * This event is called when a message is removed from the
225      * <code>DestinationCache</code>
226      *
227      * @param messageId a handle to the removed message
228      * @param connection the database connection
229      */

230     public void persistentMessageRemoved(String JavaDoc messageId,
231                                          Connection JavaDoc connection) {
232         // no-op
233
}
234
235     /**
236      * Deliver messages in the cache to the consumer
237      *
238      * @return <code>true</code> if the endpoint should be rescheduled
239      */

240     protected boolean deliverMessages() {
241         boolean reschedule = true;
242
243         for (int index = 0; index < MAX_MESSAGES; index++) {
244             // check if we should exit the loop
245
if (stopDelivery()) {
246                 reschedule = false;
247                 break;
248             }
249
250             MessageHandle handle = null;
251             try {
252                 handle = getMessageFromCache();
253             } catch (Exception JavaDoc exception) {
254                 _log.error(exception, exception);
255             }
256
257             // if the handle is null, then there are no more messages
258
// to deliver so break
259
if (handle == null) {
260                 reschedule = false;
261                 break;
262             }
263
264             try {
265                 // deliver the message to the client
266
_listener.onMessage(handle);
267             } catch (RemoteException JavaDoc exception) {
268                 // failed to deliver due to network error
269
_log.error(exception, exception);
270                 if (handle != null) {
271                     _cache.returnMessageHandle(handle);
272                 }
273                 _listener = null;
274             } catch (Exception JavaDoc exception) {
275                 _log.error(exception, exception);
276                 if (handle != null) {
277                     _cache.returnMessageHandle(handle);
278                 }
279             }
280         }
281         return reschedule;
282     }
283
284     /**
285      * Closes this endpoint
286      */

287     protected void doClose() {
288         // unregister from the DestinationCache
289
_cache.removeConsumer(this);
290     }
291
292     /**
293      * Return a message from the corresponding cache for this consumer or null
294      * if one is not available
295      *
296      * @return MessageHandle - the handle or null
297      * @throws JMSException for any error
298      */

299     private MessageHandle getMessageFromCache() throws JMSException JavaDoc {
300         MessageHandle handle = _cache.getMessage(getSelector());
301         if (handle instanceof QueueConsumerMessageHandle) {
302             // associate the handle with the consumer
303
((QueueConsumerMessageHandle) handle).setConsumerId(getId());
304         }
305
306         return handle;
307     }
308
309 }
310
311
Popular Tags