KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > persistence > Consumers


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  */

43
44 package org.exolab.jms.persistence;
45
46 import java.sql.Connection JavaDoc;
47 import java.sql.PreparedStatement JavaDoc;
48 import java.sql.ResultSet JavaDoc;
49 import java.sql.SQLException JavaDoc;
50 import java.util.ArrayList JavaDoc;
51 import java.util.Collections JavaDoc;
52 import java.util.Date JavaDoc;
53 import java.util.HashMap JavaDoc;
54 import java.util.Iterator JavaDoc;
55 import java.util.List JavaDoc;
56 import java.util.Vector JavaDoc;
57
58 import javax.jms.JMSException JavaDoc;
59 import javax.sql.DataSource JavaDoc;
60
61 import org.apache.commons.logging.Log;
62 import org.apache.commons.logging.LogFactory;
63
64 import org.exolab.jms.client.JmsDestination;
65 import org.exolab.jms.client.JmsTopic;
66
67
68 /**
69  * This class provides persistency for ConsumerState objects
70  * in an RDBMS database
71  *
72  * @version $Revision: 1.2 $ $Date: 2005/06/09 14:39:51 $
73  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
74  */

75 class Consumers {
76
77     /**
78      * A cache for all durable consumers
79      */

80     private HashMap JavaDoc _consumers;
81
82     /**
83      * A refernce to the singleton instance of this class
84      */

85     private static Consumers _instance;
86
87     /**
88      * Monitor used to synchronize access to the initialization of
89      * the singleton
90      */

91     private static final Boolean JavaDoc block = new Boolean JavaDoc(true);
92
93     /**
94      * The name of the column that uniquely identifies the consumer
95      */

96     private static final String JavaDoc CONSUMER_ID_SEED = "consumerId";
97
98     /**
99      * The name of the table that maintains a list of message handles
100      * per consumer
101      */

102     private static final String JavaDoc CONSUMER_MESSAGE = "message_handles";
103
104     /**
105      * The logger
106      */

107     private static final Log _log = LogFactory.getLog(Consumers.class);
108
109
110     /**
111      * Returns the singleton instance.
112      *
113      * Note that initialise() must have been invoked first for this
114      * to return a valid instance.
115      *
116      * @return Consumers the singleton instance
117      */

118     public static Consumers instance() {
119         return _instance;
120     }
121
122     /**
123      * Initialise the singleton instance
124      *
125      * @param Connection - the connection to use
126      * @return Consumers - singleton instance
127      * @throws PersistenceException - if the call cannot complete
128      */

129     public static Consumers initialise(Connection JavaDoc connection)
130         throws PersistenceException {
131
132         if (_instance == null) {
133             synchronized (block) {
134                 if (_instance == null) {
135                     _instance = new Consumers();
136                     _instance.load(connection);
137                 }
138             }
139         }
140         return _instance;
141     }
142
143     /**
144      * Add a new durable consumer to the database if it does not already
145      * exist. A durable consumer is specified by a destination name and
146      * a consumer name.
147      * <p>
148      * The destination must resolve to a valid JmsDestination object
149      *
150      * @param connection - the database connection to use
151      * @param destination - the name of the destination
152      * @param consumer - the name of the consumer
153      * @throws PersistenceException - if the consumer cannot be added
154      */

155     public synchronized void add(Connection JavaDoc connection, String JavaDoc dest,
156                                  String JavaDoc consumer)
157         throws PersistenceException {
158
159         JmsDestination destination = null;
160         Destinations singleton = Destinations.instance();
161         long destinationId = 0;
162
163         synchronized (singleton) {
164             destination = singleton.get(dest);
165             if (destination == null) {
166                 raise("add", consumer, dest, "destination is invalid");
167             }
168             destinationId = singleton.getId(dest);
169         }
170
171         // check that for a topic the consumer name is not the same as the
172
// destination name
173
if ((destination instanceof JmsTopic) &&
174             (consumer.equals(dest))) {
175             raise("add", consumer, dest,
176                 "The consumer name and destination name cannot be the same");
177         }
178
179         // get the next id from the seed table
180
long consumerId = SeedGenerator.instance().next(connection,
181             CONSUMER_ID_SEED);
182
183         PreparedStatement JavaDoc insert = null;
184         try {
185             insert = connection.prepareStatement(
186                 "insert into consumers values (?,?,?,?)");
187
188             long created = (new Date JavaDoc()).getTime();
189             insert.setString(1, consumer);
190             insert.setLong(2, destinationId);
191             insert.setLong(3, consumerId);
192             insert.setLong(4, created);
193             insert.executeUpdate();
194
195             Consumer map = new Consumer(consumer, consumerId,
196                 destinationId, created);
197
198             // check to see if the durable consumer already exists. If it
199
// does then do not add it but signal and error
200
if (!_consumers.containsKey(consumer)) {
201                 _consumers.put(consumer, map);
202             } else {
203                 _log.error("Durable consumer with name " + consumer
204                     + " already exists.");
205             }
206         } catch (Exception JavaDoc exception) {
207             throw new PersistenceException(
208                 "Failed to add consumer, destination=" + dest +
209                 ", name=" + consumer, exception);
210         } finally {
211             SQLHelper.close(insert);
212         }
213     }
214
215     /**
216      * Remove a consumer from the database. If the destination is of
217      * type queue then the destination name and the consumer name are
218      * identical.
219      *
220      * @param connection - the connection to use
221      * @param name - the consumer name
222      * @throws PersistenceException - if the consumer cannot be removed
223      */

224     public synchronized void remove(Connection JavaDoc connection, String JavaDoc name)
225         throws PersistenceException {
226
227         PreparedStatement JavaDoc delete = null;
228
229         // locate the consumer
230
Consumer map = (Consumer) _consumers.get(name);
231         if (map == null) {
232             raise("remove", name, "consumer does not exist");
233         }
234
235         try {
236             delete = connection.prepareStatement(
237                 "delete from consumers where name=?");
238             delete.setString(1, name);
239             delete.executeUpdate();
240
241             // now delete all the corresponding handles in the consumer
242
// message table
243
remove(CONSUMER_MESSAGE, map.consumerId, connection);
244
245             // remove the consumer from the local cache
246
_consumers.remove(name);
247         } catch (SQLException JavaDoc exception) {
248             throw new PersistenceException(
249                 "Failed to remove consumer=" + name, exception);
250         } finally {
251             SQLHelper.close(delete);
252         }
253     }
254
255     /**
256      * Return the id of the durable consumer.
257      *
258      * @param connection - the database connection to use
259      * @param name - consumer name
260      * @return the consumer identity
261      */

262     public synchronized long getConsumerId(String JavaDoc name) {
263         Consumer map = (Consumer) _consumers.get(name);
264         return (map != null) ? map.consumerId : 0;
265     }
266
267     /**
268      * Return true if a consumer exists
269      *
270      * @param name - the consumer name
271      */

272     public synchronized boolean exists(String JavaDoc name) {
273         return (_consumers.get(name) != null);
274     }
275
276     /**
277      * Returns a list of consumer names associated with a topic
278      *
279      * @param topic - the topic to query
280      */

281     public synchronized Vector JavaDoc getDurableConsumers(String JavaDoc destination) {
282         Vector JavaDoc result = new Vector JavaDoc();
283         long destinationId = Destinations.instance().getId(destination);
284         if (destinationId != 0) {
285             Iterator JavaDoc iter = _consumers.values().iterator();
286             while (iter.hasNext()) {
287                 Consumer map = (Consumer) iter.next();
288                 if (map.destinationId == destinationId) {
289                     result.add(map.name);
290                 }
291             }
292         }
293
294         return result;
295     }
296
297     /**
298      * Return a map of consumer names to destinations names.
299      *
300      * @return HashMap - list of all durable consumers
301      */

302     public synchronized HashMap JavaDoc getAllDurableConsumers() {
303         HashMap JavaDoc result = new HashMap JavaDoc();
304
305         Iterator JavaDoc iter = _consumers.values().iterator();
306         while (iter.hasNext()) {
307             Consumer map = (Consumer) iter.next();
308             JmsDestination dest = Destinations.instance().get(
309                 map.destinationId);
310
311             if (dest instanceof JmsTopic) {
312                 result.put(map.name, dest.getName());
313             }
314         }
315
316         return result;
317     }
318
319     /**
320      * Return the consumer name corresponding to the specified identity
321      *
322      * @param id - the consumer identity
323      */

324     public synchronized String JavaDoc getConsumerName(long id) {
325         String JavaDoc name = null;
326         Iterator JavaDoc iter = _consumers.values().iterator();
327
328         while (iter.hasNext()) {
329             Consumer map = (Consumer) iter.next();
330             if (map.consumerId == id) {
331                 name = map.name;
332                 break;
333             }
334         }
335
336         return name;
337     }
338
339     /**
340      * Deallocates resources owned or referenced by the instance
341      */

342     public synchronized void close() {
343         _consumers.clear();
344         _consumers = null;
345
346         _instance = null;
347     }
348
349     /**
350      * Removes all cached consumer details for a given destination
351      *
352      * @param destinationId the Id of the destination
353      */

354     protected synchronized void removeCached(long destinationId) {
355         Object JavaDoc[] list = _consumers.values().toArray();
356         for (int i = 0; i < list.length; i++) {
357             Consumer map = (Consumer) list[i];
358             if (map.destinationId == destinationId) {
359                 _consumers.remove(map.name);
360             }
361         }
362     }
363
364     /**
365      * Constructor
366      */

367     private Consumers() {
368         _consumers = new HashMap JavaDoc();
369     }
370
371     /**
372      * Load the cache during init time. It needs to get access to the
373      * TransactionService and the DatabaseService so that it can get
374      * get access to a transaction and a database connection. This method
375      * reads <b>all the consumers</b> into memory.
376      * <p>
377      * If there is any problem completing this operation then the method
378      * will throw a PersistenceException
379      *
380      * @param connection - the connection to use
381      * @throws PersistenceException - if the load fails
382      */

383     private void load(Connection JavaDoc connection)
384         throws PersistenceException {
385
386         PreparedStatement JavaDoc select = null;
387         ResultSet JavaDoc set = null;
388         try {
389             select = connection.prepareStatement(
390                 "select name, consumerid, destinationid, created "
391                 + "from consumers");
392             set = select.executeQuery();
393             String JavaDoc name = null;
394             long consumerId = 0;
395             long destinationId = 0;
396             long created = 0;
397             Consumer map = null;
398             while (set.next()) {
399                 name = set.getString(1);
400                 consumerId = set.getLong(2);
401                 destinationId = set.getLong(3);
402                 created = set.getLong(4);
403                 map = new Consumer(name, consumerId, destinationId,
404                     created);
405                 _consumers.put(name, map);
406             }
407         } catch (SQLException JavaDoc exception) {
408             throw new PersistenceException("Failed to retrieve consumers",
409                 exception);
410         } finally {
411             SQLHelper.close(set);
412             SQLHelper.close(select);
413         }
414     }
415
416     /**
417      * Remove all the rows in the specified table with the corresponding
418      * consumer identity.
419      *
420      * @param table - the table to destroy
421      * @param consumerId - the target consumerId
422      * @param connection - the database connection to use
423      * @throws SQLException - thrown on any error
424      */

425     private void remove(String JavaDoc table, long consumerId, Connection JavaDoc connection)
426         throws SQLException JavaDoc {
427
428         PreparedStatement JavaDoc delete = null;
429         try {
430             delete = connection.prepareStatement(
431                 "delete from " + table + " where consumerId=?");
432             delete.setLong(1, consumerId);
433             delete.executeUpdate();
434         } finally {
435             SQLHelper.close(delete);
436         }
437     }
438
439     /**
440      * Raise a PersistenceException with the specified parameters
441      *
442      * @param operation - operation that failed
443      * @param name - corresponding consumert name
444      * @param destination - corresponding destination
445      * @param reason - the reason for the exception
446      */

447     private void raise(String JavaDoc operation, String JavaDoc name, String JavaDoc destination,
448                        String JavaDoc reason)
449         throws PersistenceException {
450         throw new PersistenceException("Cannot " + operation + " consumer=" +
451             name + ", destination=" + destination + ": " + reason);
452     }
453
454     /**
455      * Raise a PersistenceException with the specified parameters
456      *
457      * @param operation - operation that failed
458      * @param name - corresponding consumert name
459      * @param reason - the reasone for the exception
460      */

461     private void raise(String JavaDoc operation, String JavaDoc name, String JavaDoc reason)
462         throws PersistenceException {
463         throw new PersistenceException("Cannot " + operation + " consumer=" +
464             name + ": " + reason);
465     }
466
467     /**
468      * This is an internal class that is used to store consumer entries
469      */

470     private class Consumer {
471
472         /**
473          * The name of the consumer
474          */

475         public String JavaDoc name;
476
477         /**
478          * The unique consumer identity
479          */

480         public long consumerId;
481
482         /**
483          * The identity of the destination that this durable consumer is
484          * subscribed too
485          */

486         public long destinationId;
487
488         /**
489          * The time that this durable consumer was created
490          */

491         public long created;
492
493
494         public Consumer(String JavaDoc name, long consumerId, long destinationId,
495                         long created) {
496
497             this.name = name;
498             this.consumerId = consumerId;
499             this.destinationId = destinationId;
500             this.created = created;
501         }
502
503         public String JavaDoc getKey() {
504             return name;
505         }
506     }
507 }
508
Popular Tags