KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > persistence > RDBMSAdapter


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: RDBMSAdapter.java,v 1.4 2005/06/09 14:39:52 tanderson Exp $
44  */

45 package org.exolab.jms.persistence;
46
47 import java.sql.Connection JavaDoc;
48 import java.sql.Date JavaDoc;
49 import java.sql.PreparedStatement JavaDoc;
50 import java.sql.ResultSet JavaDoc;
51 import java.sql.SQLException JavaDoc;
52 import java.util.Enumeration JavaDoc;
53 import java.util.HashMap JavaDoc;
54 import java.util.Vector JavaDoc;
55
56 import EDU.oswego.cs.dl.util.concurrent.FIFOReadWriteLock;
57 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
58
59 import org.apache.commons.logging.Log;
60 import org.apache.commons.logging.LogFactory;
61
62 import org.exolab.jms.authentication.User;
63 import org.exolab.jms.client.JmsDestination;
64 import org.exolab.jms.client.JmsQueue;
65 import org.exolab.jms.client.JmsTopic;
66 import org.exolab.jms.config.ConfigurationManager;
67 import org.exolab.jms.config.DatabaseConfiguration;
68 import org.exolab.jms.config.RdbmsDatabaseConfiguration;
69 import org.exolab.jms.events.EventHandler;
70 import org.exolab.jms.events.BasicEventManager;
71 import org.exolab.jms.message.MessageImpl;
72 import org.exolab.jms.messagemgr.MessageHandle;
73
74
75 /**
76  * This adapter is a wrapper class around the persistency mechanism.
77  * It isolates the client from the working specifics of the database, by
78  * providing a simple straight forward interface. Furure changes to
79  * the database will only require changes to the adapter.
80  *
81  * @version $Revision: 1.4 $ $Date: 2005/06/09 14:39:52 $
82  * @author <a HREF="mailto:mourikis@exolab.org">Jim Mourikis</a>
83  */

