KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > pm > jdbc3 > PersistenceManager


1 /*
2  * JBossMQ, the OpenSource JMS implementation
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.mq.pm.jdbc3;
8
9 import java.io.ByteArrayInputStream JavaDoc;
10 import java.io.ByteArrayOutputStream JavaDoc;
11 import java.io.IOException JavaDoc;
12 import java.io.ObjectInputStream JavaDoc;
13 import java.io.ObjectOutputStream JavaDoc;
14 import java.sql.Connection JavaDoc;
15 import java.sql.PreparedStatement JavaDoc;
16 import java.sql.ResultSet JavaDoc;
17 import java.sql.SQLException JavaDoc;
18 import java.sql.Types JavaDoc;
19 import java.util.Properties JavaDoc;
20
21 import javax.jms.JMSException JavaDoc;
22 import javax.management.ObjectName JavaDoc;
23 import javax.naming.InitialContext JavaDoc;
24 import javax.sql.DataSource JavaDoc;
25 import javax.transaction.Status JavaDoc;
26 import javax.transaction.Transaction JavaDoc;
27 import javax.transaction.TransactionManager JavaDoc;
28
29 import org.jboss.mq.DurableSubscriptionID;
30 import org.jboss.mq.SpyDestination;
31 import org.jboss.mq.SpyJMSException;
32 import org.jboss.mq.SpyMessage;
33 import org.jboss.mq.SpyTopic;
34 import org.jboss.mq.pm.CacheStore;
35 import org.jboss.mq.pm.NewPersistenceManager;
36 import org.jboss.mq.pm.Tx;
37 import org.jboss.mq.pm.TxManager;
38 import org.jboss.mq.server.JMSDestination;
39 import org.jboss.mq.server.JMSTopic;
40 import org.jboss.mq.server.MessageCache;
41 import org.jboss.mq.server.MessageReference;
42 import org.jboss.system.ServiceMBeanSupport;
43 import org.jboss.tm.TransactionManagerService;
44
45 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
46
47 /**
48  * This class manages all persistence related services for JDBC based
49  * persistence.
50  *
51  * @jmx:mbean extends="org.jboss.system.ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean, org.jboss.mq.pm.CacheStoreMBean"
52  *
53  * @author Jayesh Parayali (jayeshpk1@yahoo.com)
54  * @author Hiram Chirino (cojonudo14@hotmail.com)
55  * @author Adrian Brock (adrian@jboss.org)
56  *
57  * @version $Revision: 1.6.4.3 $
58  */

