1 22 package org.jboss.mq.pm.jdbc2; 23 24 import java.io.ByteArrayInputStream ; 25 import java.io.ByteArrayOutputStream ; 26 import java.io.IOException ; 27 import java.io.ObjectInputStream ; 28 import java.io.ObjectOutputStream ; 29 import java.io.StreamCorruptedException ; 30 import java.sql.Connection ; 31 import java.sql.PreparedStatement ; 32 import java.sql.ResultSet ; 33 import java.sql.SQLException ; 34 import java.util.HashMap ; 35 import java.util.Iterator ; 36 import java.util.Map ; 37 import java.util.Properties ; 38 39 import javax.jms.JMSException ; 40 import javax.management.AttributeNotFoundException ; 41 import javax.management.InstanceNotFoundException ; 42 import javax.management.MBeanException ; 43 import javax.management.ObjectName ; 44 import javax.management.ReflectionException ; 45 import javax.naming.InitialContext ; 46 import javax.naming.NamingException ; 47 import javax.sql.DataSource ; 48 import javax.transaction.Status ; 49 import javax.transaction.Transaction ; 50 import javax.transaction.TransactionManager ; 51 import javax.transaction.xa.Xid ; 52 53 import org.jboss.mq.SpyDestination; 54 import org.jboss.mq.SpyJMSException; 55 import org.jboss.mq.SpyMessage; 56 import org.jboss.mq.SpyTopic; 57 import org.jboss.mq.pm.CacheStore; 58 import org.jboss.mq.pm.Tx; 59 import org.jboss.mq.pm.TxManager; 60 import org.jboss.mq.server.JMSDestination; 61 import org.jboss.mq.server.MessageCache; 62 import org.jboss.mq.server.MessageReference; 63 import org.jboss.system.ServiceMBeanSupport; 64 import org.jboss.tm.TransactionManagerService; 65 import org.jboss.tm.TransactionTimeoutConfiguration; 66 67 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 68 69 78 public class PersistenceManager extends ServiceMBeanSupport 79 implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager, CacheStore 80 { 81 82 88 89 protected SynchronizedLong nextTransactionId = new SynchronizedLong(0l); 90 91 92 protected TxManager txManager; 93 94 95 protected DataSource datasource; 96 97 98 protected TransactionManager tm; 99 100 101 private int recoveryTimeout = 0; 102 103 104 private int recoveryRetries = 0; 105 106 107 private int recoverMessagesChunk = 0; 108 109 110 private int statementRetries = 5; 111 112 118 protected String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?"; 119 protected String UPDATE_MARKED_MESSAGES_XARECOVERY = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID NOT IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID IS NOT NULL)"; 120 protected String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?"; 121 protected String DELETE_MARKED_MESSAGES_WITH_TX = 122 "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?"; 123 protected String DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY = 124 "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID = NULL) AND TXOP=?"; 125 protected String DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?"; 126 protected String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?"; 127 protected String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXOP = 'T'"; 128 protected String INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)"; 129 protected String INSERT_TX_XARECOVERY = "INSERT INTO JMS_TRANSACTIONS (TXID, XID) values(?, ?)"; 130 protected String DELETE_ALL_TX = "DELETE FROM JMS_TRANSACTIONS"; 131 protected String DELETE_ALL_TX_XARECOVERY = "DELETE FROM JMS_TRANSACTIONS WHERE XID = NULL"; 132 protected String SELECT_MAX_TX = "SELECT MAX(TXID) FROM (SELECT MAX(TXID) FROM JMS_TRANSACTIONS UNION SELECT MAX(TXID) FROM JMS_MESSAGES)"; 133 protected String SELECT_ALL_TX_XARECOVERY = "SELECT TXID, XID FROM JMS_TRANSACTIONS"; 134 protected String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?"; 135 protected String SELECT_MESSAGES_IN_DEST_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE DESTINATION=?"; 136 protected String SELECT_MESSAGE_KEYS_IN_DEST = "SELECT MESSAGEID FROM JMS_MESSAGES WHERE DESTINATION=?"; 137 protected String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?"; 138 protected String SELECT_MESSAGE_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?"; 139 protected String INSERT_MESSAGE = 140 "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)"; 141 protected String MARK_MESSAGE = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?"; 142 protected String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?"; 143 protected String UPDATE_MESSAGE = "UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?"; 144 protected String CREATE_MESSAGE_TABLE = 145 "CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, " 146 + "DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1)," 147 + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )"; 148 protected String CREATE_IDX_MESSAGE_TXOP_TXID = "CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)"; 149 protected String CREATE_IDX_MESSAGE_DESTINATION = "CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)"; 150 protected String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )"; 151 protected String CREATE_TX_TABLE_XARECOVERY = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, XID OBJECT, PRIMARY KEY (TXID) )"; 152 153 protected static final int OBJECT_BLOB = 0; 154 protected static final int BYTES_BLOB = 1; 155 protected static final int BINARYSTREAM_BLOB = 2; 156 protected static final int BLOB_BLOB = 3; 157 158 protected int blobType = OBJECT_BLOB; 159 protected boolean createTables; 160 161 protected int connectionRetryAttempts = 5; 162 163 protected boolean xaRecovery = false; 164 165 public PersistenceManager() throws javax.jms.JMSException 171 { 172 txManager = new TxManager(this); 173 } 174 175 179 protected class TransactionManagerStrategy 180 { 181 182 Transaction threadTx; 183 184 void startTX() throws JMSException 185 { 186 try 187 { 188 threadTx = tm.suspend(); 193 194 tm.begin(); 196 } 197 catch (Exception e) 198 { 199 try 200 { 201 if (threadTx != null) 202 tm.resume(threadTx); 203 } 204 catch (Exception ignore) 205 { 206 } 207 throw new SpyJMSException("Could not start a transaction with the transaction manager.", e); 208 } 209 } 210 211 void setRollbackOnly() throws JMSException 212 { 213 try 214 { 215 tm.setRollbackOnly(); 216 } 217 catch (Exception e) 218 { 219 throw new SpyJMSException("Could not start a mark the transaction for rollback .", e); 220 } 221 } 222 223 void endTX() throws JMSException 224 { 225 try 226 { 227 if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) 228 { 229 tm.rollback(); 230 } 231 else 232 { 233 tm.commit(); 234 } 235 } 236 catch (Exception e) 237 { 238 throw new SpyJMSException("Could not start a transaction with the transaction manager.", e); 239 } 240 finally 241 { 242 try 243 { 244 if (threadTx != null) 245 tm.resume(threadTx); 246 } 247 catch (Exception ignore) 248 { 249 } 250 } 251 } 252 } 253 254 260 synchronized protected void createSchema() throws JMSException 261 { 262 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 263 tms.startTX(); 264 Connection c = null; 265 PreparedStatement stmt = null; 266 boolean threadWasInterrupted = Thread.interrupted(); 267 try 268 { 269 if (createTables) 270 { 271 c = this.getConnection(); 272 273 boolean createdMessageTable = false; 274 try 275 { 276 stmt = c.prepareStatement(CREATE_MESSAGE_TABLE); 277 stmt.executeUpdate(); 278 createdMessageTable = true; 279 } 280 catch (SQLException e) 281 { 282 log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE, e); 283 } 284 finally 285 { 286 try 287 { 288 if (stmt != null) 289 stmt.close(); 290 } 291 catch (Throwable ignored) 292 { 293 log.trace("Ignored: " + ignored); 294 } 295 stmt = null; 296 } 297 298 if (createdMessageTable) 299 { 300 try 301 { 302 stmt = c.prepareStatement(CREATE_IDX_MESSAGE_TXOP_TXID); 303 stmt.executeUpdate(); 304 } 305 catch (SQLException e) 306 { 307 log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_TXOP_TXID, e); 308 } 309 finally 310 { 311 try 312 { 313 if (stmt != null) 314 stmt.close(); 315 } 316 catch (Throwable ignored) 317 { 318 log.trace("Ignored: " + ignored); 319 } 320 stmt = null; 321 } 322 try 323 { 324 stmt = c.prepareStatement(CREATE_IDX_MESSAGE_DESTINATION); 325 stmt.executeUpdate(); 326 } 327 catch (SQLException e) 328 { 329 log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_DESTINATION, e); 330 } 331 finally 332 { 333 try 334 { 335 if (stmt != null) 336 stmt.close(); 337 } 338 catch (Throwable ignored) 339 { 340 log.trace("Ignored: " + ignored); 341 } 342 stmt = null; 343 } 344 } 345 346 String createTxTable = CREATE_TX_TABLE; 347 if (xaRecovery) 348 createTxTable = CREATE_TX_TABLE_XARECOVERY; 349 try 350 { 351 stmt = c.prepareStatement(createTxTable); 352 stmt.executeUpdate(); 353 } 354 catch (SQLException e) 355 { 356 log.debug("Could not create table with SQL: " + createTxTable, e); 357 } 358 finally 359 { 360 try 361 { 362 if (stmt != null) 363 stmt.close(); 364 } 365 catch (Throwable ignored) 366 { 367 log.trace("Ignored: " + ignored); 368 } 369 stmt = null; 370 } 371 } 372 } 373 catch (SQLException e) 374 { 375 tms.setRollbackOnly(); 376 throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e); 377 } 378 finally 379 { 380 try 381 { 382 if (stmt != null) 383 stmt.close(); 384 } 385 catch (Throwable ignore) 386 { 387 } 388 stmt = null; 389 try 390 { 391 if (c != null) 392 c.close(); 393 } 394 catch (Throwable ignore) 395 { 396 } 397 c = null; 398 tms.endTX(); 399 400 if (threadWasInterrupted) 402 Thread.currentThread().interrupt(); 403 } 404 } 405 406 synchronized protected void resolveAllUncommitedTXs() throws JMSException 407 { 408 412 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 413 tms.startTX(); 414 Connection c = null; 415 PreparedStatement stmt = null; 416 ResultSet rs = null; 417 boolean threadWasInterrupted = Thread.interrupted(); 418 try 419 { 420 c = this.getConnection(); 421 422 stmt = c.prepareStatement(SELECT_MAX_TX); 424 rs = stmt.executeQuery(); 425 if (rs.next()) 426 nextTransactionId.set(rs.getLong(1) + 1); 427 rs.close(); 428 rs = null; 429 stmt.close(); 430 stmt = null; 431 432 stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES); 434 stmt.executeUpdate(); 435 stmt.close(); 436 stmt = null; 437 438 String deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX; 440 if (xaRecovery) 441 deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY; 442 stmt = c.prepareStatement(deleteMarkedMessagesWithTx); 443 stmt.setString(1, "A"); 444 stmt.executeUpdate(); 445 stmt.close(); 446 stmt = null; 447 448 String updateMarkedMessages = UPDATE_MARKED_MESSAGES; 450 if (xaRecovery) 451 updateMarkedMessages = UPDATE_MARKED_MESSAGES_XARECOVERY; 452 stmt = c.prepareStatement(updateMarkedMessages); 453 stmt.setNull(1, java.sql.Types.BIGINT); 454 stmt.setString(2, "A"); 455 stmt.setString(3, "D"); 456 stmt.executeUpdate(); 457 stmt.close(); 458 stmt = null; 459 460 String deleteAllTx = DELETE_ALL_TX; 462 if (xaRecovery) 463 deleteAllTx = DELETE_ALL_TX_XARECOVERY; 464 stmt = c.prepareStatement(deleteAllTx); 465 stmt.execute(); 466 stmt.close(); 467 stmt = null; 468 469 if (xaRecovery) 471 { 472 stmt = c.prepareStatement(SELECT_ALL_TX_XARECOVERY); 473 rs = stmt.executeQuery(); 474 while (rs.next()) 475 { 476 long txid = rs.getLong(1); 477 Xid xid = extractXid(rs, 2); 478 Tx tx = new Tx(txid); 479 tx.setXid(xid); 480 tx.checkPersisted(); 481 txManager.restoreTx(tx); 482 } 483 rs.close(); 484 rs = null; 485 stmt.close(); 486 stmt = null; 487 } 488 } 489 catch (Exception e) 490 { 491 tms.setRollbackOnly(); 492 throw new SpyJMSException("Could not resolve uncommited transactions. Message recovery may not be accurate", e); 493 } 494 finally 495 { 496 try 497 { 498 if (rs != null) 499 rs.close(); 500 } 501 catch (Throwable ignore) 502 { 503 } 504 try 505 { 506 if (stmt != null) 507 stmt.close(); 508 } 509 catch (Throwable ignore) 510 { 511 } 512 try 513 { 514 if (c != null) 515 c.close(); 516 } 517 catch (Throwable ignore) 518 { 519 } 520 tms.endTX(); 521 522 if (threadWasInterrupted) 524 Thread.currentThread().interrupt(); 525 } 526 } 527 528 534 synchronized public void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException 535 { 536 if (jmsDest == null) 537 throw new IllegalArgumentException ("Must supply non null JMSDestination to restoreQueue"); 538 if (dest == null) 539 throw new IllegalArgumentException ("Must supply non null SpyDestination to restoreQueue"); 540 541 boolean canOverrideTimeout = (tm instanceof TransactionTimeoutConfiguration); 542 int previousTimeout = 0; 543 try 544 { 545 if (recoveryTimeout != 0) 547 { 548 if (canOverrideTimeout) 549 { 550 previousTimeout = ((TransactionTimeoutConfiguration) tm).getTransactionTimeout(); 551 tm.setTransactionTimeout(recoveryTimeout); 552 } 553 else 554 { 555 log.debug("Cannot override recovery timeout, TransactionManager does implement " + TransactionTimeoutConfiguration.class.getName()); 556 } 557 } 558 559 try 561 { 562 internalRestoreQueue(jmsDest, dest); 563 } 564 finally 565 { 566 if (recoveryTimeout != 0 && canOverrideTimeout) 568 tm.setTransactionTimeout(previousTimeout); 569 } 570 } 571 catch (Exception e) 572 { 573 SpyJMSException.rethrowAsJMSException("Unexpected error in recovery", e); 574 } 575 } 576 577 synchronized protected void internalRestoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException 578 { 579 Map prepared = null; 581 if (xaRecovery) 582 { 583 prepared = new HashMap (); 584 Map map = txManager.getPreparedTransactions(); 585 for (Iterator i = map.values().iterator(); i.hasNext();) 586 { 587 TxManager.PreparedInfo info = (TxManager.PreparedInfo) i.next(); 588 for (Iterator j = info.getTxids().iterator(); j.hasNext();) 589 { 590 Tx tx = (Tx) j.next(); 591 prepared.put(new Long (tx.longValue()), tx); 592 } 593 } 594 } 595 596 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 597 tms.startTX(); 598 Connection c = null; 599 PreparedStatement stmt = null; 600 PreparedStatement stmt2 = null; 601 ResultSet rs = null; 602 boolean threadWasInterrupted = Thread.interrupted(); 603 try 604 { 605 String selectMessagesInDest = SELECT_MESSAGES_IN_DEST; 606 String selectMessage = SELECT_MESSAGE; 607 if (xaRecovery) 608 { 609 selectMessagesInDest = SELECT_MESSAGES_IN_DEST_XARECOVERY; 610 selectMessage = SELECT_MESSAGE_XARECOVERY; 611 } 612 c = this.getConnection(); 613 if (recoverMessagesChunk == 0) 614 stmt = c.prepareStatement(selectMessagesInDest); 615 else 616 { 617 stmt = c.prepareStatement(SELECT_MESSAGE_KEYS_IN_DEST); 618 stmt2 = c.prepareStatement(selectMessage); 619 } 620 stmt.setString(1, dest.toString()); 621 622 long txid = 0; 623 String txop = null; 624 rs = stmt.executeQuery(); 625 int counter = 0; 626 int recovery = 0; 627 while (rs.next()) 628 { 629 long msgid = rs.getLong(1); 630 SpyMessage message = null; 631 if (recoverMessagesChunk == 0) 632 { 633 message = extractMessage(rs); 634 if (xaRecovery) 635 { 636 txid = rs.getLong(3); 637 txop = rs.getString(4); 638 } 639 } 640 else 641 { 642 ResultSet rs2 = null; 643 try 644 { 645 stmt2.setLong(1, msgid); 646 stmt2.setString(2, dest.toString()); 647 rs2 = stmt2.executeQuery(); 648 if (rs2.next()) 649 { 650 message = extractMessage(rs2); 651 if (xaRecovery) 652 { 653 txid = rs.getLong(3); 654 txop = rs.getString(4); 655 } 656 } 657 else 658 log.warn("Failed to find message msgid=" + msgid +" dest=" + dest); 659 } 660 finally 661 { 662 if (rs2 != null) 663 { 664 try 665 { 666 rs2.close(); 667 } 668 catch (Exception ignored) 669 { 670 } 671 } 672 } 673 } 674 if (dest instanceof SpyTopic) 676 message.header.durableSubscriberID = ((SpyTopic) dest).getDurableSubscriptionID(); 677 678 if (xaRecovery == false || txid == 0 || txop == null) 679 jmsDest.restoreMessage(message); 680 else 681 { 682 Tx tx = (Tx) prepared.get(new Long (txid)); 683 if (tx == null) 684 jmsDest.restoreMessage(message); 685 else if ("A".equals(txop)) 686 { 687 jmsDest.restoreMessage(message, tx, Tx.ADD); 688 recovery++; 689 } 690 else if ("D".equals(txop)) 691 { 692 jmsDest.restoreMessage(message, tx, Tx.REMOVE); 693 recovery++; 694 } 695 else 696 throw new IllegalStateException ("Unknown txop=" + txop + " for msg=" + msgid + " dest=" + dest); 697 } 698 counter++; 699 } 700 701 log.debug("Restored " + counter + " message(s) to: " + dest + " " + recovery + " need recovery."); 702 } 703 catch (IOException e) 704 { 705 tms.setRollbackOnly(); 706 throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e); 707 } 708 catch (SQLException e) 709 { 710 tms.setRollbackOnly(); 711 throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e); 712 } 713 finally 714 { 715 try 716 { 717 if (rs != null) 718 rs.close(); 719 } 720 catch (Throwable ignore) 721 { 722 } 723 try 724 { 725 if (stmt != null) 726 stmt.close(); 727 } 728 catch (Throwable ignore) 729 { 730 } 731 try 732 { 733 if (c != null) 734 c.close(); 735 } 736 catch (Throwable ignore) 737 { 738 } 739 tms.endTX(); 740 741 if (threadWasInterrupted) 743 Thread.currentThread().interrupt(); 744 } 745 746 } 747 748 SpyMessage extractMessage(ResultSet rs) throws SQLException , IOException 749 { 750 try 751 { 752 long messageid = rs.getLong(1); 753 754 SpyMessage message = null; 755 756 if (blobType == OBJECT_BLOB) 757 { 758 759 message = (SpyMessage) rs.getObject(2); 760 761 } 762 else if (blobType == BYTES_BLOB) 763 { 764 765 byte[] st = rs.getBytes(2); 766 ByteArrayInputStream baip = new ByteArrayInputStream (st); 767 ObjectInputStream ois = new ObjectInputStream (baip); 768 message = SpyMessage.readMessage(ois); 769 770 } 771 else if (blobType == BINARYSTREAM_BLOB) 772 { 773 774 ObjectInputStream ois = new ObjectInputStream (rs.getBinaryStream(2)); 775 message = SpyMessage.readMessage(ois); 776 777 } 778 else if (blobType == BLOB_BLOB) 779 { 780 781 ObjectInputStream ois = new ObjectInputStream (rs.getBlob(2).getBinaryStream()); 782 message = SpyMessage.readMessage(ois); 783 } 784 785 message.header.messageId = messageid; 786 return message; 787 } 788 catch (StreamCorruptedException e) 789 { 790 throw new IOException ("Could not load the message: " + e); 791 } 792 } 793 794 Xid extractXid(ResultSet rs, int column) throws SQLException , IOException , ClassNotFoundException 795 { 796 try 797 { 798 Xid xid = null; 799 800 if (blobType == OBJECT_BLOB) 801 { 802 xid = (Xid ) rs.getObject(column); 803 } 804 else if (blobType == BYTES_BLOB) 805 { 806 byte[] st = rs.getBytes(column); 807 ByteArrayInputStream baip = new ByteArrayInputStream (st); 808 ObjectInputStream ois = new ObjectInputStream (baip); 809 xid = (Xid ) ois.readObject(); 810 } 811 else if (blobType == BINARYSTREAM_BLOB) 812 { 813 ObjectInputStream ois = new ObjectInputStream (rs.getBinaryStream(column)); 814 xid = (Xid ) ois.readObject(); 815 } 816 else if (blobType == BLOB_BLOB) 817 { 818 ObjectInputStream ois = new ObjectInputStream (rs.getBlob(column).getBinaryStream()); 819 xid = (Xid ) ois.readObject(); 820 } 821 822 return xid; 823 } 824 catch (StreamCorruptedException e) 825 { 826 throw new IOException ("Could not load the message: " + e); 827 } 828 } 829 830 public void commitPersistentTx(Tx txId) throws javax.jms.JMSException 836 { 837 if (txId.wasPersisted() == false) 838 return; 839 840 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 841 tms.startTX(); 842 Connection c = null; 843 boolean threadWasInterrupted = Thread.interrupted(); 844 try 845 { 846 847 c = this.getConnection(); 848 removeMarkedMessages(c, txId, "D"); 849 removeTXRecord(c, txId.longValue()); 850 851 } 852 catch (SQLException e) 853 { 854 tms.setRollbackOnly(); 855 throw new SpyJMSException("Could not commit tx: " + txId, e); 856 } 857 finally 858 { 859 try 860 { 861 if (c != null) 862 c.close(); 863 } 864 catch (Throwable ignore) 865 { 866 } 867 tms.endTX(); 868 869 if (threadWasInterrupted) 871 Thread.currentThread().interrupt(); 872 } 873 } 874 875 public void removeMarkedMessages(Connection c, Tx txid, String mark) throws SQLException 876 { 877 PreparedStatement stmt = null; 878 try 879 { 880 stmt = c.prepareStatement(DELETE_MARKED_MESSAGES); 881 stmt.setLong(1, txid.longValue()); 882 stmt.setString(2, mark); 883 stmt.executeUpdate(); 884 } 885 finally 886 { 887 try 888 { 889 if (stmt != null) 890 stmt.close(); 891 } 892 catch (Throwable e) 893 { 894 } 895 } 896 } 897 898 public void addTXRecord(Connection c, Tx txid) throws SQLException , IOException 899 { 900 PreparedStatement stmt = null; 901 try 902 { 903 String insertTx = INSERT_TX; 904 if (xaRecovery) 905 insertTx = INSERT_TX_XARECOVERY; 906 stmt = c.prepareStatement(insertTx); 907 stmt.setLong(1, txid.longValue()); 908 if (xaRecovery) 909 { 910 Xid xid = txid.getXid(); 911 if (xid != null) 912 setBlob(stmt, 2, xid); 913 else 914 stmt.setNull(2, java.sql.Types.BLOB); 915 } 916 stmt.executeUpdate(); 917 } 918 finally 919 { 920 try 921 { 922 if (stmt != null) 923 stmt.close(); 924 } 925 catch (Throwable e) 926 { 927 } 928 } 929 } 930 931 public void removeTXRecord(Connection c, long txid) throws SQLException 932 { 933 PreparedStatement stmt = null; 934 try 935 { 936 stmt = c.prepareStatement(DELETE_TX); 937 stmt.setLong(1, txid); 938 stmt.executeUpdate(); 939 } 940 finally 941 { 942 try 943 { 944 if (stmt != null) 945 stmt.close(); 946 } 947 catch (Throwable e) 948 { 949 } 950 } 951 } 952 953 public void rollbackPersistentTx(Tx txId) throws JMSException 959 { 960 if (txId.wasPersisted() == false) 961 return; 962 963 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 964 tms.startTX(); 965 Connection c = null; 966 PreparedStatement stmt = null; 967 boolean threadWasInterrupted = Thread.interrupted(); 968 try 969 { 970 971 c = this.getConnection(); 972 removeMarkedMessages(c, txId, "A"); 973 removeTXRecord(c, txId.longValue()); 974 975 stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX); 977 stmt.setNull(1, java.sql.Types.BIGINT); 978 stmt.setString(2, "A"); 979 stmt.setString(3, "D"); 980 stmt.setLong(4, txId.longValue()); 981 stmt.executeUpdate(); 982 stmt.close(); 983 stmt = null; 984 } 985 catch (SQLException e) 986 { 987 tms.setRollbackOnly(); 988 throw new SpyJMSException("Could not rollback tx: " + txId, e); 989 } 990 finally 991 { 992 try 993 { 994 if (stmt != null) 995 stmt.close(); 996 } 997 catch (Throwable ignore) 998 { 999 } 1000 try 1001 { 1002 if (c != null) 1003 c.close(); 1004 } 1005 catch (Throwable ignore) 1006 { 1007 } 1008 tms.endTX(); 1009 1010 if (threadWasInterrupted) 1012 Thread.currentThread().interrupt(); 1013 } 1014 1015 } 1016 1017 public Tx createPersistentTx() throws JMSException 1023 { 1024 Tx id = new Tx(nextTransactionId.increment()); 1025 return id; 1026 } 1027 1028 public void insertPersistentTx(TransactionManagerStrategy tms, Connection c, Tx tx) throws JMSException 1029 { 1030 try 1031 { 1032 if (tx != null && tx.checkPersisted() == false) 1033 addTXRecord(c, tx); 1034 } 1035 catch (Exception e) 1036 { 1037 tms.setRollbackOnly(); 1038 throw new SpyJMSException("Could not create tx: " + tx.longValue(), e); 1039 } 1040 } 1041 1042 public void add(MessageReference messageRef, Tx txId) throws javax.jms.JMSException 1048 { 1049 boolean trace = log.isTraceEnabled(); 1050 if (trace) 1051 log.trace("About to add message " + messageRef + " transaction=" + txId); 1052 1053 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 1054 tms.startTX(); 1055 Connection c = null; 1056 boolean threadWasInterrupted = Thread.interrupted(); 1057 try 1058 { 1059 c = this.getConnection(); 1060 1061 insertPersistentTx(tms, c, txId); 1063 1064 synchronized (messageRef) 1066 { 1067 SpyMessage message = messageRef.getMessage(); 1068 1069 if (messageRef.stored == MessageReference.STORED) 1071 { 1072 if (trace) 1073 log.trace("Updating message " + messageRef + " transaction=" + txId); 1074 1075 markMessage(c, messageRef.messageId, messageRef.getPersistentKey(), txId, "A"); 1076 } 1077 else 1078 { 1079 if (trace) 1080 log.trace("Inserting message " + messageRef + " transaction=" + txId); 1081 1082 add(c, messageRef.getPersistentKey(), message, txId, "A"); 1083 messageRef.setStored(MessageReference.STORED); 1084 } 1085 if (trace) 1086 log.trace("Added message " + messageRef + " transaction=" + txId); 1087 } 1088 } 1089 catch (IOException e) 1090 { 1091 tms.setRollbackOnly(); 1092 throw new SpyJMSException("Could not store message: " + messageRef, e); 1093 } 1094 catch (SQLException e) 1095 { 1096 tms.setRollbackOnly(); 1097 throw new SpyJMSException("Could not store message: " + messageRef, e); 1098 } 1099 finally 1100 { 1101 try 1102 { 1103 if (c != null) 1104 c.close(); 1105 } 1106 catch (Throwable ignore) 1107 { 1108 } 1109 tms.endTX(); 1110 1111 if (threadWasInterrupted) 1113 Thread.currentThread().interrupt(); 1114 } 1115 } 1116 1117 protected void add(Connection c, String queue, SpyMessage message, Tx txId, String mark) 1118 throws SQLException , IOException 1119 { 1120 PreparedStatement stmt = null; 1121 try 1122 { 1123 1124 stmt = c.prepareStatement(INSERT_MESSAGE); 1125 1126 stmt.setLong(1, message.header.messageId); 1127 stmt.setString(2, queue); 1128 setBlob(stmt, 3, message); 1129 1130 if (txId != null) 1131 stmt.setLong(4, txId.longValue()); 1132 else 1133 stmt.setNull(4, java.sql.Types.BIGINT); 1134 stmt.setString(5, mark); 1135 1136 stmt.executeUpdate(); 1137 } 1138 finally 1139 { 1140 try 1141 { 1142 if (stmt != null) 1143 stmt.close(); 1144 } 1145 catch (Throwable ignore) 1146 { 1147 } 1148 } 1149 } 1150 1151 public void markMessage(Connection c, long messageid, String destination, Tx txId, String mark) 1152 throws SQLException 1153 { 1154 PreparedStatement stmt = null; 1155 try 1156 { 1157 1158 stmt = c.prepareStatement(MARK_MESSAGE); 1159 if (txId == null) 1160 { 1161 stmt.setNull(1, java.sql.Types.BIGINT); 1162 } 1163 else 1164 { 1165 stmt.setLong(1, txId.longValue()); 1166 } 1167 stmt.setString(2, mark); 1168 stmt.setLong(3, messageid); 1169 stmt.setString(4, destination); 1170 stmt.executeUpdate(); 1171 } 1172 finally 1173 { 1174 try 1175 { 1176 if (stmt != null) 1177 stmt.close(); 1178 } 1179 catch (Throwable ignore) 1180 { 1181 } 1182 } 1183 1184 } 1185 1186 public void setBlob(PreparedStatement stmt, int column, SpyMessage message) throws IOException , SQLException 1187 { 1188 if (blobType == OBJECT_BLOB) 1189 { 1190 stmt.setObject(column, message); 1191 } 1192 else if (blobType == BYTES_BLOB) 1193 { 1194 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 1195 ObjectOutputStream oos = new ObjectOutputStream (baos); 1196 SpyMessage.writeMessage(message, oos); 1197 oos.flush(); 1198 byte[] messageAsBytes = baos.toByteArray(); 1199 stmt.setBytes(column, messageAsBytes); 1200 } 1201 else if (blobType == BINARYSTREAM_BLOB) 1202 { 1203 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 1204 ObjectOutputStream oos = new ObjectOutputStream (baos); 1205 SpyMessage.writeMessage(message, oos); 1206 oos.flush(); 1207 byte[] messageAsBytes = baos.toByteArray(); 1208 ByteArrayInputStream bais = new ByteArrayInputStream (messageAsBytes); 1209 stmt.setBinaryStream(column, bais, messageAsBytes.length); 1210 } 1211 else if (blobType == BLOB_BLOB) 1212 { 1213 1214 throw new RuntimeException ("BLOB_TYPE: BLOB_BLOB is not yet implemented."); 1215 1223 } 1224 } 1225 1226 public void setBlob(PreparedStatement stmt, int column, Xid xid) throws IOException , SQLException 1227 { 1228 if (blobType == OBJECT_BLOB) 1229 { 1230 stmt.setObject(column, xid); 1231 } 1232 else if (blobType == BYTES_BLOB) 1233 { 1234 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 1235 ObjectOutputStream oos = new ObjectOutputStream (baos); 1236 oos.writeObject(xid); 1237 oos.flush(); 1238 byte[] messageAsBytes = baos.toByteArray(); 1239 stmt.setBytes(column, messageAsBytes); 1240 } 1241 else if (blobType == BINARYSTREAM_BLOB) 1242 { 1243 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 1244 ObjectOutputStream oos = new ObjectOutputStream (baos); 1245 oos.writeObject(xid); 1246 oos.flush(); 1247 byte[] messageAsBytes = baos.toByteArray(); 1248 ByteArrayInputStream bais = new ByteArrayInputStream (messageAsBytes); 1249 stmt.setBinaryStream(column, bais, messageAsBytes.length); 1250 } 1251 else if (blobType == BLOB_BLOB) 1252 { 1253 1254 throw new RuntimeException ("BLOB_TYPE: BLOB_BLOB is not yet implemented."); 1255 1263 } 1264 } 1265 1266 public void update(MessageReference messageRef, Tx txId) throws javax.jms.JMSException 1272 { 1273 boolean trace = log.isTraceEnabled(); 1274 if (trace) 1275 log.trace("Updating message " + messageRef + " transaction=" + txId); 1276 1277 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 1278 tms.startTX(); 1279 Connection c = null; 1280 PreparedStatement stmt = null; 1281 boolean threadWasInterrupted = Thread.interrupted(); 1282 try 1283 { 1284 1285 c = this.getConnection(); 1286 if (txId == null) 1287 { 1288 1289 stmt = c.prepareStatement(UPDATE_MESSAGE); 1290 setBlob(stmt, 1, messageRef.getMessage()); 1291 stmt.setLong(2, messageRef.messageId); 1292 stmt.setString(3, messageRef.getPersistentKey()); 1293 int rc = stmt.executeUpdate(); 1294 if (rc != 1) 1295 throw new SpyJMSException( 1296 "Could not update the message in the database: update affected " + rc + " rows"); 1297 } 1298 else 1299 { 1300 throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used"); 1301 } 1302 if (trace) 1303 log.trace("Updated message " + messageRef + " transaction=" + txId); 1304 1305 } 1306 catch (IOException e) 1307 { 1308 tms.setRollbackOnly(); 1309 throw new SpyJMSException("Could not update message: " + messageRef, e); 1310 } 1311 catch (SQLException e) 1312 { 1313 tms.setRollbackOnly(); 1314 throw new SpyJMSException("Could not update message: " + messageRef, e); 1315 } 1316 finally 1317 { 1318 try 1319 { 1320 if (stmt != null) 1321 stmt.close(); 1322 } 1323 catch (Throwable ignore) 1324 { 1325 } 1326 try 1327 { 1328 if (c != null) 1329 c.close(); 1330 } 1331 catch (Throwable ignore) 1332 { 1333 } 1334 tms.endTX(); 1335 1336 if (threadWasInterrupted) 1338 Thread.currentThread().interrupt(); 1339 } 1340 1341 } 1342 1343 public void remove(MessageReference messageRef, Tx txId) throws javax.jms.JMSException 1349 { 1350 boolean trace = log.isTraceEnabled(); 1351 if (trace) 1352 log.trace("Removing message " + messageRef + " transaction=" + txId); 1353 1354 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 1355 tms.startTX(); 1356 Connection c = null; 1357 PreparedStatement stmt = null; 1358 boolean threadWasInterrupted = Thread.interrupted(); 1359 try 1360 { 1361 c = this.getConnection(); 1362 1363 insertPersistentTx(tms, c, txId); 1365 1366 synchronized (messageRef) 1368 { 1369 if (txId == null) 1370 { 1371 stmt = c.prepareStatement(DELETE_MESSAGE); 1372 stmt.setLong(1, messageRef.messageId); 1373 stmt.setString(2, messageRef.getPersistentKey()); 1374 1375 messageRef.setStored(MessageReference.NOT_STORED); 1385 messageRef.removeDelayed(); 1386 } 1387 else 1388 { 1389 stmt = c.prepareStatement(MARK_MESSAGE); 1390 stmt.setLong(1, txId.longValue()); 1391 stmt.setString(2, "D"); 1392 stmt.setLong(3, messageRef.messageId); 1393 stmt.setString(4, messageRef.getPersistentKey()); 1394 } 1395 1396 int tries = 0; 1397 while (true) 1398 { 1399 try 1400 { 1401 int rc = stmt.executeUpdate(); 1402 1403 if (tries > 0) 1404 { 1405 if (rc != 1) 1406 throw new SpyJMSException( 1407 "Could not mark the message as deleted in the database: update affected " + rc + " rows"); 1408 1409 log.warn("Remove operation worked after " +tries +" retries"); 1410 } 1411 break; 1412 } 1413 catch (SQLException e) 1414 { 1415 log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e); 1416 tries++; 1417 if (tries >= statementRetries) 1418 { 1419 log.error("Retried " + tries + " times, now giving up"); 1420 throw new IllegalStateException ("Could not remove message after " +tries + "attempts"); 1421 } 1422 log.warn("Trying again after a pause"); 1423 Thread.sleep((long)(Math.random() * 500)); 1425 } 1426 } 1427 1428 if (trace) 1429 log.trace("Removed message " + messageRef + " transaction=" + txId); 1430 } 1431 } 1432 catch (Exception e) 1433 { 1434 tms.setRollbackOnly(); 1435 throw new SpyJMSException("Could not remove message: " + messageRef, e); 1436 } 1437 finally 1438 { 1439 try 1440 { 1441 if (stmt != null) 1442 stmt.close(); 1443 } 1444 catch (Throwable ignore) 1445 { 1446 } 1447 try 1448 { 1449 if (c != null) 1450 c.close(); 1451 } 1452 catch (Throwable ignore) 1453 { 1454 } 1455 tms.endTX(); 1456 1457 if (threadWasInterrupted) 1459 Thread.currentThread().interrupt(); 1460 } 1461 1462 } 1463 1464 1470 public TxManager getTxManager() 1471 { 1472 return txManager; 1473 } 1474 1475 public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException 1476 { 1477 } 1479 1480 public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException 1481 { 1482 if (log.isTraceEnabled()) 1483 log.trace("Loading message from storage " + messageRef); 1484 1485 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 1486 tms.startTX(); 1487 Connection c = null; 1488 PreparedStatement stmt = null; 1489 ResultSet rs = null; 1490 boolean threadWasInterrupted = Thread.interrupted(); 1491 try 1492 { 1493 1494 c = this.getConnection(); 1495 stmt = c.prepareStatement(SELECT_MESSAGE); 1496 stmt.setLong(1, messageRef.messageId); 1497 stmt.setString(2, messageRef.getPersistentKey()); 1498 1499 rs = stmt.executeQuery(); 1500 if (rs.next()) 1501 return extractMessage(rs); 1502 1503 return null; 1504 1505 } 1506 catch (IOException e) 1507 { 1508 tms.setRollbackOnly(); 1509 throw new SpyJMSException("Could not load message : " + messageRef, e); 1510 } 1511 catch (SQLException e) 1512 { 1513 tms.setRollbackOnly(); 1514 throw new SpyJMSException("Could not load message : " + messageRef, e); 1515 } 1516 finally 1517 { 1518 try 1519 { 1520 if (rs != null) 1521 rs.close(); 1522 } 1523 catch (Throwable ignore) 1524 { 1525 } 1526 try 1527 { 1528 if (stmt != null) 1529 stmt.close(); 1530 } 1531 catch (Throwable ignore) 1532 { 1533 } 1534 try 1535 { 1536 if (c != null) 1537 c.close(); 1538 } 1539 catch (Throwable ignore) 1540 { 1541 } 1542 tms.endTX(); 1543 1544 if (threadWasInterrupted) 1546 Thread.currentThread().interrupt(); 1547 } 1548 } 1549 1550 public void removeFromStorage(MessageReference messageRef) throws JMSException 1556 { 1557 if (messageRef.isPersistent()) 1559 return; 1560 1561 boolean trace = log.isTraceEnabled(); 1562 if (trace) 1563 log.trace("Removing message from storage " + messageRef); 1564 1565 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 1566 tms.startTX(); 1567 Connection c = null; 1568 PreparedStatement stmt = null; 1569 boolean threadWasInterrupted = Thread.interrupted(); 1570 try 1571 { 1572 c = this.getConnection(); 1573 stmt = c.prepareStatement(DELETE_MESSAGE); 1574 stmt.setLong(1, messageRef.messageId); 1575 stmt.setString(2, messageRef.getPersistentKey()); 1576 stmt.executeUpdate(); 1577 messageRef.setStored(MessageReference.NOT_STORED); 1578 1579 if (trace) 1580 log.trace("Removed message from storage " + messageRef); 1581 } 1582 catch (SQLException e) 1583 { 1584 tms.setRollbackOnly(); 1585 throw new SpyJMSException("Could not remove message: " + messageRef, e); 1586 } 1587 finally 1588 { 1589 try 1590 { 1591 if (stmt != null) 1592 stmt.close(); 1593 } 1594 catch (Throwable ignore) 1595 { 1596 } 1597 try 1598 { 1599 if (c != null) 1600 c.close(); 1601 } 1602 catch (Throwable ignore) 1603 { 1604 } 1605 tms.endTX(); 1606 1607 if (threadWasInterrupted) 1609 Thread.currentThread().interrupt(); 1610 } 1611 } 1612 1613 public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException 1614 { 1615 if (messageRef.isPersistent()) 1618 return; 1619 1620 boolean trace = log.isTraceEnabled(); 1621 if (trace) 1622 log.trace("Saving message to storage " + messageRef); 1623 1624 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 1625 tms.startTX(); 1626 Connection c = null; 1627 boolean threadWasInterrupted = Thread.interrupted(); 1628 try 1629 { 1630 1631 c = this.getConnection(); 1632 add(c, messageRef.getPersistentKey(), message, null, "T"); 1633 messageRef.setStored(MessageReference.STORED); 1634 1635 if (trace) 1636 log.trace("Saved message to storage " + messageRef); 1637 } 1638 catch (IOException e) 1639 { 1640 tms.setRollbackOnly(); 1641 throw new SpyJMSException("Could not store message: " + messageRef, e); 1642 } 1643 catch (SQLException e) 1644 { 1645 tms.setRollbackOnly(); 1646 throw new SpyJMSException("Could not store message: " + messageRef, e); 1647 } 1648 finally 1649 { 1650 try 1651 { 1652 if (c != null) 1653 c.close(); 1654 } 1655 catch (Throwable ignore) 1656 { 1657 } 1658 tms.endTX(); 1659 1660 if (threadWasInterrupted) 1662 Thread.currentThread().interrupt(); 1663 } 1664 } 1665 1666 1675 protected Connection getConnection() throws SQLException 1676 { 1677 int attempts = this.connectionRetryAttempts; 1678 int attemptCount = 0; 1679 SQLException sqlException = null; 1680 while (attempts-- > 0) 1681 { 1682 if (++attemptCount > 1) 1683 { 1684 log.debug("Retrying connection: attempt # " + attemptCount); 1685 } 1686 try 1687 { 1688 sqlException = null; 1689 return datasource.getConnection(); 1690 } 1691 catch (SQLException exception) 1692 { 1693 log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception); 1694 sqlException = exception; 1695 } 1696 finally 1697 { 1698 if (sqlException == null && attemptCount > 1) 1699 { 1700 log.debug("Connection succeeded on attempt # " + attemptCount); 1701 } 1702 } 1703 1704 if (attempts > 0) 1705 { 1706 try 1707 { 1708 Thread.sleep(1500); 1709 } 1710 catch (InterruptedException interruptedException) 1711 { 1712 break; 1713 } 1714 } 1715 } 1716 if (sqlException != null) 1717 { 1718 throw sqlException; 1719 } 1720 throw new SQLException ("connection attempt interrupted"); 1721 } 1722 1723 1729 1730 protected ObjectName connectionManagerName; 1731 1732 1733 protected Properties sqlProperties = new Properties (); 1734 1735 public void startService() throws Exception 1736 { 1737 UPDATE_MARKED_MESSAGES = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES); 1738 UPDATE_MARKED_MESSAGES_XARECOVERY = 1739 sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_XARECOVERY", UPDATE_MARKED_MESSAGES_XARECOVERY); 1740 UPDATE_MARKED_MESSAGES_WITH_TX = 1741 sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", UPDATE_MARKED_MESSAGES_WITH_TX); 1742 DELETE_MARKED_MESSAGES_WITH_TX = 1743 sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", DELETE_MARKED_MESSAGES_WITH_TX); 1744 DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY = 1745 sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY", DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY); 1746 DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX); 1747 DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES); 1748 DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES); 1749 INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX); 1750 INSERT_TX_XARECOVERY = sqlProperties.getProperty("INSERT_TX_XARECOVERY", INSERT_TX_XARECOVERY); 1751 DELETE_ALL_TX = sqlProperties.getProperty("DELETE_ALL_TX", DELETE_ALL_TX); 1752 DELETE_ALL_TX_XARECOVERY = sqlProperties.getProperty("DELETE_ALL_TX_XARECOVERY", DELETE_ALL_TX_XARECOVERY); 1753 SELECT_ALL_TX_XARECOVERY = sqlProperties.getProperty("SELECT_ALL_TX_XARECOVERY", SELECT_ALL_TX_XARECOVERY); 1754 SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX); 1755 SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST); 1756 SELECT_MESSAGES_IN_DEST_XARECOVERY = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST_XARECOVERY", SELECT_MESSAGES_IN_DEST_XARECOVERY); 1757 SELECT_MESSAGE_KEYS_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGE_KEYS_IN_DEST", SELECT_MESSAGE_KEYS_IN_DEST); 1758 SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE); 1759 SELECT_MESSAGE_XARECOVERY = sqlProperties.getProperty("SELECT_MESSAGE_XARECOVERY", SELECT_MESSAGE_XARECOVERY); 1760 INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE); 1761 MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE); 1762 DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE); 1763 UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE", UPDATE_MESSAGE); 1764 CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE); 1765 CREATE_IDX_MESSAGE_TXOP_TXID = sqlProperties.getProperty("CREATE_IDX_MESSAGE_TXOP_TXID", CREATE_IDX_MESSAGE_TXOP_TXID); 1766 CREATE_IDX_MESSAGE_DESTINATION = sqlProperties.getProperty("CREATE_IDX_MESSAGE_DESTINATION", CREATE_IDX_MESSAGE_DESTINATION); 1767 CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE); 1768 CREATE_TX_TABLE_XARECOVERY = sqlProperties.getProperty("CREATE_TX_TABLE_XARECOVERY", CREATE_TX_TABLE_XARECOVERY); 1769 createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true"); 1770 String s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB"); 1771 1772 if (s.equals("OBJECT_BLOB")) 1773 { 1774 blobType = OBJECT_BLOB; 1775 } 1776 else if (s.equals("BYTES_BLOB")) 1777 { 1778 blobType = BYTES_BLOB; 1779 } 1780 else if (s.equals("BINARYSTREAM_BLOB")) 1781 { 1782 blobType = BINARYSTREAM_BLOB; 1783 } 1784 else if (s.equals("BLOB_BLOB")) 1785 { 1786 blobType = BLOB_BLOB; 1787 } 1788 1789 1790 initializeFields(); 1792 1793 log.debug("Creating Schema"); 1794 try 1795 { 1796 createSchema(); 1797 } 1798 catch (Exception e) 1799 { 1800 log.warn("Error creating schema", e); 1801 } 1802 1803 log.debug("Resolving uncommited TXS"); 1804 Throwable error = null; 1805 for (int i = 0; i <= recoveryRetries; ++i) 1806 { 1807 try 1808 { 1809 resolveAllUncommitedTXs(); 1810 1811 break; 1813 } 1814 catch (Throwable t) 1815 { 1816 if (i < recoveryRetries) 1817 log.warn("Error resolving transactions retries=" + i + " of " + recoveryRetries, t); 1818 else 1819 error = t; 1820 } 1821 } 1822 1823 if (error != null) 1824 SpyJMSException.rethrowAsJMSException("Unable to resolve transactions retries=" + recoveryRetries, error); 1825 } 1826 1827 protected void initializeFields() 1828 throws MBeanException , AttributeNotFoundException , InstanceNotFoundException , ReflectionException , NamingException 1829 { 1830 String dsName = (String ) getServer().getAttribute(connectionManagerName, "BindName"); 1832 1834 InitialContext ctx = new InitialContext (); 1835 datasource = (DataSource ) ctx.lookup(dsName); 1836 1837 tm = (TransactionManager ) ctx.lookup(TransactionManagerService.JNDI_NAME); 1839 } 1840 1841 public Object getInstance() 1842 { 1843 return this; 1844 } 1845 1846 public ObjectName getMessageCache() 1847 { 1848 throw new UnsupportedOperationException ("This is now set on the destination manager"); 1849 } 1850 1851 public void setMessageCache(ObjectName messageCache) 1852 { 1853 throw new UnsupportedOperationException ("This is now set on the destination manager"); 1854 } 1855 1856 public ObjectName getConnectionManager() 1857 { 1858 return connectionManagerName; 1859 } 1860 1861 public void setConnectionManager(ObjectName connectionManagerName) 1862 { 1863 this.connectionManagerName = connectionManagerName; 1864 } 1865 1866 public MessageCache getMessageCacheInstance() 1867 { 1868 throw new UnsupportedOperationException ("This is now set on the destination manager"); 1869 } 1870 1871 public String getSqlProperties() 1872 { 1873 try 1874 { 1875 ByteArrayOutputStream boa = new ByteArrayOutputStream (); 1876 sqlProperties.store(boa, ""); 1877 return new String (boa.toByteArray()); 1878 } 1879 catch (IOException shouldnothappen) 1880 { 1881 return ""; 1882 } 1883 } 1884 1885 public void setSqlProperties(String value) 1886 { 1887 try 1888 { 1889 ByteArrayInputStream is = new ByteArrayInputStream (value.getBytes()); 1890 sqlProperties = new Properties (); 1891 sqlProperties.load(is); 1892 } 1893 catch (IOException shouldnothappen) 1894 { 1895 } 1896 } 1897 1898 public void setConnectionRetryAttempts(int value) 1899 { 1900 this.connectionRetryAttempts = value; 1901 } 1902 1903 public int getConnectionRetryAttempts() 1904 { 1905 return this.connectionRetryAttempts; 1906 } 1907 1908 public int getRecoveryTimeout() 1909 { 1910 return recoveryTimeout; 1911 } 1912 1913 public void setRecoveryTimeout(int timeout) 1914 { 1915 this.recoveryTimeout = timeout; 1916 } 1917 1918 public int getRecoveryRetries() 1919 { 1920 return recoveryRetries; 1921 } 1922 1923 public void setRecoveryRetries(int retries) 1924 { 1925 this.recoveryRetries = retries; 1926 } 1927 1928 public int getRecoverMessagesChunk() 1929 { 1930 return recoverMessagesChunk; 1931 } 1932 1933 public void setRecoverMessagesChunk(int recoverMessagesChunk) 1934 { 1935 if (recoverMessagesChunk != 0 && recoverMessagesChunk != 1) 1936 { 1937 log.warn("Only the values 0 and 1 are currently support for chunk size, using chunk size=1"); 1938 recoverMessagesChunk = 1; 1939 } 1940 this.recoverMessagesChunk = recoverMessagesChunk; 1941 } 1942 1943 public boolean isXARecovery() 1944 { 1945 return xaRecovery; 1946 } 1947 1948 public void setXARecovery(boolean xaRecovery) 1949 { 1950 this.xaRecovery = xaRecovery; 1951 } 1952 1953 public int getStatementRetries() 1954 { 1955 return statementRetries; 1956 } 1957 1958 public void setStatementRetries(int statementRetries) 1959 { 1960 if (statementRetries < 0) 1961 statementRetries = 0; 1962 this.statementRetries = statementRetries; 1963 } 1964} 1965 | Popular Tags |