KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > pm > jdbc2 > PersistenceManager


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq.pm.jdbc2;
23
24 import java.io.ByteArrayInputStream JavaDoc;
25 import java.io.ByteArrayOutputStream JavaDoc;
26 import java.io.IOException JavaDoc;
27 import java.io.ObjectInputStream JavaDoc;
28 import java.io.ObjectOutputStream JavaDoc;
29 import java.io.StreamCorruptedException JavaDoc;
30 import java.sql.Connection JavaDoc;
31 import java.sql.PreparedStatement JavaDoc;
32 import java.sql.ResultSet JavaDoc;
33 import java.sql.SQLException JavaDoc;
34 import java.util.HashMap JavaDoc;
35 import java.util.Iterator JavaDoc;
36 import java.util.Map JavaDoc;
37 import java.util.Properties JavaDoc;
38
39 import javax.jms.JMSException JavaDoc;
40 import javax.management.AttributeNotFoundException JavaDoc;
41 import javax.management.InstanceNotFoundException JavaDoc;
42 import javax.management.MBeanException JavaDoc;
43 import javax.management.ObjectName JavaDoc;
44 import javax.management.ReflectionException JavaDoc;
45 import javax.naming.InitialContext JavaDoc;
46 import javax.naming.NamingException JavaDoc;
47 import javax.sql.DataSource JavaDoc;
48 import javax.transaction.Status JavaDoc;
49 import javax.transaction.Transaction JavaDoc;
50 import javax.transaction.TransactionManager JavaDoc;
51 import javax.transaction.xa.Xid JavaDoc;
52
53 import org.jboss.mq.SpyDestination;
54 import org.jboss.mq.SpyJMSException;
55 import org.jboss.mq.SpyMessage;
56 import org.jboss.mq.SpyTopic;
57 import org.jboss.mq.pm.CacheStore;
58 import org.jboss.mq.pm.Tx;
59 import org.jboss.mq.pm.TxManager;
60 import org.jboss.mq.server.JMSDestination;
61 import org.jboss.mq.server.MessageCache;
62 import org.jboss.mq.server.MessageReference;
63 import org.jboss.system.ServiceMBeanSupport;
64 import org.jboss.tm.TransactionManagerService;
65 import org.jboss.tm.TransactionTimeoutConfiguration;
66
67 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
68
69 /**
70  * This class manages all persistence related services for JDBC based
71  * persistence.
72  *
73  * @author Jayesh Parayali (jayeshpk1@yahoo.com)
74  * @author Hiram Chirino (cojonudo14@hotmail.com)
75  * @author Adrian Brock (adrian@jboss.com)
76  * @version $Revision: 57045 $
77  */

