KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > messagemgr > TopicDestinationCache


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: TopicDestinationCache.java,v 1.3 2005/05/13 12:57:02 tanderson Exp $
44  */

45 package org.exolab.jms.messagemgr;
46
47 import java.sql.Connection JavaDoc;
48 import java.util.Iterator JavaDoc;
49 import java.util.List JavaDoc;
50 import java.util.Vector JavaDoc;
51 import java.util.ArrayList JavaDoc;
52 import javax.jms.JMSException JavaDoc;
53
54 import org.exolab.jms.client.JmsDestination;
55 import org.exolab.jms.client.JmsTopic;
56 import org.exolab.jms.message.MessageImpl;
57 import org.exolab.jms.persistence.PersistenceException;
58 import org.exolab.jms.persistence.DatabaseService;
59
60
61 /**
62  * A {@link DestinationCache} for topics.
63  *
64  * @author <a HREF="mailto:jima@comware.com.au">Jim Alateras</a>
65  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
66  * @version $Revision: 1.3 $ $Date: 2005/05/13 12:57:02 $
67  */

68 class TopicDestinationCache extends AbstractDestinationCache {
69
70     /**
71      * Construct a new <code>TopicDestinationCache</code> for a non-persistent
72      * topic.
73      *
74      * @param topic the topic to cache messages for
75      */

76     public TopicDestinationCache(JmsTopic topic) {
77         super(topic);
78     }
79
80     /**
81      * Construct a new <code>TopicDestinationCache</code> for a persistent
82      * topic.
83      *
84      * @param topic the topic to cache messages for
85      * @param connection the database connection
86      * @throws JMSException for any JMS error
87      * @throws PersistenceException for any persistence error
88      */

89     public TopicDestinationCache(JmsTopic topic, Connection JavaDoc connection)
90             throws JMSException JavaDoc, PersistenceException {
91         super(topic, connection);
92     }
93
94     /**
95      * Register a consumer with this cache.
96      *
97      * @param consumer the message consumer for this destination
98      * @return <code>true</code> if registered; otherwise <code>false</code>
99      */

100     public boolean addConsumer(ConsumerEndpoint consumer) {
101
102         boolean result = false;
103
104         // check to see that the consumer can actually subscribe to
105
// this destination
106
JmsTopic cdest = (JmsTopic) consumer.getDestination();
107         JmsTopic ddest = (JmsTopic) getDestination();
108
109         if (cdest.match(ddest)) {
110             result = super.addConsumer(consumer);
111         }
112
113         return result;
114     }
115
116     /**
117      * Invoked when the {@link MessageMgr} receives a non-persistent message.
118      *
119      * @param destination the message's destination
120      * @param message the message
121      * @throws JMSException if the listener fails to handle the message
122      */

123     public void messageAdded(JmsDestination destination, MessageImpl message)
124             throws JMSException JavaDoc {
125         boolean processed = false;
126         MessageRef reference =
127                 new CachedMessageRef(message, false, getMessageCache());
128         addMessage(reference, message);
129         MessageHandle handle = new SharedMessageHandle(reference, message);
130
131         ConsumerEndpoint[] consumers = getConsumerArray();
132         for (int index = 0; index < consumers.length; index++) {
133             ConsumerEndpoint consumer = consumers[index];
134             processed |= consumer.messageAdded(handle, message);
135         }
136
137         // create a lease iff one is required and the message has actually
138
// been accepted by at least one endpoint
139
if (processed) {
140             checkMessageExpiry(reference, message);
141         } else {
142             // no consumer picked up the message, so toss it
143
reference.destroy();
144             // @todo - inefficient. Don't really want to add the message
145
// just to remove it again if there are no consumers for it
146
}
147     }
148
149     /**
150      * Invoked when the {@link MessageMgr} receives a persistent message.
151      *
152      * @param connection the database connection
153      * @param destination the message's destination
154      * @param message the message
155      * @throws JMSException if the listener fails to handle the message
156      * @throws PersistenceException if there is a persistence related problem
157      */

158     public void persistentMessageAdded(Connection JavaDoc connection,
159                                        JmsDestination destination,
160                                        MessageImpl message)
161             throws JMSException JavaDoc, PersistenceException {
162         boolean processed = false;
163         MessageRef reference = new CachedMessageRef(message, true, getMessageCache());
164         addMessage(reference, message);
165         SharedMessageHandle handle = new SharedMessageHandle(reference, message);
166
167         // now send the message to all active consumers
168
ConsumerEndpoint[] consumers = getConsumerArray();
169         for (int index = 0; index < consumers.length; index++) {
170             ConsumerEndpoint consumer = consumers[index];
171             processed |= consumer.persistentMessageAdded(handle, message, connection);
172         }
173
174         // for each inactive durable consumer, add a persistent handle
175
// @todo - possible race condition between inactive subscription
176
// becoming active again - potential for message loss?
177
JmsTopic topic = (JmsTopic) getDestination();
178         List JavaDoc inactive = ConsumerManager.instance().getInactiveSubscriptions(
179                 topic);
180         if (!inactive.isEmpty()) {
181             Iterator JavaDoc iterator = inactive.iterator();
182             while (iterator.hasNext()) {
183                 String JavaDoc name = (String JavaDoc) iterator.next();
184                 TopicConsumerMessageHandle durable
185                         = new TopicConsumerMessageHandle(handle, name);
186                 durable.add(connection);
187             }
188             processed = true;
189         }
190
191         // create a lease iff one is required and the message has actually
192
// been accepted by at least one endpoint
193
if (processed) {
194             checkMessageExpiry(reference, message);
195         } else {
196             // no consumer picked up the message, so toss it
197
handle.destroy(connection);
198             // @todo - inefficient. Don't really want to make the message
199
// persistent, just to remove it again if there are no consumers
200
// for it
201
}
202
203     }
204
205     /**
206      * Return a message handle back to the cache, to recover unsent or
207      * unacknowledged messages.
208      *
209      * @param handle the message handle to return
210      */

211     public void returnMessageHandle(MessageHandle handle) {
212         long consumerId = handle.getConsumerId();
213         AbstractTopicConsumerEndpoint endpoint =
214                 (AbstractTopicConsumerEndpoint) getConsumerEndpoint(consumerId);
215         // if the endpoint is still active then return the message
216
// back to it
217
if (endpoint != null) {
218             endpoint.returnMessage(handle);
219         } else {
220             // @todo - need to destroy the handle?
221
// what about for inactive durable consumers -
222
// could this inadvertently trash the message?
223
}
224
225     }
226
227     /**
228      * Load the state of a durable consumer.
229      *
230      * @param name the durable subscription name
231      * @param connection the database connection to use
232      * @return a list of {@link MessageHandle} instances
233      * @throws JMSException for any JMS error
234      * @throws PersistenceException for any persistence error
235      */

236     public List JavaDoc getDurableMessageHandles(String JavaDoc name, Connection JavaDoc connection)
237             throws JMSException JavaDoc, PersistenceException {
238         Vector JavaDoc handles = DatabaseService.getAdapter().getMessageHandles(
239                 connection, getDestination(), name);
240         List JavaDoc result = new ArrayList JavaDoc(handles.size());
241
242         MessageCache cache = getMessageCache();
243
244         Iterator JavaDoc iterator = handles.iterator();
245         while (iterator.hasNext()) {
246             PersistentMessageHandle handle =
247                     (PersistentMessageHandle) iterator.next();
248             String JavaDoc messageId = handle.getMessageId();
249             MessageRef reference = cache.getMessageRef(messageId);
250             if (reference == null) {
251                 reference = new CachedMessageRef(messageId, true, cache);
252             }
253             cache.addMessageRef(reference);
254             handle.reference(reference);
255             result.add(handle);
256
257             checkMessageExpiry(reference, handle.getExpiryTime());
258         }
259         return result;
260     }
261
262     /**
263      * Initialise the cache from the database.
264      *
265      * @param connection the database connection to use
266      * @throws JMSException for any JMS error
267      * @throws PersistenceException for any persistence error
268      */

269     protected void init(Connection JavaDoc connection) throws JMSException JavaDoc,
270             PersistenceException {
271         // no-op
272
}
273
274     /**
275      * Remove an expired persistent message, and notify any listeners.
276      *
277      * @param reference a handle to the expired message
278      * @param connection the database connection
279      * @throws JMSException if a listener fails to handle the
280      * expiration
281      * @throws PersistenceException if there is a persistence related problem
282      */

283     protected void persistentMessageExpired(MessageRef reference, Connection JavaDoc connection)
284             throws JMSException JavaDoc, PersistenceException {
285         String JavaDoc messageId = reference.getMessageId();
286         ConsumerEndpoint[] consumers = getConsumerArray();
287
288         for (int i = 0; i < consumers.length; ++i) {
289             consumers[i].persistentMessageRemoved(messageId, connection);
290         }
291
292         // since it is a persistent message, need to handle inactive
293
// durable consumers subscribing to the destination
294
List JavaDoc inactive = ConsumerManager.instance().getInactiveSubscriptions(
295                 (JmsTopic) getDestination());
296         Iterator JavaDoc iterator = inactive.iterator();
297         while (iterator.hasNext()) {
298             String JavaDoc name = (String JavaDoc) iterator.next();
299             // @todo
300
/*
301             TopicConsumerMessageHandle sub = new TopicConsumerMessageHandle(handle, name);
302             sub.destroy(connection);
303 */

304         }
305     }
306
307 }
308
309
Popular Tags