59 public class PersistenceManager
60    extends ServiceMBeanSupport
61    implements PersistenceManagerMBean, NewPersistenceManager, CacheStore, Runnable JavaDoc
62 {
63    // Constants --------------------------------------------------------------------
64

65    /** Message is an object */
66    static final int OBJECT_BLOB = 0;
67
68    /** Message is a byte array */
69    static final int BYTES_BLOB = 1;
70
71    /** Message is a binary stream */
72    static final int BINARYSTREAM_BLOB = 2;
73
74    /** Message is a blob */
75    static final int BLOB_BLOB = 3;
76
77    // Attributes -------------------------------------------------------------------
78

79    /** The next transaction id */
80    private SynchronizedLong nextTransactionId = new SynchronizedLong(0l);
81    
82    /** The jbossmq transaction manager */
83    private TxManager txManager;
84
85    /** The data source */
86    private DataSource JavaDoc datasource;
87
88    /** The jta transaction manager */
89    private TransactionManager JavaDoc tm;
90    
91    /** The object name of the connection manager */
92    private ObjectName JavaDoc connectionManagerName;
93    
94    /** The sql properties */
95    private Properties JavaDoc sqlProperties = new Properties JavaDoc();
96
97    String JavaDoc UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=?";
98    String JavaDoc UPDATE_MARKED_REFERENCES = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=?";
99    String JavaDoc UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
100    String JavaDoc UPDATE_MARKED_REFERENCES_WITH_TX = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
101    String JavaDoc DELETE_MARKED_MESSAGES_WITH_TX = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?";
102    String JavaDoc DELETE_MARKED_REFERENCES_WITH_TX = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?";
103    String JavaDoc DELETE_TX = "DELETE FROM JMS_TRANSACTION_LOG WHERE TXID = ?";
104    String JavaDoc DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID=? AND TXOP=?";
105    String JavaDoc DELETE_MARKED_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID=? AND TXOP=?";
106    String JavaDoc DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXOP='T'";
107    String JavaDoc DELETE_TEMPORARY_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXOP='T'";
108    String JavaDoc INSERT_TX = "INSERT INTO JMS_TRANSACTION_LOG (TXID) values(?)";
109    String JavaDoc SELECT_MAX_TX = "SELECT MAX(TXID) FROM JMS_TRANSACTION_LOG";
110    String JavaDoc SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE DESTINATION=?";
111    String JavaDoc SELECT_REFERENCES_IN_DEST = "SELECT R.MESSAGEID, M.MESSAGEBLOB, R.REDELIVERED, R.REDELIVERS FROM JMS_REFERENCE_LOG AS R, JMS_MESSAGE_LOG AS M" +
112                                       " WHERE R.MESSAGEID = M.MESSAGEID AND R.DESTINATION=?";
113    String JavaDoc SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
114    String JavaDoc INSERT_MESSAGE = "INSERT INTO JMS_MESSAGE_LOG (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP, LATECLONE) VALUES(?,?,?,?,?,?)";
115    String JavaDoc INSERT_REFERENCE = "INSERT INTO JMS_REFERENCE_LOG (MESSAGEID, DESTINATION, TXID, TXOP, REDELIVERED, REDELIVERS) VALUES(?,?,?,?,?,?)";
116    String JavaDoc MARK_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
117    String JavaDoc MARK_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
118    String JavaDoc DELETE_MESSAGE = "DELETE FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
119    String JavaDoc DELETE_REFERENCE = "DELETE FROM JMS_REFERENCE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
120    String JavaDoc UPDATE_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
121    String JavaDoc UPDATE_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET REDELIVERED=?, REDELIVERS=? WHERE MESSAGEID=? AND DESTINATION=?";
122    String JavaDoc DELETE_ORPHANED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE LATECLONE = '1' AND MESSAGEID NOT IN (SELECT MESSAGEID FROM JMS_REFERENCE_LOG)";
123    String JavaDoc DELETE_ALL_TXS = "DELETE FROM JMS_TRANSACTION_LOG";
124    String JavaDoc CREATE_REFERENCE_TABLE =
125       "CREATE TABLE JMS_REFERENCE_LOG ( MESSAGEID INTEGER NOT NULL, "
126          + "DESTINATION VARCHAR(256) NOT NULL, TXID INTEGER, TXOP CHAR(1), "
127          + "REDELIVERED CHAR(1), REDELIVERS INTEGER, "
128          + "PRIMARY KEY (MESSAGEID, DESTINATION) )";
129    String JavaDoc CREATE_MESSAGE_TABLE =
130       "CREATE TABLE JMS_MESSAGE_LOG ( MESSAGEID INTEGER NOT NULL, "
131          + "DESTINATION VARCHAR(256), TXID INTEGER, TXOP CHAR(1), LATECLONE CHAR(1), "
132          + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
133    String JavaDoc CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTION_LOG ( TXID INTEGER )";
134
135    /** The blob type */
136    int blobType = OBJECT_BLOB;
137
138    /** Whether to create tables */
139    boolean createTables = true;
140    
141    /** Number of retry attempts to connect to the db */
142    private int connectionRetryAttempts = 5;
143    
144    /** The garbage collection period in millis */
145    private long gcPeriod = 60000;
146
147    /** The background gc thread */
148    private Thread JavaDoc gcThread;
149
150    // Constructors -----------------------------------------------------------------
151

152    /**
153     * Create a new persistence manager
154     *
155     * @throws JMSException for any error
156     */

157    public PersistenceManager() throws javax.jms.JMSException JavaDoc
158    {
159       txManager = new TxManager(this);
160    }
161
162    // Public -----------------------------------------------------------------------
163

164    /**
165     * Retrieve the connection manager object name
166     *
167     * @jmx:managed-attribute
168     */

169    public ObjectName JavaDoc getConnectionManager()
170    {
171       return connectionManagerName;
172    }
173
174    /**
175     * Set the connection manager object name
176     *
177     * @jmx:managed-attribute
178     */

179    public void setConnectionManager(ObjectName JavaDoc connectionManagerName)
180    {
181       this.connectionManagerName = connectionManagerName;
182    }
183
184    /**
185     * Set the garbage collection period
186     *
187     * @jmx:managed-attribute
188     */

189    public int getGCPeriodSecs()
190    {
191       return (int) gcPeriod / 1000;
192    }
193
194    /**
195     * Set the garbage collection period in seconds
196     *
197     * @jmx:managed-attribute
198     */

199    public void setGCPeriodSecs(int gcPeriodSecs)
200    {
201       this.gcPeriod = gcPeriodSecs * 1000;
202    }
203    
204    /**
205     * Gets the ConnectionRetryAttempts.
206     *
207     * @jmx:managed-attribute
208     * @return the number of retry events
209     */

210    public int getConnectionRetryAttempts()
211    {
212       return this.connectionRetryAttempts;
213    }
214    
215    /**
216     * Sets the ConnectionRetryAttempts.
217     *
218     * @jmx:managed-attribute
219     * @param value the number of retry attempts
220     */

221    public void setConnectionRetryAttempts(int value)
222    {
223       this.connectionRetryAttempts = value;
224    }
225
226    /**
227     * Gets the sqlProperties.
228     *
229     * @jmx:managed-attribute
230     * @return Returns the Properties
231     */

232    public String JavaDoc getSqlProperties()
233    {
234       try
235       {
236          ByteArrayOutputStream JavaDoc boa = new ByteArrayOutputStream JavaDoc();
237          sqlProperties.store(boa, "");
238          return new String JavaDoc(boa.toByteArray());
239       }
240       catch (IOException JavaDoc shouldnothappen)
241       {
242          return "";
243       }
244    }
245
246    /**
247     * Sets the sqlProperties.
248     *
249     * @jmx:managed-attribute
250     * @param sqlProperties The sqlProperties to set
251     */

252    public void setSqlProperties(String JavaDoc value)
253    {
254       try
255       {
256          ByteArrayInputStream JavaDoc is = new ByteArrayInputStream JavaDoc(value.getBytes());
257          sqlProperties = new Properties JavaDoc();
258          sqlProperties.load(is);
259       }
260       catch (IOException JavaDoc shouldnothappen)
261       {
262       }
263    }
264
265    // PersistenceManager implementation --------------------------------------------
266

267    public Tx createPersistentTx() throws JMSException JavaDoc
268    {
269       Tx id = new Tx(nextTransactionId.increment());
270       TransactionManagerStrategy tms = new TransactionManagerStrategy();
271       tms.startTX();
272       Connection JavaDoc c = null;
273       PreparedStatement JavaDoc stmt = null;
274       boolean threadWasInterrupted = Thread.interrupted();
275       try
276       {
277          c = this.getConnection();
278          stmt = c.prepareStatement(INSERT_TX);
279          stmt.setLong(1, id.longValue());
280          stmt.executeUpdate();
281
282       }
283       catch (SQLException JavaDoc e)
284       {
285          tms.setRollbackOnly();
286          throw new SpyJMSException("Could not crate tx: " + id, e);
287       }
288       finally
289       {
290          try
291          {
292             stmt.close();
293          }
294          catch (Throwable JavaDoc ignore)
295          {
296          }
297          try
298          {
299             c.close();
300          }
301          catch (Throwable JavaDoc ignore)
302          {
303          }
304          tms.endTX();
305
306          // Restore the interrupted state of the thread
307
if( threadWasInterrupted )
308             Thread.currentThread().interrupt();
309       }
310
311       return id;
312    }
313
314    public void commitPersistentTx(Tx txId) throws JMSException JavaDoc
315    {
316       TransactionManagerStrategy tms = new TransactionManagerStrategy();
317       tms.startTX();
318       Connection JavaDoc c = null;
319       boolean threadWasInterrupted = Thread.interrupted();
320       try
321       {
322          c = this.getConnection();
323          removeMarkedMessages(c, txId, "D");
324          removeMarkedReferences(c, txId, "D");
325          removeTXRecord(c, txId.longValue());
326       }
327       catch (SQLException JavaDoc e)
328       {
329          tms.setRollbackOnly();
330          throw new SpyJMSException("Could not commit tx: " + txId, e);
331       }
332       finally
333       {
334          try
335          {
336             c.close();
337          }
338          catch (Throwable JavaDoc ignore)
339          {
340          }
341          tms.endTX();
342
343          // Restore the interrupted state of the thread
344
if( threadWasInterrupted )
345             Thread.currentThread().interrupt();
346       }
347    }
348
349    public void rollbackPersistentTx(Tx txId) throws JMSException JavaDoc
350    {
351
352       TransactionManagerStrategy tms = new TransactionManagerStrategy();
353       tms.startTX();
354       Connection JavaDoc c = null;
355       PreparedStatement JavaDoc stmt = null;
356       boolean threadWasInterrupted = Thread.interrupted();
357       try
358       {
359          c = this.getConnection();
360          removeMarkedMessages(c, txId, "A");
361          removeMarkedReferences(c, txId, "A");
362          removeTXRecord(c, txId.longValue());
363
364          // Restore all the messages that were logically removed.
365
stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX);
366          stmt.setNull(1, Types.BIGINT);
367          stmt.setString(2, "A");
368          stmt.setString(3, "D");
369          stmt.setLong(4, txId.longValue());
370          stmt.executeUpdate();
371          stmt.close();
372
373          // Restore all the references that were logically removed.
374
stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES_WITH_TX);
375          stmt.setNull(1, Types.BIGINT);
376          stmt.setString(2, "A");
377          stmt.setString(3, "D");
378          stmt.setLong(4, txId.longValue());
379          stmt.executeUpdate();
380          stmt.close();
381       }
382       catch (SQLException JavaDoc e)
383       {
384          tms.setRollbackOnly();
385          throw new SpyJMSException("Could not rollback tx: " + txId, e);
386       }
387       finally
388       {
389          try
390          {
391             if (stmt != null)
392                stmt.close();
393             if (c != null)
394                c.close();
395          }
396          catch (Throwable JavaDoc ignore)
397          {
398          }
399          tms.endTX();
400
401          // Restore the interrupted state of the thread
402
if( threadWasInterrupted )
403             Thread.currentThread().interrupt();
404       }
405    }
406
407    public void add(MessageReference messageRef, Tx txId) throws JMSException JavaDoc
408    {
409       boolean trace = log.isTraceEnabled();
410       if (trace)
411          log.trace("About to add message " + messageRef + " transaction=" + txId);
412
413       TransactionManagerStrategy tms = new TransactionManagerStrategy();
414       tms.startTX();
415       Connection JavaDoc c = null;
416       boolean threadWasInterrupted = Thread.interrupted();
417       try
418       {
419          c = this.getConnection();
420          // Synchronize on the message to avoid a race with the softener
421
synchronized(messageRef)
422          {
423             if (trace)
424                log.trace("Inserting message " + messageRef + " transaction=" + txId);
425
426             if (messageRef.isLateClone())
427             {
428                addReference(c, messageRef.getPersistentKey(), messageRef, txId, "A");
429             }
430             else
431             {
432                SpyMessage message = messageRef.getMessage();
433                addMessage(c, messageRef.getPersistentKey(), message, txId, "A", "0");
434             }
435             messageRef.setStored(MessageReference.STORED);
436
437             if (trace)
438                log.trace("Added message " + messageRef + " transaction=" + txId);
439          }
440       }
441       catch (IOException JavaDoc e)
442       {
443          tms.setRollbackOnly();
444          throw new SpyJMSException("Could not store message: " + messageRef, e);
445       }
446       catch (SQLException JavaDoc e)
447       {
448          tms.setRollbackOnly();
449          throw new SpyJMSException("Could not store message: " + messageRef, e);
450       }
451       finally
452       {
453          try
454          {
455             c.close();
456          }
457          catch (Throwable JavaDoc ignore)
458          {
459          }
460          tms.endTX();
461
462          // Restore the interrupted state of the thread
463
if( threadWasInterrupted )
464             Thread.currentThread().interrupt();
465       }
466    }
467
468    public void update(MessageReference messageRef, Tx txId) throws JMSException JavaDoc
469    {
470       boolean trace = log.isTraceEnabled();
471       if (trace)
472          log.trace("Updating message " + messageRef + " transaction=" + txId);
473
474       TransactionManagerStrategy tms = new TransactionManagerStrategy();
475       tms.startTX();
476       Connection JavaDoc c = null;
477       PreparedStatement JavaDoc stmt = null;
478       boolean threadWasInterrupted = Thread.interrupted();
479       try
480       {
481          c = this.getConnection();
482          if (txId == null)
483          {
484             if (messageRef.isLateClone())
485             {
486                stmt = c.prepareStatement(UPDATE_REFERENCE);
487                if (messageRef.redelivered)
488                   stmt.setString(1, "1");
489                else
490                   stmt.setString(1, "0");
491                stmt.setLong(2, messageRef.redeliveryCount);
492                stmt.setLong(3, messageRef.messageId);
493                stmt.setString(4, messageRef.getPersistentKey());
494             }
495             else
496             {
497                stmt = c.prepareStatement(UPDATE_MESSAGE);
498                setBlob(stmt, 1, messageRef.getMessage());
499                stmt.setLong(2, messageRef.messageId);
500                stmt.setString(3, messageRef.getPersistentKey());
501             }
502             int rc = stmt.executeUpdate();
503             if( rc != 1 )
504                throw new SpyJMSException("Could not update the message in the database: update affected "+rc+" rows");
505          }
506          else
507          {
508             throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used");
509          }
510          if (trace)
511             log.trace("Updated message " + messageRef + " transaction=" + txId);
512
513       }
514       catch (IOException JavaDoc e)
515       {
516          tms.setRollbackOnly();
517          throw new SpyJMSException("Could not update message: " + messageRef, e);
518       }
519       catch (SQLException JavaDoc e)
520       {
521          tms.setRollbackOnly();
522          throw new SpyJMSException("Could not update message: " + messageRef, e);
523       }
524       finally
525       {
526          try
527          {
528             stmt.close();
529          }
530          catch (Throwable JavaDoc ignore)
531          {
532          }
533          try
534          {
535             c.close();
536          }
537          catch (Throwable JavaDoc ignore)
538          {
539          }
540          tms.endTX();
541
542          // Restore the interrupted state of the thread
543
if( threadWasInterrupted )
544             Thread.currentThread().interrupt();
545       }
546    }
547
548    public void remove(MessageReference messageRef, Tx txId) throws JMSException JavaDoc
549    {
550       boolean trace = log.isTraceEnabled();
551       if (trace)
552          log.trace("Removing message " + messageRef + " transaction=" + txId);
553
554       TransactionManagerStrategy tms = new TransactionManagerStrategy();
555       tms.startTX();
556       Connection JavaDoc c = null;
557       PreparedStatement JavaDoc stmt = null;
558       boolean threadWasInterrupted = Thread.interrupted();
559       try
560       {
561          c = this.getConnection();
562          // Synchronize on the message to avoid a race with the softener
563
synchronized(messageRef)
564          {
565             if (txId == null)
566             {
567                if (messageRef.isLateClone())
568                   stmt = c.prepareStatement(DELETE_REFERENCE);
569                else
570                   stmt = c.prepareStatement(DELETE_MESSAGE);
571                stmt.setLong(1, messageRef.messageId);
572                stmt.setString(2, messageRef.getPersistentKey());
573                int rc = stmt.executeUpdate();
574                if( rc != 1 )
575                   throw new SpyJMSException("Could not delete the message from the database: delete affected "+rc+" rows");
576
577                // Adrian Brock:
578
// Remove the message from the cache, but don't
579
// return it to the pool just yet. The queue still holds
580
// a reference to the message and will return it
581
// to the pool once it gets enough time slice.
582
// The alternative is to remove the validation
583
// for double removal from the cache,
584
// which I don't want to do because it is useful
585
// for spotting errors
586
messageRef.setStored(MessageReference.NOT_STORED);
587                messageRef.removeDelayed();
588             }
589             else
590             {
591                if (messageRef.isLateClone())
592                {
593                   stmt = c.prepareStatement(MARK_REFERENCE);
594                   stmt.setLong(1, txId.longValue());
595                   stmt.setString(2, "D");
596                   stmt.setLong(3, messageRef.messageId);
597                   stmt.setString(4, messageRef.getPersistentKey());
598                }
599                else
600                {
601                   stmt = c.prepareStatement(MARK_MESSAGE);
602                   stmt.setLong(1, txId.longValue());
603                   stmt.setString(2, "D");
604                   stmt.setLong(3, messageRef.messageId);
605                   stmt.setString(4, messageRef.getPersistentKey());
606                }
607                int rc = stmt.executeUpdate();
608                if( rc != 1 )
609                   throw new SpyJMSException("Could not mark the message as deleted in the database: update affected "+rc+" rows");
610             }
611             if (trace)
612                log.trace("Removed message " + messageRef + " transaction=" + txId);
613          }
614       }
615       catch (SQLException JavaDoc e)
616       {
617          tms.setRollbackOnly();
618          throw new SpyJMSException("Could not remove message: " + messageRef, e);
619       }
620       finally
621       {
622          try
623          {
624             stmt.close();
625          }
626          catch (Throwable JavaDoc ignore)
627          {
628          }
629          try
630          {
631             c.close();
632          }
633          catch (Throwable JavaDoc ignore)
634          {
635          }
636          tms.endTX();
637
638          // Restore the interrupted state of the thread
639
if( threadWasInterrupted )
640             Thread.currentThread().interrupt();
641       }
642    }
643
644    public synchronized void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws javax.jms.JMSException JavaDoc
645    {
646       if (jmsDest == null)
647          throw new IllegalArgumentException JavaDoc("Must supply non null JMSDestination to restoreQueue");
648       if (dest == null)
649          throw new IllegalArgumentException JavaDoc("Must supply non null SpyDestination to restoreQueue");
650
651       TransactionManagerStrategy tms = new TransactionManagerStrategy();
652       tms.startTX();
653       Connection JavaDoc c = null;
654       PreparedStatement JavaDoc stmt = null;
655       ResultSet JavaDoc rs = null;
656       boolean threadWasInterrupted = Thread.interrupted();
657       try
658       {
659          c = this.getConnection();
660          int counter=0;
661          if (jmsDest.parameters.lateClone)
662          {
663             JMSTopic topic = (JMSTopic) jmsDest;
664             // The durable subscription is not serialized
665
DurableSubscriptionID id = ((SpyTopic) dest).getDurableSubscriptionID();
666
667             stmt = c.prepareStatement(SELECT_REFERENCES_IN_DEST);
668             stmt.setString(1, dest.toString());
669
670             rs = stmt.executeQuery();
671             while (rs.next())
672             {
673                SpyMessage message = extractMessage(rs, 2);
674                boolean redelivered = false;
675                if (rs.getString(3).equals("1"))
676                   redelivered = true;
677                message.header.jmsRedelivered = redelivered;
678                message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer JavaDoc(rs.getInt(4)));
679                topic.restoreMessage(message, id);
680                counter++;
681             }
682          }
683          else
684          {
685             stmt = c.prepareStatement(SELECT_MESSAGES_IN_DEST);
686             stmt.setString(1, dest.toString());
687
688             rs = stmt.executeQuery();
689             while (rs.next())
690             {
691                SpyMessage message = extractMessage(rs, 2);
692                // The durable subscription is not serialized
693
if (dest instanceof SpyTopic)
694                   message.header.durableSubscriberID = ((SpyTopic)dest).getDurableSubscriptionID();
695                jmsDest.restoreMessage(message);
696                counter++;
697             }
698          }
699          
700          log.debug("Restored "+counter+" message(s) to: "+dest);
701       }
702       catch (IOException JavaDoc e)
703       {
704          tms.setRollbackOnly();
705          throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
706       }
707       catch (SQLException JavaDoc e)
708       {
709          tms.setRollbackOnly();
710          throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
711       }
712       finally
713       {
714          try
715          {
716             rs.close();
717          }
718          catch (Throwable JavaDoc ignore)
719          {
720          }
721          try
722          {
723             stmt.close();
724          }
725          catch (Throwable JavaDoc ignore)
726          {
727          }
728          try
729          {
730             c.close();
731          }
732          catch (Throwable JavaDoc ignore)
733          {
734          }
735          tms.endTX();
736
737          // Restore the interrupted state of the thread
738
if( threadWasInterrupted )
739             Thread.currentThread().interrupt();
740       }
741
742    }
743
744    public TxManager getTxManager()
745    {
746       return txManager;
747    }
748
749    public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException JavaDoc
750    {
751       // Nothing to clean up, all the state is in the db.
752
}
753
754    /**
755     * Unsupported operation
756     */