84
85 public class RDBMSAdapter
86     extends PersistenceAdapter
87     implements EventHandler {
88
89     /**
90      * The schema version number. Note this must be incremented whenever
91      * The schema changes.
92      */

93     public static final String JavaDoc SCHEMA_VERSION = "V0.7.6";
94
95     /**
96      * The JDBC ConnectionManager
97      */

98     private static DBConnectionManager _connectionManager = null;
99
100     /**
101      * This is the interval that the automatic garbage collector will
102      * execute, if specified. It is specified in seconds.
103      */

104     private int _gcInterval = 600;
105
106     /**
107      * This is the block size that is used during purging.
108      */

109     private int _gcBlockSize = 500;
110
111     /**
112      * This is used to track the incremental garbage collection. It
113      * tracks the last blocks examined.
114      */

115     private long _lastTime = 0;
116
117     /**
118      * This is the thread priority for the GC Thread
119      */

120     private int _gcThreadPriority = Thread.NORM_PRIORITY;
121
122     /**
123      * Lock to help prevent deadlocks when administratively removing
124      * destinations, while producers and consumers are actively sending
125      * and receiving messages. It ensures that when a destination is in the
126      * process of being removed, no other changes are occuring on the
127      * messages and message_handles tables.
128      */

129     private ReadWriteLock _destinationLock = new FIFOReadWriteLock();
130
131     /**
132      * This is the event that is fired to initiate garbage collection
133      * in the database
134      */

135     private static final int COLLECT_DATABASE_GARBAGE_EVENT = 1;
136
137     /**
138      * The logger
139      */

140     private static final Log _log = LogFactory.getLog(RDBMSAdapter.class);
141
142
143     /**
144      * Connects to the given db.
145      *
146      * @throws PersistenceException if a connection cannot be establised to
147      * the database
148      */

149     RDBMSAdapter(String JavaDoc driver, String JavaDoc url, String JavaDoc userName, String JavaDoc password)
150         throws PersistenceException {
151
152         DatabaseConfiguration dbConfig =
153             ConfigurationManager.getConfig().getDatabaseConfiguration();
154         RdbmsDatabaseConfiguration config =
155             dbConfig.getRdbmsDatabaseConfiguration();
156
157         // create the connection manager, and configure it
158
_connectionManager = getConnectionManager(config.getClazz());
159         _connectionManager.setUser(userName);
160         _connectionManager.setPassword(password);
161         _connectionManager.setDriver(driver);
162         _connectionManager.setURL(url);
163         _connectionManager.setMaxActive(config.getMaxActive());
164         _connectionManager.setMaxIdle(config.getMaxIdle());
165         _connectionManager.setMinIdleTime(config.getMinIdleTime());
166         _connectionManager.setEvictionInterval(config.getEvictionInterval());
167         _connectionManager.setTestQuery(config.getTestQuery());
168         _connectionManager.setTestBeforeUse(config.getTestBeforeUse());
169
170         // initialisze the connection manager
171
_connectionManager.init();
172
173         Connection JavaDoc connection = null;
174         try {
175             // initialize the various caches and helper classes used to
176
// execute the various SQL.
177
connection = getConnection();
178
179             String JavaDoc version = getSchemaVersion(connection);
180             if (version == null) {
181                 initSchemaVersion(connection);
182             } else if (!version.equals(SCHEMA_VERSION)) {
183                 throw new PersistenceException(
184                     "Schema needs to be converted from version=" + version
185                     + " to version=" + SCHEMA_VERSION
186                     + "\nBack up your database, and run 'dbtool -migrate'"
187                     + "to convert the schema");
188             }
189
190             SeedGenerator.initialise();
191             Destinations.initialise(connection);
192             Consumers.initialise(connection);
193             Messages.initialise();
194             MessageHandles.initialise();
195             connection.commit();
196             Users.initialise();
197         } catch (PersistenceException exception) {
198             SQLHelper.rollback(connection);
199             throw exception;
200         } catch (Exception JavaDoc exception) {
201             throw new PersistenceException(
202                 "Failed to initialise database adapter", exception);
203         } finally {
204             SQLHelper.close(connection);
205             
206         }
207
208 /*
209         // check whether we should initiate automatic garbage collection
210         if (dbConfig.hasGarbageCollectionInterval()) {
211             _gcInterval = dbConfig.getGarbageCollectionInterval() * 1000;
212             registerEvent();
213         }
214
215         if (dbConfig.hasGarbageCollectionBlockSize()) {
216             _gcBlockSize = dbConfig.getGarbageCollectionBlockSize();
217         }
218
219         if (dbConfig.hasGarbageCollectionThreadPriority()) {
220             _gcThreadPriority = dbConfig.getGarbageCollectionBlockSize();
221             if (_gcThreadPriority < Thread.MIN_PRIORITY) {
222                 _gcThreadPriority = Thread.MIN_PRIORITY;
223             } else if (_gcThreadPriority > Thread.MAX_PRIORITY) {
224                 _gcThreadPriority = Thread.MAX_PRIORITY;
225             }
226         }
227 */

228     }
229
230     /**
231      * Close the database if open.
232      */

233     public void close() {
234         if (SeedGenerator.instance() != null) {
235             SeedGenerator.instance().close();
236         }
237
238         if (Destinations.instance() != null) {
239             Destinations.instance().close();
240         }
241
242         if (Consumers.instance() != null) {
243             Consumers.instance().close();
244         }
245
246         if (Messages.instance() != null) {
247             Messages.instance().close();
248         }
249
250         if (MessageHandles.instance() != null) {
251             MessageHandles.instance().close();
252         }
253
254         if (Users.instance() != null) {
255             Users.instance().close();
256         }
257     }
258
259     // implementation of PersistenceAdapter.getLastId
260
public long getLastId(Connection JavaDoc connection)
261         throws PersistenceException {
262
263         long lastId = -1;
264         boolean successful = false;
265         PreparedStatement JavaDoc query = null;
266         ResultSet JavaDoc result = null;
267         PreparedStatement JavaDoc insert = null;
268         try {
269             query = connection.prepareStatement(
270                 "select maxid from message_id where id = 1");
271             result = query.executeQuery();
272
273             if (result.next()) {
274                 lastId = result.getInt(1);
275             } else {
276                 // first entry create.
277
insert = connection.prepareStatement(
278                     "insert into message_id values (?,?)");
279                 insert.setInt(1, 1);
280                 insert.setLong(2, 0);
281                 insert.executeUpdate();
282                 lastId = 0;
283             }
284         } catch (Exception JavaDoc exception) {
285             throw new PersistenceException("Failed to get last message id",
286                 exception);
287         } finally {
288             SQLHelper.close(result);
289             SQLHelper.close(insert);
290             SQLHelper.close(query);
291         }
292
293         return lastId;
294     }
295
296     // implementation of PersistenceAdapter.updateIds
297
public void updateIds(Connection JavaDoc connection, long id)
298         throws PersistenceException {
299         PreparedStatement JavaDoc insert = null;
300         try {
301             insert = connection.prepareStatement(
302                 "update message_id set maxId = ? where id = 1");
303
304             insert.setLong(1, id);
305             insert.executeUpdate();
306         } catch (Exception JavaDoc exception) {
307             throw new PersistenceException("Failed to update message id",
308                                            exception);
309         } finally {
310             SQLHelper.close(insert);
311         }
312     }
313
314     // implementation of PersistenceMessage.addMessage
315
public void addMessage(Connection JavaDoc connection, MessageImpl message)
316         throws PersistenceException {
317
318         long start = 0;
319
320         if (_log.isDebugEnabled()) {
321             start = System.currentTimeMillis();
322         }
323
324         try {
325             _destinationLock.readLock().acquire();
326             Messages.instance().add(connection, message);
327         } catch (InterruptedException JavaDoc exception) {
328             throw new PersistenceException("Failed to acquire lock",
329                 exception);
330         } finally {
331             _destinationLock.readLock().release();
332
333             if (_log.isDebugEnabled()) {
334                 _log.debug("addMessage," +
335                     (System.currentTimeMillis() - start));
336             }
337         }
338     }
339
340     // implementation of PersistenceMessage.addMessage
341
public void updateMessage(Connection JavaDoc connection, MessageImpl message)
342         throws PersistenceException {
343         long start = 0;
344         if (_log.isDebugEnabled()) {
345             start = System.currentTimeMillis();
346         }
347
348         try {
349             _destinationLock.readLock().acquire();
350             Messages.instance().update(connection, message);
351         } catch (InterruptedException JavaDoc exception) {
352             throw new PersistenceException("Failed to acquire lock",
353                 exception);
354         } finally {
355             _destinationLock.readLock().release();
356             if (_log.isDebugEnabled()) {
357                 _log.debug("updateMessage," +
358                     (System.currentTimeMillis() - start));
359             }
360         }
361     }
362
363     // implementation of PersistenceAdapter.getUnprocessedMessages
364
public Vector JavaDoc getUnprocessedMessages(Connection JavaDoc connection)
365         throws PersistenceException {
366         long start = 0;
367         if (_log.isDebugEnabled()) {
368             start = System.currentTimeMillis();
369         }
370
371         try {
372             return Messages.instance().getUnprocessedMessages(connection);
373         } finally {
374             if (_log.isDebugEnabled()) {
375                 _log.debug("getUnprocessedMessages," + (System.currentTimeMillis() - start));
376             }
377         }
378     }
379
380
381     // implementation of PersistenceAdapter.removeMessage
382
public void removeMessage(Connection JavaDoc connection, String JavaDoc id)
383         throws PersistenceException {
384         long start = 0;
385         if (_log.isDebugEnabled()) {
386             start = System.currentTimeMillis();
387         }
388
389         try {
390             _destinationLock.readLock().acquire();
391             Messages.instance().remove(connection, id);
392         } catch (InterruptedException JavaDoc exception) {
393             throw new PersistenceException("Failed to acquire lock",
394                 exception);
395         } finally {
396             _destinationLock.readLock().release();
397             if (_log.isDebugEnabled()) {
398                 _log.debug("removeMessage," +
399                     (System.currentTimeMillis() - start));
400             }
401         }
402     }
403
404     // implementation of PersistenceAdapter.getMessage
405
public MessageImpl getMessage(Connection JavaDoc connection, String JavaDoc id)
406         throws PersistenceException {
407         long start = 0;
408         if (_log.isDebugEnabled()) {
409             start = System.currentTimeMillis();
410         }
411
412         try {
413             return Messages.instance().get(connection, id);
414         } finally {
415             if (_log.isDebugEnabled()) {
416                 _log.debug("getMessage," + (System.currentTimeMillis() - start));
417             }
418         }
419     }
420
421     // implementation of PersistenceAdapter.getMessages
422
public Vector JavaDoc getMessages(Connection JavaDoc connection, MessageHandle handle)
423         throws PersistenceException {
424         long start = 0;
425         if (_log.isDebugEnabled()) {
426             start = System.currentTimeMillis();
427         }
428
429         try {
430             return Messages.instance().getMessages(connection,
431                 handle.getDestination().getName(), handle.getPriority(),
432                 handle.getAcceptedTime());
433         } finally {
434             if (_log.isDebugEnabled()) {
435                 _log.debug("getMessages," + (System.currentTimeMillis() - start));
436             }
437         }
438     }
439
440     // implementation of PersistenceAdapter.addMessageHandle
441
public void addMessageHandle(Connection JavaDoc connection, MessageHandle handle)
442         throws PersistenceException {
443         long start = 0;
444         if (_log.isDebugEnabled()) {
445             start = System.currentTimeMillis();
446         }
447
448         try {
449             _destinationLock.readLock().acquire();
450             MessageHandles.instance().addMessageHandle(connection, handle);
451         } catch (InterruptedException JavaDoc exception) {
452             throw new PersistenceException("Failed to acquire lock",
453                 exception);
454         } finally {
455             _destinationLock.readLock().release();
456             if (_log.isDebugEnabled()) {
457                 _log.debug("addMessageHandle," + (System.currentTimeMillis() - start));
458             }
459         }
460     }
461
462     // implementation of PersistenceAdapter.updateMessageHandle
463
public void updateMessageHandle(Connection JavaDoc connection, MessageHandle handle)
464         throws PersistenceException {
465         long start = 0;
466         if (_log.isDebugEnabled()) {
467             start = System.currentTimeMillis();
468         }
469
470         try {
471             _destinationLock.readLock().acquire();
472             MessageHandles.instance().updateMessageHandle(connection, handle);
473         } catch (InterruptedException JavaDoc exception) {
474             throw new PersistenceException("Failed to acquire lock",
475                 exception);
476         } finally {
477             _destinationLock.readLock().release();
478             if (_log.isDebugEnabled()) {
479                 _log.debug("updateMessageHandle," + (System.currentTimeMillis() - start));
480             }
481         }
482     }
483
484     // implementation of PersistenceAdapter.removeMessageHandle
485
public void removeMessageHandle(Connection JavaDoc connection, MessageHandle handle)
486         throws PersistenceException {
487         long start = 0;
488         if (_log.isDebugEnabled()) {
489             start = System.currentTimeMillis();
490         }
491
492         try {
493             _destinationLock.readLock().acquire();
494             MessageHandles.instance().removeMessageHandle(connection, handle);
495         } catch (InterruptedException JavaDoc exception) {
496             throw new PersistenceException("Failed to acquire lock",
497                 exception);
498         } finally {
499             _destinationLock.readLock().release();
500             if (_log.isDebugEnabled()) {
501                 _log.debug("removeMessageHandle," + (System.currentTimeMillis() - start));
502             }
503         }
504     }
505
506     // implementation of PersistenceAdapter.getMessageHandles
507
public Vector JavaDoc getMessageHandles(Connection JavaDoc connection,
508                                     JmsDestination destination, String JavaDoc name)
509         throws PersistenceException {
510         long start = 0;
511         if (_log.isDebugEnabled()) {
512             start = System.currentTimeMillis();
513         }
514
515         try {
516             return MessageHandles.instance().getMessageHandles(connection,
517                 destination.getName(), name);
518         } finally {
519             if (_log.isDebugEnabled()) {
520                 _log.debug("getMessageHandles,"
521                           + (System.currentTimeMillis() - start));
522             }
523         }
524     }
525
526     // implementation of PersistenceAdapter.addDurableConsumer
527
public void addDurableConsumer(Connection JavaDoc connection, String JavaDoc topic,
528                                    String JavaDoc consumer)
529         throws PersistenceException {
530
531         try {
532             _destinationLock.readLock().acquire();
533             Consumers.instance().add(connection, topic, consumer);
534         } catch (InterruptedException JavaDoc exception) {
535             throw new PersistenceException("Failed to acquire lock",
536                 exception);
537         } finally {
538             _destinationLock.readLock().release();
539         }
540     }
541
542     // implementation of PersistenceAdapter.removeDurableConsumer
543
public void removeDurableConsumer(Connection JavaDoc connection, String JavaDoc consumer)
544         throws PersistenceException {
545
546         try {
547             _destinationLock.readLock().acquire();
548             Consumers.instance().remove(connection, consumer);
549         } catch (InterruptedException JavaDoc exception) {
550             throw new PersistenceException("Failed to acquire lock",
551                 exception);
552         } finally {
553             _destinationLock.readLock().release();
554         }
555     }
556
557     // implementation of PersistenceAdapter.getDurableConsumers
558
public Enumeration JavaDoc getDurableConsumers(Connection JavaDoc connection, String JavaDoc topic)
559         throws PersistenceException {
560         return Consumers.instance().getDurableConsumers(topic).elements();
561     }
562
563     // implementation of PersistenceAdapter.getAllDurableConsumers
564
public HashMap JavaDoc getAllDurableConsumers(Connection JavaDoc connection)
565         throws PersistenceException {
566
567         return Consumers.instance().getAllDurableConsumers();
568     }
569
570     // implementation of PersistenceAdapter.durableConsumerExists
571
public boolean durableConsumerExists(Connection JavaDoc connection, String JavaDoc name)
572         throws PersistenceException {
573
574         return Consumers.instance().exists(name);
575     }
576
577     // implementation of PersistenceAdapter.addDestination
578
public void addDestination(Connection JavaDoc connection, String JavaDoc name,
579                                boolean queue)
580         throws PersistenceException {
581
582         JmsDestination destination = (queue)
583             ? (JmsDestination) new JmsQueue(name)
584             : (JmsDestination) new JmsTopic(name);
585
586         // create the destination. If the destination is also
587
// a queue create a special consumer for it.
588
try {
589             _destinationLock.readLock().acquire();
590             Destinations.instance().add(connection, destination);
591             if (queue) {
592                 Consumers.instance().add(connection, name, name);
593             }
594         } catch (InterruptedException JavaDoc exception) {
595             throw new PersistenceException("Failed to acquire lock",
596                 exception);
597         } finally {
598             _destinationLock.readLock().release();
599         }
600     }
601
602     // implementation of PersistenceAdapter.removeDestination
603
public void removeDestination(Connection JavaDoc connection, String JavaDoc name)
604         throws PersistenceException {
605
606         JmsDestination destination = Destinations.instance().get(name);
607         if (destination != null) {
608             try {
609                 _destinationLock.writeLock().acquire();
610                 Destinations.instance().remove(connection, destination);
611             } catch (InterruptedException JavaDoc exception) {
612                 throw new PersistenceException("Failed to acquire lock",
613                     exception);
614             } finally {
615                 _destinationLock.writeLock().release();
616             }
617         }
618     }
619
620     // implementation of PersistenceAdapter.getAllDestinations
621
public Enumeration JavaDoc getAllDestinations(Connection JavaDoc connection)
622         throws PersistenceException {
623
624         return Destinations.instance().getDestinations().elements();
625     }
626
627     // implementation of PersistenceAdapter.checkDestination
628
public boolean checkDestination(Connection JavaDoc connection, String JavaDoc name)
629         throws PersistenceException {
630
631         return (Destinations.instance().get(name) != null);
632     }
633
634     // implementation of getQueueMessageCount
635
public int getQueueMessageCount(Connection JavaDoc connection, String JavaDoc name)
636         throws PersistenceException {
637
638         return MessageHandles.instance().getMessageCount(
639             connection, name, name);
640     }
641
642     // implementation of PersistenceAdapter.getQueueMessageCount
643
public int getDurableConsumerMessageCount(Connection JavaDoc connection,
644                                               String JavaDoc destination, String JavaDoc name)
645         throws PersistenceException {
646
647         return MessageHandles.instance().getMessageCount(connection,
648             destination, name);
649     }
650
651     // implementation of PersistenceAdapter.getQueueMessageCount
652
public void removeExpiredMessages(Connection JavaDoc connection)
653         throws PersistenceException {
654
655         Messages.instance().removeExpiredMessages(connection);
656     }
657
658     // implementation of PersistenceAdapter.removeExpiredMessageHandles
659
public void removeExpiredMessageHandles(Connection JavaDoc connection,
660                                             String JavaDoc consumer)
661         throws PersistenceException {
662
663         MessageHandles.instance().removeExpiredMessageHandles(connection,
664             consumer);
665     }
666
667     // implementation of PersistenceAdapter.getNonExpiredMessages
668
public Vector JavaDoc getNonExpiredMessages(Connection JavaDoc connection,
669                                         JmsDestination destination)
670         throws PersistenceException {
671
672         return Messages.instance().getNonExpiredMessages(
673             connection, destination);
674     }
675
676     // implementation of EventHandler.handleEvent
677
public void handleEvent(int event, Object JavaDoc callback, long time) {
678         // disabled, as per bug 816895 - Exception in purgeMessages
679
// if (event == COLLECT_DATABASE_GARBAGE_EVENT) {
680
// // collect garbage now, but before doing so change the thread
681
// // priority to low.
682
// try {
683
// Thread.currentThread().setPriority(_gcThreadPriority);
684
// purgeMessages();
685
// } finally {
686
// Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
687
// registerEvent();
688
// }
689
// }
690
}
691
692     /**
693      * Return a connection to the database from the pool of connections. It
694      * will throw an PersistenceException if it cannot retrieve a connection.
695      * The client should close the connection normally, since the pool is a
696      * connection event listener.
697      *
698      * @return Connection - a pooled connection or null
699      * @exception PersistenceException - if it cannot retrieve a connection
700      */

701     public Connection JavaDoc getConnection()
702         throws PersistenceException {
703         return _connectionManager.getConnection();
704     }
705
706     /**
707      * Return a reference to the DBConnectionManager
708      *
709      * @return DBConnectionManager
710      */

711     public DBConnectionManager getDBConnectionManager() {
712         return _connectionManager;
713     }
714
715     public void addUser(Connection JavaDoc connection, User user)
716         throws PersistenceException {
717         Users.instance().add(connection, user);
718     }
719
720     public Enumeration JavaDoc getAllUsers(Connection JavaDoc connection)
721         throws PersistenceException {
722         return Users.instance().getAllUsers(connection).elements();
723     }
724
725     public User getUser(Connection JavaDoc connection, User user)
726         throws PersistenceException {
727         return Users.instance().get(connection, user);
728     }
729
730     public void removeUser(Connection JavaDoc connection, User user)
731         throws PersistenceException {
732         Users.instance().remove(connection, user);
733     }
734
735     public void updateUser(Connection JavaDoc connection, User user)
736         throws PersistenceException {
737         Users.instance().update(connection, user);
738     }
739
740     /**
741      * Incrementally purge all processed messages from the database.
742      * @todo this needs to be revisited. See bug 816895
743      * - existing expired messages are purged at startup
744      * - messages received that subsequently expire while the server is
745      * running are removed individually.
746      * - not clear how the previous implementation ever worked.
747      * The Messages.getMessageIds() method returns all messages, not
748      * just those processed, nor is it clear that the processed flag
749      * is ever non-zero.
750      * The current implementation (as a fix for bug 816895 - Exception in
751      * purgeMessages) simply delegates to removeExpiredMessages()
752      *
753      * @return the number of messages deleted
754      */

755     public synchronized int purgeMessages() {
756         int deleted = 0;
757
758         Connection JavaDoc connection = null;
759         try {
760             connection = getConnection();
761             removeExpiredMessages(connection);
762             connection.commit();
763         } catch (Exception JavaDoc exception) {
764             _log.error("Exception in purgeMessages", exception);
765         } finally {
766             SQLHelper.close(connection);
767         }
768         return 0;
769
770 // if (connection == null) {
771
// return 0;
772
// }
773

774 // // we have a valid connection so we can proceed
775
// try {
776
// long stime = System.currentTimeMillis();
777
// HashMap msgids = Messages.instance().getMessageIds(
778
// connection, _lastTime, _gcBlockSize);
779

780 // // if there are no messages then reset the last time to
781
// // 0 and break;
782
// if (msgids.size() > 0) {
783
// // find the minimum and maximum..we can improve the way we
784
// // do this.
785
// Iterator iter = msgids.values().iterator();
786
// long min = -1;
787
// long max = -1;
788

789 // while (iter.hasNext()) {
790
// Long id = (Long) iter.next();
791
// if ((min == -1) &&
792
// (max == -1)) {
793
// min = id.longValue();
794
// max = id.longValue();
795
// }
796

797 // if (id.longValue() < min) {
798
// min = id.longValue();
799
// } else if (id.longValue() > max) {
800
// max = id.longValue();
801
// }
802
// }
803

804 // // set the last time for the next iteration unless the
805
// // the size of the msgids is less than the gcBlockSize.
806
// // If the later is the case then reset the last time.
807
// // This is in preparation for the next pass through this
808
// // method.
809
// if (msgids.size() < _gcBlockSize) {
810
// _lastTime = 0;
811
// } else {
812
// _lastTime = max;
813
// }
814

815 // // now iterate through the message list and delete the
816
// // messages that do not have corresponding handles.
817
// Vector hdlids = MessageHandles.instance().getMessageIds(connection, min, max);
818
// iter = msgids.keySet().iterator();
819
// while (iter.hasNext()) {
820
// String id = (String) iter.next();
821
// if (!hdlids.contains(id)) {
822
// // this message is not referenced by anyone so we can
823
// // delete it
824
// Messages.instance().remove(connection, id);
825
// deleted++;
826
// }
827
// }
828
// connection.commit();
829
// } else {
830
// // reset the lastTime
831
// _lastTime = 0;
832
// }
833
// _log.debug("DBGC Deleted " + deleted + " messages and took "
834
// + (System.currentTimeMillis() - stime) +
835
// "ms to complete.");
836
// } catch (Exception exception) {
837
// try {
838
// connection.rollback();
839
// } catch (Exception nested) {
840
// // ignore this exception
841
// }
842
// _log.error("Exception in purgeMessages", exception);
843
// deleted = 0;
844
// } finally {
845
// try {
846
// connection.close();
847
// } catch (Exception nested) {
848
// // ignore
849
// }
850
// }
851
//
852
// return deleted;
853
}
854
855     /**
856      * Get the schema version
857      *
858      * @param connection the connection to use
859      * @return the schema version, or null, if no version has been initialised
860      * @throws PersistenceException for any related persistence exception
861      */

862     private String JavaDoc getSchemaVersion(Connection JavaDoc connection)
863         throws PersistenceException {
864
865         String JavaDoc version = null;
866         PreparedStatement JavaDoc query = null;
867         ResultSet JavaDoc result = null;
868         try {
869             query = connection.prepareStatement(
870                 "select version from system_data where id = 1");
871             result = query.executeQuery();
872             if (result.next()) {
873                 version = result.getString(1);
874             }
875         } catch (SQLException JavaDoc exception) {
876             throw new PersistenceException(
877                 "Failed to get the schema version", exception);
878         } finally {
879             SQLHelper.close(result);
880             SQLHelper.close(query);
881             
882         }
883         return version;
884     }
885
886     /**
887      * Initialise the schema version
888      *
889      * @param connection the connection to use
890      */

891     private void initSchemaVersion(Connection JavaDoc connection)
892         throws PersistenceException {
893
894         _log.info("Initialising schema version " + SCHEMA_VERSION);
895         PreparedStatement JavaDoc insert = null;
896         try {
897             insert = connection.prepareStatement(
898                 "insert into system_data (id, version, creationdate) "
899                 + "values (?,?,?)");
900             insert.setInt(1, 1);
901             insert.setString(2, SCHEMA_VERSION);
902             insert.setDate(3, new Date JavaDoc(System.currentTimeMillis()));
903             insert.executeUpdate();
904             
905         } catch (SQLException JavaDoc exception) {
906             throw new PersistenceException(
907                 "Failed to initialise schema version", exception);
908         } finally{
909             SQLHelper.close(insert);
910         }
911     }
912
913     /**
914      * Register an event to collect and remove processed messages with the
915      * {@link BasicEventManager}
916      */

917     private void registerEvent() {
918 // try {
919
// disabled, as per bug 816895 - Exception in purgeMessages
920
// BasicEventManager.instance().registerEventRelative(
921
// new Event(COLLECT_DATABASE_GARBAGE_EVENT, this, null),
922
// _gcInterval);
923
// } catch (IllegalEventDefinedException exception) {
924
// _log.error("registerEvent failed", exception);
925
// }
926
}
927
928     /**
929      * Creates a {@link DBConnectionManager} using its fully qualified class
930      * name
931      *
932      * @param className the fully qualified class name
933      * @throws PersistenceException if it cannot be created
934      */

935     private DBConnectionManager getConnectionManager(String JavaDoc className)
936         throws PersistenceException {
937
938         DBConnectionManager result = null;
939         Class JavaDoc clazz = null;
940         ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
941         try {
942             if (loader != null) {
943                 clazz = loader.loadClass(className);
944             }
945         } catch (ClassNotFoundException JavaDoc ignore) {
946         }
947         try {
948             if (clazz == null) {
949                 clazz = Class.forName(className);
950             }
951         } catch (ClassNotFoundException JavaDoc exception) {
952             throw new PersistenceException(
953                 "Failed to locate connection manager implementation: "
954                 + className, exception);
955         }
956
957         try {
958             result = (DBConnectionManager) clazz.newInstance();
959         } catch (Exception JavaDoc exception) {
960             throw new PersistenceException(
961                 "Failed to create connection manager", exception);
962         }
963
964         return result;
965     }
966
967 }
968
Popular Tags