KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > pm > jdbc2 > PersistenceManager


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq.pm.jdbc2;
23
24 import java.io.ByteArrayInputStream JavaDoc;
25 import java.io.ByteArrayOutputStream JavaDoc;
26 import java.io.IOException JavaDoc;
27 import java.io.ObjectInputStream JavaDoc;
28 import java.io.ObjectOutputStream JavaDoc;
29 import java.io.StreamCorruptedException JavaDoc;
30 import java.sql.Connection JavaDoc;
31 import java.sql.PreparedStatement JavaDoc;
32 import java.sql.ResultSet JavaDoc;
33 import java.sql.SQLException JavaDoc;
34 import java.util.HashMap JavaDoc;
35 import java.util.Iterator JavaDoc;
36 import java.util.Map JavaDoc;
37 import java.util.Properties JavaDoc;
38
39 import javax.jms.JMSException JavaDoc;
40 import javax.management.AttributeNotFoundException JavaDoc;
41 import javax.management.InstanceNotFoundException JavaDoc;
42 import javax.management.MBeanException JavaDoc;
43 import javax.management.ObjectName JavaDoc;
44 import javax.management.ReflectionException JavaDoc;
45 import javax.naming.InitialContext JavaDoc;
46 import javax.naming.NamingException JavaDoc;
47 import javax.sql.DataSource JavaDoc;
48 import javax.transaction.Status JavaDoc;
49 import javax.transaction.Transaction JavaDoc;
50 import javax.transaction.TransactionManager JavaDoc;
51 import javax.transaction.xa.Xid JavaDoc;
52
53 import org.jboss.mq.SpyDestination;
54 import org.jboss.mq.SpyJMSException;
55 import org.jboss.mq.SpyMessage;
56 import org.jboss.mq.SpyTopic;
57 import org.jboss.mq.pm.CacheStore;
58 import org.jboss.mq.pm.Tx;
59 import org.jboss.mq.pm.TxManager;
60 import org.jboss.mq.server.JMSDestination;
61 import org.jboss.mq.server.MessageCache;
62 import org.jboss.mq.server.MessageReference;
63 import org.jboss.system.ServiceMBeanSupport;
64 import org.jboss.tm.TransactionManagerService;
65 import org.jboss.tm.TransactionTimeoutConfiguration;
66
67 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
68
69 /**
70  * This class manages all persistence related services for JDBC based
71  * persistence.
72  *
73  * @author Jayesh Parayali (jayeshpk1@yahoo.com)
74  * @author Hiram Chirino (cojonudo14@hotmail.com)
75  * @author Adrian Brock (adrian@jboss.com)
76  * @version $Revision: 57045 $
77  */

78 public class PersistenceManager extends ServiceMBeanSupport
79    implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager, CacheStore
80 {
81
82    /////////////////////////////////////////////////////////////////////////////////
83
//
84
// TX state attibutes
85
//
86
/////////////////////////////////////////////////////////////////////////////////
87

88    /** The next transaction id */
89    protected SynchronizedLong nextTransactionId = new SynchronizedLong(0l);
90    
91    /** The jta transaction manager */
92    protected TxManager txManager;
93    
94    /** The DataSource */
95    protected DataSource JavaDoc datasource;
96    
97    /** The JBossMQ transaction mananger */
98    protected TransactionManager JavaDoc tm;
99    
100    /** The override recovery timeout */
101    private int recoveryTimeout = 0;
102    
103    /** The recovery retries */
104    private int recoveryRetries = 0;
105    
106    /** The recover messages chunk */
107    private int recoverMessagesChunk = 0;
108
109    /** The statement retries */
110    private int statementRetries = 5;
111    
112    /////////////////////////////////////////////////////////////////////////////////
113
//
114
// JDBC Access Attributes
115
//
116
/////////////////////////////////////////////////////////////////////////////////
117

118    protected String JavaDoc UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?";
119    protected String JavaDoc UPDATE_MARKED_MESSAGES_XARECOVERY = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID NOT IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID IS NOT NULL)";
120    protected String JavaDoc UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
121    protected String JavaDoc DELETE_MARKED_MESSAGES_WITH_TX =
122       "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?";
123    protected String JavaDoc DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY =
124       "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID = NULL) AND TXOP=?";
125    protected String JavaDoc DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?";
126    protected String JavaDoc DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?";
127    protected String JavaDoc DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXOP = 'T'";
128    protected String JavaDoc INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)";
129    protected String JavaDoc INSERT_TX_XARECOVERY = "INSERT INTO JMS_TRANSACTIONS (TXID, XID) values(?, ?)";
130    protected String JavaDoc DELETE_ALL_TX = "DELETE FROM JMS_TRANSACTIONS";
131    protected String JavaDoc DELETE_ALL_TX_XARECOVERY = "DELETE FROM JMS_TRANSACTIONS WHERE XID = NULL";
132    protected String JavaDoc SELECT_MAX_TX = "SELECT MAX(TXID) FROM (SELECT MAX(TXID) FROM JMS_TRANSACTIONS UNION SELECT MAX(TXID) FROM JMS_MESSAGES)";
133    protected String JavaDoc SELECT_ALL_TX_XARECOVERY = "SELECT TXID, XID FROM JMS_TRANSACTIONS";
134    protected String JavaDoc SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?";
135    protected String JavaDoc SELECT_MESSAGES_IN_DEST_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE DESTINATION=?";
136    protected String JavaDoc SELECT_MESSAGE_KEYS_IN_DEST = "SELECT MESSAGEID FROM JMS_MESSAGES WHERE DESTINATION=?";
137    protected String JavaDoc SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
138    protected String JavaDoc SELECT_MESSAGE_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
139    protected String JavaDoc INSERT_MESSAGE =
140       "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)";
141    protected String JavaDoc MARK_MESSAGE = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
142    protected String JavaDoc DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
143    protected String JavaDoc UPDATE_MESSAGE = "UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
144    protected String JavaDoc CREATE_MESSAGE_TABLE =
145       "CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, "
146          + "DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1),"
147          + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
148    protected String JavaDoc CREATE_IDX_MESSAGE_TXOP_TXID = "CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)";
149    protected String JavaDoc CREATE_IDX_MESSAGE_DESTINATION = "CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)";
150    protected String JavaDoc CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )";
151    protected String JavaDoc CREATE_TX_TABLE_XARECOVERY = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, XID OBJECT, PRIMARY KEY (TXID) )";
152
153    protected static final int OBJECT_BLOB = 0;
154    protected static final int BYTES_BLOB = 1;
155    protected static final int BINARYSTREAM_BLOB = 2;
156    protected static final int BLOB_BLOB = 3;
157
158    protected int blobType = OBJECT_BLOB;
159    protected boolean createTables;
160
161    protected int connectionRetryAttempts = 5;
162
163    protected boolean xaRecovery = false;
164    
165    /////////////////////////////////////////////////////////////////////////////////
166
//
167
// Constructor.
168
//
169
/////////////////////////////////////////////////////////////////////////////////
170
public PersistenceManager() throws javax.jms.JMSException JavaDoc
171    {
172       txManager = new TxManager(this);
173    }
174
175    /**
176     * This inner class helps handle the tx management of the jdbc connections.
177     *
178     */