757    public MessageCache getMessageCacheInstance()
758    {
759       throw new UnsupportedOperationException JavaDoc("This is now set on the destination manager");
760    }
761
762    // NewPersistenceManager implementation -----------------------------------------
763

764    public void addMessage(SpyMessage message) throws JMSException JavaDoc
765    {
766
767       TransactionManagerStrategy tms = new TransactionManagerStrategy();
768       tms.startTX();
769       Connection JavaDoc c = null;
770       boolean threadWasInterrupted = Thread.interrupted();
771       try
772       {
773          c = datasource.getConnection();
774          addMessage(c, "*", message, null, null, "1");
775       }
776       catch (IOException JavaDoc e)
777       {
778          tms.setRollbackOnly();
779          throw new SpyJMSException("Could not add message:", e);
780       }
781       catch (SQLException JavaDoc e)
782       {
783          tms.setRollbackOnly();
784          throw new SpyJMSException("Could not add message:", e);
785       }
786       finally
787       {
788          try
789          {
790             if (c != null)
791                c.close();
792          }
793          catch (Throwable JavaDoc ignore)
794          {
795          }
796          tms.endTX();
797
798          // Restore the interrupted state of the thread
799
if( threadWasInterrupted )
800             Thread.currentThread().interrupt();
801       }
802    }
803
804    // PersistenceManagerMBean implementation ---------------------------------------
805