78 public class PersistenceManager extends ServiceMBeanSupport
79    implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager, CacheStore
80 {
81
82    /////////////////////////////////////////////////////////////////////////////////
83
//
84
// TX state attibutes
85
//
86
/////////////////////////////////////////////////////////////////////////////////
87

88    /** The next transaction id */
89    protected SynchronizedLong nextTransactionId = new SynchronizedLong(0l);
90    
91    /** The jta transaction manager */
92    protected TxManager txManager;
93    
94    /** The DataSource */
95    protected DataSource JavaDoc datasource;
96    
97    /** The JBossMQ transaction mananger */
98    protected TransactionManager JavaDoc tm;
99    
100    /** The override recovery timeout */
101    private int recoveryTimeout = 0;
102    
103    /** The recovery retries */
104    private int recoveryRetries = 0;
105    
106    /** The recover messages chunk */
107    private int recoverMessagesChunk = 0;
108
109    /** The statement retries */
110    private int statementRetries = 5;
111    
112    /////////////////////////////////////////////////////////////////////////////////
113
//
114
// JDBC Access Attributes
115
//
116
/////////////////////////////////////////////////////////////////////////////////
117

118    protected String JavaDoc UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?";
119    protected String JavaDoc UPDATE_MARKED_MESSAGES_XARECOVERY = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID NOT IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID IS NOT NULL)";
120    protected String JavaDoc UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
121    protected String JavaDoc DELETE_MARKED_MESSAGES_WITH_TX =
122       "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?";
123    protected String JavaDoc DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY =
124       "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID = NULL) AND TXOP=?";
125    protected String JavaDoc DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?";
126    protected String JavaDoc DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?";
127    protected String JavaDoc DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXOP = 'T'";
128    protected String JavaDoc INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)";
129    protected String JavaDoc INSERT_TX_XARECOVERY = "INSERT INTO JMS_TRANSACTIONS (TXID, XID) values(?, ?)";
130    protected String JavaDoc DELETE_ALL_TX = "DELETE FROM JMS_TRANSACTIONS";
131    protected String JavaDoc DELETE_ALL_TX_XARECOVERY = "DELETE FROM JMS_TRANSACTIONS WHERE XID = NULL";
132    protected String JavaDoc SELECT_MAX_TX = "SELECT MAX(TXID) FROM (SELECT MAX(TXID) FROM JMS_TRANSACTIONS UNION SELECT MAX(TXID) FROM JMS_MESSAGES)";
133    protected String JavaDoc SELECT_ALL_TX_XARECOVERY = "SELECT TXID, XID FROM JMS_TRANSACTIONS";
134    protected String JavaDoc SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?";
135    protected String JavaDoc SELECT_MESSAGES_IN_DEST_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE DESTINATION=?";
136    protected String JavaDoc SELECT_MESSAGE_KEYS_IN_DEST = "SELECT MESSAGEID FROM JMS_MESSAGES WHERE DESTINATION=?";
137    protected String JavaDoc SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
138    protected String JavaDoc SELECT_MESSAGE_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
139    protected String JavaDoc INSERT_MESSAGE =
140       "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)";
141    protected String JavaDoc MARK_MESSAGE = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
142    protected String JavaDoc DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
143    protected String JavaDoc UPDATE_MESSAGE = "UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
144    protected String JavaDoc CREATE_MESSAGE_TABLE =
145       "CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, "
146          + "DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1),"
147          + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
148    protected String JavaDoc CREATE_IDX_MESSAGE_TXOP_TXID = "CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)";
149    protected String JavaDoc CREATE_IDX_MESSAGE_DESTINATION = "CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)";
150    protected String JavaDoc CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )";
151    protected String JavaDoc CREATE_TX_TABLE_XARECOVERY = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, XID OBJECT, PRIMARY KEY (TXID) )";
152
153    protected static final int OBJECT_BLOB = 0;
154    protected static final int BYTES_BLOB = 1;
155    protected static final int BINARYSTREAM_BLOB = 2;
156    protected static final int BLOB_BLOB = 3;
157
158    protected int blobType = OBJECT_BLOB;
159    protected boolean createTables;
160
161    protected int connectionRetryAttempts = 5;
162
163    protected boolean xaRecovery = false;
164    
165    /////////////////////////////////////////////////////////////////////////////////
166
//
167
// Constructor.
168
//
169
/////////////////////////////////////////////////////////////////////////////////
170
public PersistenceManager() throws javax.jms.JMSException JavaDoc
171    {
172       txManager = new TxManager(this);
173    }
174
175    /**
176     * This inner class helps handle the tx management of the jdbc connections.
177     *
178     */