179    protected class TransactionManagerStrategy
180    {
181
182       Transaction JavaDoc threadTx;
183
184       void startTX() throws JMSException JavaDoc
185       {
186          try
187          {
188             // Thread arriving must be clean (jboss doesn't set the thread
189
// previously). However optimized calls come with associated
190
// thread for example. We suspend the thread association here, and
191
// resume in the finally block of the following try.
192
threadTx = tm.suspend();
193
194             // Always begin a transaction
195
tm.begin();
196          }
197          catch (Exception JavaDoc e)
198          {
199             try
200             {
201                if (threadTx != null)
202                   tm.resume(threadTx);
203             }
204             catch (Exception JavaDoc ignore)
205             {
206             }
207             throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
208          }
209       }
210
211       void setRollbackOnly() throws JMSException JavaDoc
212       {
213          try
214          {
215             tm.setRollbackOnly();
216          }
217          catch (Exception JavaDoc e)
218          {
219             throw new SpyJMSException("Could not start a mark the transaction for rollback .", e);
220          }
221       }
222
223       void endTX() throws JMSException JavaDoc
224       {
225          try
226          {
227             if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK)
228             {
229                tm.rollback();
230             }
231             else
232             {
233                tm.commit();
234             }
235          }
236          catch (Exception JavaDoc e)
237          {
238             throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
239          }
240          finally
241          {
242             try
243             {
244                if (threadTx != null)
245                   tm.resume(threadTx);
246             }
247             catch (Exception JavaDoc ignore)
248             {
249             }
250          }
251       }
252    }
253
254    /////////////////////////////////////////////////////////////////////////////////
255
//
256
// TX Resolution.
257
//
258
/////////////////////////////////////////////////////////////////////////////////
259

260    synchronized protected void createSchema() throws JMSException JavaDoc
261    {
262       TransactionManagerStrategy tms = new TransactionManagerStrategy();
263       tms.startTX();
264       Connection JavaDoc c = null;
265       PreparedStatement JavaDoc stmt = null;
266       boolean threadWasInterrupted = Thread.interrupted();
267       try
268       {
269          if (createTables)
270          {
271             c = this.getConnection();
272
273             boolean createdMessageTable = false;
274             try
275             {
276                stmt = c.prepareStatement(CREATE_MESSAGE_TABLE);
277                stmt.executeUpdate();
278                createdMessageTable = true;
279             }
280             catch (SQLException JavaDoc e)
281             {
282                log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE, e);
283             }
284             finally
285             {
286                try
287                {
288                   if (stmt != null)
289                      stmt.close();
290                }
291                catch (Throwable JavaDoc ignored)
292                {
293                   log.trace("Ignored: " + ignored);
294                }
295                stmt = null;
296             }
297
298             if (createdMessageTable)
299             {
300                try
301                {
302                   stmt = c.prepareStatement(CREATE_IDX_MESSAGE_TXOP_TXID);
303                   stmt.executeUpdate();
304                }
305                catch (SQLException JavaDoc e)
306                {
307                   log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_TXOP_TXID, e);
308                }
309                finally
310                {
311                   try
312                   {
313                      if (stmt != null)
314                         stmt.close();
315                   }
316                   catch (Throwable JavaDoc ignored)
317                   {
318                      log.trace("Ignored: " + ignored);
319                   }
320                   stmt = null;
321                }
322                try
323                {
324                   stmt = c.prepareStatement(CREATE_IDX_MESSAGE_DESTINATION);
325                   stmt.executeUpdate();
326                }
327                catch (SQLException JavaDoc e)
328                {
329                   log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_DESTINATION, e);
330                }
331                finally
332                {
333                   try
334                   {
335                      if (stmt != null)
336                         stmt.close();
337                   }
338                   catch (Throwable JavaDoc ignored)
339                   {
340                      log.trace("Ignored: " + ignored);
341                   }
342                   stmt = null;
343                }
344             }
345
346             String JavaDoc createTxTable = CREATE_TX_TABLE;
347             if (xaRecovery)
348                createTxTable = CREATE_TX_TABLE_XARECOVERY;
349             try
350             {
351                stmt = c.prepareStatement(createTxTable);
352                stmt.executeUpdate();
353             }
354             catch (SQLException JavaDoc e)
355             {
356                log.debug("Could not create table with SQL: " + createTxTable, e);
357             }
358             finally
359             {
360                try
361                {
362                   if (stmt != null)
363                      stmt.close();
364                }
365                catch (Throwable JavaDoc ignored)
366                {
367                   log.trace("Ignored: " + ignored);
368                }
369                stmt = null;
370             }
371          }
372       }
373       catch (SQLException JavaDoc e)
374       {
375          tms.setRollbackOnly();
376          throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e);
377       }
378       finally
379       {
380          try
381          {
382             if (stmt != null)
383                stmt.close();
384          }
385          catch (Throwable JavaDoc ignore)
386          {
387          }
388          stmt = null;
389          try
390          {
391             if (c != null)
392                c.close();
393          }
394          catch (Throwable JavaDoc ignore)
395          {
396          }
397          c = null;
398          tms.endTX();
399
400          // Restore the interrupted state of the thread
401
if (threadWasInterrupted)
402             Thread.currentThread().interrupt();
403       }
404    }
405
406    synchronized protected void resolveAllUncommitedTXs() throws JMSException JavaDoc
407    {
408       // We perform recovery in a different thread to the table creation
409
// Postgres doesn't like create table failing in the same transaction
410
// as other operations
411

412       TransactionManagerStrategy tms = new TransactionManagerStrategy();
413       tms.startTX();
414       Connection JavaDoc c = null;
415       PreparedStatement JavaDoc stmt = null;
416       ResultSet JavaDoc rs = null;
417       boolean threadWasInterrupted = Thread.interrupted();
418       try
419       {
420          c = this.getConnection();
421
422          // Find out what the next TXID should be
423
stmt = c.prepareStatement(SELECT_MAX_TX);
424          rs = stmt.executeQuery();
425          if (rs.next())
426             nextTransactionId.set(rs.getLong(1) + 1);
427          rs.close();
428          rs = null;
429          stmt.close();
430          stmt = null;
431
432          // Delete all the temporary messages.
433
stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES);
434          stmt.executeUpdate();
435          stmt.close();
436          stmt = null;
437
438          // Delete all the messages that were added but thier tx's were not commited.
439
String JavaDoc deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX;
440          if (xaRecovery)
441             deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY;
442          stmt = c.prepareStatement(deleteMarkedMessagesWithTx);
443          stmt.setString(1, "A");
444          stmt.executeUpdate();
445          stmt.close();
446          stmt = null;
447
448          // Restore all the messages that were removed but their tx's were not commited.
449
String JavaDoc updateMarkedMessages = UPDATE_MARKED_MESSAGES;
450          if (xaRecovery)
451             updateMarkedMessages = UPDATE_MARKED_MESSAGES_XARECOVERY;
452          stmt = c.prepareStatement(updateMarkedMessages);
453          stmt.setNull(1, java.sql.Types.BIGINT);
454          stmt.setString(2, "A");
455          stmt.setString(3, "D");
456          stmt.executeUpdate();
457          stmt.close();
458          stmt = null;
459
460          // Now recovery is complete, clear the transaction table.
461
String JavaDoc deleteAllTx = DELETE_ALL_TX;
462          if (xaRecovery)
463             deleteAllTx = DELETE_ALL_TX_XARECOVERY;
464          stmt = c.prepareStatement(deleteAllTx);
465          stmt.execute();
466          stmt.close();
467          stmt = null;
468
469          // If we are doing XARecovery restore the prepared transactions
470
if (xaRecovery)
471          {
472             stmt = c.prepareStatement(SELECT_ALL_TX_XARECOVERY);
473             rs = stmt.executeQuery();
474             while (rs.next())
475             {
476                long txid = rs.getLong(1);
477                Xid JavaDoc xid = extractXid(rs, 2);
478                Tx tx = new Tx(txid);
479                tx.setXid(xid);
480                tx.checkPersisted();
481                txManager.restoreTx(tx);
482             }
483             rs.close();
484             rs = null;
485             stmt.close();
486             stmt = null;
487          }
488       }
489       catch (Exception JavaDoc e)
490       {
491          tms.setRollbackOnly();
492          throw new SpyJMSException("Could not resolve uncommited transactions. Message recovery may not be accurate", e);
493       }
494       finally
495       {
496          try
497          {
498             if (rs != null)
499                rs.close();
500          }
501          catch (Throwable JavaDoc ignore)
502          {
503          }
504          try
505          {
506             if (stmt != null)
507                stmt.close();
508          }
509          catch (Throwable JavaDoc ignore)
510          {
511          }
512          try
513          {
514             if (c != null)
515                c.close();
516          }
517          catch (Throwable JavaDoc ignore)
518          {
519          }
520          tms.endTX();
521
522          // Restore the interrupted state of the thread
523
if (threadWasInterrupted)
524             Thread.currentThread().interrupt();
525       }
526    }
527
528    /////////////////////////////////////////////////////////////////////////////////
529
//
530
// Message Recovery
531
//
532
/////////////////////////////////////////////////////////////////////////////////
533