806    public Object JavaDoc getInstance()
807    {
808       return this;
809    }
810
811    /**
812     * Unsupported operation
813     */

814    public ObjectName JavaDoc getMessageCache()
815    {
816       throw new UnsupportedOperationException JavaDoc("This is now set on the destination manager");
817    }
818
819    /**
820     * Unsupported operation
821     */

822    public void setMessageCache(ObjectName JavaDoc messageCache)
823    {
824       throw new UnsupportedOperationException JavaDoc("This is now set on the destination manager");
825    }
826
827    // CacheStore implementation ----------------------------------------------------
828

829    public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException JavaDoc
830    {
831       if (log.isTraceEnabled())
832          log.trace("Loading message from storage " + messageRef);
833
834       TransactionManagerStrategy tms = new TransactionManagerStrategy();
835       tms.startTX();
836       Connection JavaDoc c = null;
837       PreparedStatement JavaDoc stmt = null;
838       ResultSet JavaDoc rs = null;
839       boolean threadWasInterrupted = Thread.interrupted();
840       try
841       {
842          c = this.getConnection();
843          stmt = c.prepareStatement(SELECT_MESSAGE);
844          stmt.setLong(1, messageRef.messageId);
845          if (messageRef.isLateClone())
846             stmt.setString(2, "*");
847          else
848             stmt.setString(2, messageRef.getPersistentKey());
849
850          rs = stmt.executeQuery();
851          if (rs.next())
852             return extractMessage(rs, 2);
853
854          return null;
855
856       }
857       catch (IOException JavaDoc e)
858       {
859          tms.setRollbackOnly();
860          throw new SpyJMSException("Could not load message : " + messageRef, e);
861       }
862       catch (SQLException JavaDoc e)
863       {
864          tms.setRollbackOnly();
865          throw new SpyJMSException("Could not load message : " + messageRef, e);
866       }
867       finally
868       {
869          try
870          {
871             rs.close();
872          }
873          catch (Throwable JavaDoc ignore)
874          {
875          }
876          try
877          {
878             stmt.close();
879          }
880          catch (Throwable JavaDoc ignore)
881          {
882          }
883          try
884          {
885             c.close();
886          }
887          catch (Throwable JavaDoc ignore)
888          {
889          }
890          tms.endTX();
891
892          // Restore the interrupted state of the thread
893
if( threadWasInterrupted )
894             Thread.currentThread().interrupt();
895       }
896    }
897
898    public void removeFromStorage(MessageReference messageRef) throws JMSException JavaDoc
899    {
900       // We don't remove persistent messages sent to persistent queues
901
if (messageRef.isPersistent())
902          return;
903
904       boolean trace = log.isTraceEnabled();
905       if (trace)
906          log.trace("Removing message from storage " + messageRef);
907       
908       TransactionManagerStrategy tms = new TransactionManagerStrategy();
909       tms.startTX();
910       Connection JavaDoc c = null;
911       PreparedStatement JavaDoc stmt = null;
912       boolean threadWasInterrupted = Thread.interrupted();
913       try
914       {
915          c = this.getConnection();
916          if (messageRef.isLateClone())
917          {
918             stmt = c.prepareStatement(DELETE_REFERENCE);
919             stmt.setLong(1, messageRef.messageId);
920             stmt.setString(2, messageRef.getPersistentKey());
921             stmt.executeUpdate();
922             messageRef.setStored(MessageReference.NOT_STORED);
923          }
924          else
925          {
926             stmt = c.prepareStatement(DELETE_MESSAGE);
927             stmt.setLong(1, messageRef.messageId);
928             stmt.setString(2, messageRef.getPersistentKey());
929             stmt.executeUpdate();
930             messageRef.setStored(MessageReference.NOT_STORED);
931          }
932
933          if (trace)
934             log.trace("Removed message from storage " + messageRef);
935       }
936       catch (SQLException JavaDoc e)
937       {
938          tms.setRollbackOnly();
939          throw new SpyJMSException("Could not remove message: " + messageRef, e);
940       }
941       finally
942       {
943          try
944          {
945             stmt.close();
946          }
947          catch (Throwable JavaDoc ignore)
948          {
949          }
950          try
951          {
952             c.close();
953          }
954          catch (Throwable JavaDoc ignore)
955          {
956          }
957          tms.endTX();
958
959          // Restore the interrupted state of the thread
960
if( threadWasInterrupted )
961             Thread.currentThread().interrupt();
962       }
963    }
964
965    public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException JavaDoc
966    {
967       // Ignore save operations for persistent messages sent to persistent queues
968
// The queues handle the persistence
969
if (messageRef.isPersistent())
970          return;
971
972       boolean trace = log.isTraceEnabled();
973       if (trace)
974          log.trace("Saving message to storage " + messageRef);
975       
976       TransactionManagerStrategy tms = new TransactionManagerStrategy();
977       tms.startTX();
978       Connection JavaDoc c = null;
979       boolean threadWasInterrupted = Thread.interrupted();
980       try
981       {
982          c = this.getConnection();
983          if (messageRef.isLateClone())
984          {
985             addReference(c, messageRef.getPersistentKey(), messageRef, null, "T");
986             try
987             {
988                addMessage(c, "*", message, null, "T", "1");
989             }
990             catch (SQLException JavaDoc e)
991             {
992                log.trace("TODO: Check this is really a duplicate", e);
993             }
994          }
995          else
996          {
997             addMessage(c, messageRef.getPersistentKey(), message, null, "T", "0");
998          }
999          messageRef.setStored(MessageReference.STORED);
1000
1001         if (trace)
1002            log.trace("Saved message to storage " + messageRef);
1003      }
1004      catch (IOException JavaDoc e)
1005      {
1006         tms.setRollbackOnly();
1007         throw new SpyJMSException("Could not store message: " + messageRef, e);
1008      }
1009      catch (SQLException JavaDoc e)
1010      {
1011         tms.setRollbackOnly();
1012         throw new SpyJMSException("Could not store message: " + messageRef, e);
1013      }
1014      finally
1015      {
1016         try
1017         {
1018            c.close();
1019         }
1020         catch (Throwable JavaDoc ignore)
1021         {
1022         }
1023         tms.endTX();
1024
1025         // Restore the interrupted state of the thread
1026
if( threadWasInterrupted )
1027            Thread.currentThread().interrupt();
1028      }
1029   }
1030
1031   // Runnable implementation ------------------------------------------------------
1032

