KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > pm > jdbc3 > PersistenceManager


1 /*
2  * JBossMQ, the OpenSource JMS implementation
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.mq.pm.jdbc3;
8
9 import java.io.ByteArrayInputStream JavaDoc;
10 import java.io.ByteArrayOutputStream JavaDoc;
11 import java.io.IOException JavaDoc;
12 import java.io.ObjectInputStream JavaDoc;
13 import java.io.ObjectOutputStream JavaDoc;
14 import java.sql.Connection JavaDoc;
15 import java.sql.PreparedStatement JavaDoc;
16 import java.sql.ResultSet JavaDoc;
17 import java.sql.SQLException JavaDoc;
18 import java.sql.Types JavaDoc;
19 import java.util.Properties JavaDoc;
20
21 import javax.jms.JMSException JavaDoc;
22 import javax.management.ObjectName JavaDoc;
23 import javax.naming.InitialContext JavaDoc;
24 import javax.sql.DataSource JavaDoc;
25 import javax.transaction.Status JavaDoc;
26 import javax.transaction.Transaction JavaDoc;
27 import javax.transaction.TransactionManager JavaDoc;
28
29 import org.jboss.mq.DurableSubscriptionID;
30 import org.jboss.mq.SpyDestination;
31 import org.jboss.mq.SpyJMSException;
32 import org.jboss.mq.SpyMessage;
33 import org.jboss.mq.SpyTopic;
34 import org.jboss.mq.pm.CacheStore;
35 import org.jboss.mq.pm.NewPersistenceManager;
36 import org.jboss.mq.pm.Tx;
37 import org.jboss.mq.pm.TxManager;
38 import org.jboss.mq.server.JMSDestination;
39 import org.jboss.mq.server.JMSTopic;
40 import org.jboss.mq.server.MessageCache;
41 import org.jboss.mq.server.MessageReference;
42 import org.jboss.system.ServiceMBeanSupport;
43 import org.jboss.tm.TransactionManagerService;
44
45 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
46
47 /**
48  * This class manages all persistence related services for JDBC based
49  * persistence.
50  *
51  * @jmx:mbean extends="org.jboss.system.ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean, org.jboss.mq.pm.CacheStoreMBean"
52  *
53  * @author Jayesh Parayali (jayeshpk1@yahoo.com)
54  * @author Hiram Chirino (cojonudo14@hotmail.com)
55  * @author Adrian Brock (adrian@jboss.org)
56  *
57  * @version $Revision: 1.6.4.3 $
58  */