534    synchronized public void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException JavaDoc
535    {
536       if (jmsDest == null)
537          throw new IllegalArgumentException JavaDoc("Must supply non null JMSDestination to restoreQueue");
538       if (dest == null)
539          throw new IllegalArgumentException JavaDoc("Must supply non null SpyDestination to restoreQueue");
540
541       boolean canOverrideTimeout = (tm instanceof TransactionTimeoutConfiguration);
542       int previousTimeout = 0;
543       try
544       {
545          // Set our timeout
546
if (recoveryTimeout != 0)
547          {
548             if (canOverrideTimeout)
549             {
550                previousTimeout = ((TransactionTimeoutConfiguration) tm).getTransactionTimeout();
551                tm.setTransactionTimeout(recoveryTimeout);
552             }
553             else
554             {
555                log.debug("Cannot override recovery timeout, TransactionManager does implement " + TransactionTimeoutConfiguration.class.getName());
556             }
557          }
558          
559          // restore the queue
560
try
561          {
562             internalRestoreQueue(jmsDest, dest);
563          }
564          finally
565          {
566             // restore the transaction timeout
567
if (recoveryTimeout != 0 && canOverrideTimeout)
568                tm.setTransactionTimeout(previousTimeout);
569          }
570       }
571       catch (Exception JavaDoc e)
572       {
573          SpyJMSException.rethrowAsJMSException("Unexpected error in recovery", e);
574       }
575    }
576    
577    synchronized protected void internalRestoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException JavaDoc
578    {
579       // Work out the prepared transactions
580
Map JavaDoc prepared = null;
581       if (xaRecovery)
582       {
583          prepared = new HashMap JavaDoc();
584          Map JavaDoc map = txManager.getPreparedTransactions();
585          for (Iterator JavaDoc i = map.values().iterator(); i.hasNext();)
586          {
587             TxManager.PreparedInfo info = (TxManager.PreparedInfo) i.next();
588             for (Iterator JavaDoc j = info.getTxids().iterator(); j.hasNext();)
589             {
590                Tx tx = (Tx) j.next();
591                prepared.put(new Long JavaDoc(tx.longValue()), tx);
592             }
593          }
594       }
595       
596       TransactionManagerStrategy tms = new TransactionManagerStrategy();
597       tms.startTX();
598       Connection JavaDoc c = null;
599       PreparedStatement JavaDoc stmt = null;
600       PreparedStatement JavaDoc stmt2 = null;
601       ResultSet JavaDoc rs = null;
602       boolean threadWasInterrupted = Thread.interrupted();
603       try
604       {
605          String JavaDoc selectMessagesInDest = SELECT_MESSAGES_IN_DEST;
606          String JavaDoc selectMessage = SELECT_MESSAGE;
607          if (xaRecovery)
608          {
609             selectMessagesInDest = SELECT_MESSAGES_IN_DEST_XARECOVERY;
610             selectMessage = SELECT_MESSAGE_XARECOVERY;
611          }
612          c = this.getConnection();
613          if (recoverMessagesChunk == 0)
614             stmt = c.prepareStatement(selectMessagesInDest);
615          else
616          {
617             stmt = c.prepareStatement(SELECT_MESSAGE_KEYS_IN_DEST);
618             stmt2 = c.prepareStatement(selectMessage);
619          }
620          stmt.setString(1, dest.toString());
621
622          long txid = 0;
623          String JavaDoc txop = null;
624          rs = stmt.executeQuery();
625          int counter = 0;
626          int recovery = 0;
627          while (rs.next())
628          {
629             long msgid = rs.getLong(1);
630             SpyMessage message = null;
631             if (recoverMessagesChunk == 0)
632             {
633                message = extractMessage(rs);
634                if (xaRecovery)
635                {
636                   txid = rs.getLong(3);
637                   txop = rs.getString(4);
638                }
639             }
640             else
641             {
642                ResultSet JavaDoc rs2 = null;
643                try
644                {
645                   stmt2.setLong(1, msgid);
646                   stmt2.setString(2, dest.toString());
647                   rs2 = stmt2.executeQuery();
648                   if (rs2.next())
649                   {
650                      message = extractMessage(rs2);
651                      if (xaRecovery)
652                      {
653                         txid = rs.getLong(3);
654                         txop = rs.getString(4);
655                      }
656                   }
657                   else
658                      log.warn("Failed to find message msgid=" + msgid +" dest=" + dest);
659                }
660                finally
661                {
662                   if (rs2 != null)
663                   {
664                      try
665                      {
666                         rs2.close();
667                      }
668                      catch (Exception JavaDoc ignored)
669                      {
670                      }
671                   }
672                }
673             }
674             // The durable subscription is not serialized
675
if (dest instanceof SpyTopic)
676                message.header.durableSubscriberID = ((SpyTopic) dest).getDurableSubscriptionID();
677
678             if (xaRecovery == false || txid == 0 || txop == null)
679                jmsDest.restoreMessage(message);
680             else
681             {
682                Tx tx = (Tx) prepared.get(new Long JavaDoc(txid));
683                if (tx == null)
684                   jmsDest.restoreMessage(message);
685                else if ("A".equals(txop))
686                {
687                   jmsDest.restoreMessage(message, tx, Tx.ADD);
688                   recovery++;
689                }
690                else if ("D".equals(txop))
691                {
692                   jmsDest.restoreMessage(message, tx, Tx.REMOVE);
693                   recovery++;
694                }
695                else
696                   throw new IllegalStateException JavaDoc("Unknown txop=" + txop + " for msg=" + msgid + " dest=" + dest);
697             }
698             counter++;
699          }
700
701          log.debug("Restored " + counter + " message(s) to: " + dest + " " + recovery + " need recovery.");
702       }
703       catch (IOException JavaDoc e)
704       {
705          tms.setRollbackOnly();
706          throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
707       }
708       catch (SQLException JavaDoc e)
709       {
710          tms.setRollbackOnly();
711          throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
712       }
713       finally
714       {
715          try
716          {
717             if (rs != null)
718                rs.close();
719          }
720          catch (Throwable JavaDoc ignore)
721          {
722          }
723          try
724          {
725             if (stmt != null)
726                stmt.close();
727          }
728          catch (Throwable JavaDoc ignore)
729          {
730          }
731          try
732          {
733             if (c != null)
734                c.close();
735          }
736          catch (Throwable JavaDoc ignore)
737          {
738          }
739          tms.endTX();
740
741          // Restore the interrupted state of the thread
742
if (threadWasInterrupted)
743             Thread.currentThread().interrupt();
744       }
745
746    }
747
748    SpyMessage extractMessage(ResultSet JavaDoc rs) throws SQLException JavaDoc, IOException JavaDoc
749    {
750       try
751       {
752          long messageid = rs.getLong(1);
753
754          SpyMessage message = null;
755
756          if (blobType == OBJECT_BLOB)
757          {
758
759             message = (SpyMessage) rs.getObject(2);
760
761          }
762          else if (blobType == BYTES_BLOB)
763          {
764
765             byte[] st = rs.getBytes(2);
766             ByteArrayInputStream JavaDoc baip = new ByteArrayInputStream JavaDoc(st);
767             ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(baip);
768             message = SpyMessage.readMessage(ois);
769
770          }
771          else if (blobType == BINARYSTREAM_BLOB)
772          {
773
774             ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(rs.getBinaryStream(2));
775             message = SpyMessage.readMessage(ois);
776
777          }
778          else if (blobType == BLOB_BLOB)
779          {
780
781             ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(rs.getBlob(2).getBinaryStream());
782             message = SpyMessage.readMessage(ois);
783          }
784
785          message.header.messageId = messageid;
786          return message;
787       }
788       catch (StreamCorruptedException JavaDoc e)
789       {
790          throw new IOException JavaDoc("Could not load the message: " + e);
791       }
792    }
793
794    Xid JavaDoc extractXid(ResultSet JavaDoc rs, int column) throws SQLException JavaDoc, IOException JavaDoc, ClassNotFoundException JavaDoc
795    {
796       try
797       {
798          Xid JavaDoc xid = null;
799
800          if (blobType == OBJECT_BLOB)
801          {
802             xid = (Xid JavaDoc) rs.getObject(column);
803          }
804          else if (blobType == BYTES_BLOB)
805          {
806             byte[] st = rs.getBytes(column);
807             ByteArrayInputStream JavaDoc baip = new ByteArrayInputStream JavaDoc(st);
808             ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(baip);
809             xid = (Xid JavaDoc) ois.readObject();
810          }
811          else if (blobType == BINARYSTREAM_BLOB)
812          {
813             ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(rs.getBinaryStream(column));
814             xid = (Xid JavaDoc) ois.readObject();
815          }
816          else if (blobType == BLOB_BLOB)
817          {
818             ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(rs.getBlob(column).getBinaryStream());
819             xid = (Xid JavaDoc) ois.readObject();
820          }
821
822          return xid;
823       }
824       catch (StreamCorruptedException JavaDoc e)
825       {
826          throw new IOException JavaDoc("Could not load the message: " + e);
827       }
828    }
829
830    /////////////////////////////////////////////////////////////////////////////////
831
//
832
// TX Commit
833
//
834
/////////////////////////////////////////////////////////////////////////////////
835
public void commitPersistentTx(Tx txId) throws javax.jms.JMSException JavaDoc
836    {
837       if (txId.wasPersisted() == false)
838          return;
839       
840       TransactionManagerStrategy tms = new TransactionManagerStrategy();
841       tms.startTX();
842       Connection JavaDoc c = null;
843       boolean threadWasInterrupted = Thread.interrupted();
844       try
845       {
846
847          c = this.getConnection();
848          removeMarkedMessages(c, txId, "D");
849          removeTXRecord(c, txId.longValue());
850
851       }
852       catch (SQLException JavaDoc e)
853       {
854          tms.setRollbackOnly();
855          throw new SpyJMSException("Could not commit tx: " + txId, e);
856       }
857       finally
858       {
859          try
860          {
861             if (c != null)
862                c.close();
863          }
864          catch (Throwable JavaDoc ignore)
865          {
866          }
867          tms.endTX();
868
869          // Restore the interrupted state of the thread
870
if (threadWasInterrupted)
871             Thread.currentThread().interrupt();
872       }
873    }
874
875    public void removeMarkedMessages(Connection JavaDoc c, Tx txid, String JavaDoc mark) throws SQLException JavaDoc
876    {
877       PreparedStatement JavaDoc stmt = null;
878       try
879       {
880          stmt = c.prepareStatement(DELETE_MARKED_MESSAGES);
881          stmt.setLong(1, txid.longValue());
882          stmt.setString(2, mark);
883          stmt.executeUpdate();
884       }
885       finally
886       {
887          try
888          {
889             if (stmt != null)
890                stmt.close();
891          }
892          catch (Throwable JavaDoc e)
893          {
894          }
895       }
896    }
897
898    public void addTXRecord(Connection JavaDoc c, Tx txid) throws SQLException JavaDoc, IOException JavaDoc
899    {
900       PreparedStatement JavaDoc stmt = null;
901       try
902       {
903          String JavaDoc insertTx = INSERT_TX;
904          if (xaRecovery)
905             insertTx = INSERT_TX_XARECOVERY;
906          stmt = c.prepareStatement(insertTx);
907          stmt.setLong(1, txid.longValue());
908          if (xaRecovery)
909          {
910             Xid JavaDoc xid = txid.getXid();
911             if (xid != null)
912                setBlob(stmt, 2, xid);
913             else
914                stmt.setNull(2, java.sql.Types.BLOB);
915          }
916          stmt.executeUpdate();
917       }
918       finally
919       {
920          try
921          {
922             if (stmt != null)
923                stmt.close();
924          }
925          catch (Throwable JavaDoc e)
926          {
927          }
928       }
929    }
930
931    public void removeTXRecord(Connection JavaDoc c, long txid) throws SQLException JavaDoc
932    {
933       PreparedStatement JavaDoc stmt = null;
934       try
935       {
936          stmt = c.prepareStatement(DELETE_TX);
937          stmt.setLong(1, txid);
938          stmt.executeUpdate();
939       }
940       finally
941       {
942          try
943          {
944             if (stmt != null)
945                stmt.close();
946          }
947          catch (Throwable JavaDoc e)
948          {
949          }
950       }
951    }
952
953    /////////////////////////////////////////////////////////////////////////////////
954
//
955
// TX Rollback
956
//
957
/////////////////////////////////////////////////////////////////////////////////
958
public void rollbackPersistentTx(Tx txId) throws JMSException JavaDoc
959    {
960       if (txId.wasPersisted() == false)
961          return;
962
963       TransactionManagerStrategy tms = new TransactionManagerStrategy();
964       tms.startTX();
965       Connection JavaDoc c = null;
966       PreparedStatement JavaDoc stmt = null;
967       boolean threadWasInterrupted = Thread.interrupted();
968       try
969       {
970
971          c = this.getConnection();
972          removeMarkedMessages(c, txId, "A");
973          removeTXRecord(c, txId.longValue());
974
975          // Restore all the messages that were logically removed.
976
stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX);
977          stmt.setNull(1, java.sql.Types.BIGINT);
978          stmt.setString(2, "A");
979          stmt.setString(3, "D");
980          stmt.setLong(4, txId.longValue());
981          stmt.executeUpdate();
982          stmt.close();
983          stmt = null;
984       }
985       catch (SQLException JavaDoc e)
986       {
987          tms.setRollbackOnly();
988          throw new SpyJMSException("Could not rollback tx: " + txId, e);
989       }
990       finally
991       {
992          try
993          {
994             if (stmt != null)
995                stmt.close();
996          }
997          catch (Throwable JavaDoc ignore)
998          {
999          }
1000         try
1001         {
1002            if (c != null)
1003               c.close();
1004         }
1005         catch (Throwable JavaDoc ignore)
1006         {
1007         }
1008         tms.endTX();
1009
1010         // Restore the interrupted state of the thread
1011
if (threadWasInterrupted)
1012            Thread.currentThread().interrupt();
1013      }
1014
1015   }
1016
1017   /////////////////////////////////////////////////////////////////////////////////
1018
//
1019
// TX Creation
1020
//
1021
/////////////////////////////////////////////////////////////////////////////////
1022
public Tx createPersistentTx() throws JMSException JavaDoc
1023   {
1024      Tx id = new Tx(nextTransactionId.increment());
1025      return id;
1026   }
1027
1028   public void insertPersistentTx(TransactionManagerStrategy tms, Connection JavaDoc c, Tx tx) throws JMSException JavaDoc
1029   {
1030      try
1031      {
1032         if (tx != null && tx.checkPersisted() == false)
1033            addTXRecord(c, tx);
1034      }
1035      catch (Exception JavaDoc e)
1036      {
1037         tms.setRollbackOnly();
1038         throw new SpyJMSException("Could not create tx: " + tx.longValue(), e);
1039      }
1040   }
1041
1042   /////////////////////////////////////////////////////////////////////////////////
1043
//
1044
// Adding a message
1045
//
1046
/////////////////////////////////////////////////////////////////////////////////
1047
public void add(MessageReference messageRef, Tx txId) throws javax.jms.JMSException JavaDoc
1048   {
1049      boolean trace = log.isTraceEnabled();
1050      if (trace)
1051         log.trace("About to add message " + messageRef + " transaction=" + txId);
1052
1053      TransactionManagerStrategy tms = new TransactionManagerStrategy();
1054      tms.startTX();
1055      Connection JavaDoc c = null;
1056      boolean threadWasInterrupted = Thread.interrupted();
1057      try
1058      {
1059         c = this.getConnection();
1060
1061         // Lazily write the peristent transaction
1062
insertPersistentTx(tms, c, txId);
1063         
1064         // Synchronize on the message to avoid a race with the softener
1065
synchronized (messageRef)
1066         {
1067            SpyMessage message = messageRef.getMessage();
1068
1069            // has it allready been stored by the message cache interface??
1070
if (messageRef.stored == MessageReference.STORED)
1071            {
1072               if (trace)
1073                  log.trace("Updating message " + messageRef + " transaction=" + txId);
1074
1075               markMessage(c, messageRef.messageId, messageRef.getPersistentKey(), txId, "A");
1076            }
1077            else
1078            {
1079               if (trace)
1080                  log.trace("Inserting message " + messageRef + " transaction=" + txId);
1081
1082               add(c, messageRef.getPersistentKey(), message, txId, "A");
1083               messageRef.setStored(MessageReference.STORED);
1084            }
1085            if (trace)
1086               log.trace("Added message " + messageRef + " transaction=" + txId);
1087         }
1088      }
1089      catch (IOException JavaDoc e)
1090      {
1091         tms.setRollbackOnly();
1092         throw new SpyJMSException("Could not store message: " + messageRef, e);
1093      }
1094      catch (SQLException JavaDoc e)
1095      {
1096         tms.setRollbackOnly();
1097         throw new SpyJMSException("Could not store message: " + messageRef, e);
1098      }
1099      finally
1100      {
1101         try
1102         {
1103            if (c != null)
1104               c.close();
1105         }
1106         catch (Throwable JavaDoc ignore)
1107         {
1108         }
1109         tms.endTX();
1110
1111         // Restore the interrupted state of the thread
1112
if (threadWasInterrupted)
1113            Thread.currentThread().interrupt();
1114      }
1115   }
1116
1117   protected void add(Connection JavaDoc c, String JavaDoc queue, SpyMessage message, Tx txId, String JavaDoc mark)
1118      throws SQLException JavaDoc, IOException JavaDoc
1119   {
1120      PreparedStatement JavaDoc stmt = null;
1121      try
1122      {
1123
1124         stmt = c.prepareStatement(INSERT_MESSAGE);
1125
1126         stmt.setLong(1, message.header.messageId);
1127         stmt.setString(2, queue);
1128         setBlob(stmt, 3, message);
1129
1130         if (txId != null)
1131            stmt.setLong(4, txId.longValue());
1132         else
1133            stmt.setNull(4, java.sql.Types.BIGINT);
1134         stmt.setString(5, mark);
1135
1136         stmt.executeUpdate();
1137      }
1138      finally
1139      {
1140         try
1141         {
1142            if (stmt != null)
1143               stmt.close();
1144         }
1145         catch (Throwable JavaDoc ignore)
1146         {
1147         }
1148      }
1149   }
1150
1151   public void markMessage(Connection JavaDoc c, long messageid, String JavaDoc destination, Tx txId, String JavaDoc mark)
1152      throws SQLException JavaDoc
1153   {
1154      PreparedStatement JavaDoc stmt = null;
1155      try
1156      {
1157
1158         stmt = c.prepareStatement(MARK_MESSAGE);
1159         if (txId == null)
1160         {
1161            stmt.setNull(1, java.sql.Types.BIGINT);
1162         }
1163         else
1164         {
1165            stmt.setLong(1, txId.longValue());
1166         }
1167         stmt.setString(2, mark);
1168         stmt.setLong(3, messageid);
1169         stmt.setString(4, destination);
1170         stmt.executeUpdate();
1171      }
1172      finally
1173      {
1174         try
1175         {
1176            if (stmt != null)
1177               stmt.close();
1178         }
1179         catch (Throwable JavaDoc ignore)
1180         {
1181         }
1182      }
1183
1184   }
1185
1186   public void setBlob(PreparedStatement JavaDoc stmt, int column, SpyMessage message) throws IOException JavaDoc, SQLException JavaDoc
1187   {
1188      if (blobType == OBJECT_BLOB)
1189      {
1190         stmt.setObject(column, message);
1191      }
1192      else if (blobType == BYTES_BLOB)
1193      {
1194         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
1195         ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(baos);
1196         SpyMessage.writeMessage(message, oos);
1197         oos.flush();
1198         byte[] messageAsBytes = baos.toByteArray();
1199         stmt.setBytes(column, messageAsBytes);
1200      }
1201      else if (blobType == BINARYSTREAM_BLOB)
1202      {
1203         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
1204         ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(baos);
1205         SpyMessage.writeMessage(message, oos);
1206         oos.flush();
1207         byte[] messageAsBytes = baos.toByteArray();
1208         ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(messageAsBytes);
1209         stmt.setBinaryStream(column, bais, messageAsBytes.length);
1210      }
1211      else if (blobType == BLOB_BLOB)
1212      {
1213
1214         throw new RuntimeException JavaDoc("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
1215         /** TODO:
1216         ByteArrayOutputStream baos= new ByteArrayOutputStream();
1217         ObjectOutputStream oos= new ObjectOutputStream(baos);
1218         oos.writeObject(message);
1219         byte[] messageAsBytes= baos.toByteArray();
1220         ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
1221         stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
1222         */

1223      }
1224   }
1225
1226   public void setBlob(PreparedStatement JavaDoc stmt, int column, Xid JavaDoc xid) throws IOException JavaDoc, SQLException JavaDoc
1227   {
1228      if (blobType == OBJECT_BLOB)
1229      {
1230         stmt.setObject(column, xid);
1231      }
1232      else if (blobType == BYTES_BLOB)
1233      {
1234         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
1235         ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(baos);
1236         oos.writeObject(xid);
1237         oos.flush();
1238         byte[] messageAsBytes = baos.toByteArray();
1239         stmt.setBytes(column, messageAsBytes);
1240      }
1241      else if (blobType == BINARYSTREAM_BLOB)
1242      {
1243         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
1244         ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(baos);
1245         oos.writeObject(xid);
1246         oos.flush();
1247         byte[] messageAsBytes = baos.toByteArray();
1248         ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(messageAsBytes);
1249         stmt.setBinaryStream(column, bais, messageAsBytes.length);
1250      }
1251      else if (blobType == BLOB_BLOB)
1252      {
1253
1254         throw new RuntimeException JavaDoc("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
1255         /** TODO:
1256         ByteArrayOutputStream baos= new ByteArrayOutputStream();
1257         ObjectOutputStream oos= new ObjectOutputStream(baos);
1258         oos.writeObject(xid);
1259         byte[] messageAsBytes= baos.toByteArray();
1260         ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
1261         stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
1262         */

1263      }
1264   }
1265
1266   /////////////////////////////////////////////////////////////////////////////////
1267
//
1268
// Updating a message
1269
//
1270
/////////////////////////////////////////////////////////////////////////////////
1271
public void update(MessageReference messageRef, Tx txId) throws javax.jms.JMSException JavaDoc
1272   {
1273      boolean trace = log.isTraceEnabled();
1274      if (trace)
1275         log.trace("Updating message " + messageRef + " transaction=" + txId);
1276
1277      TransactionManagerStrategy tms = new TransactionManagerStrategy();
1278      tms.startTX();
1279      Connection JavaDoc c = null;
1280      PreparedStatement JavaDoc stmt = null;
1281      boolean threadWasInterrupted = Thread.interrupted();
1282      try
1283      {
1284
1285         c = this.getConnection();
1286         if (txId == null)
1287         {
1288
1289            stmt = c.prepareStatement(UPDATE_MESSAGE);
1290            setBlob(stmt, 1, messageRef.getMessage());
1291            stmt.setLong(2, messageRef.messageId);
1292            stmt.setString(3, messageRef.getPersistentKey());
1293            int rc = stmt.executeUpdate();
1294            if (rc != 1)
1295               throw new SpyJMSException(
1296                  "Could not update the message in the database: update affected " + rc + " rows");
1297         }
1298         else
1299         {
1300            throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used");
1301         }
1302         if (trace)
1303            log.trace("Updated message " + messageRef + " transaction=" + txId);
1304
1305      }
1306      catch (IOException JavaDoc e)
1307      {
1308         tms.setRollbackOnly();
1309         throw new SpyJMSException("Could not update message: " + messageRef, e);
1310      }
1311      catch (SQLException JavaDoc e)
1312      {
1313         tms.setRollbackOnly();
1314         throw new SpyJMSException("Could not update message: " + messageRef, e);
1315      }
1316      finally
1317      {
1318         try
1319         {
1320            if (stmt != null)
1321               stmt.close();
1322         }
1323         catch (Throwable JavaDoc ignore)
1324         {
1325         }
1326         try
1327         {
1328            if (c != null)
1329               c.close();
1330         }
1331         catch (Throwable JavaDoc ignore)
1332         {
1333         }
1334         tms.endTX();
1335
1336         // Restore the interrupted state of the thread
1337
if (threadWasInterrupted)
1338            Thread.currentThread().interrupt();
1339      }
1340
1341   }
1342
1343   /////////////////////////////////////////////////////////////////////////////////
1344
//
1345
// Removing a message
1346
//
1347
/////////////////////////////////////////////////////////////////////////////////
1348
public void remove(MessageReference messageRef, Tx txId) throws javax.jms.JMSException JavaDoc
1349   {
1350      boolean trace = log.isTraceEnabled();
1351      if (trace)
1352         log.trace("Removing message " + messageRef + " transaction=" + txId);
1353
1354      TransactionManagerStrategy tms = new TransactionManagerStrategy();
1355      tms.startTX();
1356      Connection JavaDoc c = null;
1357      PreparedStatement JavaDoc stmt = null;
1358      boolean threadWasInterrupted = Thread.interrupted();
1359      try
1360      {
1361         c = this.getConnection();
1362
1363         // Lazily write the peristent transaction
1364
insertPersistentTx(tms, c, txId);
1365         
1366         // Synchronize on the message to avoid a race with the softener
1367
synchronized (messageRef)
1368         {
1369            if (txId == null)
1370            {
1371               stmt = c.prepareStatement(DELETE_MESSAGE);
1372               stmt.setLong(1, messageRef.messageId);
1373               stmt.setString(2, messageRef.getPersistentKey());
1374
1375               // Adrian Brock:
1376
// Remove the message from the cache, but don't
1377
// return it to the pool just yet. The queue still holds
1378
// a reference to the message and will return it
1379
// to the pool once it gets enough time slice.
1380
// The alternative is to remove the validation
1381
// for double removal from the cache,
1382
// which I don't want to do because it is useful
1383
// for spotting errors
1384
messageRef.setStored(MessageReference.NOT_STORED);
1385               messageRef.removeDelayed();
1386            }
1387            else
1388            {
1389               stmt = c.prepareStatement(MARK_MESSAGE);
1390               stmt.setLong(1, txId.longValue());
1391               stmt.setString(2, "D");
1392               stmt.setLong(3, messageRef.messageId);
1393               stmt.setString(4, messageRef.getPersistentKey());
1394            }
1395
1396             int tries = 0;
1397             while (true)
1398             {
1399                try
1400                {
1401                   int rc = stmt.executeUpdate();
1402
1403                   if (tries > 0)
1404                   {
1405                      if (rc != 1)
1406                         throw new SpyJMSException(
1407                           "Could not mark the message as deleted in the database: update affected " + rc + " rows");
1408
1409                      log.warn("Remove operation worked after " +tries +" retries");
1410                   }
1411                   break;
1412                }
1413                catch (SQLException JavaDoc e)
1414                {
1415                   log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
1416                   tries++;
1417                   if (tries >= statementRetries)
1418                   {
1419                      log.error("Retried " + tries + " times, now giving up");
1420                      throw new IllegalStateException JavaDoc("Could not remove message after " +tries + "attempts");
1421                   }
1422                   log.warn("Trying again after a pause");
1423                   //Now we wait for a random amount of time to minimise risk of deadlock
1424
Thread.sleep((long)(Math.random() * 500));
1425                }
1426             }
1427
1428            if (trace)
1429               log.trace("Removed message " + messageRef + " transaction=" + txId);
1430         }
1431      }
1432      catch (Exception JavaDoc e)
1433      {
1434         tms.setRollbackOnly();
1435         throw new SpyJMSException("Could not remove message: " + messageRef, e);
1436      }
1437      finally
1438      {
1439         try
1440         {
1441            if (stmt != null)
1442               stmt.close();
1443         }
1444         catch (Throwable JavaDoc ignore)
1445         {
1446         }
1447         try
1448         {
1449            if (c != null)
1450               c.close();
1451         }
1452         catch (Throwable JavaDoc ignore)
1453         {
1454         }
1455         tms.endTX();
1456
1457         // Restore the interrupted state of the thread
1458
if (threadWasInterrupted)
1459            Thread.currentThread().interrupt();
1460      }
1461
1462   }
1463
1464   /////////////////////////////////////////////////////////////////////////////////
1465
//
1466
// Misc. PM functions
1467
//
1468
/////////////////////////////////////////////////////////////////////////////////
1469

1470   public TxManager getTxManager()
1471   {
1472      return txManager;
1473   }
1474
1475   public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException JavaDoc
1476   {
1477      // Nothing to clean up, all the state is in the db.
1478
}
1479
1480   public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException JavaDoc
1481   {
1482      if (log.isTraceEnabled())
1483         log.trace("Loading message from storage " + messageRef);
1484
1485      TransactionManagerStrategy tms = new TransactionManagerStrategy();
1486      tms.startTX();
1487      Connection JavaDoc c = null;
1488      PreparedStatement JavaDoc stmt = null;
1489      ResultSet JavaDoc rs = null;
1490      boolean threadWasInterrupted = Thread.interrupted();
1491      try
1492      {
1493
1494         c = this.getConnection();
1495         stmt = c.prepareStatement(SELECT_MESSAGE);
1496         stmt.setLong(1, messageRef.messageId);
1497         stmt.setString(2, messageRef.getPersistentKey());
1498
1499         rs = stmt.executeQuery();
1500         if (rs.next())
1501            return extractMessage(rs);
1502
1503         return null;
1504
1505      }
1506      catch (IOException JavaDoc e)
1507      {
1508         tms.setRollbackOnly();
1509         throw new SpyJMSException("Could not load message : " + messageRef, e);
1510      }
1511      catch (SQLException JavaDoc e)
1512      {
1513         tms.setRollbackOnly();
1514         throw new SpyJMSException("Could not load message : " + messageRef, e);
1515      }
1516      finally
1517      {
1518         try
1519         {
1520            if (rs != null)
1521               rs.close();
1522         }
1523         catch (Throwable JavaDoc ignore)
1524         {
1525         }
1526         try
1527         {
1528            if (stmt != null)
1529               stmt.close();
1530         }
1531         catch (Throwable JavaDoc ignore)
1532         {
1533         }
1534         try
1535         {
1536            if (c != null)
1537               c.close();
1538         }
1539         catch (Throwable JavaDoc ignore)
1540         {
1541         }
1542         tms.endTX();
1543
1544         // Restore the interrupted state of the thread
1545
if (threadWasInterrupted)
1546            Thread.currentThread().interrupt();
1547      }
1548   }
1549
1550   /////////////////////////////////////////////////////////////////////////////////
1551
//
1552
// CacheStore Functions
1553
//
1554
/////////////////////////////////////////////////////////////////////////////////
1555
public void removeFromStorage(MessageReference messageRef) throws JMSException JavaDoc
1556   {
1557      // We don't remove persistent messages sent to persistent queues
1558
if (messageRef.isPersistent())
1559         return;
1560
1561      boolean trace = log.isTraceEnabled();
1562      if (trace)
1563         log.trace("Removing message from storage " + messageRef);
1564
1565      TransactionManagerStrategy tms = new TransactionManagerStrategy();
1566      tms.startTX();
1567      Connection JavaDoc c = null;
1568      PreparedStatement JavaDoc stmt = null;
1569      boolean threadWasInterrupted = Thread.interrupted();
1570      try
1571      {
1572         c = this.getConnection();
1573         stmt = c.prepareStatement(DELETE_MESSAGE);
1574         stmt.setLong(1, messageRef.messageId);
1575         stmt.setString(2, messageRef.getPersistentKey());
1576         stmt.executeUpdate();
1577         messageRef.setStored(MessageReference.NOT_STORED);
1578
1579         if (trace)
1580            log.trace("Removed message from storage " + messageRef);
1581      }
1582      catch (SQLException JavaDoc e)
1583      {
1584         tms.setRollbackOnly();
1585         throw new SpyJMSException("Could not remove message: " + messageRef, e);
1586      }
1587      finally
1588      {
1589         try
1590         {
1591            if (stmt != null)
1592               stmt.close();
1593         }
1594         catch (Throwable JavaDoc ignore)
1595         {
1596         }
1597         try
1598         {
1599            if (c != null)
1600               c.close();
1601         }
1602         catch (Throwable JavaDoc ignore)
1603         {
1604         }
1605         tms.endTX();
1606
1607         // Restore the interrupted state of the thread
1608
if (threadWasInterrupted)
1609            Thread.currentThread().interrupt();
1610      }
1611   }
1612
1613   public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException JavaDoc
1614   {
1615      // Ignore save operations for persistent messages sent to persistent queues
1616
// The queues handle the persistence
1617
if (messageRef.isPersistent())
1618         return;
1619
1620      boolean trace = log.isTraceEnabled();
1621      if (trace)
1622         log.trace("Saving message to storage " + messageRef);
1623
1624      TransactionManagerStrategy tms = new TransactionManagerStrategy();
1625      tms.startTX();
1626      Connection JavaDoc c = null;
1627      boolean threadWasInterrupted = Thread.interrupted();
1628      try
1629      {
1630
1631         c = this.getConnection();
1632         add(c, messageRef.getPersistentKey(), message, null, "T");
1633         messageRef.setStored(MessageReference.STORED);
1634
1635         if (trace)
1636            log.trace("Saved message to storage " + messageRef);
1637      }
1638      catch (IOException JavaDoc e)
1639      {
1640         tms.setRollbackOnly();
1641         throw new SpyJMSException("Could not store message: " + messageRef, e);
1642      }
1643      catch (SQLException JavaDoc e)
1644      {
1645         tms.setRollbackOnly();
1646         throw new SpyJMSException("Could not store message: " + messageRef, e);
1647      }
1648      finally
1649      {
1650         try
1651         {
1652            if (c != null)
1653               c.close();
1654         }
1655         catch (Throwable JavaDoc ignore)
1656         {
1657         }
1658         tms.endTX();
1659
1660         // Restore the interrupted state of the thread
1661
if (threadWasInterrupted)
1662            Thread.currentThread().interrupt();
1663      }
1664   }
1665
1666   /**
1667    * Gets a connection from the datasource, retrying as needed. This was
1668    * implemented because in some minimal configurations (i.e. little logging
1669    * and few services) the database wasn't ready when we tried to get a
1670    * connection. We, therefore, implement a retry loop wich is controled
1671    * by the ConnectionRetryAttempts attribute. Submitted by terry@amicas.com
1672    *
1673    * @exception SQLException if an error occurs.
1674    */

1675   protected Connection JavaDoc getConnection() throws SQLException JavaDoc
1676   {
1677      int attempts = this.connectionRetryAttempts;
1678      int attemptCount = 0;
1679      SQLException JavaDoc sqlException = null;
1680      while (attempts-- > 0)
1681      {
1682         if (++attemptCount > 1)
1683         {
1684            log.debug("Retrying connection: attempt # " + attemptCount);
1685         }
1686         try
1687         {
1688            sqlException = null;
1689            return datasource.getConnection();
1690         }
1691         catch (SQLException JavaDoc exception)
1692         {
1693            log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception);
1694            sqlException = exception;
1695         }
1696         finally
1697         {
1698            if (sqlException == null && attemptCount > 1)
1699            {
1700               log.debug("Connection succeeded on attempt # " + attemptCount);
1701            }
1702         }
1703
1704         if (attempts > 0)
1705         {
1706            try
1707            {
1708               Thread.sleep(1500);
1709            }
1710            catch (InterruptedException JavaDoc interruptedException)
1711            {
1712               break;
1713            }
1714         }
1715      }
1716      if (sqlException != null)
1717      {
1718         throw sqlException;
1719      }
1720      throw new SQLException JavaDoc("connection attempt interrupted");
1721   }
1722
1723   /////////////////////////////////////////////////////////////////////////////////
1724
//
1725
// JMX Interface
1726
//
1727
/////////////////////////////////////////////////////////////////////////////////
1728

1729   /** The object name of the DataSource */
1730   protected ObjectName JavaDoc connectionManagerName;
1731   
1732   /** The SQL properties */
1733   protected Properties JavaDoc sqlProperties = new Properties JavaDoc();
1734
1735   public void startService() throws Exception JavaDoc
1736   {
1737      UPDATE_MARKED_MESSAGES = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES);
1738      UPDATE_MARKED_MESSAGES_XARECOVERY =
1739         sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_XARECOVERY", UPDATE_MARKED_MESSAGES_XARECOVERY);
1740      UPDATE_MARKED_MESSAGES_WITH_TX =
1741         sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", UPDATE_MARKED_MESSAGES_WITH_TX);
1742      DELETE_MARKED_MESSAGES_WITH_TX =
1743         sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", DELETE_MARKED_MESSAGES_WITH_TX);
1744      DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY =
1745         sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY", DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY);
1746      DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX);
1747      DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES);
1748      DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES);
1749      INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX);
1750      INSERT_TX_XARECOVERY = sqlProperties.getProperty("INSERT_TX_XARECOVERY", INSERT_TX_XARECOVERY);
1751      DELETE_ALL_TX = sqlProperties.getProperty("DELETE_ALL_TX", DELETE_ALL_TX);
1752      DELETE_ALL_TX_XARECOVERY = sqlProperties.getProperty("DELETE_ALL_TX_XARECOVERY", DELETE_ALL_TX_XARECOVERY);
1753      SELECT_ALL_TX_XARECOVERY = sqlProperties.getProperty("SELECT_ALL_TX_XARECOVERY", SELECT_ALL_TX_XARECOVERY);
1754      SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX);
1755      SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST);
1756      SELECT_MESSAGES_IN_DEST_XARECOVERY = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST_XARECOVERY", SELECT_MESSAGES_IN_DEST_XARECOVERY);
1757      SELECT_MESSAGE_KEYS_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGE_KEYS_IN_DEST", SELECT_MESSAGE_KEYS_IN_DEST);
1758      SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE);
1759      SELECT_MESSAGE_XARECOVERY = sqlProperties.getProperty("SELECT_MESSAGE_XARECOVERY", SELECT_MESSAGE_XARECOVERY);
1760      INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE);
1761      MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE);
1762      DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE);
1763      UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE", UPDATE_MESSAGE);
1764      CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE);
1765      CREATE_IDX_MESSAGE_TXOP_TXID = sqlProperties.getProperty("CREATE_IDX_MESSAGE_TXOP_TXID", CREATE_IDX_MESSAGE_TXOP_TXID);
1766      CREATE_IDX_MESSAGE_DESTINATION = sqlProperties.getProperty("CREATE_IDX_MESSAGE_DESTINATION", CREATE_IDX_MESSAGE_DESTINATION);
1767      CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE);
1768      CREATE_TX_TABLE_XARECOVERY = sqlProperties.getProperty("CREATE_TX_TABLE_XARECOVERY", CREATE_TX_TABLE_XARECOVERY);
1769      createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true");
1770      String JavaDoc s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB");
1771
1772      if (s.equals("OBJECT_BLOB"))
1773      {
1774         blobType = OBJECT_BLOB;
1775      }
1776      else if (s.equals("BYTES_BLOB"))
1777      {
1778         blobType = BYTES_BLOB;
1779      }
1780      else if (s.equals("BINARYSTREAM_BLOB"))
1781      {
1782         blobType = BINARYSTREAM_BLOB;
1783      }
1784      else if (s.equals("BLOB_BLOB"))
1785      {
1786         blobType = BLOB_BLOB;
1787      }
1788
1789
1790      // initialize tm and datasource
1791
initializeFields();
1792
1793      log.debug("Creating Schema");
1794      try
1795      {
1796         createSchema();
1797      }
1798      catch (Exception JavaDoc e)
1799      {
1800         log.warn("Error creating schema", e);
1801      }
1802      
1803      log.debug("Resolving uncommited TXS");
1804      Throwable JavaDoc error = null;
1805      for (int i = 0; i <= recoveryRetries; ++i)
1806      {
1807         try
1808         {
1809            resolveAllUncommitedTXs();
1810            
1811            // done
1812
break;
1813         }
1814         catch (Throwable JavaDoc t)
1815         {
1816            if (i < recoveryRetries)
1817               log.warn("Error resolving transactions retries=" + i + " of " + recoveryRetries, t);
1818            else
1819               error = t;
1820         }
1821      }
1822      
1823      if (error != null)
1824         SpyJMSException.rethrowAsJMSException("Unable to resolve transactions retries=" + recoveryRetries, error);
1825   }
1826
1827   protected void initializeFields()
1828           throws MBeanException JavaDoc, AttributeNotFoundException JavaDoc, InstanceNotFoundException JavaDoc, ReflectionException JavaDoc, NamingException JavaDoc
1829   {
1830      //Find the ConnectionFactoryLoader MBean so we can find the datasource
1831
String JavaDoc dsName = (String JavaDoc) getServer().getAttribute(connectionManagerName, "BindName");
1832      //Get an InitialContext
1833

1834      InitialContext JavaDoc ctx = new InitialContext JavaDoc();
1835      datasource = (DataSource JavaDoc) ctx.lookup(dsName);
1836
1837      //Get the Transaction Manager so we can control the jdbc tx
1838
tm = (TransactionManager JavaDoc) ctx.lookup(TransactionManagerService.JNDI_NAME);
1839   }
1840
1841   public Object JavaDoc getInstance()
1842   {
1843      return this;
1844   }
1845
1846   public ObjectName JavaDoc getMessageCache()
1847   {
1848      throw new UnsupportedOperationException JavaDoc("This is now set on the destination manager");
1849   }
1850
1851   public void setMessageCache(ObjectName JavaDoc messageCache)
1852   {
1853      throw new UnsupportedOperationException JavaDoc("This is now set on the destination manager");
1854   }
1855
1856   public ObjectName JavaDoc getConnectionManager()
1857   {
1858      return connectionManagerName;
1859   }
1860
1861   public void setConnectionManager(ObjectName JavaDoc connectionManagerName)
1862   {
1863      this.connectionManagerName = connectionManagerName;
1864   }
1865
1866   public MessageCache getMessageCacheInstance()
1867   {
1868      throw new UnsupportedOperationException JavaDoc("This is now set on the destination manager");
1869   }
1870
1871   public String JavaDoc getSqlProperties()
1872   {
1873      try
1874      {
1875         ByteArrayOutputStream JavaDoc boa = new ByteArrayOutputStream JavaDoc();
1876         sqlProperties.store(boa, "");
1877         return new String JavaDoc(boa.toByteArray());
1878      }
1879      catch (IOException JavaDoc shouldnothappen)
1880      {
1881         return "";
1882      }
1883   }
1884
1885   public void setSqlProperties(String JavaDoc value)
1886   {
1887      try
1888      {
1889         ByteArrayInputStream JavaDoc is = new ByteArrayInputStream JavaDoc(value.getBytes());
1890         sqlProperties = new Properties JavaDoc();
1891         sqlProperties.load(is);
1892      }
1893      catch (IOException JavaDoc shouldnothappen)
1894      {
1895      }
1896   }
1897
1898   public void setConnectionRetryAttempts(int value)
1899   {
1900      this.connectionRetryAttempts = value;
1901   }
1902
1903   public int getConnectionRetryAttempts()
1904   {
1905      return this.connectionRetryAttempts;
1906   }
1907  
1908   public int getRecoveryTimeout()
1909   {
1910      return recoveryTimeout;
1911   }
1912
1913   public void setRecoveryTimeout(int timeout)
1914   {
1915      this.recoveryTimeout = timeout;
1916   }
1917   
1918   public int getRecoveryRetries()
1919   {
1920      return recoveryRetries;
1921   }
1922
1923   public void setRecoveryRetries(int retries)
1924   {
1925      this.recoveryRetries = retries;
1926   }
1927
1928   public int getRecoverMessagesChunk()
1929   {
1930      return recoverMessagesChunk;
1931   }
1932
1933   public void setRecoverMessagesChunk(int recoverMessagesChunk)
1934   {
1935      if (recoverMessagesChunk != 0 && recoverMessagesChunk != 1)
1936      {
1937         log.warn("Only the values 0 and 1 are currently support for chunk size, using chunk size=1");
1938         recoverMessagesChunk = 1;
1939      }
1940      this.recoverMessagesChunk = recoverMessagesChunk;
1941   }
1942
1943   public boolean isXARecovery()
1944   {
1945      return xaRecovery;
1946   }
1947
1948   public void setXARecovery(boolean xaRecovery)
1949   {
1950      this.xaRecovery = xaRecovery;
1951   }
1952
1953   public int getStatementRetries()
1954   {
1955      return statementRetries;
1956   }
1957
1958   public void setStatementRetries(int statementRetries)
1959   {
1960      if (statementRetries < 0)
1961         statementRetries = 0;
1962      this.statementRetries = statementRetries;
1963   }
1964}
1965
Popular Tags