1033   public void run()
1034   {
1035      Thread JavaDoc current = Thread.currentThread();
1036      while (gcThread == current)
1037      {
1038         try
1039         {
1040            Thread.sleep(gcPeriod);
1041            if (gcThread != current)
1042               return;
1043               
1044            Connection JavaDoc connection = datasource.getConnection();
1045            try
1046            {
1047               PreparedStatement JavaDoc stmt = connection.prepareStatement(DELETE_ORPHANED_MESSAGES);
1048               try
1049               {
1050                  stmt.executeUpdate();
1051               }
1052               finally
1053               {
1054                  try
1055                  {
1056                     stmt.close();
1057                  }
1058                  catch (SQLException JavaDoc ignored)
1059                  {
1060                     log.trace("Error closing statement", ignored);
1061                  }
1062               }
1063            }
1064            finally
1065            {
1066               try
1067               {
1068                  connection.close();
1069               }
1070               catch (SQLException JavaDoc ignored)
1071               {
1072                  log.trace("Error closing connection", ignored);
1073               }
1074            }
1075         }
1076         catch (InterruptedException JavaDoc ignored)
1077         {
1078         }
1079         catch (Throwable JavaDoc t)
1080         {
1081            log.warn("Unhandled throwable in gc thread:", t);
1082         }
1083      }
1084   }
1085
1086   // ServerMBeanSupport overrides -------------------------------------------------
1087

