KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > AbstractTopicConsumerEndpoint


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: AbstractTopicConsumerEndpoint.java,v 1.1 2005/03/18 03:58:39 tanderson Exp $
44  */

45
46 package org.exolab.jms.messagemgr;
47
48 import java.rmi.RemoteException JavaDoc;
49 import java.sql.Connection JavaDoc;
50 import java.util.Collections JavaDoc;
51 import java.util.HashMap JavaDoc;
52 import java.util.Iterator JavaDoc;
53 import java.util.Map JavaDoc;
54 import javax.jms.InvalidSelectorException JavaDoc;
55 import javax.jms.JMSException JavaDoc;
56
57 import org.apache.commons.logging.Log;
58 import org.apache.commons.logging.LogFactory;
59
60 import org.exolab.jms.client.JmsDestination;
61 import org.exolab.jms.client.JmsTopic;
62 import org.exolab.jms.message.MessageImpl;
63 import org.exolab.jms.persistence.PersistenceException;
64 import org.exolab.jms.scheduler.Scheduler;
65 import org.exolab.jms.selector.Selector;
66 import org.exolab.jms.server.JmsServerSession;
67
68
69 /**
70  * A {@link ConsumerEndpoint} for topics.
71  *
72  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
73  * @version $Revision: 1.1 $ $Date: 2005/03/18 03:58:39 $
74  */