59 public class PersistenceManager
60    extends ServiceMBeanSupport
61    implements PersistenceManagerMBean, NewPersistenceManager, CacheStore, Runnable JavaDoc
62 {
63    // Constants --------------------------------------------------------------------
64

65    /** Message is an object */
66    static final int OBJECT_BLOB = 0;
67
68    /** Message is a byte array */
69    static final int BYTES_BLOB = 1;
70
71    /** Message is a binary stream */
72    static final int BINARYSTREAM_BLOB = 2;
73
74    /** Message is a blob */
75    static final int BLOB_BLOB = 3;
76
77    // Attributes -------------------------------------------------------------------
78

79    /** The next transaction id */
80    private SynchronizedLong nextTransactionId = new SynchronizedLong(0l);
81    
82    /** The jbossmq transaction manager */
83    private TxManager txManager;
84
85    /** The data source */
86    private DataSource JavaDoc datasource;
87
88    /** The jta transaction manager */
89    private TransactionManager JavaDoc tm;
90    
91    /** The object name of the connection manager */
92    private ObjectName JavaDoc connectionManagerName;
93    
94    /** The sql properties */
95    private Properties JavaDoc sqlProperties = new Properties JavaDoc();
96
97    String JavaDoc UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=?";
98    String JavaDoc UPDATE_MARKED_REFERENCES = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=?";
99    String JavaDoc UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
100    String JavaDoc UPDATE_MARKED_REFERENCES_WITH_TX = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
101    String JavaDoc DELETE_MARKED_MESSAGES_WITH_TX = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?";
102    String JavaDoc DELETE_MARKED_REFERENCES_WITH_TX = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?";
103    String JavaDoc DELETE_TX = "DELETE FROM JMS_TRANSACTION_LOG WHERE TXID = ?";
104    String JavaDoc DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID=? AND TXOP=?";
105    String JavaDoc DELETE_MARKED_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID=? AND TXOP=?";
106    String JavaDoc DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXOP='T'";
107    String JavaDoc DELETE_TEMPORARY_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXOP='T'";
108    String JavaDoc INSERT_TX = "INSERT INTO JMS_TRANSACTION_LOG (TXID) values(?)";
109    String JavaDoc SELECT_MAX_TX = "SELECT MAX(TXID) FROM JMS_TRANSACTION_LOG";
110    String JavaDoc SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE DESTINATION=?";
111    String JavaDoc SELECT_REFERENCES_IN_DEST = "SELECT R.MESSAGEID, M.MESSAGEBLOB, R.REDELIVERED, R.REDELIVERS FROM JMS_REFERENCE_LOG AS R, JMS_MESSAGE_LOG AS M" +
112                                       " WHERE R.MESSAGEID = M.MESSAGEID AND R.DESTINATION=?";
113    String JavaDoc SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
114    String JavaDoc INSERT_MESSAGE = "INSERT INTO JMS_MESSAGE_LOG (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP, LATECLONE) VALUES(?,?,?,?,?,?)";
115    String JavaDoc INSERT_REFERENCE = "INSERT INTO JMS_REFERENCE_LOG (MESSAGEID, DESTINATION, TXID, TXOP, REDELIVERED, REDELIVERS) VALUES(?,?,?,?,?,?)";
116    String JavaDoc MARK_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
117    String JavaDoc MARK_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
118    String JavaDoc DELETE_MESSAGE = "DELETE FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
119    String JavaDoc DELETE_REFERENCE = "DELETE FROM JMS_REFERENCE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
120    String JavaDoc UPDATE_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
121    String JavaDoc UPDATE_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET REDELIVERED=?, REDELIVERS=? WHERE MESSAGEID=? AND DESTINATION=?";
122    String JavaDoc DELETE_ORPHANED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE LATECLONE = '1' AND MESSAGEID NOT IN (SELECT MESSAGEID FROM JMS_REFERENCE_LOG)";
123    String JavaDoc DELETE_ALL_TXS = "DELETE FROM JMS_TRANSACTION_LOG";
124    String JavaDoc CREATE_REFERENCE_TABLE =
125       "CREATE TABLE JMS_REFERENCE_LOG ( MESSAGEID INTEGER NOT NULL, "
126          + "DESTINATION VARCHAR(256) NOT NULL, TXID INTEGER, TXOP CHAR(1), "
127          + "REDELIVERED CHAR(1), REDELIVERS INTEGER, "
128          + "PRIMARY KEY (MESSAGEID, DESTINATION) )";
129    String JavaDoc CREATE_MESSAGE_TABLE =
130       "CREATE TABLE JMS_MESSAGE_LOG ( MESSAGEID INTEGER NOT NULL, "
131          + "DESTINATION VARCHAR(256), TXID INTEGER, TXOP CHAR(1), LATECLONE CHAR(1), "
132          + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
133    String JavaDoc CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTION_LOG ( TXID INTEGER )";
134
135    /** The blob type */
136    int blobType = OBJECT_BLOB;
137
138    /** Whether to create tables */
139    boolean createTables = true;
140    
141    /** Number of retry attempts to connect to the db */
142    private int connectionRetryAttempts = 5;
143    
144    /** The garbage collection period in millis */
145    private long gcPeriod = 60000;
146
147    /** The background gc thread */
148    private Thread JavaDoc gcThread;
149
150    // Constructors -----------------------------------------------------------------
151

152    /**
153     * Create a new persistence manager
154     *
155     * @throws JMSException for any error
156     */

157    public PersistenceManager() throws javax.jms.JMSException JavaDoc
158    {
159       txManager = new TxManager(this);
160    }
161
162    // Public -----------------------------------------------------------------------
163

164    /**
165     * Retrieve the connection manager object name
166     *
167     * @jmx:managed-attribute
168     */

169    public ObjectName JavaDoc getConnectionManager()
170    {
171       return connectionManagerName;
172    }
173
174    /**
175     * Set the connection manager object name
176     *
177     * @jmx:managed-attribute
178     */

179    public void setConnectionManager(ObjectName JavaDoc connectionManagerName)
180    {
181       this.connectionManagerName = connectionManagerName;
182    }
183
184    /**
185     * Set the garbage collection period
186     *
187     * @jmx:managed-attribute
188     */

189    public int getGCPeriodSecs()
190    {
191       return (int) gcPeriod / 1000;
192    }
193
194    /**
195     * Set the garbage collection period in seconds
196     *
197     * @jmx:managed-attribute
198     */

199    public void setGCPeriodSecs(int gcPeriodSecs)
200    {
201       this.gcPeriod = gcPeriodSecs * 1000;
202    }
203    
204    /**
205     * Gets the ConnectionRetryAttempts.
206     *
207     * @jmx:managed-attribute
208     * @return the number of retry events
209     */

210    public int getConnectionRetryAttempts()
211    {
212       return this.connectionRetryAttempts;
213    }
214    
215    /**
216     * Sets the ConnectionRetryAttempts.
217     *
218     * @jmx:managed-attribute
219     * @param value the number of retry attempts
220     */

221    public void setConnectionRetryAttempts(int value)
222    {
223       this.connectionRetryAttempts = value;
224    }
225
226    /**
227     * Gets the sqlProperties.
228     *
229     * @jmx:managed-attribute
230     * @return Returns the Properties
231     */

232    public String JavaDoc getSqlProperties()
233    {
234       try
235       {
236          ByteArrayOutputStream JavaDoc boa = new ByteArrayOutputStream JavaDoc();
237          sqlProperties.store(boa, "");
238          return new String JavaDoc(boa.toByteArray());
239       }
240       catch (IOException JavaDoc shouldnothappen)
241       {
242          return "";
243       }
244    }
245
246    /**
247     * Sets the sqlProperties.
248     *
249     * @jmx:managed-attribute
250     * @param sqlProperties The sqlProperties to set
251     */

252    public void setSqlProperties(String JavaDoc value)
253    {
254       try
255       {
256          ByteArrayInputStream JavaDoc is = new ByteArrayInputStream JavaDoc(value.getBytes());
257          sqlProperties = new Properties JavaDoc();
258          sqlProperties.load(is);
259       }
260       catch (IOException JavaDoc shouldnothappen)
261       {
262       }
263    }
264
265    // PersistenceManager implementation --------------------------------------------
266

267    public Tx createPersistentTx() throws JMSException JavaDoc
268    {
269       Tx id = new Tx(nextTransactionId.increment());
270       TransactionManagerStrategy tms = new TransactionManagerStrategy();
271       tms.startTX();
272       Connection JavaDoc c = null;
273       PreparedStatement JavaDoc stmt = null;
274       boolean threadWasInterrupted = Thread.interrupted();
275       try
276       {
277          c = this.getConnection();
278          stmt = c.prepareStatement(INSERT_TX);
279          stmt.setLong(1, id.longValue());
280          stmt.executeUpdate();
281
282       }
283       catch (SQLException JavaDoc e)
284       {
285          tms.setRollbackOnly();
286          throw new SpyJMSException("Could not crate tx: " + id, e);
287       }
288       finally
289       {
290          try
291          {
292             stmt.close();
293          }
294          catch (Throwable JavaDoc ignore)
295          {
296          }
297          try
298          {
299             c.close();
300          }
301          catch (Throwable JavaDoc ignore)
302          {
303          }
304          tms.endTX();
305
306          // Restore the interrupted state of the thread
307
if( threadWasInterrupted )
308             Thread.currentThread().interrupt();
309       }
310
311       return id;
312    }
313
314    public void commitPersistentTx(Tx txId) throws JMSException JavaDoc
315    {
316       TransactionManagerStrategy tms = new TransactionManagerStrategy();
317       tms.startTX();
318       Connection JavaDoc c = null;
319       boolean threadWasInterrupted = Thread.interrupted();
320       try
321       {
322          c = this.getConnection();
323          removeMarkedMessages(c, txId, "D");
324          removeMarkedReferences(c, txId, "D");
325          removeTXRecord(c, txId.longValue());
326       }
327       catch (SQLException JavaDoc e)
328       {
329          tms.setRollbackOnly();
330          throw new SpyJMSException("Could not commit tx: " + txId, e);
331       }
332       finally
333       {
334          try
335          {
336             c.close();
337          }
338          catch (Throwable JavaDoc ignore)
339          {
340          }
341          tms.endTX();
342
343          // Restore the interrupted state of the thread
344
if( threadWasInterrupted )
345             Thread.currentThread().interrupt();
346       }
347    }
348
349    public void rollbackPersistentTx(Tx txId) throws JMSException JavaDoc
350    {
351
352       TransactionManagerStrategy tms = new TransactionManagerStrategy();
353       tms.startTX();
354       Connection JavaDoc c = null;
355       PreparedStatement JavaDoc stmt = null;
356       boolean threadWasInterrupted = Thread.interrupted();
357       try
358       {
359          c = this.getConnection();
360          removeMarkedMessages(c, txId, "A");
361          removeMarkedReferences(c, txId, "A");
362          removeTXRecord(c, txId.longValue());
363
364          // Restore all the messages that were logically removed.
365
stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX);
366          stmt.setNull(1, Types.BIGINT);
367          stmt.setString(2, "A");
368          stmt.setString(3, "D");
369          stmt.setLong(4, txId.longValue());
370          stmt.executeUpdate();
371          stmt.close();
372
373          // Restore all the references that were logically removed.
374
stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES_WITH_TX);
375          stmt.setNull(1, Types.BIGINT);
376          stmt.setString(2, "A");
377          stmt.setString(3, "D");
378          stmt.setLong(4, txId.longValue());
379          stmt.executeUpdate();
380          stmt.close();
381       }
382       catch (SQLException JavaDoc e)
383       {
384          tms.setRollbackOnly();
385          throw new SpyJMSException("Could not rollback tx: " + txId, e);
386       }
387       finally
388       {
389          try
390          {
391             if (stmt != null)
392                stmt.close();
393             if (c != null)
394                c.close();
395          }
396          catch (Throwable JavaDoc ignore)
397          {
398          }
399          tms.endTX();
400
401          // Restore the interrupted state of the thread
402
if( threadWasInterrupted )
403             Thread.currentThread().interrupt();
404       }
405    }
406
407    public void add(MessageReference messageRef, Tx txId) throws JMSException JavaDoc
408    {
409       boolean trace = log.isTraceEnabled();
410       if (trace)
411          log.trace("About to add message " + messageRef + " transaction=" + txId);
412
413       TransactionManagerStrategy tms = new TransactionManagerStrategy();
414       tms.startTX();
415       Connection JavaDoc c = null;
416       boolean threadWasInterrupted = Thread.interrupted();
417       try
418       {
419          c = this.getConnection();
420          // Synchronize on the message to avoid a race with the softener
421
synchronized(messageRef)
422          {
423             if (trace)
424                log.trace("Inserting message " + messageRef + " transaction=" + txId);
425
426             if (messageRef.isLateClone())
427             {
428                addReference(c, messageRef.getPersistentKey(), messageRef, txId, "A");
429             }
430             else
431             {
432                SpyMessage message = messageRef.getMessage();
433                addMessage(c, messageRef.getPersistentKey(), message, txId, "A", "0");
434             }
435             messageRef.setStored(MessageReference.STORED);
436
437             if (trace)
438                log.trace("Added message " + messageRef + " transaction=" + txId);
439          }
440       }
441       catch (IOException JavaDoc e)
442       {
443          tms.setRollbackOnly();
444          throw new SpyJMSException("Could not store message: " + messageRef, e);
445       }
446       catch (SQLException JavaDoc e)
447       {
448          tms.setRollbackOnly();
449          throw new SpyJMSException("Could not store message: " + messageRef, e);
450       }
451       finally
452       {
453          try
454          {
455             c.close();
456          }
457          catch (Throwable JavaDoc ignore)
458          {
459          }
460          tms.endTX();
461
462          // Restore the interrupted state of the thread
463
if( threadWasInterrupted )
464             Thread.currentThread().interrupt();
465       }
466    }
467
468    public void update(MessageReference messageRef, Tx txId) throws JMSException JavaDoc
469    {
470       boolean trace = log.isTraceEnabled();
471       if (trace)
472          log.trace("Updating message " + messageRef + " transaction=" + txId);
473
474       TransactionManagerStrategy tms = new TransactionManagerStrategy();
475       tms.startTX();
476       Connection JavaDoc c = null;
477       PreparedStatement JavaDoc stmt = null;
478       boolean threadWasInterrupted = Thread.interrupted();
479       try
480       {
481          c = this.getConnection();
482          if (txId == null)
483          {
484             if (messageRef.isLateClone())
485             {
486                stmt = c.prepareStatement(UPDATE_REFERENCE);
487                if (messageRef.redelivered)
488                   stmt.setString(1, "1");
489                else
490                   stmt.setString(1, "0");
491                stmt.setLong(2, messageRef.redeliveryCount);
492                stmt.setLong(3, messageRef.messageId);
493                stmt.setString(4, messageRef.getPersistentKey());
494             }
495             else
496             {
497                stmt = c.prepareStatement(UPDATE_MESSAGE);
498                setBlob(stmt, 1, messageRef.getMessage());
499                stmt.setLong(2, messageRef.messageId);
500                stmt.setString(3, messageRef.getPersistentKey());
501             }
502             int rc = stmt.executeUpdate();
503             if( rc != 1 )
504                throw new SpyJMSException("Could not update the message in the database: update affected "+rc+" rows");
505          }
506          else
507          {
508             throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used");
509          }
510          if (trace)
511             log.trace("Updated message " + messageRef + " transaction=" + txId);
512
513       }
514       catch (IOException JavaDoc e)
515       {
516          tms.setRollbackOnly();
517          throw new SpyJMSException("Could not update message: " + messageRef, e);
518       }
519       catch (SQLException JavaDoc e)
520       {
521          tms.setRollbackOnly();
522          throw new SpyJMSException("Could not update message: " + messageRef, e);
523       }
524       finally
525       {
526          try
527          {
528             stmt.close();
529          }
530          catch (Throwable JavaDoc ignore)
531          {
532          }
533          try
534          {
535             c.close();
536          }
537          catch (Throwable JavaDoc ignore)
538          {
539          }
540          tms.endTX();
541
542          // Restore the interrupted state of the thread
543
if( threadWasInterrupted )
544             Thread.currentThread().interrupt();
545       }
546    }
547
548    public void remove(MessageReference messageRef, Tx txId) throws JMSException JavaDoc
549    {
550       boolean trace = log.isTraceEnabled();
551       if (trace)
552          log.trace("Removing message " + messageRef + " transaction=" + txId);
553
554       TransactionManagerStrategy tms = new TransactionManagerStrategy();
555       tms.startTX();
556       Connection JavaDoc c = null;
557       PreparedStatement JavaDoc stmt = null;
558       boolean threadWasInterrupted = Thread.interrupted();
559       try
560       {
561          c = this.getConnection();
562          // Synchronize on the message to avoid a race with the softener
563
synchronized(messageRef)
564          {
565             if (txId == null)
566             {
567                if (messageRef.isLateClone())
568                   stmt = c.prepareStatement(DELETE_REFERENCE);
569                else
570                   stmt = c.prepareStatement(DELETE_MESSAGE);
571                stmt.setLong(1, messageRef.messageId);
572                stmt.setString(2, messageRef.getPersistentKey());
573                int rc = stmt.executeUpdate();
574                if( rc != 1 )
575                   throw new SpyJMSException("Could not delete the message from the database: delete affected "+rc+" rows");
576
577                // Adrian Brock:
578
// Remove the message from the cache, but don't
579
// return it to the pool just yet. The queue still holds
580
// a reference to the message and will return it
581
// to the pool once it gets enough time slice.
582
// The alternative is to remove the validation
583
// for double removal from the cache,
584
// which I don't want to do because it is useful
585
// for spotting errors
586
messageRef.setStored(MessageReference.NOT_STORED);
587                messageRef.removeDelayed();
588             }
589             else
590             {
591                if (messageRef.isLateClone())
592                {
593                   stmt = c.prepareStatement(MARK_REFERENCE);
594                   stmt.setLong(1, txId.longValue());
595                   stmt.setString(2, "D");
596                   stmt.setLong(3, messageRef.messageId);
597                   stmt.setString(4, messageRef.getPersistentKey());
598                }
599                else
600                {
601                   stmt = c.prepareStatement(MARK_MESSAGE);
602                   stmt.setLong(1, txId.longValue());
603                   stmt.setString(2, "D");
604                   stmt.setLong(3, messageRef.messageId);
605                   stmt.setString(4, messageRef.getPersistentKey());
606                }
607                int rc = stmt.executeUpdate();
608                if( rc != 1 )
609                   throw new SpyJMSException("Could not mark the message as deleted in the database: update affected "+rc+" rows");
610             }
611             if (trace)
612                log.trace("Removed message " + messageRef + " transaction=" + txId);
613          }
614       }
615       catch (SQLException JavaDoc e)
616       {
617          tms.setRollbackOnly();
618          throw new SpyJMSException("Could not remove message: " + messageRef, e);
619       }
620       finally
621       {
622          try
623          {
624             stmt.close();
625          }
626          catch (Throwable JavaDoc ignore)
627          {
628          }
629          try
630          {
631             c.close();
632          }
633          catch (Throwable JavaDoc ignore)
634          {
635          }
636          tms.endTX();
637
638          // Restore the interrupted state of the thread
639
if( threadWasInterrupted )
640             Thread.currentThread().interrupt();
641       }
642    }
643
644    public synchronized void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws javax.jms.JMSException JavaDoc
645    {
646       if (jmsDest == null)
647          throw new IllegalArgumentException JavaDoc("Must supply non null JMSDestination to restoreQueue");
648       if (dest == null)
649          throw new IllegalArgumentException JavaDoc("Must supply non null SpyDestination to restoreQueue");
650
651       TransactionManagerStrategy tms = new TransactionManagerStrategy();
652       tms.startTX();
653       Connection JavaDoc c = null;
654       PreparedStatement JavaDoc stmt = null;
655       ResultSet JavaDoc rs = null;
656       boolean threadWasInterrupted = Thread.interrupted();
657       try
658       {
659          c = this.getConnection();
660          int counter=0;
661          if (jmsDest.parameters.lateClone)
662          {
663             JMSTopic topic = (JMSTopic) jmsDest;
664             // The durable subscription is not serialized
665
DurableSubscriptionID id = ((SpyTopic) dest).getDurableSubscriptionID();
666
667             stmt = c.prepareStatement(SELECT_REFERENCES_IN_DEST);
668             stmt.setString(1, dest.toString());
669
670             rs = stmt.executeQuery();
671             while (rs.next())
672             {
673                SpyMessage message = extractMessage(rs, 2);
674                boolean redelivered = false;
675                if (rs.getString(3).equals("1"))
676                   redelivered = true;
677                message.header.jmsRedelivered = redelivered;
678                message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer JavaDoc(rs.getInt(4)));
679                topic.restoreMessage(message, id);
680                counter++;
681             }
682          }
683          else
684          {
685             stmt = c.prepareStatement(SELECT_MESSAGES_IN_DEST);
686             stmt.setString(1, dest.toString());
687
688             rs = stmt.executeQuery();
689             while (rs.next())
690             {
691                SpyMessage message = extractMessage(rs, 2);
692                // The durable subscription is not serialized
693
if (dest instanceof SpyTopic)
694                   message.header.durableSubscriberID = ((SpyTopic)dest).getDurableSubscriptionID();
695                jmsDest.restoreMessage(message);
696                counter++;
697             }
698          }
699          
700          log.debug("Restored "+counter+" message(s) to: "+dest);
701       }
702       catch (IOException JavaDoc e)
703       {
704          tms.setRollbackOnly();
705          throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
706       }
707       catch (SQLException JavaDoc e)
708       {
709          tms.setRollbackOnly();
710          throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
711       }
712       finally
713       {
714          try
715          {
716             rs.close();
717          }
718          catch (Throwable JavaDoc ignore)
719          {
720          }
721          try
722          {
723             stmt.close();
724          }
725          catch (Throwable JavaDoc ignore)
726          {
727          }
728          try
729          {
730             c.close();
731          }
732          catch (Throwable JavaDoc ignore)
733          {
734          }
735          tms.endTX();
736
737          // Restore the interrupted state of the thread
738
if( threadWasInterrupted )
739             Thread.currentThread().interrupt();
740       }
741
742    }
743
744    public TxManager getTxManager()
745    {
746       return txManager;
747    }
748
749    public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException JavaDoc
750    {
751       // Nothing to clean up, all the state is in the db.
752
}
753
754    /**
755     * Unsupported operation
756     */