1088   protected void startService() throws Exception JavaDoc
1089   {
1090      UPDATE_MARKED_MESSAGES = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES);
1091      UPDATE_MARKED_REFERENCES = sqlProperties.getProperty("UPDATE_MARKED_REFERENCES", UPDATE_MARKED_REFERENCES);
1092      UPDATE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", UPDATE_MARKED_MESSAGES_WITH_TX);
1093      UPDATE_MARKED_REFERENCES_WITH_TX = sqlProperties.getProperty("UPDATE_MARKED_REFERENCES_WITH_TX", UPDATE_MARKED_REFERENCES_WITH_TX);
1094      DELETE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", DELETE_MARKED_MESSAGES_WITH_TX);
1095      DELETE_MARKED_REFERENCES_WITH_TX = sqlProperties.getProperty("DELETE_MARKED_REFERENCES_WITH_TX", DELETE_MARKED_REFERENCES_WITH_TX);
1096      DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX);
1097      DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES);
1098      DELETE_MARKED_REFERENCES = sqlProperties.getProperty("DELETE_MARKED_REFERENCES", DELETE_MARKED_REFERENCES);
1099      DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES);
1100      DELETE_TEMPORARY_REFERENCES = sqlProperties.getProperty("DELETE_TEMPORARY_REFERENCES", DELETE_TEMPORARY_REFERENCES);
1101      INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX);
1102      SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX);
1103      SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST);
1104      SELECT_REFERENCES_IN_DEST = sqlProperties.getProperty("SELECT_REFERENCES_IN_DEST", SELECT_REFERENCES_IN_DEST);
1105      SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE);
1106      INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE);
1107      INSERT_REFERENCE = sqlProperties.getProperty("INSERT_REFERENCE", INSERT_REFERENCE);
1108      MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE);
1109      MARK_REFERENCE = sqlProperties.getProperty("MARK_REFERENCE", MARK_REFERENCE);
1110      DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE);
1111      DELETE_REFERENCE = sqlProperties.getProperty("DELETE_REFERENCE", DELETE_REFERENCE);
1112      UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE", UPDATE_MESSAGE);
1113      UPDATE_REFERENCE = sqlProperties.getProperty("UPDATE_REFERENCE", UPDATE_REFERENCE);
1114      DELETE_ORPHANED_MESSAGES = sqlProperties.getProperty("DELETE_ORPHANED_MESSAGES", DELETE_ORPHANED_MESSAGES);
1115      DELETE_ALL_TXS = sqlProperties.getProperty("DELETE_ALL_TXS", DELETE_ALL_TXS);
1116      CREATE_REFERENCE_TABLE = sqlProperties.getProperty("CREATE_REFERENCE_TABLE", CREATE_REFERENCE_TABLE);
1117      CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE);
1118      CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE);
1119      createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true");
1120      String JavaDoc s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB");
1121
1122      if (s.equals("OBJECT_BLOB"))
1123         blobType = OBJECT_BLOB;
1124      else if (s.equals("BYTES_BLOB"))
1125         blobType = BYTES_BLOB;
1126      else if (s.equals("BINARYSTREAM_BLOB"))
1127         blobType = BINARYSTREAM_BLOB;
1128      else if (s.equals("BLOB_BLOB"))
1129         blobType = BLOB_BLOB;
1130
1131      //Find the ConnectionFactoryLoader MBean so we can find the datasource
1132
String JavaDoc dsName = (String JavaDoc) getServer().getAttribute(connectionManagerName, "BindName");
1133
1134      //Get an InitialContext
1135
InitialContext JavaDoc ctx = new InitialContext JavaDoc();
1136      datasource = (DataSource JavaDoc) ctx.lookup(dsName);
1137
1138      //Get the Transaction Manager so we can control the jdbc tx
1139
tm = (TransactionManager JavaDoc) ctx.lookup(TransactionManagerService.JNDI_NAME);
1140
1141      log.debug("Resolving uncommited TXS");
1142      resolveAllUncommitedTXs();
1143      
1144      gcThread = new Thread JavaDoc(this, "JBossMQ persistent message garbage collection");
1145      gcThread.setDaemon(true);
1146      gcThread.start();
1147   }
1148
1149   protected void stopService() throws Exception JavaDoc
1150   {
1151      if (gcThread != null)
1152         gcThread.interrupt();
1153      gcThread = null;
1154   }
1155
1156   // Protected --------------------------------------------------------------------
1157

1158   /**
1159    * Resolve uncommitted transactions
1160    *
1161    * @throws JMSException for any error
1162    */

1163   protected synchronized void resolveAllUncommitedTXs() throws JMSException JavaDoc
1164   {
1165      TransactionManagerStrategy tms = new TransactionManagerStrategy();
1166      tms.startTX();
1167      Connection JavaDoc c = null;
1168      PreparedStatement JavaDoc stmt = null;
1169      ResultSet JavaDoc rs = null;
1170      boolean threadWasInterrupted = Thread.interrupted();
1171      try
1172      {
1173         if (createTables)
1174         {
1175            c = this.getConnection();
1176
1177            try
1178            {
1179               stmt = c.prepareStatement(CREATE_REFERENCE_TABLE);
1180               stmt.executeUpdate();
1181            }
1182            catch (SQLException JavaDoc e)
1183            {
1184               log.debug("Could not create table with SQL: " + CREATE_REFERENCE_TABLE + ", got : " + e);
1185            }
1186            finally
1187            {
1188               try
1189               {
1190                  if (stmt != null)
1191                     stmt.close();
1192               }
1193               catch (Throwable JavaDoc ignored)
1194               {
1195                  log.trace("Ignored: " + ignored);
1196               }
1197               stmt = null;
1198            }
1199
1200            try
1201            {
1202               stmt = c.prepareStatement(CREATE_MESSAGE_TABLE);
1203               stmt.executeUpdate();
1204            }
1205            catch (SQLException JavaDoc e)
1206            {
1207               log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE + ", got : " + e);
1208            }
1209            finally
1210            {
1211               try
1212               {
1213                  if (stmt != null)
1214                     stmt.close();
1215               }
1216               catch (Throwable JavaDoc ignored)
1217               {
1218                  log.trace("Ignored: " + ignored);
1219               }
1220               stmt = null;
1221            }
1222
1223            try
1224            {
1225               stmt = c.prepareStatement(CREATE_TX_TABLE);
1226               stmt.executeUpdate();
1227            }
1228            catch (SQLException JavaDoc e)
1229            {
1230               log.debug("Could not create table with SQL: " + CREATE_TX_TABLE + ", got : " + e);
1231            }
1232            finally
1233            {
1234               try
1235               {
1236                  if (stmt != null)
1237                     stmt.close();
1238               }
1239               catch (Throwable JavaDoc ignored)
1240               {
1241                  log.trace("Ignored: " + ignored);
1242               }
1243               stmt = null;
1244            }
1245         }
1246      }
1247      catch (SQLException JavaDoc e)
1248      {
1249         tms.setRollbackOnly();
1250         throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e);
1251      }
1252      finally
1253      {
1254         try
1255         {
1256            if (stmt != null)
1257               stmt.close();
1258         }
1259         catch (Throwable JavaDoc ignore)
1260         {
1261         }
1262         stmt = null;
1263         try
1264         {
1265            c.close();
1266         }
1267         catch (Throwable JavaDoc ignore)
1268         {
1269         }
1270         c = null;
1271         tms.endTX();
1272
1273         // Restore the interrupted state of the thread
1274
if( threadWasInterrupted )
1275            Thread.currentThread().interrupt();
1276      }
1277
1278      // We perform recovery in a different thread to the table creation
1279
// Postgres doesn't like create table failing in the same transaction
1280
// as other operations
1281