179    protected class TransactionManagerStrategy
180    {
181
182       Transaction JavaDoc threadTx;
183
184       void startTX() throws JMSException JavaDoc
185       {
186          try
187          {
188             // Thread arriving must be clean (jboss doesn't set the thread
189
// previously). However optimized calls come with associated
190
// thread for example. We suspend the thread association here, and
191
// resume in the finally block of the following try.
192
threadTx = tm.suspend();
193
194             // Always begin a transaction
195
tm.begin();
196          }
197          catch (Exception JavaDoc e)
198          {
199             try
200             {
201                if (threadTx != null)
202                   tm.resume(threadTx);
203             }
204             catch (Exception JavaDoc ignore)
205             {
206             }
207             throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
208          }
209       }
210
211       void setRollbackOnly() throws JMSException JavaDoc
212       {
213          try
214          {
215             tm.setRollbackOnly();
216          }
217          catch (Exception JavaDoc e)
218          {
219             throw new SpyJMSException("Could not start a mark the transaction for rollback .", e);
220          }
221       }
222
223       void endTX() throws JMSException JavaDoc
224       {
225          try
226          {
227             if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK)
228             {
229                tm.rollback();
230             }
231             else
232             {
233                tm.commit();
234             }
235          }
236          catch (Exception JavaDoc e)
237          {
238             throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
239          }
240          finally
241          {
242             try
243             {
244                if (threadTx != null)
245                   tm.resume(threadTx);
246             }
247             catch (Exception JavaDoc ignore)
248             {
249             }
250          }
251       }
252    }
253
254    /////////////////////////////////////////////////////////////////////////////////
255
//
256
// TX Resolution.
257
//
258
/////////////////////////////////////////////////////////////////////////////////
259