757    public MessageCache getMessageCacheInstance()
758    {
759       throw new UnsupportedOperationException JavaDoc("This is now set on the destination manager");
760    }
761
762    // NewPersistenceManager implementation -----------------------------------------
763

764    public void addMessage(SpyMessage message) throws JMSException JavaDoc
765    {
766
767       TransactionManagerStrategy tms = new TransactionManagerStrategy();
768       tms.startTX();
769       Connection JavaDoc c = null;
770       boolean threadWasInterrupted = Thread.interrupted();
771       try
772       {
773          c = datasource.getConnection();
774          addMessage(c, "*", message, null, null, "1");
775       }
776       catch (IOException JavaDoc e)
777       {
778          tms.setRollbackOnly();
779          throw new SpyJMSException("Could not add message:", e);
780       }
781       catch (SQLException JavaDoc e)
782       {
783          tms.setRollbackOnly();
784          throw new SpyJMSException("Could not add message:", e);
785       }
786       finally
787       {
788          try
789          {
790             if (c != null)
791                c.close();
792          }
793          catch (Throwable JavaDoc ignore)
794          {
795          }
796          tms.endTX();
797
798          // Restore the interrupted state of the thread
799
if( threadWasInterrupted )
800             Thread.currentThread().interrupt();
801       }
802    }
803
804    // PersistenceManagerMBean implementation ---------------------------------------
805

806    public Object JavaDoc getInstance()
807    {
808       return this;
809    }
810
811    /**
812     * Unsupported operation
813     */

814    public