| 1 22 package org.jboss.mq.pm.jdbc2; 23 24 import java.io.ByteArrayInputStream ; 25 import java.io.ByteArrayOutputStream ; 26 import java.io.IOException ; 27 import java.io.ObjectInputStream ; 28 import java.io.ObjectOutputStream ; 29 import java.io.StreamCorruptedException ; 30 import java.sql.Connection ; 31 import java.sql.PreparedStatement ; 32 import java.sql.ResultSet ; 33 import java.sql.SQLException ; 34 import java.util.HashMap ; 35 import java.util.Iterator ; 36 import java.util.Map ; 37 import java.util.Properties ; 38 39 import javax.jms.JMSException ; 40 import javax.management.AttributeNotFoundException ; 41 import javax.management.InstanceNotFoundException ; 42 import javax.management.MBeanException ; 43 import javax.management.ObjectName ; 44 import javax.management.ReflectionException ; 45 import javax.naming.InitialContext ; 46 import javax.naming.NamingException ; 47 import javax.sql.DataSource ; 48 import javax.transaction.Status ; 49 import javax.transaction.Transaction ; 50 import javax.transaction.TransactionManager ; 51 import javax.transaction.xa.Xid ; 52 53 import org.jboss.mq.SpyDestination; 54 import org.jboss.mq.SpyJMSException; 55 import org.jboss.mq.SpyMessage; 56 import org.jboss.mq.SpyTopic; 57 import org.jboss.mq.pm.CacheStore; 58 import org.jboss.mq.pm.Tx; 59 import org.jboss.mq.pm.TxManager; 60 import org.jboss.mq.server.JMSDestination; 61 import org.jboss.mq.server.MessageCache; 62 import org.jboss.mq.server.MessageReference; 63 import org.jboss.system.ServiceMBeanSupport; 64 import org.jboss.tm.TransactionManagerService; 65 import org.jboss.tm.TransactionTimeoutConfiguration; 66 67 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 68 69 78 public class PersistenceManager extends ServiceMBeanSupport 79 implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager, CacheStore 80 { 81 82 88 89 protected SynchronizedLong nextTransactionId = new SynchronizedLong(0l); 90 91 92 protected TxManager txManager; 93 94 95 protected DataSource datasource; 96 97 98 protected TransactionManager tm; 99 100 101 private int recoveryTimeout = 0; 102 103 104 private int recoveryRetries = 0; 105 106 107 private int recoverMessagesChunk = 0; 108 109 110 private int statementRetries = 5; 111 112 118 protected String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?"; 119 protected String UPDATE_MARKED_MESSAGES_XARECOVERY = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID NOT IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID IS NOT NULL)"; 120 protected String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?"; 121 protected String DELETE_MARKED_MESSAGES_WITH_TX = 122 "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?"; 123 protected String DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY = 124 "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID = NULL) AND TXOP=?"; 125 protected String DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?"; 126 protected String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?"; 127 protected String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXOP = 'T'"; 128 protected String INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)"; 129 protected String INSERT_TX_XARECOVERY = "INSERT INTO JMS_TRANSACTIONS (TXID, XID) values(?, ?)"; 130 protected String DELETE_ALL_TX = "DELETE FROM JMS_TRANSACTIONS"; 131 protected String DELETE_ALL_TX_XARECOVERY = "DELETE FROM JMS_TRANSACTIONS WHERE XID = NULL"; 132 protected String SELECT_MAX_TX = "SELECT MAX(TXID) FROM (SELECT MAX(TXID) FROM JMS_TRANSACTIONS UNION SELECT MAX(TXID) FROM JMS_MESSAGES)"; 133 protected String SELECT_ALL_TX_XARECOVERY = "SELECT TXID, XID FROM JMS_TRANSACTIONS"; 134 protected String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?"; 135 protected String SELECT_MESSAGES_IN_DEST_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE DESTINATION=?"; 136 protected String SELECT_MESSAGE_KEYS_IN_DEST = "SELECT MESSAGEID FROM JMS_MESSAGES WHERE DESTINATION=?"; 137 protected String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?"; 138 protected String SELECT_MESSAGE_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?"; 139 protected String INSERT_MESSAGE = 140 "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)"; 141 protected String MARK_MESSAGE = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?"; 142 protected String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?"; 143 protected String UPDATE_MESSAGE = "UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?"; 144 protected String CREATE_MESSAGE_TABLE = 145 "CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, " 146 + "DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1)," 147 + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )"; 148 protected String CREATE_IDX_MESSAGE_TXOP_TXID = "CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)"; 149 protected String CREATE_IDX_MESSAGE_DESTINATION = "CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)"; 150 protected String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )"; 151 protected String CREATE_TX_TABLE_XARECOVERY = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, XID OBJECT, PRIMARY KEY (TXID) )"; 152 153 protected static final int OBJECT_BLOB = 0; 154 protected static final int BYTES_BLOB = 1; 155 protected static final int BINARYSTREAM_BLOB = 2; 156 protected static final int BLOB_BLOB = 3; 157 158 protected int blobType = OBJECT_BLOB; 159 protected boolean createTables; 160 161 protected int connectionRetryAttempts = 5; 162 163 protected boolean xaRecovery = false; 164 165 public PersistenceManager() throws javax.jms.JMSException  171 { 172 txManager = new TxManager(this); 173 } 174 175 179 protected class TransactionManagerStrategy 180 { 181 182 Transaction threadTx; 183 184 void startTX() throws JMSException  185 { 186 try 187 { 188 threadTx = tm.suspend(); 193 194 tm.begin(); 196 } 197 catch (Exception e) 198 { 199 try 200 { 201 if (threadTx != null) 202 tm.resume(threadTx); 203 } 204 catch (Exception ignore) 205 { 206 } 207 throw new SpyJMSException("Could not start a transaction with the transaction manager.", e); 208 } 209 } 210 211 void setRollbackOnly() throws JMSException  212 { 213 try 214 { 215 tm.setRollbackOnly(); 216 } 217 catch (Exception e) 218 { 219 throw new SpyJMSException("Could not start a mark the transaction for rollback .", e); 220 } 221 } 222 223 void endTX() throws JMSException  224 { 225 try 226 { 227 if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) 228 { 229 tm.rollback(); 230 } 231 else 232 { 233 tm.commit(); 234 } 235 } 236 catch (Exception e) 237 { 238 throw new SpyJMSException("Could not start a transaction with the transaction manager.", e); 239 } 240 finally 241 { 242 try 243 { 244 if (threadTx != null) 245 tm.resume(threadTx); 246 } 247 catch (Exception ignore) 248 { 249 } 250 } 251 } 252 } 253 254 260 synchronized protected void createSchema() throws JMSException  261 { 262 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 263 tms.startTX(); 264 Connection c = null; 265 PreparedStatement stmt = null; 266 boolean threadWasInterrupted = Thread.interrupted(); 267 try 268 { 269 if (createTables) 270 { 271 c = this.getConnection(); 272 273 boolean createdMessageTable = false; 274 try 275 { 276 stmt = c.prepareStatement(CREATE_MESSAGE_TABLE); 277 stmt.executeUpdate(); 278 createdMessageTable = true; 279 } 280 catch (SQLException e) 281 { 282 log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE, e); 283 } 284 finally 285 { 286 try 287 { 288 if (stmt != null) 289 stmt.close(); 290 } 291 catch (Throwable ignored) 292 { 293 log.trace("Ignored: " + ignored); 294 } 295 stmt = null; 296 } 297 298 if (createdMessageTable) 299 { 300 try 301 { 302 stmt = c.prepareStatement(CREATE_IDX_MESSAGE_TXOP_TXID); 303 stmt.executeUpdate(); 304 } 305 catch (SQLException e) 306 { 307 log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_TXOP_TXID, e); 308 } 309 finally 310 { 311 try 312 { 313 if (stmt != null) 314 stmt.close(); 315 } 316 catch (Throwable ignored) 317 { 318 log.trace("Ignored: " + ignored); 319 } 320 stmt = null; 321 } 322 try 323 { 324 stmt = c.prepareStatement(CREATE_IDX_MESSAGE_DESTINATION); 325 stmt.executeUpdate(); 326 } 327 catch (SQLException e) 328 { 329 log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_DESTINATION, e); 330 } 331 finally 332 { 333 try 334 { 335 if (stmt != null) 336 stmt.close(); 337 } 338 catch (Throwable ignored) 339 { 340 log.trace("Ignored: " + ignored); 341 } 342 stmt = null; 343 } 344 } 345 346 String createTxTable = CREATE_TX_TABLE; 347 if (xaRecovery) 348 createTxTable = CREATE_TX_TABLE_XARECOVERY; 349 try 350 { 351 stmt = c.prepareStatement(createTxTable); 352 stmt.executeUpdate(); 353 } 354 catch (SQLException e) 355 { 356 log.debug("Could not create table with SQL: " + createTxTable, e); 357 } 358 finally 359 { 360 try 361 { 362 if (stmt != null) 363 stmt.close(); 364 } 365 catch (Throwable ignored) 366 { 367 log.trace("Ignored: " + ignored); 368 } 369 stmt = null; 370 } 371 } 372 } 373 catch (SQLException e) 374 { 375 tms.setRollbackOnly(); 376 throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e); 377 } 378 finally 379 { 380 try 381 { 382 if (stmt != null) 383 stmt.close(); 384 } 385 catch (Throwable ignore) 386 { 387 } 388 stmt = null; 389 try 390 { 391 if (c != null) 392 c.close(); 393 } 394 catch (Throwable ignore) 395 { 396 } 397 c = null; 398 tms.endTX(); 399 400 if (threadWasInterrupted) 402 Thread.currentThread().interrupt(); 403 } 404 } 405 406 synchronized protected void resolveAllUncommitedTXs() throws JMSException  407 { 408 412 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 413 tms.startTX(); 414 Connection c = null; 415 PreparedStatement stmt = null; 416 ResultSet rs = null; 417 boolean threadWasInterrupted = Thread.interrupted(); 418 try 419 { 420 c = this.getConnection(); 421 422 stmt = c.prepareStatement(SELECT_MAX_TX); 424 rs = stmt.executeQuery(); 425 if (rs.next()) 426 nextTransactionId.set(rs.getLong(1) + 1); 427 rs.close(); 428 rs = null; 429 stmt.close(); 430 stmt = null; 431 432 stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES); 434 stmt.executeUpdate(); 435 stmt.close(); 436 stmt = null; 437 438 String deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX; 440 if (xaRecovery) 441 deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY; 442 stmt = c.prepareStatement(deleteMarkedMessagesWithTx); 443 stmt.setString(1, "A"); 444 stmt.executeUpdate(); 445 stmt.close(); 446 stmt = null; 447 448 String updateMarkedMessages = UPDATE_MARKED_MESSAGES; 450 if (xaRecovery) 451 updateMarkedMessages = UPDATE_MARKED_MESSAGES_XARECOVERY; 452 stmt = c.prepareStatement(updateMarkedMessages); 453 stmt.setNull(1, java.sql.Types.BIGINT); 454 stmt.setString(2, "A"); 455 stmt.setString(3, "D"); 456 stmt.executeUpdate(); 457 stmt.close(); 458 stmt = null; 459 460 String deleteAllTx = DELETE_ALL_TX; 462 if (xaRecovery) 463 deleteAllTx = DELETE_ALL_TX_XARECOVERY; 464 stmt = c.prepareStatement(deleteAllTx); 465 stmt.execute(); 466 stmt.close(); 467 stmt = null; 468 469 if (xaRecovery) 471 { 472 stmt = c.prepareStatement(SELECT_ALL_TX_XARECOVERY); 473 rs = stmt.executeQuery(); 474 while (rs.next()) 475 { 476 long txid = rs.getLong(1); 477 Xid xid = extractXid(rs, 2); 478 Tx tx = new Tx(txid); 479 tx.setXid(xid); 480 tx.checkPersisted(); 481 txManager.restoreTx(tx); 482 } 483 rs.close(); 484 rs = null; 485 stmt.close(); 486 stmt = null; 487 } 488 } 489 catch (Exception e) 490 { 491 tms.setRollbackOnly(); 492 throw new SpyJMSException("Could not resolve uncommited transactions. Message recovery may not be accurate", e); 493 } 494 finally 495 { 496 try 497 { 498 if (rs != null) 499 rs.close(); 500 } 501 catch (Throwable ignore) 502 { 503 } 504 try 505 { 506 if (stmt != null) 507 stmt.close(); 508 } 509 catch (Throwable ignore) 510 { 511 } 512 try 513 { 514 if (c != null) 515 c.close(); 516 } 517 catch (Throwable ignore) 518 { 519 } 520 tms.endTX(); 521 522 if (threadWasInterrupted) 524 Thread.currentThread().interrupt(); 525 } 526 } 527 528 534 synchronized public void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException  535 { 536 if (jmsDest == null) 537 throw new IllegalArgumentException ("Must supply non null JMSDestination to restoreQueue"); 538 if (dest == null) 539 throw new IllegalArgumentException ("Must supply non null SpyDestination to restoreQueue"); 540 541 boolean canOverrideTimeout = (tm instanceof TransactionTimeoutConfiguration); 542 int previousTimeout = 0; 543 try 544 { 545 if (recoveryTimeout != 0) 547 { 548 if (canOverrideTimeout) 549 { 550 previousTimeout = ((TransactionTimeoutConfiguration) tm).getTransactionTimeout(); 551 tm.setTransactionTimeout(recoveryTimeout); 552 } 553 else 554 { 555 log.debug("Cannot override recovery timeout, TransactionManager does implement " + TransactionTimeoutConfiguration.class.getName()); 556 } 557 } 558 559 try 561 { 562 internalRestoreQueue(jmsDest, dest); 563 } 564 finally 565 { 566 if (recoveryTimeout != 0 && canOverrideTimeout) 568 tm.setTransactionTimeout(previousTimeout); 569 } 570 } 571 catch (Exception e) 572 { 573 SpyJMSException.rethrowAsJMSException("Unexpected error in recovery", e); 574 } 575 } 576 577 synchronized protected void internalRestoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException  578 { 579 Map prepared = null; 581 if (xaRecovery) 582 { 583 prepared = new HashMap (); 584 Map map = txManager.getPreparedTransactions(); 585 for (Iterator i = map.values().iterator(); i.hasNext();) 586 { 587 TxManager.PreparedInfo info = (TxManager.PreparedInfo) i.next(); 588 for (Iterator j = info.getTxids().iterator(); j.hasNext();) 589 { 590 Tx tx = (Tx) j.next(); 591 prepared.put(new Long (tx.longValue()), tx); 592 } 593 } 594 } 595 596 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 597 tms.startTX(); 598 Connection c = null; 599 PreparedStatement stmt = null; 600 PreparedStatement stmt2 = null; 601 ResultSet rs = null; 602 boolean threadWasInterrupted = Thread.interrupted(); 603 try 604 { 605 String selectMessagesInDest = SELECT_MESSAGES_IN_DEST; 606 String selectMessage = SELECT_MESSAGE; 607 if (xaRecovery) 608 { 609 selectMessagesInDest = SELECT_MESSAGES_IN_DEST_XARECOVERY; 610 selectMessage = SELECT_MESSAGE_XARECOVERY; 611 } 612 c = this.getConnection(); 613 if (recoverMessagesChunk == 0) 614 stmt = c.prepareStatement(selectMessagesInDest); 615 else 616 { 617 stmt = c.prepareStatement(SELECT_MESSAGE_KEYS_IN_DEST); 618 stmt2 = c.prepareStatement(selectMessage); 619 } 620 stmt.setString(1, dest.toString()); 621 622 long txid = 0; 623 String txop = null; 624 rs = stmt.executeQuery(); 625 int counter = 0; 626 int recovery = 0; 627 while (rs.next()) 628 { 629 long msgid = rs.getLong(1); 630 SpyMessage message = null; 631 if (recoverMessagesChunk == 0) 632 { 633 message = extractMessage(rs); 634 if (xaRecovery) 635 { 636 txid = rs.getLong(3); 637 txop = rs.getString(4); 638 } 639 } 640 else 641 { 642 ResultSet rs2 = null; 643 try 644 { 645 stmt2.setLong(1, msgid); 646 stmt2.setString(2, dest.toString()); 647 rs2 = stmt2.executeQuery(); 648 if (rs2.next()) 649 { 650 message = extractMessage(rs2); 651 if (xaRecovery) 652 { 653 txid = rs.getLong(3); 654 txop = rs.getString(4); 655 } 656 } 657 else 658 log.warn("Failed to find message msgid=" + msgid +" dest=" + dest); 659 } 660 finally 661 { 662 if (rs2 != null) 663 { 664 try 665 { 666 rs2.close(); 667 } 668 catch (Exception ignored) 669 { 670 } 671 } 672 } 673 } 674 if (dest instanceof SpyTopic) 676 message.header.durableSubscriberID = ((SpyTopic) dest).getDurableSubscriptionID(); 677 678 if (xaRecovery == false || txid == 0 || txop == null) 679 jmsDest.restoreMessage(message); 680 else 681 { 682 Tx tx = (Tx) prepared.get(new Long (txid)); 683 if (tx == null) 684 jmsDest.restoreMessage(message); 685 else if ("A".equals(txop)) 686 { 687 jmsDest.restoreMessage(message, tx, Tx.ADD); 688 recovery++; 689 } 690 else if ("D".equals(txop)) 691 { 692 jmsDest.restoreMessage(message, tx, Tx.REMOVE); 693 recovery++; 694 } 695 else 696 throw new IllegalStateException ("Unknown txop=" + txop + " for msg=" + msgid + " dest=" + dest); 697 } 698 counter++; 699 } 700 701 log.debug("Restored " + counter + " message(s) to: " + dest + " " + recovery + " need recovery."); 702 } 703 catch (IOException e) 704 { 705 tms.setRollbackOnly(); 706 throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e); 707 } 708 catch (SQLException e) 709 { 710 tms.setRollbackOnly(); 711 throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e); 712 } 713 finally 714 { 715 try 716 { 717 if (rs != null) 718 rs.close(); 719 } 720 catch (Throwable ignore) 721 { 722 } 723 try 724 { 725 if (stmt != null) 726 stmt.close(); 727 } 728 catch (Throwable ignore) 729 { 730 } 731 try 732 { 733 if (c != null) 734 c.close(); 735 } 736 catch (Throwable ignore) 737 { 738 } 739 tms.endTX(); 740 741 if (threadWasInterrupted) 743 Thread.currentThread().interrupt(); 744 } 745 746 } 747 748 SpyMessage extractMessage(ResultSet rs) throws SQLException , IOException  749 { 750 try 751 &
|