1282      tms = new TransactionManagerStrategy();
1283      tms.startTX();
1284      threadWasInterrupted = Thread.interrupted();
1285      try
1286      {
1287         c = this.getConnection();
1288
1289         // Delete the temporary messages.
1290
stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES);
1291         stmt.executeUpdate();
1292         stmt.close();
1293
1294         // Delete all the messages that were added but their tx's were not commited.
1295
stmt = c.prepareStatement(DELETE_MARKED_MESSAGES_WITH_TX);
1296         stmt.setString(1, "A");
1297         stmt.executeUpdate();
1298         stmt.close();
1299
1300         // Restore all the messages that were removed but their tx's were not commited.
1301
stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES);
1302         stmt.setNull(1, Types.BIGINT);
1303         stmt.setString(2, "A");
1304         stmt.setString(3, "D");
1305         stmt.executeUpdate();
1306         stmt.close();
1307
1308         // Delete the temporary references.
1309
stmt = c.prepareStatement(DELETE_TEMPORARY_REFERENCES);
1310         stmt.executeUpdate();
1311         stmt.close();
1312
1313         // Delete all the references that were added but their tx's were not commited.
1314
stmt = c.prepareStatement(DELETE_MARKED_REFERENCES_WITH_TX);
1315         stmt.setString(1, "A");
1316         stmt.executeUpdate();
1317         stmt.close();
1318
1319         // Restore all the references that were removed but their tx's were not commited.
1320
stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES);
1321         stmt.setNull(1, Types.BIGINT);
1322         stmt.setString(2, "A");
1323         stmt.setString(3, "D");
1324         stmt.executeUpdate();
1325         stmt.close();
1326         
1327         // Remove orphaned messages
1328
stmt = c.prepareStatement(DELETE_ORPHANED_MESSAGES);
1329         stmt.executeUpdate();
1330         stmt.close();
1331
1332         // Find out what the next TXID should be
1333
stmt = c.prepareStatement(SELECT_MAX_TX);
1334         rs = stmt.executeQuery();
1335         if (rs.next())
1336            nextTransactionId.set(rs.getLong(1) + 1);
1337         rs.close();
1338         stmt.close();
1339            
1340         // Delete all transactions.
1341
stmt = c.prepareStatement(DELETE_ALL_TXS);
1342         stmt.executeUpdate();
1343         stmt.close();
1344      }
1345      catch (SQLException JavaDoc e)
1346      {
1347         tms.setRollbackOnly();
1348         throw new SpyJMSException("Could not resolve uncommited transactions. Message recovery may not be accurate", e);
1349      }
1350      finally
1351      {
1352         try
1353         {
1354            rs.close();
1355         }
1356         catch (Throwable JavaDoc ignore)
1357         {
1358         }
1359         try
1360         {
1361            stmt.close();
1362         }
1363         catch (Throwable JavaDoc ignore)
1364         {
1365         }
1366         try
1367         {
1368            c.close();
1369         }
1370         catch (Throwable JavaDoc ignore)
1371         {
1372         }
1373         tms.endTX();
1374
1375         // Restore the interrupted state of the thread
1376
if( threadWasInterrupted )
1377            Thread.currentThread().interrupt();
1378      }
1379   }
1380
1381   /**
1382    * Remove a transaction record
1383    *
1384    * @param c the connection
1385    * @param txid the transaction
1386    * @throws SQLException for any error
1387    */

1388   protected void removeTXRecord(Connection JavaDoc c, long txid) throws SQLException JavaDoc
1389   {
1390      PreparedStatement JavaDoc stmt = null;
1391      try
1392      {
1393         stmt = c.prepareStatement(DELETE_TX);
1394         stmt.setLong(1, txid);
1395         stmt.executeUpdate();
1396      }
1397      finally
1398      {
1399         try
1400         {
1401            stmt.close();
1402         }
1403         catch (Throwable JavaDoc e)
1404         {
1405         }
1406      }
1407   }
1408
1409   /**
1410    * Add a message
1411    *
1412    * @param c the connection
1413    * @param queue the queue name
1414    * @param message the message
1415    * @param txid the transaction id
1416    * @param mark the mark to set for the message
1417    * @throws SQLException for an error in the db
1418    * @throws IOException for an error serializing the message
1419    */

1420   protected void addMessage(Connection JavaDoc c, String JavaDoc queue, SpyMessage message, Tx txId, String JavaDoc mark, String JavaDoc lateClone)
1421      throws SQLException JavaDoc, IOException JavaDoc
1422   {
1423      PreparedStatement JavaDoc stmt = null;
1424      try
1425      {
1426         stmt = c.prepareStatement(INSERT_MESSAGE);
1427
1428         stmt.setLong(1, message.header.messageId);
1429         String JavaDoc dest = "*";
1430         if (queue != null)
1431            dest = queue;
1432         stmt.setString(2, dest);
1433         setBlob(stmt, 3, message);
1434         if (txId != null)
1435            stmt.setLong(4, txId.longValue());
1436         else
1437            stmt.setNull(4, Types.BIGINT);
1438         if (mark == null)
1439            stmt.setNull(5, Types.VARCHAR);
1440         else
1441            stmt.setString(5, mark);
1442         stmt.setString(6, lateClone);
1443
1444         try
1445         {
1446            stmt.executeUpdate();
1447         }
1448         catch (SQLException JavaDoc e)
1449         {
1450            if (lateClone.equals("1"))
1451               log.trace("Assumed already added to message log: " + message.header.messageId);
1452            else
1453               throw e;
1454         }
1455      }
1456      finally
1457      {
1458         try
1459         {
1460            stmt.close();
1461         }
1462         catch (Throwable JavaDoc ignore)
1463         {
1464         }
1465      }
1466   }
1467
1468   /**
1469    * Add a reference
1470    *
1471    * @param c the connection
1472    * @param queue the queue name
1473    * @param message the reference
1474    * @param txid the transaction id
1475    * @param mark the mark to set for the message
1476    * @throws SQLException for an error in the db
1477    * @throws IOException for an error serializing the message
1478    */

1479   protected void addReference(Connection JavaDoc c, String JavaDoc queue, MessageReference message, Tx txId, String JavaDoc mark)
1480      throws SQLException JavaDoc, IOException JavaDoc
1481   {
1482      PreparedStatement JavaDoc stmt = null;
1483      try
1484      {
1485         stmt = c.prepareStatement(INSERT_REFERENCE);
1486
1487         stmt.setLong(1, message.messageId);
1488         stmt.setString(2, queue);
1489
1490         if (txId != null)
1491            stmt.setLong(3, txId.longValue());
1492         else
1493            stmt.setNull(3, Types.BIGINT);
1494         stmt.setString(4, mark);
1495         if (message.redelivered)
1496            stmt.setString(5, "1");
1497         else
1498            stmt.setString(5, "0");
1499         stmt.setLong(6, message.redeliveryCount);
1500
1501         stmt.executeUpdate();
1502      }
1503      finally
1504      {
1505         try
1506         {
1507            stmt.close();
1508         }
1509         catch (Throwable JavaDoc ignore)
1510         {
1511         }
1512      }
1513   }
1514
1515   /**
1516    * Remove messages for a given transaction and mark
1517    *
1518    * @param c the connection
1519    * @param txid the transaction id
1520    * @param mark the mark
1521    * @throws SQLException for any error
1522    */

1523   protected void removeMarkedMessages(Connection JavaDoc c, Tx txid, String JavaDoc mark) throws SQLException JavaDoc
1524   {
1525      PreparedStatement JavaDoc stmt = null;
1526      try
1527      {
1528         stmt = c.prepareStatement(DELETE_MARKED_MESSAGES);
1529         stmt.setLong(1, txid.longValue());
1530         stmt.setString(2, mark);
1531         stmt.executeUpdate();
1532      }
1533      finally
1534      {
1535         try
1536         {
1537            stmt.close();
1538         }
1539         catch (Throwable JavaDoc e)
1540         {
1541         }
1542      }
1543   }
1544
1545   /**
1546    * Remove references for a given transaction and mark
1547    *
1548    * @param c the connection
1549    * @param txid the transaction id
1550    * @param mark the mark
1551    * @throws SQLException for any error
1552    */