260    synchronized protected void createSchema() throws JMSException JavaDoc
261    {
262       TransactionManagerStrategy tms = new TransactionManagerStrategy();
263       tms.startTX();
264       Connection JavaDoc c = null;
265       PreparedStatement JavaDoc stmt = null;
266       boolean threadWasInterrupted = Thread.interrupted();
267       try
268       {
269          if (createTables)
270          {
271             c = this.getConnection();
272
273             boolean createdMessageTable = false;
274             try
275             {
276                stmt = c.prepareStatement(CREATE_MESSAGE_TABLE);
277                stmt.executeUpdate();
278                createdMessageTable = true;
279             }
280             catch (SQLException JavaDoc e)
281             {
282                log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE, e);
283             }
284             finally
285             {
286                try
287                {
288                   if (stmt != null)
289                      stmt.close();
290                }
291                catch (Throwable JavaDoc ignored)
292                {
293                   log.trace("Ignored: " + ignored);
294                }
295                stmt = null;
296             }
297
298             if (createdMessageTable)
299             {
300                try
301                {
302                   stmt = c.prepareStatement(CREATE_IDX_MESSAGE_TXOP_TXID);
303                   stmt.executeUpdate();
304                }
305                catch (SQLException JavaDoc e)
306                {
307                   log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_TXOP_TXID, e);
308                }
309                finally
310                {
311                   try
312                   {
313                      if (stmt != null)
314                         stmt.close();
315                   }
316                   catch (Throwable JavaDoc ignored)
317                   {
318                      log.trace("Ignored: " + ignored);
319                   }
320                   stmt = null;
321                }
322                try
323                {
324                   stmt = c.prepareStatement(CREATE_IDX_MESSAGE_DESTINATION);
325                   stmt.executeUpdate();
326                }
327                catch (SQLException JavaDoc e)
328                {
329                   log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_DESTINATION, e);
330                }
331                finally
332                {
333                   try
334                   {
335                      if (stmt != null)
336                         stmt.close();
337                   }
338                   catch (Throwable JavaDoc ignored)
339                   {
340                      log.trace("Ignored: " + ignored);
341                   }
342                   stmt = null;
343                }
344             }
345
346             String JavaDoc createTxTable = CREATE_TX_TABLE;
347             if (xaRecovery)
348                createTxTable = CREATE_TX_TABLE_XARECOVERY;
349             try
350             {
351                stmt = c.prepareStatement(createTxTable);
352                stmt.executeUpdate();
353             }
354             catch (SQLException JavaDoc e)
355             {
356                log.debug("Could not create table with SQL: " + createTxTable, e);
357             }
358             finally
359             {
360                try
361                {
362                   if (stmt != null)
363                      stmt.close();
364                }
365                catch (Throwable JavaDoc ignored)
366                {
367                   log.trace("Ignored: " + ignored);
368                }
369                stmt = null;
370             }
371          }
372       }
373       catch (SQLException JavaDoc e)
374       {
375          tms.setRollbackOnly();
376          throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e);
377       }
378       finally
379       {
380          try
381          {
382             if (stmt != null)
383                stmt.close();
384          }
385          catch (Throwable JavaDoc ignore)
386          {
387          }
388          stmt = null;
389          try
390          {
391             if (c != null)
392                c.close();
393          }
394          catch (Throwable JavaDoc ignore)
395          {
396          }
397          c = null;
398          tms.endTX();
399
400          // Restore the interrupted state of the thread
401
if (threadWasInterrupted)
402             Thread.currentThread().interrupt();
403       }
404    }
405
406    synchronized protected void resolveAllUncommitedTXs() throws JMSException JavaDoc
407    {
408       // We perform recovery in a different thread to the table creation
409
// Postgres doesn't like create table failing in the same transaction
410
// as other operations
411

412       TransactionManagerStrategy tms = new TransactionManagerStrategy();
413       tms.startTX();
414       Connection JavaDoc c = null;
415       PreparedStatement JavaDoc stmt = null;
416       ResultSet JavaDoc rs = null;
417       boolean threadWasInterrupted = Thread.interrupted();
418       try
419       {
420          c = this.getConnection();
421
422          // Find out what the next TXID should be
423
stmt = c.prepareStatement(SELECT_MAX_TX);
424          rs = stmt.executeQuery();
425          if (rs.next())
426             nextTransactionId.set(rs.getLong(1) + 1);
427          rs.close();
428          rs = null;
429          stmt.close();
430          stmt = null;
431
432          // Delete all the temporary messages.
433
stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES);
434          stmt.executeUpdate();
435          stmt.close();
436          stmt = null;
437
438          // Delete all the messages that were added but thier tx's were not commited.
439
String JavaDoc deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX;
440          if (xaRecovery)
441             deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY;
442          stmt = c.prepareStatement(deleteMarkedMessagesWithTx);
443          stmt.setString(1, "A");
444          stmt.executeUpdate();
445          stmt.close();
446          stmt = null;
447
448          // Restore all the messages that were removed but their tx's were not commited.
449
String JavaDoc updateMarkedMessages = UPDATE_MARKED_MESSAGES;
450          if (xaRecovery)
451             updateMarkedMessages = UPDATE_MARKED_MESSAGES_XARECOVERY;
452          stmt = c.prepareStatement(updateMarkedMessages);
453          stmt.setNull(1, java.sql.Types.BIGINT);
454          stmt.setString(2, "A");
455          stmt.setString(3, "D");
456          stmt.executeUpdate();
457          stmt.close();
458          stmt = null;
459
460          // Now recovery is complete, clear the transaction table.
461
String JavaDoc deleteAllTx = DELETE_ALL_TX;
462          if (xaRecovery)
463             deleteAllTx = DELETE_ALL_TX_XARECOVERY;
464          stmt = c.prepareStatement(deleteAllTx);
465          stmt.execute();
466          stmt.close();
467          stmt = null;
468
469          // If we are doing XARecovery restore the prepared transactions
470
if (xaRecovery)
471          {
472             stmt = c.prepareStatement(SELECT_ALL_TX_XARECOVERY);
473             rs = stmt.executeQuery();
474             while (rs.next())
475             {
476                long txid = rs.getLong(1);
477                Xid JavaDoc xid = extractXid(rs, 2);
478                Tx tx = new Tx(txid);
479                tx.setXid(xid);
480                tx.checkPersisted();
481                txManager.restoreTx(tx);
482             }
483             rs.close();
484             rs = null;
485             stmt.close();
486             stmt = null;
487          }
488       }
489       catch (Exception JavaDoc e)
490       {
491          tms.setRollbackOnly();
492          throw new SpyJMSException("Could not resolve uncommited transactions. Message recovery may not be accurate", e);
493       }
494       finally
495       {
496          try
497          {
498             if (rs != null)
499                rs.close();
500          }
501          catch (Throwable JavaDoc ignore)
502          {
503          }
504          try
505          {
506             if (stmt != null)
507                stmt.close();
508          }
509          catch (Throwable JavaDoc ignore)
510          {
511          }
512          try
513          {
514             if (c != null)
515                c.close();
516          }
517          catch (Throwable JavaDoc ignore)
518          {
519          }
520          tms.endTX();
521
522          // Restore the interrupted state of the thread
523
if (threadWasInterrupted)
524             Thread.currentThread().interrupt();
525       }
526    }
527
528    /////////////////////////////////////////////////////////////////////////////////
529
//
530
// Message Recovery
531
//
532
/////////////////////////////////////////////////////////////////////////////////
533

