1 7 package org.jboss.mq.pm.jdbc3; 8 9 import java.io.ByteArrayInputStream ; 10 import java.io.ByteArrayOutputStream ; 11 import java.io.IOException ; 12 import java.io.ObjectInputStream ; 13 import java.io.ObjectOutputStream ; 14 import java.sql.Connection ; 15 import java.sql.PreparedStatement ; 16 import java.sql.ResultSet ; 17 import java.sql.SQLException ; 18 import java.sql.Types ; 19 import java.util.Properties ; 20 21 import javax.jms.JMSException ; 22 import javax.management.ObjectName ; 23 import javax.naming.InitialContext ; 24 import javax.sql.DataSource ; 25 import javax.transaction.Status ; 26 import javax.transaction.Transaction ; 27 import javax.transaction.TransactionManager ; 28 29 import org.jboss.mq.DurableSubscriptionID; 30 import org.jboss.mq.SpyDestination; 31 import org.jboss.mq.SpyJMSException; 32 import org.jboss.mq.SpyMessage; 33 import org.jboss.mq.SpyTopic; 34 import org.jboss.mq.pm.CacheStore; 35 import org.jboss.mq.pm.NewPersistenceManager; 36 import org.jboss.mq.pm.Tx; 37 import org.jboss.mq.pm.TxManager; 38 import org.jboss.mq.server.JMSDestination; 39 import org.jboss.mq.server.JMSTopic; 40 import org.jboss.mq.server.MessageCache; 41 import org.jboss.mq.server.MessageReference; 42 import org.jboss.system.ServiceMBeanSupport; 43 import org.jboss.tm.TransactionManagerService; 44 45 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 46 47 59 public class PersistenceManager 60 extends ServiceMBeanSupport 61 implements PersistenceManagerMBean, NewPersistenceManager, CacheStore, Runnable 62 { 63 65 66 static final int OBJECT_BLOB = 0; 67 68 69 static final int BYTES_BLOB = 1; 70 71 72 static final int BINARYSTREAM_BLOB = 2; 73 74 75 static final int BLOB_BLOB = 3; 76 77 79 80 private SynchronizedLong nextTransactionId = new SynchronizedLong(0l); 81 82 83 private TxManager txManager; 84 85 86 private DataSource datasource; 87 88 89 private TransactionManager tm; 90 91 92 private ObjectName connectionManagerName; 93 94 95 private Properties sqlProperties = new Properties (); 96 97 String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=?"; 98 String UPDATE_MARKED_REFERENCES = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=?"; 99 String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?"; 100 String UPDATE_MARKED_REFERENCES_WITH_TX = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?"; 101 String DELETE_MARKED_MESSAGES_WITH_TX = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?"; 102 String DELETE_MARKED_REFERENCES_WITH_TX = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?"; 103 String DELETE_TX = "DELETE FROM JMS_TRANSACTION_LOG WHERE TXID = ?"; 104 String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID=? AND TXOP=?"; 105 String DELETE_MARKED_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID=? AND TXOP=?"; 106 String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXOP='T'"; 107 String DELETE_TEMPORARY_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXOP='T'"; 108 String INSERT_TX = "INSERT INTO JMS_TRANSACTION_LOG (TXID) values(?)"; 109 String SELECT_MAX_TX = "SELECT MAX(TXID) FROM JMS_TRANSACTION_LOG"; 110 String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE DESTINATION=?"; 111 String SELECT_REFERENCES_IN_DEST = "SELECT R.MESSAGEID, M.MESSAGEBLOB, R.REDELIVERED, R.REDELIVERS FROM JMS_REFERENCE_LOG AS R, JMS_MESSAGE_LOG AS M" + 112 " WHERE R.MESSAGEID = M.MESSAGEID AND R.DESTINATION=?"; 113 String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?"; 114 String INSERT_MESSAGE = "INSERT INTO JMS_MESSAGE_LOG (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP, LATECLONE) VALUES(?,?,?,?,?,?)"; 115 String INSERT_REFERENCE = "INSERT INTO JMS_REFERENCE_LOG (MESSAGEID, DESTINATION, TXID, TXOP, REDELIVERED, REDELIVERS) VALUES(?,?,?,?,?,?)"; 116 String MARK_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?"; 117 String MARK_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?"; 118 String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?"; 119 String DELETE_REFERENCE = "DELETE FROM JMS_REFERENCE_LOG WHERE MESSAGEID=? AND DESTINATION=?"; 120 String UPDATE_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?"; 121 String UPDATE_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET REDELIVERED=?, REDELIVERS=? WHERE MESSAGEID=? AND DESTINATION=?"; 122 String DELETE_ORPHANED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE LATECLONE = '1' AND MESSAGEID NOT IN (SELECT MESSAGEID FROM JMS_REFERENCE_LOG)"; 123 String DELETE_ALL_TXS = "DELETE FROM JMS_TRANSACTION_LOG"; 124 String CREATE_REFERENCE_TABLE = 125 "CREATE TABLE JMS_REFERENCE_LOG ( MESSAGEID INTEGER NOT NULL, " 126 + "DESTINATION VARCHAR(256) NOT NULL, TXID INTEGER, TXOP CHAR(1), " 127 + "REDELIVERED CHAR(1), REDELIVERS INTEGER, " 128 + "PRIMARY KEY (MESSAGEID, DESTINATION) )"; 129 String CREATE_MESSAGE_TABLE = 130 "CREATE TABLE JMS_MESSAGE_LOG ( MESSAGEID INTEGER NOT NULL, " 131 + "DESTINATION VARCHAR(256), TXID INTEGER, TXOP CHAR(1), LATECLONE CHAR(1), " 132 + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )"; 133 String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTION_LOG ( TXID INTEGER )"; 134 135 136 int blobType = OBJECT_BLOB; 137 138 139 boolean createTables = true; 140 141 142 private int connectionRetryAttempts = 5; 143 144 145 private long gcPeriod = 60000; 146 147 148 private Thread gcThread; 149 150 152 157 public PersistenceManager() throws javax.jms.JMSException 158 { 159 txManager = new TxManager(this); 160 } 161 162 164 169 public ObjectName getConnectionManager() 170 { 171 return connectionManagerName; 172 } 173 174 179 public void setConnectionManager(ObjectName connectionManagerName) 180 { 181 this.connectionManagerName = connectionManagerName; 182 } 183 184 189 public int getGCPeriodSecs() 190 { 191 return (int) gcPeriod / 1000; 192 } 193 194 199 public void setGCPeriodSecs(int gcPeriodSecs) 200 { 201 this.gcPeriod = gcPeriodSecs * 1000; 202 } 203 204 210 public int getConnectionRetryAttempts() 211 { 212 return this.connectionRetryAttempts; 213 } 214 215 221 public void setConnectionRetryAttempts(int value) 222 { 223 this.connectionRetryAttempts = value; 224 } 225 226 232 public String getSqlProperties() 233 { 234 try 235 { 236 ByteArrayOutputStream boa = new ByteArrayOutputStream (); 237 sqlProperties.store(boa, ""); 238 return new String (boa.toByteArray()); 239 } 240 catch (IOException shouldnothappen) 241 { 242 return ""; 243 } 244 } 245 246 252 public void setSqlProperties(String value) 253 { 254 try 255 { 256 ByteArrayInputStream is = new ByteArrayInputStream (value.getBytes()); 257 sqlProperties = new Properties (); 258 sqlProperties.load(is); 259 } 260 catch (IOException shouldnothappen) 261 { 262 } 263 } 264 265 267 public Tx createPersistentTx() throws JMSException 268 { 269 Tx id = new Tx(nextTransactionId.increment()); 270 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 271 tms.startTX(); 272 Connection c = null; 273 PreparedStatement stmt = null; 274 boolean threadWasInterrupted = Thread.interrupted(); 275 try 276 { 277 c = this.getConnection(); 278 stmt = c.prepareStatement(INSERT_TX); 279 stmt.setLong(1, id.longValue()); 280 stmt.executeUpdate(); 281 282 } 283 catch (SQLException e) 284 { 285 tms.setRollbackOnly(); 286 throw new SpyJMSException("Could not crate tx: " + id, e); 287 } 288 finally 289 { 290 try 291 { 292 stmt.close(); 293 } 294 catch (Throwable ignore) 295 { 296 } 297 try 298 { 299 c.close(); 300 } 301 catch (Throwable ignore) 302 { 303 } 304 tms.endTX(); 305 306 if( threadWasInterrupted ) 308 Thread.currentThread().interrupt(); 309 } 310 311 return id; 312 } 313 314 public void commitPersistentTx(Tx txId) throws JMSException 315 { 316 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 317 tms.startTX(); 318 Connection c = null; 319 boolean threadWasInterrupted = Thread.interrupted(); 320 try 321 { 322 c = this.getConnection(); 323 removeMarkedMessages(c, txId, "D"); 324 removeMarkedReferences(c, txId, "D"); 325 removeTXRecord(c, txId.longValue()); 326 } 327 catch (SQLException e) 328 { 329 tms.setRollbackOnly(); 330 throw new SpyJMSException("Could not commit tx: " + txId, e); 331 } 332 finally 333 { 334 try 335 { 336 c.close(); 337 } 338 catch (Throwable ignore) 339 { 340 } 341 tms.endTX(); 342 343 if( threadWasInterrupted ) 345 Thread.currentThread().interrupt(); 346 } 347 } 348 349 public void rollbackPersistentTx(Tx txId) throws JMSException 350 { 351 352 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 353 tms.startTX(); 354 Connection c = null; 355 PreparedStatement stmt = null; 356 boolean threadWasInterrupted = Thread.interrupted(); 357 try 358 { 359 c = this.getConnection(); 360 removeMarkedMessages(c, txId, "A"); 361 removeMarkedReferences(c, txId, "A"); 362 removeTXRecord(c, txId.longValue()); 363 364 stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX); 366 stmt.setNull(1, Types.BIGINT); 367 stmt.setString(2, "A"); 368 stmt.setString(3, "D"); 369 stmt.setLong(4, txId.longValue()); 370 stmt.executeUpdate(); 371 stmt.close(); 372 373 stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES_WITH_TX); 375 stmt.setNull(1, Types.BIGINT); 376 stmt.setString(2, "A"); 377 stmt.setString(3, "D"); 378 stmt.setLong(4, txId.longValue()); 379 stmt.executeUpdate(); 380 stmt.close(); 381 } 382 catch (SQLException e) 383 { 384 tms.setRollbackOnly(); 385 throw new SpyJMSException("Could not rollback tx: " + txId, e); 386 } 387 finally 388 { 389 try 390 { 391 if (stmt != null) 392 stmt.close(); 393 if (c != null) 394 c.close(); 395 } 396 catch (Throwable ignore) 397 { 398 } 399 tms.endTX(); 400 401 if( threadWasInterrupted ) 403 Thread.currentThread().interrupt(); 404 } 405 } 406 407 public void add(MessageReference messageRef, Tx txId) throws JMSException 408 { 409 boolean trace = log.isTraceEnabled(); 410 if (trace) 411 log.trace("About to add message " + messageRef + " transaction=" + txId); 412 413 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 414 tms.startTX(); 415 Connection c = null; 416 boolean threadWasInterrupted = Thread.interrupted(); 417 try 418 { 419 c = this.getConnection(); 420 synchronized(messageRef) 422 { 423 if (trace) 424 log.trace("Inserting message " + messageRef + " transaction=" + txId); 425 426 if (messageRef.isLateClone()) 427 { 428 addReference(c, messageRef.getPersistentKey(), messageRef, txId, "A"); 429 } 430 else 431 { 432 SpyMessage message = messageRef.getMessage(); 433 addMessage(c, messageRef.getPersistentKey(), message, txId, "A", "0"); 434 } 435 messageRef.setStored(MessageReference.STORED); 436 437 if (trace) 438 log.trace("Added message " + messageRef + " transaction=" + txId); 439 } 440 } 441 catch (IOException e) 442 { 443 tms.setRollbackOnly(); 444 throw new SpyJMSException("Could not store message: " + messageRef, e); 445 } 446 catch (SQLException e) 447 { 448 tms.setRollbackOnly(); 449 throw new SpyJMSException("Could not store message: " + messageRef, e); 450 } 451 finally 452 { 453 try 454 { 455 c.close(); 456 } 457 catch (Throwable ignore) 458 { 459 } 460 tms.endTX(); 461 462 if( threadWasInterrupted ) 464 Thread.currentThread().interrupt(); 465 } 466 } 467 468 public void update(MessageReference messageRef, Tx txId) throws JMSException 469 { 470 boolean trace = log.isTraceEnabled(); 471 if (trace) 472 log.trace("Updating message " + messageRef + " transaction=" + txId); 473 474 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 475 tms.startTX(); 476 Connection c = null; 477 PreparedStatement stmt = null; 478 boolean threadWasInterrupted = Thread.interrupted(); 479 try 480 { 481 c = this.getConnection(); 482 if (txId == null) 483 { 484 if (messageRef.isLateClone()) 485 { 486 stmt = c.prepareStatement(UPDATE_REFERENCE); 487 if (messageRef.redelivered) 488 stmt.setString(1, "1"); 489 else 490 stmt.setString(1, "0"); 491 stmt.setLong(2, messageRef.redeliveryCount); 492 stmt.setLong(3, messageRef.messageId); 493 stmt.setString(4, messageRef.getPersistentKey()); 494 } 495 else 496 { 497 stmt = c.prepareStatement(UPDATE_MESSAGE); 498 setBlob(stmt, 1, messageRef.getMessage()); 499 stmt.setLong(2, messageRef.messageId); 500 stmt.setString(3, messageRef.getPersistentKey()); 501 } 502 int rc = stmt.executeUpdate(); 503 if( rc != 1 ) 504 throw new SpyJMSException("Could not update the message in the database: update affected "+rc+" rows"); 505 } 506 else 507 { 508 throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used"); 509 } 510 if (trace) 511 log.trace("Updated message " + messageRef + " transaction=" + txId); 512 513 } 514 catch (IOException e) 515 { 516 tms.setRollbackOnly(); 517 throw new SpyJMSException("Could not update message: " + messageRef, e); 518 } 519 catch (SQLException e) 520 { 521 tms.setRollbackOnly(); 522 throw new SpyJMSException("Could not update message: " + messageRef, e); 523 } 524 finally 525 { 526 try 527 { 528 stmt.close(); 529 } 530 catch (Throwable ignore) 531 { 532 } 533 try 534 { 535 c.close(); 536 } 537 catch (Throwable ignore) 538 { 539 } 540 tms.endTX(); 541 542 if( threadWasInterrupted ) 544 Thread.currentThread().interrupt(); 545 } 546 } 547 548 public void remove(MessageReference messageRef, Tx txId) throws JMSException 549 { 550 boolean trace = log.isTraceEnabled(); 551 if (trace) 552 log.trace("Removing message " + messageRef + " transaction=" + txId); 553 554 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 555 tms.startTX(); 556 Connection c = null; 557 PreparedStatement stmt = null; 558 boolean threadWasInterrupted = Thread.interrupted(); 559 try 560 { 561 c = this.getConnection(); 562 synchronized(messageRef) 564 { 565 if (txId == null) 566 { 567 if (messageRef.isLateClone()) 568 stmt = c.prepareStatement(DELETE_REFERENCE); 569 else 570 stmt = c.prepareStatement(DELETE_MESSAGE); 571 stmt.setLong(1, messageRef.messageId); 572 stmt.setString(2, messageRef.getPersistentKey()); 573 int rc = stmt.executeUpdate(); 574 if( rc != 1 ) 575 throw new SpyJMSException("Could not delete the message from the database: delete affected "+rc+" rows"); 576 577 messageRef.setStored(MessageReference.NOT_STORED); 587 messageRef.removeDelayed(); 588 } 589 else 590 { 591 if (messageRef.isLateClone()) 592 { 593 stmt = c.prepareStatement(MARK_REFERENCE); 594 stmt.setLong(1, txId.longValue()); 595 stmt.setString(2, "D"); 596 stmt.setLong(3, messageRef.messageId); 597 stmt.setString(4, messageRef.getPersistentKey()); 598 } 599 else 600 { 601 stmt = c.prepareStatement(MARK_MESSAGE); 602 stmt.setLong(1, txId.longValue()); 603 stmt.setString(2, "D"); 604 stmt.setLong(3, messageRef.messageId); 605 stmt.setString(4, messageRef.getPersistentKey()); 606 } 607 int rc = stmt.executeUpdate(); 608 if( rc != 1 ) 609 throw new SpyJMSException("Could not mark the message as deleted in the database: update affected "+rc+" rows"); 610 } 611 if (trace) 612 log.trace("Removed message " + messageRef + " transaction=" + txId); 613 } 614 } 615 catch (SQLException e) 616 { 617 tms.setRollbackOnly(); 618 throw new SpyJMSException("Could not remove message: " + messageRef, e); 619 } 620 finally 621 { 622 try 623 { 624 stmt.close(); 625 } 626 catch (Throwable ignore) 627 { 628 } 629 try 630 { 631 c.close(); 632 } 633 catch (Throwable ignore) 634 { 635 } 636 tms.endTX(); 637 638 if( threadWasInterrupted ) 640 Thread.currentThread().interrupt(); 641 } 642 } 643 644 public synchronized void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws javax.jms.JMSException 645 { 646 if (jmsDest == null) 647 throw new IllegalArgumentException ("Must supply non null JMSDestination to restoreQueue"); 648 if (dest == null) 649 throw new IllegalArgumentException ("Must supply non null SpyDestination to restoreQueue"); 650 651 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 652 tms.startTX(); 653 Connection c = null; 654 PreparedStatement stmt = null; 655 ResultSet rs = null; 656 boolean threadWasInterrupted = Thread.interrupted(); 657 try 658 { 659 c = this.getConnection(); 660 int counter=0; 661 if (jmsDest.parameters.lateClone) 662 { 663 JMSTopic topic = (JMSTopic) jmsDest; 664 DurableSubscriptionID id = ((SpyTopic) dest).getDurableSubscriptionID(); 666 667 stmt = c.prepareStatement(SELECT_REFERENCES_IN_DEST); 668 stmt.setString(1, dest.toString()); 669 670 rs = stmt.executeQuery(); 671 while (rs.next()) 672 { 673 SpyMessage message = extractMessage(rs, 2); 674 boolean redelivered = false; 675 if (rs.getString(3).equals("1")) 676 redelivered = true; 677 message.header.jmsRedelivered = redelivered; 678 message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer (rs.getInt(4))); 679 topic.restoreMessage(message, id); 680 counter++; 681 } 682 } 683 else 684 { 685 stmt = c.prepareStatement(SELECT_MESSAGES_IN_DEST); 686 stmt.setString(1, dest.toString()); 687 688 rs = stmt.executeQuery(); 689 while (rs.next()) 690 { 691 SpyMessage message = extractMessage(rs, 2); 692 if (dest instanceof SpyTopic) 694 message.header.durableSubscriberID = ((SpyTopic)dest).getDurableSubscriptionID(); 695 jmsDest.restoreMessage(message); 696 counter++; 697 } 698 } 699 700 log.debug("Restored "+counter+" message(s) to: "+dest); 701 } 702 catch (IOException e) 703 { 704 tms.setRollbackOnly(); 705 throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e); 706 } 707 catch (SQLException e) 708 { 709 tms.setRollbackOnly(); 710 throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e); 711 } 712 finally 713 { 714 try 715 { 716 rs.close(); 717 } 718 catch (Throwable ignore) 719 { 720 } 721 try 722 { 723 stmt.close(); 724 } 725 catch (Throwable ignore) 726 { 727 } 728 try 729 { 730 c.close(); 731 } 732 catch (Throwable ignore) 733 { 734 } 735 tms.endTX(); 736 737 if( threadWasInterrupted ) 739 Thread.currentThread().interrupt(); 740 } 741 742 } 743 744 public TxManager getTxManager() 745 { 746 return txManager; 747 } 748 749 public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException 750 { 751 } 753 754 757 public MessageCache getMessageCacheInstance() 758 { 759 throw new UnsupportedOperationException ("This is now set on the destination manager"); 760 } 761 762 764 public void addMessage(SpyMessage message) throws JMSException 765 { 766 767 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 768 tms.startTX(); 769 Connection c = null; 770 boolean threadWasInterrupted = Thread.interrupted(); 771 try 772 { 773 c = datasource.getConnection(); 774 addMessage(c, "*", message, null, null, "1"); 775 } 776 catch (IOException e) 777 { 778 tms.setRollbackOnly(); 779 throw new SpyJMSException("Could not add message:", e); 780 } 781 catch (SQLException e) 782 { 783 tms.setRollbackOnly(); 784 throw new SpyJMSException("Could not add message:", e); 785 } 786 finally 787 { 788 try 789 { 790 if (c != null) 791 c.close(); 792 } 793 catch (Throwable ignore) 794 { 795 } 796 tms.endTX(); 797 798 if( threadWasInterrupted ) 800 Thread.currentThread().interrupt(); 801 } 802 } 803 804 806 public Object getInstance() 807 { 808 return this; 809 } 810 811 814 public ObjectName getMessageCache() 815 { 816 throw new UnsupportedOperationException ("This is now set on the destination manager"); 817 } 818 819 822 public void setMessageCache(ObjectName messageCache) 823 { 824 throw new UnsupportedOperationException ("This is now set on the destination manager"); 825 } 826 827 829 public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException 830 { 831 if (log.isTraceEnabled()) 832 log.trace("Loading message from storage " + messageRef); 833 834 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 835 tms.startTX(); 836 Connection c = null; 837 PreparedStatement stmt = null; 838 ResultSet rs = null; 839 boolean threadWasInterrupted = Thread.interrupted(); 840 try 841 { 842 c = this.getConnection(); 843 stmt = c.prepareStatement(SELECT_MESSAGE); 844 stmt.setLong(1, messageRef.messageId); 845 if (messageRef.isLateClone()) 846 stmt.setString(2, "*"); 847 else 848 stmt.setString(2, messageRef.getPersistentKey()); 849 850 rs = stmt.executeQuery(); 851 if (rs.next()) 852 return extractMessage(rs, 2); 853 854 return null; 855 856 } 857 catch (IOException e) 858 { 859 tms.setRollbackOnly(); 860 throw new SpyJMSException("Could not load message : " + messageRef, e); 861 } 862 catch (SQLException e) 863 { 864 tms.setRollbackOnly(); 865 throw new SpyJMSException("Could not load message : " + messageRef, e); 866 } 867 finally 868 { 869 try 870 { 871 rs.close(); 872 } 873 catch (Throwable ignore) 874 { 875 } 876 try 877 { 878 stmt.close(); 879 } 880 catch (Throwable ignore) 881 { 882 } 883 try 884 { 885 c.close(); 886 } 887 catch (Throwable ignore) 888 { 889 } 890 tms.endTX(); 891 892 if( threadWasInterrupted ) 894 Thread.currentThread().interrupt(); 895 } 896 } 897 898 public void removeFromStorage(MessageReference messageRef) throws JMSException 899 { 900 if (messageRef.isPersistent()) 902 return; 903 904 boolean trace = log.isTraceEnabled(); 905 if (trace) 906 log.trace("Removing message from storage " + messageRef); 907 908 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 909 tms.startTX(); 910 Connection c = null; 911 PreparedStatement stmt = null; 912 boolean threadWasInterrupted = Thread.interrupted(); 913 try 914 { 915 c = this.getConnection(); 916 if (messageRef.isLateClone()) 917 { 918 stmt = c.prepareStatement(DELETE_REFERENCE); 919 stmt.setLong(1, messageRef.messageId); 920 stmt.setString(2, messageRef.getPersistentKey()); 921 stmt.executeUpdate(); 922 messageRef.setStored(MessageReference.NOT_STORED); 923 } 924 else 925 { 926 stmt = c.prepareStatement(DELETE_MESSAGE); 927 stmt.setLong(1, messageRef.messageId); 928 stmt.setString(2, messageRef.getPersistentKey()); 929 stmt.executeUpdate(); 930 messageRef.setStored(MessageReference.NOT_STORED); 931 } 932 933 if (trace) 934 log.trace("Removed message from storage " + messageRef); 935 } 936 catch (SQLException e) 937 { 938 tms.setRollbackOnly(); 939 throw new SpyJMSException("Could not remove message: " + messageRef, e); 940 } 941 finally 942 { 943 try 944 { 945 stmt.close(); 946 } 947 catch (Throwable ignore) 948 { 949 } 950 try 951 { 952 c.close(); 953 } 954 catch (Throwable ignore) 955 { 956 } 957 tms.endTX(); 958 959 if( threadWasInterrupted ) 961 Thread.currentThread().interrupt(); 962 } 963 } 964 965 public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException 966 { 967 if (messageRef.isPersistent()) 970 return; 971 972 boolean trace = log.isTraceEnabled(); 973 if (trace) 974 log.trace("Saving message to storage " + messageRef); 975 976 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 977 tms.startTX(); 978 Connection c = null; 979 boolean threadWasInterrupted = Thread.interrupted(); 980 try 981 { 982 c = this.getConnection(); 983 if (messageRef.isLateClone()) 984 { 985 addReference(c, messageRef.getPersistentKey(), messageRef, null, "T"); 986 try 987 { 988 addMessage(c, "*", message, null, "T", "1"); 989 } 990 catch (SQLException e) 991 { 992 log.trace("TODO: Check this is really a duplicate", e); 993 } 994 } 995 else 996 { 997 addMessage(c, messageRef.getPersistentKey(), message, null, "T", "0"); 998 } 999 messageRef.setStored(MessageReference.STORED); 1000 1001 if (trace) 1002 log.trace("Saved message to storage " + messageRef); 1003 } 1004 catch (IOException e) 1005 { 1006 tms.setRollbackOnly(); 1007 throw new SpyJMSException("Could not store message: " + messageRef, e); 1008 } 1009 catch (SQLException e) 1010 { 1011 tms.setRollbackOnly(); 1012 throw new SpyJMSException("Could not store message: " + messageRef, e); 1013 } 1014 finally 1015 { 1016 try 1017 { 1018 c.close(); 1019 } 1020 catch (Throwable ignore) 1021 { 1022 } 1023 tms.endTX(); 1024 1025 if( threadWasInterrupted ) 1027 Thread.currentThread().interrupt(); 1028 } 1029 } 1030 1031 1033 public void run() 1034 { 1035 Thread current = Thread.currentThread(); 1036 while (gcThread == current) 1037 { 1038 try 1039 { 1040 Thread.sleep(gcPeriod); 1041 if (gcThread != current) 1042 return; 1043 1044 Connection connection = datasource.getConnection(); 1045 try 1046 { 1047 PreparedStatement stmt = connection.prepareStatement(DELETE_ORPHANED_MESSAGES); 1048 try 1049 { 1050 stmt.executeUpdate(); 1051 } 1052 finally 1053 { 1054 try 1055 { 1056 stmt.close(); 1057 } 1058 catch (SQLException ignored) 1059 { 1060 log.trace("Error closing statement", ignored); 1061 } 1062 } 1063 } 1064 finally 1065 { 1066 try 1067 { 1068 connection.close(); 1069 } 1070 catch (SQLException ignored) 1071 { 1072 log.trace("Error closing connection", ignored); 1073 } 1074 } 1075 } 1076 catch (InterruptedException ignored) 1077 { 1078 } 1079 catch (Throwable t) 1080 { 1081 log.warn("Unhandled throwable in gc thread:", t); 1082 } 1083 } 1084 } 1085 1086 1088 protected void startService() throws Exception 1089 { 1090 UPDATE_MARKED_MESSAGES = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES); 1091 UPDATE_MARKED_REFERENCES = sqlProperties.getProperty("UPDATE_MARKED_REFERENCES", UPDATE_MARKED_REFERENCES); 1092 UPDATE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", UPDATE_MARKED_MESSAGES_WITH_TX); 1093 UPDATE_MARKED_REFERENCES_WITH_TX = sqlProperties.getProperty("UPDATE_MARKED_REFERENCES_WITH_TX", UPDATE_MARKED_REFERENCES_WITH_TX); 1094 DELETE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", DELETE_MARKED_MESSAGES_WITH_TX); 1095 DELETE_MARKED_REFERENCES_WITH_TX = sqlProperties.getProperty("DELETE_MARKED_REFERENCES_WITH_TX", DELETE_MARKED_REFERENCES_WITH_TX); 1096 DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX); 1097 DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES); 1098 DELETE_MARKED_REFERENCES = sqlProperties.getProperty("DELETE_MARKED_REFERENCES", DELETE_MARKED_REFERENCES); 1099 DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES); 1100 DELETE_TEMPORARY_REFERENCES = sqlProperties.getProperty("DELETE_TEMPORARY_REFERENCES", DELETE_TEMPORARY_REFERENCES); 1101 INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX); 1102 SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX); 1103 SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST); 1104 SELECT_REFERENCES_IN_DEST = sqlProperties.getProperty("SELECT_REFERENCES_IN_DEST", SELECT_REFERENCES_IN_DEST); 1105 SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE); 1106 INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE); 1107 INSERT_REFERENCE = sqlProperties.getProperty("INSERT_REFERENCE", INSERT_REFERENCE); 1108 MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE); 1109 MARK_REFERENCE = sqlProperties.getProperty("MARK_REFERENCE", MARK_REFERENCE); 1110 DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE); 1111 DELETE_REFERENCE = sqlProperties.getProperty("DELETE_REFERENCE", DELETE_REFERENCE); 1112 UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE", UPDATE_MESSAGE); 1113 UPDATE_REFERENCE = sqlProperties.getProperty("UPDATE_REFERENCE", UPDATE_REFERENCE); 1114 DELETE_ORPHANED_MESSAGES = sqlProperties.getProperty("DELETE_ORPHANED_MESSAGES", DELETE_ORPHANED_MESSAGES); 1115 DELETE_ALL_TXS = sqlProperties.getProperty("DELETE_ALL_TXS", DELETE_ALL_TXS); 1116 CREATE_REFERENCE_TABLE = sqlProperties.getProperty("CREATE_REFERENCE_TABLE", CREATE_REFERENCE_TABLE); 1117 CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE); 1118 CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE); 1119 createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true"); 1120 String s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB"); 1121 1122 if (s.equals("OBJECT_BLOB")) 1123 blobType = OBJECT_BLOB; 1124 else if (s.equals("BYTES_BLOB")) 1125 blobType = BYTES_BLOB; 1126 else if (s.equals("BINARYSTREAM_BLOB")) 1127 blobType = BINARYSTREAM_BLOB; 1128 else if (s.equals("BLOB_BLOB")) 1129 blobType = BLOB_BLOB; 1130 1131 String dsName = (String ) getServer().getAttribute(connectionManagerName, "BindName"); 1133 1134 InitialContext ctx = new InitialContext (); 1136 datasource = (DataSource ) ctx.lookup(dsName); 1137 1138 tm = (TransactionManager ) ctx.lookup(TransactionManagerService.JNDI_NAME); 1140 1141 log.debug("Resolving uncommited TXS"); 1142 resolveAllUncommitedTXs(); 1143 1144 gcThread = new Thread (this, "JBossMQ persistent message garbage collection"); 1145 gcThread.setDaemon(true); 1146 gcThread.start(); 1147 } 1148 1149 protected void stopService() throws Exception 1150 { 1151 if (gcThread != null) 1152 gcThread.interrupt(); 1153 gcThread = null; 1154 } 1155 1156 1158 1163 protected synchronized void resolveAllUncommitedTXs() throws JMSException 1164 { 1165 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 1166 tms.startTX(); 1167 Connection c = null; 1168 PreparedStatement stmt = null; 1169 ResultSet rs = null; 1170 boolean threadWasInterrupted = Thread.interrupted(); 1171 try 1172 { 1173 if (createTables) 1174 { 1175 c = this.getConnection(); 1176 1177 try 1178 { 1179 stmt = c.prepareStatement(CREATE_REFERENCE_TABLE); 1180 stmt.executeUpdate(); 1181 } 1182 catch (SQLException e) 1183 { 1184 log.debug("Could not create table with SQL: " + CREATE_REFERENCE_TABLE + ", got : " + e); 1185 } 1186 finally 1187 { 1188 try 1189 { 1190 if (stmt != null) 1191 stmt.close(); 1192 } 1193 catch (Throwable ignored) 1194 { 1195 log.trace("Ignored: " + ignored); 1196 } 1197 stmt = null; 1198 } 1199 1200 try 1201 { 1202 stmt = c.prepareStatement(CREATE_MESSAGE_TABLE); 1203 stmt.executeUpdate(); 1204 } 1205 catch (SQLException e) 1206 { 1207 log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE + ", got : " + e); 1208 } 1209 finally 1210 { 1211 try 1212 { 1213 if (stmt != null) 1214 stmt.close(); 1215 } 1216 catch (Throwable ignored) 1217 { 1218 log.trace("Ignored: " + ignored); 1219 } 1220 stmt = null; 1221 } 1222 1223 try 1224 { 1225 stmt = c.prepareStatement(CREATE_TX_TABLE); 1226 stmt.executeUpdate(); 1227 } 1228 catch (SQLException e) 1229 { 1230 log.debug("Could not create table with SQL: " + CREATE_TX_TABLE + ", got : " + e); 1231 } 1232 finally 1233 { 1234 try 1235 { 1236 if (stmt != null) 1237 stmt.close(); 1238 } 1239 catch (Throwable ignored) 1240 { 1241 log.trace("Ignored: " + ignored); 1242 } 1243 stmt = null; 1244 } 1245 } 1246 } 1247 catch (SQLException e) 1248 { 1249 tms.setRollbackOnly(); 1250 throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e); 1251 } 1252 finally 1253 { 1254 try 1255 { 1256 if (stmt != null) 1257 stmt.close(); 1258 } 1259 catch (Throwable ignore) 1260 { 1261 } 1262 stmt = null; 1263 try 1264 { 1265 c.close(); 1266 } 1267 catch (Throwable ignore) 1268 { 1269 } 1270 c = null; 1271 tms.endTX(); 1272 1273 if( threadWasInterrupted ) 1275 Thread.currentThread().interrupt(); 1276 } 1277 1278 1282 tms = new TransactionManagerStrategy(); 1283 tms.startTX(); 1284 threadWasInterrupted = Thread.interrupted(); 1285 try 1286 { 1287 c = this.getConnection(); 1288 1289 stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES); 1291 stmt.executeUpdate(); 1292 stmt.close(); 1293 1294 stmt = c.prepareStatement(DELETE_MARKED_MESSAGES_WITH_TX); 1296 stmt.setString(1, "A"); 1297 stmt.executeUpdate(); 1298 stmt.close(); 1299 1300 stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES); 1302 stmt.setNull(1, Types.BIGINT); 1303 stmt.setString(2, "A"); 1304 stmt.setString(3, "D"); 1305 stmt.executeUpdate(); 1306 stmt.close(); 1307 1308 stmt = c.prepareStatement(DELETE_TEMPORARY_REFERENCES); 1310 stmt.executeUpdate(); 1311 stmt.close(); 1312 1313 stmt = c.prepareStatement(DELETE_MARKED_REFERENCES_WITH_TX); 1315 stmt.setString(1, "A"); 1316 stmt.executeUpdate(); 1317 stmt.close(); 1318 1319 stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES); 1321 stmt.setNull(1, Types.BIGINT); 1322 stmt.setString(2, "A"); 1323 stmt.setString(3, "D"); 1324 stmt.executeUpdate(); 1325 stmt.close(); 1326 1327 stmt = c.prepareStatement(DELETE_ORPHANED_MESSAGES); 1329 stmt.executeUpdate(); 1330 stmt.close(); 1331 1332 stmt = c.prepareStatement(SELECT_MAX_TX); 1334 rs = stmt.executeQuery(); 1335 if (rs.next()) 1336 nextTransactionId.set(rs.getLong(1) + 1); 1337 rs.close(); 1338 stmt.close(); 1339 1340 stmt = c.prepareStatement(DELETE_ALL_TXS); 1342 stmt.executeUpdate(); 1343 stmt.close(); 1344 } 1345 catch (SQLException e) 1346 { 1347 tms.setRollbackOnly(); 1348 throw new SpyJMSException("Could not resolve uncommited transactions. Message recovery may not be accurate", e); 1349 } 1350 finally 1351 { 1352 try 1353 { 1354 rs.close(); 1355 } 1356 catch (Throwable ignore) 1357 { 1358 } 1359 try 1360 { 1361 stmt.close(); 1362 } 1363 catch (Throwable ignore) 1364 { 1365 } 1366 try 1367 { 1368 c.close(); 1369 } 1370 catch (Throwable ignore) 1371 { 1372 } 1373 tms.endTX(); 1374 1375 if( threadWasInterrupted ) 1377 Thread.currentThread().interrupt(); 1378 } 1379 } 1380 1381 1388 protected void removeTXRecord(Connection c, long txid) throws SQLException 1389 { 1390 PreparedStatement stmt = null; 1391 try 1392 { 1393 stmt = c.prepareStatement(DELETE_TX); 1394 stmt.setLong(1, txid); 1395 stmt.executeUpdate(); 1396 } 1397 finally 1398 { 1399 try 1400 { 1401 stmt.close(); 1402 } 1403 catch (Throwable e) 1404 { 1405 } 1406 } 1407 } 1408 1409 1420 protected void addMessage(Connection c, String queue, SpyMessage message, Tx txId, String mark, String lateClone) 1421 throws SQLException , IOException 1422 { 1423 PreparedStatement stmt = null; 1424 try 1425 { 1426 stmt = c.prepareStatement(INSERT_MESSAGE); 1427 1428 stmt.setLong(1, message.header.messageId); 1429 String dest = "*"; 1430 if (queue != null) 1431 dest = queue; 1432 stmt.setString(2, dest); 1433 setBlob(stmt, 3, message); 1434 if (txId != null) 1435 stmt.setLong(4, txId.longValue()); 1436 else 1437 stmt.setNull(4, Types.BIGINT); 1438 if (mark == null) 1439 stmt.setNull(5, Types.VARCHAR); 1440 else 1441 stmt.setString(5, mark); 1442 stmt.setString(6, lateClone); 1443 1444 try 1445 { 1446 stmt.executeUpdate(); 1447 } 1448 catch (SQLException e) 1449 { 1450 if (lateClone.equals("1")) 1451 log.trace("Assumed already added to message log: " + message.header.messageId); 1452 else 1453 throw e; 1454 } 1455 } 1456 finally 1457 { 1458 try 1459 { 1460 stmt.close(); 1461 } 1462 catch (Throwable ignore) 1463 { 1464 } 1465 } 1466 } 1467 1468 1479 protected void addReference(Connection c, String queue, MessageReference message, Tx txId, String mark) 1480 throws SQLException , IOException 1481 { 1482 PreparedStatement stmt = null; 1483 try 1484 { 1485 stmt = c.prepareStatement(INSERT_REFERENCE); 1486 1487 stmt.setLong(1, message.messageId); 1488 stmt.setString(2, queue); 1489 1490 if (txId != null) 1491 stmt.setLong(3, txId.longValue()); 1492 else 1493 stmt.setNull(3, Types.BIGINT); 1494 stmt.setString(4, mark); 1495 if (message.redelivered) 1496 stmt.setString(5, "1"); 1497 else 1498 stmt.setString(5, "0"); 1499 stmt.setLong(6, message.redeliveryCount); 1500 1501 stmt.executeUpdate(); 1502 } 1503 finally 1504 { 1505 try 1506 { 1507 stmt.close(); 1508 } 1509 catch (Throwable ignore) 1510 { 1511 } 1512 } 1513 } 1514 1515 1523 protected void removeMarkedMessages(Connection c, Tx txid, String mark) throws SQLException 1524 { 1525 PreparedStatement stmt = null; 1526 try 1527 { 1528 stmt = c.prepareStatement(DELETE_MARKED_MESSAGES); 1529 stmt.setLong(1, txid.longValue()); 1530 stmt.setString(2, mark); 1531 stmt.executeUpdate(); 1532 } 1533 finally 1534 { 1535 try 1536 { 1537 stmt.close(); 1538 } 1539 catch (Throwable e) 1540 { 1541 } 1542 } 1543 } 1544 1545 1553 protected void removeMarkedReferences(Connection c, Tx txid, String mark) throws SQLException 1554 { 1555 PreparedStatement stmt = null; 1556 try 1557 { 1558 stmt = c.prepareStatement(DELETE_MARKED_REFERENCES); 1559 if (txid != null) 1560 stmt.setLong(1, txid.longValue()); 1561 else 1562 stmt.setNull(1, Types.BIGINT); 1563 stmt.setString(2, mark); 1564 stmt.executeUpdate(); 1565 } 1566 finally 1567 { 1568 try 1569 { 1570 stmt.close(); 1571 } 1572 catch (Throwable e) 1573 { 1574 } 1575 } 1576 } 1577 1578 1587 protected void setBlob(PreparedStatement stmt, int column, SpyMessage message) 1588 throws IOException , SQLException 1589 { 1590 if (blobType == OBJECT_BLOB) 1591 stmt.setObject(column, message); 1592 else if (blobType == BYTES_BLOB) 1593 { 1594 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 1595 ObjectOutputStream oos = new ObjectOutputStream (baos); 1596 SpyMessage.writeMessage(message,oos); 1597 oos.flush(); 1598 byte[] messageAsBytes = baos.toByteArray(); 1599 stmt.setBytes(column, messageAsBytes); 1600 } 1601 else if (blobType == BINARYSTREAM_BLOB) 1602 { 1603 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 1604 ObjectOutputStream oos = new ObjectOutputStream (baos); 1605 SpyMessage.writeMessage(message,oos); 1606 oos.flush(); 1607 byte[] messageAsBytes = baos.toByteArray(); 1608 ByteArrayInputStream bais = new ByteArrayInputStream (messageAsBytes); 1609 stmt.setBinaryStream(column, bais, messageAsBytes.length); 1610 } 1611 else if (blobType == BLOB_BLOB) 1612 { 1613 throw new RuntimeException ("BLOB_TYPE: BLOB_BLOB is not yet implemented."); 1614 1622 } 1623 } 1624 1625 1634 protected SpyMessage extractMessage(ResultSet rs, int column) throws SQLException , IOException 1635 { 1636 long messageid = rs.getLong(1); 1637 SpyMessage message = null; 1638 if (blobType == OBJECT_BLOB) 1639 { 1640 message = (SpyMessage) rs.getObject(column); 1641 } 1642 else if (blobType == BYTES_BLOB) 1643 { 1644 byte[] st = rs.getBytes(column); 1645 ByteArrayInputStream baip = new ByteArrayInputStream (st); 1646 ObjectInputStream ois = new ObjectInputStream (baip); 1647 message = SpyMessage.readMessage(ois); 1648 } 1649 else if (blobType == BINARYSTREAM_BLOB) 1650 { 1651 ObjectInputStream ois = new ObjectInputStream (rs.getBinaryStream(column)); 1652 message = SpyMessage.readMessage(ois); 1653 } 1654 else if (blobType == BLOB_BLOB) 1655 { 1656 ObjectInputStream ois = new ObjectInputStream (rs.getBlob(column).getBinaryStream()); 1657 message = SpyMessage.readMessage(ois); 1658 } 1659 else throw new IllegalStateException (); 1660 message.header.messageId = messageid; 1661 return message; 1662 } 1663 1664 1673 protected Connection getConnection() throws SQLException 1674 { 1675 int attempts = this.connectionRetryAttempts; 1676 int attemptCount = 0; 1677 SQLException sqlException = null; 1678 while (attempts-- > 0) 1679 { 1680 if (++attemptCount > 1) 1681 log.debug("Retrying connection: attempt # " + attemptCount); 1682 1683 try 1684 { 1685 sqlException = null; 1686 return datasource.getConnection(); 1687 } 1688 catch (SQLException exception) 1689 { 1690 log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception); 1691 sqlException = exception; 1692 } 1693 finally 1694 { 1695 if (sqlException == null && attemptCount > 1) 1696 log.debug("Connection succeeded on attempt # " + attemptCount); 1697 } 1698 1699 if (attempts > 0) 1700 { 1701 try 1702 { 1703 Thread.sleep(1500); 1704 } 1705 catch(InterruptedException interruptedException) 1706 { 1707 break; 1708 } 1709 } 1710 } 1711 if (sqlException != null) 1712 throw sqlException; 1713 throw new SQLException ("connection attempt interrupted"); 1714 } 1715 1716 1718 1721 class TransactionManagerStrategy 1722 { 1723 1724 Transaction threadTx; 1725 1726 void startTX() throws JMSException 1727 { 1728 try 1729 { 1730 threadTx = tm.suspend(); 1735 1736 tm.begin(); 1737 } 1738 catch (Exception e) 1739 { 1740 try 1741 { 1742 if (threadTx != null) 1743 tm.resume(threadTx); 1744 } 1745 catch (Exception ignore) 1746 { 1747 } 1748 throw new SpyJMSException("Could not start a transaction with the transaction manager.", e); 1749 } 1750 } 1751 1752 void setRollbackOnly() throws JMSException 1753 { 1754 try 1755 { 1756 tm.setRollbackOnly(); 1757 } 1758 catch (Exception e) 1759 { 1760 throw new SpyJMSException("Could not start a mark the transaction for rollback .", e); 1761 } 1762 } 1763 1764 void endTX() throws JMSException 1765 { 1766 try 1767 { 1768 if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) 1769 { 1770 tm.rollback(); 1771 } 1772 else 1773 { 1774 tm.commit(); 1775 } 1776 } 1777 catch (Exception e) 1778 { 1779 throw new SpyJMSException("Could not start a transaction with the transaction manager.", e); 1780 } 1781 finally 1782 { 1783 try 1784 { 1785 if (threadTx != null) 1786 tm.resume(threadTx); 1787 } 1788 catch (Exception ignore) 1789 { 1790 } 1791 } 1792 } 1793 } 1794} 1795 | Popular Tags |