1553   protected void removeMarkedReferences(Connection JavaDoc c, Tx txid, String JavaDoc mark) throws SQLException JavaDoc
1554   {
1555      PreparedStatement JavaDoc stmt = null;
1556      try
1557      {
1558         stmt = c.prepareStatement(DELETE_MARKED_REFERENCES);
1559         if (txid != null)
1560            stmt.setLong(1, txid.longValue());
1561         else
1562            stmt.setNull(1, Types.BIGINT);
1563         stmt.setString(2, mark);
1564         stmt.executeUpdate();
1565      }
1566      finally
1567      {
1568         try
1569         {
1570            stmt.close();
1571         }
1572         catch (Throwable JavaDoc e)
1573         {
1574         }
1575      }
1576   }
1577
1578   /**
1579    * Store the message in a blob
1580    *
1581    * @param stmt the prepared statement
1582    * @param column the column in the prepared statement
1583    * @param message the message
1584    * @param IOException for an error serializing the message
1585    * @param SQLException for an error accessing the db
1586    */

1587   protected void setBlob(PreparedStatement JavaDoc stmt, int column, SpyMessage message)
1588      throws IOException JavaDoc, SQLException JavaDoc
1589   {
1590      if (blobType == OBJECT_BLOB)
1591         stmt.setObject(column, message);
1592      else if (blobType == BYTES_BLOB)
1593      {
1594         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
1595         ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(baos);
1596         SpyMessage.writeMessage(message,oos);
1597         oos.flush();
1598         byte[] messageAsBytes = baos.toByteArray();
1599         stmt.setBytes(column, messageAsBytes);
1600      }
1601      else if (blobType == BINARYSTREAM_BLOB)
1602      {
1603         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
1604         ObjectOutputStream JavaDoc oos = new ObjectOutputStream JavaDoc(baos);
1605         SpyMessage.writeMessage(message,oos);
1606         oos.flush();
1607         byte[] messageAsBytes = baos.toByteArray();
1608         ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(messageAsBytes);
1609         stmt.setBinaryStream(column, bais, messageAsBytes.length);
1610      }
1611      else if (blobType == BLOB_BLOB)
1612      {
1613         throw new RuntimeException JavaDoc("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
1614         /** TODO:
1615         ByteArrayOutputStream baos= new ByteArrayOutputStream();
1616         ObjectOutputStream oos= new ObjectOutputStream(baos);
1617         oos.writeObject(message);
1618         byte[] messageAsBytes= baos.toByteArray();
1619         ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
1620         stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
1621         */

1622      }
1623   }
1624
1625   /**
1626    * Extract a message from a result
1627    *
1628    * @param rs the result set
1629    * @param column the column number
1630    * @return the message
1631    * @throws SQLException for an error accessing the db
1632    * @throws IOException for an error extracting the message
1633    */

1634   protected SpyMessage extractMessage(ResultSet JavaDoc rs, int column) throws SQLException JavaDoc, IOException JavaDoc
1635   {
1636      long messageid = rs.getLong(1);
1637      SpyMessage message = null;
1638      if (blobType == OBJECT_BLOB)
1639      {
1640         message = (SpyMessage) rs.getObject(column);
1641      }
1642      else if (blobType == BYTES_BLOB)
1643      {
1644         byte[] st = rs.getBytes(column);
1645         ByteArrayInputStream JavaDoc baip = new ByteArrayInputStream JavaDoc(st);
1646         ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(baip);
1647         message = SpyMessage.readMessage(ois);
1648      }
1649      else if (blobType == BINARYSTREAM_BLOB)
1650      {
1651         ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(rs.getBinaryStream(column));
1652         message = SpyMessage.readMessage(ois);
1653      }
1654      else if (blobType == BLOB_BLOB)
1655      {
1656         ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(rs.getBlob(column).getBinaryStream());
1657         message = SpyMessage.readMessage(ois);
1658      }
1659      else throw new IllegalStateException JavaDoc();
1660      message.header.messageId = messageid;
1661      return message;
1662   }
1663   
1664   /**
1665    * Gets a connection from the datasource, retrying as needed. This was
1666    * implemented because in some minimal configurations (i.e. little logging
1667    * and few services) the database wasn't ready when we tried to get a
1668    * connection. We, therefore, implement a retry loop wich is controled
1669    * by the ConnectionRetryAttempts attribute. Submitted by terry@amicas.com
1670    *
1671    * @throws SQLException if an error occurs.
1672    */

1673   protected Connection JavaDoc getConnection() throws SQLException JavaDoc
1674   {
1675      int attempts = this.connectionRetryAttempts;
1676      int attemptCount = 0;
1677      SQLException JavaDoc sqlException = null;
1678      while (attempts-- > 0)
1679      {
1680         if (++attemptCount > 1)
1681            log.debug("Retrying connection: attempt # " + attemptCount);
1682
1683         try
1684         {
1685            sqlException = null;
1686            return datasource.getConnection();
1687         }
1688         catch (SQLException JavaDoc exception)
1689         {
1690            log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception);
1691            sqlException = exception;
1692         }
1693         finally
1694         {
1695            if (sqlException == null && attemptCount > 1)
1696               log.debug("Connection succeeded on attempt # " + attemptCount);
1697         }
1698           
1699         if (attempts > 0)
1700         {
1701             try
1702             {
1703                 Thread.sleep(1500);
1704             }
1705             catch(InterruptedException JavaDoc interruptedException)
1706             {
1707                 break;
1708             }
1709         }
1710      }
1711      if (sqlException != null)
1712         throw sqlException;
1713      throw new SQLException JavaDoc("connection attempt interrupted");
1714   }
1715
1716   // Inner Classes ----------------------------------------------------------------
1717

1718   /**
1719    * This inner class helps handle the tx management of the jdbc connections.
1720    */

1721   class TransactionManagerStrategy
1722   {
1723
1724      Transaction JavaDoc threadTx;
1725
1726      void startTX() throws JMSException JavaDoc
1727      {
1728         try
1729         {
1730            // Thread arriving must be clean (jboss doesn't set the thread
1731
// previously). However optimized calls come with associated
1732
// thread for example. We suspend the thread association here, and
1733
// resume in the finally block of the following try.
1734
threadTx = tm.suspend();
1735
1736            tm.begin();
1737         }
1738         catch (Exception JavaDoc e)
1739         {
1740            try
1741            {
1742               if (threadTx != null)
1743                  tm.resume(threadTx);
1744            }
1745            catch (Exception JavaDoc ignore)
1746            {
1747            }
1748            throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
1749         }
1750      }
1751
1752      void setRollbackOnly() throws JMSException JavaDoc
1753      {
1754         try
1755         {
1756            tm.setRollbackOnly();
1757         }
1758         catch (Exception JavaDoc e)
1759         {
1760            throw new SpyJMSException("Could not start a mark the transaction for rollback .", e);
1761         }
1762      }
1763
1764      void endTX() throws JMSException JavaDoc
1765      {
1766         try
1767         {
1768            if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK)
1769            {
1770               tm.rollback();
1771            }
1772            else
1773            {
1774               tm.commit();
1775            }
1776         }
1777         catch (Exception JavaDoc e)
1778         {
1779            throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
1780         }
1781         finally
1782         {
1783            try
1784            {
1785               if (threadTx != null)
1786                  tm.resume(threadTx);
1787            }
1788            catch (Exception JavaDoc ignore)
1789            {
1790            }
1791         }
1792      }
1793   }
1794}
1795
Popular Tags