534    synchronized public void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException JavaDoc
535    {
536       if (jmsDest == null)
537          throw new IllegalArgumentException JavaDoc("Must supply non null JMSDestination to restoreQueue");
538       if (dest == null)
539          throw new IllegalArgumentException JavaDoc("Must supply non null SpyDestination to restoreQueue");
540
541       boolean canOverrideTimeout = (tm instanceof TransactionTimeoutConfiguration);
542       int previousTimeout = 0;
543       try
544       {
545          // Set our timeout
546
if (recoveryTimeout != 0)
547          {
548             if (canOverrideTimeout)
549             {
550                previousTimeout = ((TransactionTimeoutConfiguration) tm).getTransactionTimeout();
551                tm.setTransactionTimeout(recoveryTimeout);
552             }
553             else
554             {
555                log.debug("Cannot override recovery timeout, TransactionManager does implement " + TransactionTimeoutConfiguration.class.getName());
556             }
557          }
558          
559          // restore the queue
560
try
561          {
562             internalRestoreQueue(jmsDest, dest);
563          }
564          finally
565          {
566             // restore the transaction timeout
567
if (recoveryTimeout != 0 && canOverrideTimeout)
568                tm.setTransactionTimeout(previousTimeout);
569          }
570       }
571       catch (Exception JavaDoc e)
572       {
573          SpyJMSException.rethrowAsJMSException("Unexpected error in recovery", e);
574       }
575    }
576    
577    synchronized protected void internalRestoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException JavaDoc
578    {
579       // Work out the prepared transactions
580
Map JavaDoc prepared = null;
581       if (xaRecovery)
582       {
583          prepared = new HashMap JavaDoc();
584          Map JavaDoc map = txManager.getPreparedTransactions();
585          for (Iterator JavaDoc i = map.values().iterator(); i.hasNext();)
586          {
587             TxManager.PreparedInfo info = (TxManager.PreparedInfo) i.next();
588             for (Iterator JavaDoc j = info.getTxids().iterator(); j.hasNext();)
589             {
590                Tx tx = (Tx) j.next();
591                prepared.put(new Long JavaDoc(tx.longValue()), tx);
592             }
593          }
594       }
595       
596       TransactionManagerStrategy tms = new TransactionManagerStrategy();
597       tms.startTX();
598       Connection JavaDoc c = null;
599       PreparedStatement JavaDoc stmt = null;
600       PreparedStatement JavaDoc stmt2 = null;
601       ResultSet JavaDoc rs = null;
602       boolean threadWasInterrupted = Thread.interrupted();
603       try
604       {
605          String JavaDoc selectMessagesInDest = SELECT_MESSAGES_IN_DEST;
606          String JavaDoc selectMessage = SELECT_MESSAGE;
607          if (xaRecovery)
608          {
609             selectMessagesInDest = SELECT_MESSAGES_IN_DEST_XARECOVERY;
610             selectMessage = SELECT_MESSAGE_XARECOVERY;
611          }
612          c = this.getConnection();
613          if (recoverMessagesChunk == 0)
614             stmt = c.prepareStatement(selectMessagesInDest);
615          else
616          {
617             stmt = c.prepareStatement(SELECT_MESSAGE_KEYS_IN_DEST);
618             stmt2 = c.prepareStatement(selectMessage);
619          }
620          stmt.setString(1, dest.toString());
621
622          long txid = 0;
623          String JavaDoc txop = null;
624          rs = stmt.executeQuery();
625          int counter = 0;
626          int recovery = 0;
627          while (rs.next())
628          {
629             long msgid = rs.getLong(1);
630             SpyMessage message = null;
631             if (recoverMessagesChunk == 0)
632             {
633                message = extractMessage(rs);
634                if (xaRecovery)
635                {
636                   txid = rs.getLong(3);
637                   txop = rs.getString(4);
638                }
639             }
640             else
641             {
642                ResultSet JavaDoc rs2 = null;
643                try
644                {
645                   stmt2.setLong(1, msgid);
646                   stmt2.setString(2, dest.toString());
647                   rs2 = stmt2.executeQuery();
648                   if (rs2.next())
649                   {
650                      message = extractMessage(rs2);
651                      if (xaRecovery)
652                      {
653                         txid = rs.getLong(3);
654                         txop = rs.getString(4);
655                      }
656                   }
657                   else
658                      log.warn("Failed to find message msgid=" + msgid +" dest=" + dest);
659                }
660                finally
661                {
662                   if (rs2 != null)
663                   {
664                      try
665                      {
666                         rs2.close();
667                      }
668                      catch (Exception JavaDoc ignored)
669                      {
670                      }
671                   }
672                }
673             }
674             // The durable subscription is not serialized
675
if (dest instanceof SpyTopic)
676                message.header.durableSubscriberID = ((SpyTopic) dest).getDurableSubscriptionID();
677
678             if (xaRecovery == false || txid == 0 || txop == null)
679                jmsDest.restoreMessage(message);
680             else
681             {
682                Tx tx = (Tx) prepared.get(new Long JavaDoc(txid));
683                if (tx == null)
684                   jmsDest.restoreMessage(message);
685                else if ("A".equals(txop))
686                {
687                   jmsDest.restoreMessage(message, tx, Tx.ADD);
688                   recovery++;
689                }
690                else if ("D".equals(txop))
691                {
692                   jmsDest.restoreMessage(message, tx, Tx.REMOVE);
693                   recovery++;
694                }
695                else
696                   throw new IllegalStateException JavaDoc("Unknown txop=" + txop + " for msg=" + msgid + " dest=" + dest);
697             }
698             counter++;
699          }
700
701          log.debug("Restored " + counter + " message(s) to: " + dest + " " + recovery + " need recovery.");
702       }
703       catch (IOException JavaDoc e)
704       {
705          tms.setRollbackOnly();
706          throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
707       }
708       catch (SQLException JavaDoc e)
709       {
710          tms.setRollbackOnly();
711          throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
712       }
713       finally
714       {
715          try
716          {
717             if (rs != null)
718                rs.close();
719          }
720          catch (Throwable JavaDoc ignore)
721          {
722          }
723          try
724          {
725             if (stmt != null)
726                stmt.close();
727          }
728          catch (Throwable JavaDoc ignore)
729          {
730          }
731          try
732          {
733             if (c != null)
734                c.close();
735          }
736          catch (Throwable JavaDoc ignore)
737          {
738          }
739          tms.endTX();
740
741          // Restore the interrupted state of the thread
742
if (threadWasInterrupted)
743             Thread.currentThread().interrupt();
744       }
745
746    }
747
748    SpyMessage extractMessage(ResultSet JavaDoc rs) throws SQLException JavaDoc, IOException JavaDoc
749    {
750       try
751     &