| 1 7 package org.jboss.mq.pm.jdbc3; 8 9 import java.io.ByteArrayInputStream ; 10 import java.io.ByteArrayOutputStream ; 11 import java.io.IOException ; 12 import java.io.ObjectInputStream ; 13 import java.io.ObjectOutputStream ; 14 import java.sql.Connection ; 15 import java.sql.PreparedStatement ; 16 import java.sql.ResultSet ; 17 import java.sql.SQLException ; 18 import java.sql.Types ; 19 import java.util.Properties ; 20 21 import javax.jms.JMSException ; 22 import javax.management.ObjectName ; 23 import javax.naming.InitialContext ; 24 import javax.sql.DataSource ; 25 import javax.transaction.Status ; 26 import javax.transaction.Transaction ; 27 import javax.transaction.TransactionManager ; 28 29 import org.jboss.mq.DurableSubscriptionID; 30 import org.jboss.mq.SpyDestination; 31 import org.jboss.mq.SpyJMSException; 32 import org.jboss.mq.SpyMessage; 33 import org.jboss.mq.SpyTopic; 34 import org.jboss.mq.pm.CacheStore; 35 import org.jboss.mq.pm.NewPersistenceManager; 36 import org.jboss.mq.pm.Tx; 37 import org.jboss.mq.pm.TxManager; 38 import org.jboss.mq.server.JMSDestination; 39 import org.jboss.mq.server.JMSTopic; 40 import org.jboss.mq.server.MessageCache; 41 import org.jboss.mq.server.MessageReference; 42 import org.jboss.system.ServiceMBeanSupport; 43 import org.jboss.tm.TransactionManagerService; 44 45 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 46 47 59 public class PersistenceManager 60 extends ServiceMBeanSupport 61 implements PersistenceManagerMBean, NewPersistenceManager, CacheStore, Runnable  62 { 63 65 66 static final int OBJECT_BLOB = 0; 67 68 69 static final int BYTES_BLOB = 1; 70 71 72 static final int BINARYSTREAM_BLOB = 2; 73 74 75 static final int BLOB_BLOB = 3; 76 77 79 80 private SynchronizedLong nextTransactionId = new SynchronizedLong(0l); 81 82 83 private TxManager txManager; 84 85 86 private DataSource datasource; 87 88 89 private TransactionManager tm; 90 91 92 private ObjectName connectionManagerName; 93 94 95 private Properties sqlProperties = new Properties (); 96 97 String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=?"; 98 String UPDATE_MARKED_REFERENCES = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=?"; 99 String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?"; 100 String UPDATE_MARKED_REFERENCES_WITH_TX = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?"; 101 String DELETE_MARKED_MESSAGES_WITH_TX = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?"; 102 String DELETE_MARKED_REFERENCES_WITH_TX = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?"; 103 String DELETE_TX = "DELETE FROM JMS_TRANSACTION_LOG WHERE TXID = ?"; 104 String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID=? AND TXOP=?"; 105 String DELETE_MARKED_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID=? AND TXOP=?"; 106 String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXOP='T'"; 107 String DELETE_TEMPORARY_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXOP='T'"; 108 String INSERT_TX = "INSERT INTO JMS_TRANSACTION_LOG (TXID) values(?)"; 109 String SELECT_MAX_TX = "SELECT MAX(TXID) FROM JMS_TRANSACTION_LOG"; 110 String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE DESTINATION=?"; 111 String SELECT_REFERENCES_IN_DEST = "SELECT R.MESSAGEID, M.MESSAGEBLOB, R.REDELIVERED, R.REDELIVERS FROM JMS_REFERENCE_LOG AS R, JMS_MESSAGE_LOG AS M" + 112 " WHERE R.MESSAGEID = M.MESSAGEID AND R.DESTINATION=?"; 113 String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?"; 114 String INSERT_MESSAGE = "INSERT INTO JMS_MESSAGE_LOG (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP, LATECLONE) VALUES(?,?,?,?,?,?)"; 115 String INSERT_REFERENCE = "INSERT INTO JMS_REFERENCE_LOG (MESSAGEID, DESTINATION, TXID, TXOP, REDELIVERED, REDELIVERS) VALUES(?,?,?,?,?,?)"; 116 String MARK_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?"; 117 String MARK_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?"; 118 String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?"; 119 String DELETE_REFERENCE = "DELETE FROM JMS_REFERENCE_LOG WHERE MESSAGEID=? AND DESTINATION=?"; 120 String UPDATE_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?"; 121 String UPDATE_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET REDELIVERED=?, REDELIVERS=? WHERE MESSAGEID=? AND DESTINATION=?"; 122 String DELETE_ORPHANED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE LATECLONE = '1' AND MESSAGEID NOT IN (SELECT MESSAGEID FROM JMS_REFERENCE_LOG)"; 123 String DELETE_ALL_TXS = "DELETE FROM JMS_TRANSACTION_LOG"; 124 String CREATE_REFERENCE_TABLE = 125 "CREATE TABLE JMS_REFERENCE_LOG ( MESSAGEID INTEGER NOT NULL, " 126 + "DESTINATION VARCHAR(256) NOT NULL, TXID INTEGER, TXOP CHAR(1), " 127 + "REDELIVERED CHAR(1), REDELIVERS INTEGER, " 128 + "PRIMARY KEY (MESSAGEID, DESTINATION) )"; 129 String CREATE_MESSAGE_TABLE = 130 "CREATE TABLE JMS_MESSAGE_LOG ( MESSAGEID INTEGER NOT NULL, " 131 + "DESTINATION VARCHAR(256), TXID INTEGER, TXOP CHAR(1), LATECLONE CHAR(1), " 132 + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )"; 133 String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTION_LOG ( TXID INTEGER )"; 134 135 136 int blobType = OBJECT_BLOB; 137 138 139 boolean createTables = true; 140 141 142 private int connectionRetryAttempts = 5; 143 144 145 private long gcPeriod = 60000; 146 147 148 private Thread gcThread; 149 150 152 157 public PersistenceManager() throws javax.jms.JMSException  158 { 159 txManager = new TxManager(this); 160 } 161 162 164 169 public ObjectName getConnectionManager() 170 { 171 return connectionManagerName; 172 } 173 174 179 public void setConnectionManager(ObjectName connectionManagerName) 180 { 181 this.connectionManagerName = connectionManagerName; 182 } 183 184 189 public int getGCPeriodSecs() 190 { 191 return (int) gcPeriod / 1000; 192 } 193 194 199 public void setGCPeriodSecs(int gcPeriodSecs) 200 { 201 this.gcPeriod = gcPeriodSecs * 1000; 202 } 203 204 210 public int getConnectionRetryAttempts() 211 { 212 return this.connectionRetryAttempts; 213 } 214 215 221 public void setConnectionRetryAttempts(int value) 222 { 223 this.connectionRetryAttempts = value; 224 } 225 226 232 public String getSqlProperties() 233 { 234 try 235 { 236 ByteArrayOutputStream boa = new ByteArrayOutputStream (); 237 sqlProperties.store(boa, ""); 238 return new String (boa.toByteArray()); 239 } 240 catch (IOException shouldnothappen) 241 { 242 return ""; 243 } 244 } 245 246 252 public void setSqlProperties(String value) 253 { 254 try 255 { 256 ByteArrayInputStream is = new ByteArrayInputStream (value.getBytes()); 257 sqlProperties = new Properties (); 258 sqlProperties.load(is); 259 } 260 catch (IOException shouldnothappen) 261 { 262 } 263 } 264 265 267 public Tx createPersistentTx() throws JMSException  268 { 269 Tx id = new Tx(nextTransactionId.increment()); 270 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 271 tms.startTX(); 272 Connection c = null; 273 PreparedStatement stmt = null; 274 boolean threadWasInterrupted = Thread.interrupted(); 275 try 276 { 277 c = this.getConnection(); 278 stmt = c.prepareStatement(INSERT_TX); 279 stmt.setLong(1, id.longValue()); 280 stmt.executeUpdate(); 281 282 } 283 catch (SQLException e) 284 { 285 tms.setRollbackOnly(); 286 throw new SpyJMSException("Could not crate tx: " + id, e); 287 } 288 finally 289 { 290 try 291 { 292 stmt.close(); 293 } 294 catch (Throwable ignore) 295 { 296 } 297 try 298 { 299 c.close(); 300 } 301 catch (Throwable ignore) 302 { 303 } 304 tms.endTX(); 305 306 if( threadWasInterrupted ) 308 Thread.currentThread().interrupt(); 309 } 310 311 return id; 312 } 313 314 public void commitPersistentTx(Tx txId) throws JMSException  315 { 316 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 317 tms.startTX(); 318 Connection c = null; 319 boolean threadWasInterrupted = Thread.interrupted(); 320 try 321 { 322 c = this.getConnection(); 323 removeMarkedMessages(c, txId, "D"); 324 removeMarkedReferences(c, txId, "D"); 325 removeTXRecord(c, txId.longValue()); 326 } 327 catch (SQLException e) 328 { 329 tms.setRollbackOnly(); 330 throw new SpyJMSException("Could not commit tx: " + txId, e); 331 } 332 finally 333 { 334 try 335 { 336 c.close(); 337 } 338 catch (Throwable ignore) 339 { 340 } 341 tms.endTX(); 342 343 if( threadWasInterrupted ) 345 Thread.currentThread().interrupt(); 346 } 347 } 348 349 public void rollbackPersistentTx(Tx txId) throws JMSException  350 { 351 352 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 353 tms.startTX(); 354 Connection c = null; 355 PreparedStatement stmt = null; 356 boolean threadWasInterrupted = Thread.interrupted(); 357 try 358 { 359 c = this.getConnection(); 360 removeMarkedMessages(c, txId, "A"); 361 removeMarkedReferences(c, txId, "A"); 362 removeTXRecord(c, txId.longValue()); 363 364 stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX); 366 stmt.setNull(1, Types.BIGINT); 367 stmt.setString(2, "A"); 368 stmt.setString(3, "D"); 369 stmt.setLong(4, txId.longValue()); 370 stmt.executeUpdate(); 371 stmt.close(); 372 373 stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES_WITH_TX); 375 stmt.setNull(1, Types.BIGINT); 376 stmt.setString(2, "A"); 377 stmt.setString(3, "D"); 378 stmt.setLong(4, txId.longValue()); 379 stmt.executeUpdate(); 380 stmt.close(); 381 } 382 catch (SQLException e) 383 { 384 tms.setRollbackOnly(); 385 throw new SpyJMSException("Could not rollback tx: " + txId, e); 386 } 387 finally 388 { 389 try 390 { 391 if (stmt != null) 392 stmt.close(); 393 if (c != null) 394 c.close(); 395 } 396 catch (Throwable ignore) 397 { 398 } 399 tms.endTX(); 400 401 if( threadWasInterrupted ) 403 Thread.currentThread().interrupt(); 404 } 405 } 406 407 public void add(MessageReference messageRef, Tx txId) throws JMSException  408 { 409 boolean trace = log.isTraceEnabled(); 410 if (trace) 411 log.trace("About to add message " + messageRef + " transaction=" + txId); 412 413 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 414 tms.startTX(); 415 Connection c = null; 416 boolean threadWasInterrupted = Thread.interrupted(); 417 try 418 { 419 c = this.getConnection(); 420 synchronized(messageRef) 422 { 423 if (trace) 424 log.trace("Inserting message " + messageRef + " transaction=" + txId); 425 426 if (messageRef.isLateClone()) 427 { 428 addReference(c, messageRef.getPersistentKey(), messageRef, txId, "A"); 429 } 430 else 431 { 432 SpyMessage message = messageRef.getMessage(); 433 addMessage(c, messageRef.getPersistentKey(), message, txId, "A", "0"); 434 } 435 messageRef.setStored(MessageReference.STORED); 436 437 if (trace) 438 log.trace("Added message " + messageRef + " transaction=" + txId); 439 } 440 } 441 catch (IOException e) 442 { 443 tms.setRollbackOnly(); 444 throw new SpyJMSException("Could not store message: " + messageRef, e); 445 } 446 catch (SQLException e) 447 { 448 tms.setRollbackOnly(); 449 throw new SpyJMSException("Could not store message: " + messageRef, e); 450 } 451 finally 452 { 453 try 454 { 455 c.close(); 456 } 457 catch (Throwable ignore) 458 { 459 } 460 tms.endTX(); 461 462 if( threadWasInterrupted ) 464 Thread.currentThread().interrupt(); 465 } 466 } 467 468 public void update(MessageReference messageRef, Tx txId) throws JMSException  469 { 470 boolean trace = log.isTraceEnabled(); 471 if (trace) 472 log.trace("Updating message " + messageRef + " transaction=" + txId); 473 474 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 475 tms.startTX(); 476 Connection c = null; 477 PreparedStatement stmt = null; 478 boolean threadWasInterrupted = Thread.interrupted(); 479 try 480 { 481 c = this.getConnection(); 482 if (txId == null) 483 { 484 if (messageRef.isLateClone()) 485 { 486 stmt = c.prepareStatement(UPDATE_REFERENCE); 487 if (messageRef.redelivered) 488 stmt.setString(1, "1"); 489 else 490 stmt.setString(1, "0"); 491 stmt.setLong(2, messageRef.redeliveryCount); 492 stmt.setLong(3, messageRef.messageId); 493 stmt.setString(4, messageRef.getPersistentKey()); 494 } 495 else 496 { 497 stmt = c.prepareStatement(UPDATE_MESSAGE); 498 setBlob(stmt, 1, messageRef.getMessage()); 499 stmt.setLong(2, messageRef.messageId); 500 stmt.setString(3, messageRef.getPersistentKey()); 501 } 502 int rc = stmt.executeUpdate(); 503 if( rc != 1 ) 504 throw new SpyJMSException("Could not update the message in the database: update affected "+rc+" rows"); 505 } 506 else 507 { 508 throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used"); 509 } 510 if (trace) 511 log.trace("Updated message " + messageRef + " transaction=" + txId); 512 513 } 514 catch (IOException e) 515 { 516 tms.setRollbackOnly(); 517 throw new SpyJMSException("Could not update message: " + messageRef, e); 518 } 519 catch (SQLException e) 520 { 521 tms.setRollbackOnly(); 522 throw new SpyJMSException("Could not update message: " + messageRef, e); 523 } 524 finally 525 { 526 try 527 { 528 stmt.close(); 529 } 530 catch (Throwable ignore) 531 { 532 } 533 try 534 { 535 c.close(); 536 } 537 catch (Throwable ignore) 538 { 539 } 540 tms.endTX(); 541 542 if( threadWasInterrupted ) 544 Thread.currentThread().interrupt(); 545 } 546 } 547 548 public void remove(MessageReference messageRef, Tx txId) throws JMSException  549 { 550 boolean trace = log.isTraceEnabled(); 551 if (trace) 552 log.trace("Removing message " + messageRef + " transaction=" + txId); 553 554 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 555 tms.startTX(); 556 Connection c = null; 557 PreparedStatement stmt = null; 558 boolean threadWasInterrupted = Thread.interrupted(); 559 try 560 { 561 c = this.getConnection(); 562 synchronized(messageRef) 564 { 565 if (txId == null) 566 { 567 if (messageRef.isLateClone()) 568 stmt = c.prepareStatement(DELETE_REFERENCE); 569 else 570 stmt = c.prepareStatement(DELETE_MESSAGE); 571 stmt.setLong(1, messageRef.messageId); 572 stmt.setString(2, messageRef.getPersistentKey()); 573 int rc = stmt.executeUpdate(); 574 if( rc != 1 ) 575 throw new SpyJMSException("Could not delete the message from the database: delete affected "+rc+" rows"); 576 577 messageRef.setStored(MessageReference.NOT_STORED); 587 messageRef.removeDelayed(); 588 } 589 else 590 { 591 if (messageRef.isLateClone()) 592 { 593 stmt = c.prepareStatement(MARK_REFERENCE); 594 stmt.setLong(1, txId.longValue()); 595 stmt.setString(2, "D"); 596 stmt.setLong(3, messageRef.messageId); 597 stmt.setString(4, messageRef.getPersistentKey()); 598 } 599 else 600 { 601 stmt = c.prepareStatement(MARK_MESSAGE); 602 stmt.setLong(1, txId.longValue()); 603 stmt.setString(2, "D"); 604 stmt.setLong(3, messageRef.messageId); 605 stmt.setString(4, messageRef.getPersistentKey()); 606 } 607 int rc = stmt.executeUpdate(); 608 if( rc != 1 ) 609 throw new SpyJMSException("Could not mark the message as deleted in the database: update affected "+rc+" rows"); 610 } 611 if (trace) 612 log.trace("Removed message " + messageRef + " transaction=" + txId); 613 } 614 } 615 catch (SQLException e) 616 { 617 tms.setRollbackOnly(); 618 throw new SpyJMSException("Could not remove message: " + messageRef, e); 619 } 620 finally 621 { 622 try 623 { 624 stmt.close(); 625 } 626 catch (Throwable ignore) 627 { 628 } 629 try 630 { 631 c.close(); 632 } 633 catch (Throwable ignore) 634 { 635 } 636 tms.endTX(); 637 638 if( threadWasInterrupted ) 640 Thread.currentThread().interrupt(); 641 } 642 } 643 644 public synchronized void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws javax.jms.JMSException  645 { 646 if (jmsDest == null) 647 throw new IllegalArgumentException ("Must supply non null JMSDestination to restoreQueue"); 648 if (dest == null) 649 throw new IllegalArgumentException ("Must supply non null SpyDestination to restoreQueue"); 650 651 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 652 tms.startTX(); 653 Connection c = null; 654 PreparedStatement stmt = null; 655 ResultSet rs = null; 656 boolean threadWasInterrupted = Thread.interrupted(); 657 try 658 { 659 c = this.getConnection(); 660 int counter=0; 661 if (jmsDest.parameters.lateClone) 662 { 663 JMSTopic topic = (JMSTopic) jmsDest; 664 DurableSubscriptionID id = ((SpyTopic) dest).getDurableSubscriptionID(); 666 667 stmt = c.prepareStatement(SELECT_REFERENCES_IN_DEST); 668 stmt.setString(1, dest.toString()); 669 670 rs = stmt.executeQuery(); 671 while (rs.next()) 672 { 673 SpyMessage message = extractMessage(rs, 2); 674 boolean redelivered = false; 675 if (rs.getString(3).equals("1")) 676 redelivered = true; 677 message.header.jmsRedelivered = redelivered; 678 message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer (rs.getInt(4))); 679 topic.restoreMessage(message, id); 680 counter++; 681 } 682 } 683 else 684 { 685 stmt = c.prepareStatement(SELECT_MESSAGES_IN_DEST); 686 stmt.setString(1, dest.toString()); 687 688 rs = stmt.executeQuery(); 689 while (rs.next()) 690 { 691 SpyMessage message = extractMessage(rs, 2); 692 if (dest instanceof SpyTopic) 694 message.header.durableSubscriberID = ((SpyTopic)dest).getDurableSubscriptionID(); 695 jmsDest.restoreMessage(message); 696 counter++; 697 } 698 } 699 700 log.debug("Restored "+counter+" message(s) to: "+dest); 701 } 702 catch (IOException e) 703 { 704 tms.setRollbackOnly(); 705 throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e); 706 } 707 catch (SQLException e) 708 { 709 tms.setRollbackOnly(); 710 throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e); 711 } 712 finally 713 { 714 try 715 { 716 rs.close(); 717 } 718 catch (Throwable ignore) 719 { 720 } 721 try 722 { 723 stmt.close(); 724 } 725 catch (Throwable ignore) 726 { 727 } 728 try 729 { 730 c.close(); 731 } 732 catch (Throwable ignore) 733 { 734 } 735 tms.endTX(); 736 737 if( threadWasInterrupted ) 739 Thread.currentThread().interrupt(); 740 } 741 742 } 743 744 public TxManager getTxManager() 745 { 746 return txManager; 747 } 748 749 public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException  750 { 751 } 753 754 757 public MessageCache getMessageCacheInstance() 758 { 759 throw new UnsupportedOperationException ("This is now set on the destination manager"); 760 } 761 762 764 public void addMessage(SpyMessage message) throws JMSException  765 { 766 767 TransactionManagerStrategy tms = new TransactionManagerStrategy(); 768 tms.startTX(); 769 Connection c = null; 770 boolean threadWasInterrupted = Thread.interrupted(); 771 try 772 { 773 c = datasource.getConnection(); 774 addMessage(c, "*", message, null, null, "1"); 775 } 776 catch (IOException e) 777 { 778 tms.setRollbackOnly(); 779 throw new SpyJMSException("Could not add message:", e); 780 } 781 catch (SQLException e) 782 { 783 tms.setRollbackOnly(); 784 throw new SpyJMSException("Could not add message:", e); 785 } 786 finally 787 { 788 try 789 { 790 if (c != null) 791 c.close(); 792 } 793 catch (Throwable ignore) 794 { 795 } 796 tms.endTX(); 797 798 if( threadWasInterrupted ) 800 Thread.currentThread().interrupt(); 801 } 802 } 803 804 806 public Object getInstance() 807 { 808 return this; 809 } 810 811 814 public |