75 abstract class AbstractTopicConsumerEndpoint extends AbstractConsumerEndpoint
76         implements DestinationEventListener {
77
78     /**
79      * Cache of all handles for this consumer.
80      */

81     private MessageQueue _handles = new MessageQueue();
82
83     /**
84      * Maintains a map of TopicDestinationCache that this endpoint subscribes
85      * to, keyed on JmsTopic. A wildcard subscription may point to more than
86      * one.
87      */

88     protected Map JavaDoc _caches = Collections.synchronizedMap(new HashMap JavaDoc());
89
90     /**
91      * The maximum number of messages that a dispatch can deliver at any one
92      * time
93      */

94     private final int MAX_MESSAGES = 200;
95
96     /**
97      * The logger
98      */

99     private static final Log _log =
100             LogFactory.getLog(AbstractTopicConsumerEndpoint.class);
101
102     /**
103      * Construct a new <code>TopicConsumerEndpoint</code>.
104      * <p/>
105      * The destination and selector determine where it will be sourcing its
106      * messages from, and scheduler is used to asynchronously deliver messages
107      * to the consumer.
108      *
109      * @param consumerId the identity of this consumer
110      * @param session the owning session
111      * @param topic the topic to access
112      * @param selector the message selector. May be <code>null</code>
113      * @param noLocal if true, inhibits the delivery of messages published by
114      * its own connection.
115      * @param scheduler used to schedule asynchronous message delivery.
116      * @throws InvalidSelectorException if the selector is invalid
117      * @throws JMSException if the destination caches can't be
118      * constructed
119      */

120     public AbstractTopicConsumerEndpoint(long consumerId,
121                                          JmsServerSession session,
122                                          JmsTopic topic,
123                                          String JavaDoc selector, boolean noLocal,
124                                          Scheduler scheduler)
125             throws JMSException JavaDoc {
126         super(consumerId, session, topic, selector, noLocal, scheduler);
127     }
128
129     /**
130      * Return the next available message to the client.
131      * <p/>
132      * The <code>wait</code> parameter indicates how many milliseconds to wait
133      * for a message before returning. If <code>wait</code> is <code>0</code>
134      * then do not wait. If <code>wait</code> is <code>-1</code> then wait
135      * indefinitely for the next message.
136      *
137      * @param wait number of milliseconds to wait
138      * @return the next message or <code>null</code>
139      * @throws JMSException for any error
140      */

141     public MessageHandle receive(long wait) throws JMSException JavaDoc {
142         MessageHandle handle = receiveNoWait();
143         if ((handle == null) && (wait >= 0)) {
144             // no message is currently available for this
145
// consumer. So set the flag to indicate that
146
// the consumer is waiting for a message
147
setWaitingForMessage();
148         }
149
150         return handle;
151     }
152
153     /**
154      * Returns the first available message
155      */

156     public MessageHandle receiveNoWait() throws JMSException JavaDoc {
157         MessageHandle handle = null;
158         while ((handle = _handles.removeFirst()) != null) {
159             // make sure we can still get access to the message
160
MessageImpl message = handle.getMessage();
161             if (message != null) {
162                 if (selects(message)) {
163                     // got a message which is applicable for the endpoint
164
break;
165                 } else {
166                     // this message has been filtered out so we can destroy
167
// the handle.
168
handle.destroy();
169                 }
170             }
171             handle = null;
172         }
173         return handle;
174     }
175
176     /**
177      * Return a delivered, but unacknowledged message to the cache.
178      *
179      * @param handle the handle of the message to return
180      */

181     public void returnMessage(MessageHandle handle) {
182         addMessage(handle);
183
184         // schedule if needed
185
schedule();
186     }
187
188     /**
189      * Return the number of unsent messages in the cache for this consumer.
190      *
191      * @return the number of unsent messages
192      */

193     public int getMessageCount() {
194         return _handles.size();
195     }
196
197     /**
198      * This event is called when a non-persistent message is added to the
199      * <code>DestinationCache</code>.
200      *
201      * @param handle a handle to the message
202      * @param message the added message
203      * @return <code>true</code> if the listener accepted the message; otherwise
204      * <code>false</ode>
205      * @throws JMSException if the listener fails to handle the message
206      */

207     public boolean messageAdded(MessageHandle handle, MessageImpl message)
208             throws JMSException JavaDoc {
209         boolean accepted = true;
210
211         // if the 'noLocal' indicator is set, and the message arrived on
212
// the same connection, ignore the message
213
if (getNoLocal() && message.getConnectionId() == getConnectionId()) {
214             accepted = false;
215         } else {
216             // create a message handle for this consumer
217
handle = new TopicConsumerMessageHandle(handle, this);
218
219             if (!_handles.contains(handle)) {
220                 // if the message is not already in the cache then add it
221
addMessage(handle);
222                 schedule();
223             } else {
224                 accepted = false;
225                 _log.warn("Endpoint=" + this + " already has message cached: " +
226                           handle);
227             }
228         }
229         return accepted;
230     }
231
232     /**
233      * This event is called when a message is removed from the
234      * <code>DestinationCache</code>.
235      *
236      * @param messageId the identifier of the removed message
237      * @throws JMSException if the listener fails to handle the message
238      */

239     public void messageRemoved(String JavaDoc messageId) throws JMSException JavaDoc {
240         MessageHandle handle = _handles.remove(messageId);
241         if (handle != null) {
242             handle.destroy();
243         }
244     }
245
246     /**
247      * This event is called when a persistent message is added to the
248      * <code>DestinationCache</code>.
249      *
250      * @param handle a handle to the added message
251      * @param message the added message
252      * @param connection the database connection
253      * @return <code>true</code> if the listener accepted the message;
254      * @throws JMSException if the listener fails to handle the message
255      * @throws PersistenceException if there is a persistence related problem
256      */

257     public boolean persistentMessageAdded(MessageHandle handle,
258                                           MessageImpl message,
259                                           Connection JavaDoc connection)
260             throws JMSException JavaDoc, PersistenceException {
261         boolean accepted = true;
262
263         // if the 'noLocal' indicator is set, and the message arrived on
264
// the same connection, ignore the message
265
if (getNoLocal() && message.getConnectionId() == getConnectionId()) {
266             accepted = false;
267         } else {
268             // create a message handle for this consumer
269
handle = new TopicConsumerMessageHandle(handle, this);
270             if (isPersistent()) {
271                 // and make it persistent if this is a durable consumer
272
handle.add(connection);
273             }
274
275             accepted = _handles.add(handle);
276             if (accepted) {
277                 addMessage(handle);
278                 schedule();
279             } else {
280                 accepted = false;
281                 _log.warn("Endpoint=" + this + " already has message cached: " +
282                           handle);
283             }
284         }
285         return accepted;
286     }
287
288     /**
289      * This event is called when a message is removed from the
290      * <code>DestinationCache</code>.
291      *
292      * @param messageId the identifier of the removed message
293      * @param connection the database connection
294      * @throws JMSException if the listener fails to handle the message
295      * @throws PersistenceException if there is a persistence related problem
296      */

297     public void persistentMessageRemoved(String JavaDoc messageId,
298                                          Connection JavaDoc connection)
299             throws JMSException JavaDoc, PersistenceException {
300         MessageHandle handle = _handles.remove(messageId);
301         if (handle != null) {
302             handle.destroy(connection);
303         }
304     }
305
306     /**
307      * This method is called when a new destination is added to the {@link
308      * DestinationManager}.
309      *
310      * @param destination the destination that was added
311      * @param cache the corresponding cache
312      */

313     public void destinationAdded(JmsDestination destination,
314                                  DestinationCache cache) {
315         if (destination instanceof JmsTopic) {
316             JmsTopic myTopic = (JmsTopic) getDestination();
317             JmsTopic topic = (JmsTopic) destination;
318             if (myTopic.match(topic) && !_caches.containsKey(topic)) {
319                 _caches.put(topic, cache);
320                 cache.addConsumer(this);
321             }
322         }
323     }
324
325     /**
326      * This method is called when a destination is removed from the {@link
327      * DestinationManager}.
328      *
329      * @param destination the destination that was removed
330      * @param cache the corresponding cache
331      */

332     public void destinationRemoved(JmsDestination destination,
333                                    DestinationCache cache) {
334         if (destination instanceof JmsTopic) {
335             _caches.remove(destination);
336         }
337     }
338
339     /**
340      * Deliver messages in the cache to the consumer.
341      *
342      * @return <code>true</code> if the endpoint should be rescheduled
343      */

344     protected boolean deliverMessages() {
345         boolean reschedule = true;
346
347         for (int index = 0; index < MAX_MESSAGES;) {
348
349             // check if we should exit the loop
350
if (stopDelivery()) {
351                 reschedule = false;
352                 break;
353             }
354
355             // Process the first message on the list.
356
MessageHandle handle = _handles.removeFirst();
357             try {
358                 Selector selector = getSelector();
359                 if (selector != null) {
360                     MessageImpl m = handle.getMessage();
361                     if ((m != null) && selector.selects(m)) {
362                         // this message has been selected by the selector
363
_listener.onMessage(handle);
364                         index++;
365                     } else {
366                         // this message has not been selected
367
handle.destroy();
368                     }
369                 } else {
370                     // send the message to the consumer
371
_listener.onMessage(handle);
372                     index++;
373                 }
374             } catch (RemoteException JavaDoc exception) {
375                 _listener = null;
376                 returnMessage(handle);
377             } catch (JMSException JavaDoc exception) {
378                 _log.error(exception, exception);
379                 returnMessage(handle);
380             } catch (Exception JavaDoc exception) {
381                 _log.error(exception, exception);
382                 returnMessage(handle);
383             }
384         }
385         return reschedule;
386     }
387
388     /**
389      * Registers this with the associated {@link DestinationCache}s The consumer
390      * may receive messages immediately.
391      *
392      * @throws JMSException for any JMS error
393      */

394     protected void init() throws JMSException JavaDoc {
395         JmsTopic topic = (JmsTopic) getDestination();
396
397         // register the endpoint with the destination
398
DestinationManager destmgr = DestinationManager.instance();
399         if (topic.isWildCard()) {
400             // if the topic is a wild card then we need to retrieve a
401
// set of matching destination caches.
402
_caches = destmgr.getTopicDestinationCaches(topic);
403             // for each cache register this endpoint as a consumer of
404
// it's messages. Before doing so register as a destination
405
// event listener with the DestinationManager
406
destmgr.addDestinationEventListener(this);
407             Iterator JavaDoc iterator = _caches.values().iterator();
408             while (iterator.hasNext()) {
409                 DestinationCache cache = (DestinationCache) iterator.next();
410                 cache.addConsumer(this);
411             }
412         } else {
413             // if the topic is not a wildcard then we need to get the
414
// destination cache. If one does not exist then we need to
415
// create it.
416
DestinationCache cache = destmgr.getDestinationCache(topic);
417             _caches.put(topic, cache);
418             cache.addConsumer(this);
419         }
420     }
421
422     /**
423      * Add the handle to the cache.
424      *
425      * @param handle the message handle to add
426      */

427     protected void addMessage(MessageHandle handle) {
428         _handles.add(handle);
429         notifyMessageAvailable();
430     }
431
432     /**
433      * Closes this endpoint.
434      */

435     protected void doClose() {
436         // unregister as a destination event listener
437
DestinationManager.instance().removeDestinationEventListener(this);
438         // unregister from the destination before continuing
439
DestinationCache[] caches = (DestinationCache[])
440                 _caches.values().toArray(new DestinationCache[0]);
441         for (int i = 0; i < caches.length; ++i) {
442             caches[i].removeConsumer(this);
443         }
444         _caches.clear();
445
446         if (!isPersistent()) {
447             // for non-persistent consumers, destroy all outstanding message
448
// handles
449
MessageHandle[] handles = _handles.toArray();
450             for (int i = 0; i < handles.length; ++i) {
451                 MessageHandle handle = handles[i];
452                 try {
453                     handle.destroy();
454                 } catch (JMSException JavaDoc exception) {
455                     _log.error(exception, exception);
456                 }
457             }
458         }
459     }
460 }
461
Popular Tags