1 23 24 package org.continuent.sequoia.controller.recoverylog; 25 26 import java.sql.PreparedStatement ; 27 import java.sql.ResultSet ; 28 import java.sql.SQLException ; 29 import java.util.Iterator ; 30 import java.util.LinkedList ; 31 32 import org.continuent.sequoia.common.i18n.Translate; 33 import org.continuent.sequoia.common.log.Trace; 34 import org.continuent.sequoia.controller.recoverylog.events.LogEvent; 35 import org.continuent.sequoia.controller.recoverylog.events.LogRequestEvent; 36 37 43 public class LoggerThread extends Thread 44 { 45 private boolean killed = false; 50 private LinkedList logQueue; 51 private Trace logger; 52 private PreparedStatement logStmt; 53 private PreparedStatement updateStmt; 54 private RecoveryLog recoveryLog; 55 private LogEvent currentEvent = null; 56 private LogEvent lastFailed; 57 58 63 public LoggerThread(RecoveryLog log) 64 { 65 super("LoggerThread"); 66 this.recoveryLog = log; 67 this.logger = RecoveryLog.logger; 68 logStmt = null; 69 updateStmt = null; 70 logQueue = new LinkedList (); 71 } 72 73 78 public Trace getLogger() 79 { 80 return logger; 81 } 82 83 88 public synchronized boolean getLogQueueIsEmpty() 89 { 90 if (logQueue.isEmpty()) 91 { 92 notify(); 94 return true; 95 } 96 else 97 { 98 return false; 99 } 100 } 101 102 110 public PreparedStatement getLogPreparedStatement() throws SQLException 111 { 112 if (logStmt == null) 113 { 114 logStmt = recoveryLog.getDatabaseConnection().prepareStatement( 115 "INSERT INTO " + recoveryLog.getLogTableName() 116 + " VALUES(?,?,?,?,?,?,?,?,?,?,?)"); 117 } 118 return logStmt; 119 } 120 121 129 public PreparedStatement getUpdatePreparedStatement() throws SQLException 130 { 131 if (updateStmt == null) 132 { 133 updateStmt = recoveryLog 134 .getDatabaseConnection() 135 .prepareStatement( 136 "UPDATE " 137 + recoveryLog.getLogTableName() 138 + " SET exec_status=?,update_count=?,exec_time=?,completion_log_id=? WHERE log_id=?"); 139 } 140 return updateStmt; 141 } 142 143 148 public RecoveryLog getRecoveryLog() 149 { 150 return recoveryLog; 151 } 152 153 160 public synchronized boolean hasLogEntryForTransaction(long tid) 161 { 162 for (Iterator iter = logQueue.iterator(); iter.hasNext();) 163 { 164 LogEvent logEvent = (LogEvent) iter.next(); 165 if (logEvent.belongToTransaction(tid)) 166 return true; 167 } 168 return false; 169 } 170 171 178 public void invalidateLogStatements() 179 { 180 try 181 { 182 logStmt.close(); 183 } 184 catch (Exception ignore) 185 { 186 } 187 try 188 { 189 updateStmt.close(); 190 } 191 catch (Exception ignore) 192 { 193 } 194 logStmt = null; 195 updateStmt = null; 196 recoveryLog.invalidateInternalConnection(); 197 } 198 199 206 public synchronized void log(LogEvent logObject) 207 { 208 logQueue.addLast(logObject); 209 notify(); 210 } 211 212 219 public synchronized void putBackAtHeadOfQueue(LogEvent event, Exception e) 220 { 221 if (lastFailed != event) 222 { 223 logQueue.addFirst(event); 224 notify(); 225 lastFailed = event; 226 } 227 else 228 { 229 if (event instanceof LogRequestEvent) 230 logger 231 .error("WARNING! Your recovery log is probably corrupted, you should perform a restore log operation"); 232 logger.error("Logger thread was unable to log " + event.toString() 233 + " because of " + e, e); 234 } 235 } 236 237 243 public synchronized void removeQueriesOfTransactionFromQueue(long tid) 244 { 245 if (logger.isDebugEnabled()) 246 logger.debug(Translate.get("recovery.jdbc.loggerthread.removing", tid)); 247 Iterator iter = logQueue.iterator(); 248 if (iter.hasNext()) 251 { 252 iter.next(); 253 } 254 while (iter.hasNext()) 255 { 256 LogEvent event = (LogEvent) iter.next(); 257 if (event.belongToTransaction(tid)) 258 { 259 iter.remove(); 260 } 261 } 262 } 263 264 273 public boolean removeEmptyTransaction(long transactionId) throws SQLException 274 { 275 if (hasLogEntryForTransaction(transactionId)) 276 return false; 277 278 PreparedStatement stmt = null; 279 ResultSet rs = null; 280 try 281 { 282 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 283 "SELECT * FROM " + recoveryLog.getLogTableName() 284 + " WHERE transaction_id=?"); 285 stmt.setLong(1, transactionId); 286 rs = stmt.executeQuery(); 287 if (!rs.next()) 288 return true; 290 String sql = rs.getString(recoveryLog.getLogTableSqlColumnName()); 292 if ((sql == null) || !sql.startsWith(RecoveryLog.BEGIN)) 293 return false; 294 295 if (rs.next()) 296 return false; 298 rs.close(); 299 stmt.close(); 300 301 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 303 "DELETE FROM " + recoveryLog.getLogTableName() 304 + " WHERE transaction_id=?"); 305 stmt.setLong(1, transactionId); 306 stmt.executeUpdate(); 307 return true; 308 } 309 catch (SQLException e) 310 { 311 throw new SQLException (Translate.get( 312 "recovery.jdbc.transaction.remove.failed", new String []{ 313 String.valueOf(transactionId), e.getMessage()})); 314 } 315 finally 316 { 317 try 318 { 319 if (rs != null) 320 rs.close(); 321 } 322 catch (Exception ignore) 323 { 324 } 325 try 326 { 327 if (stmt != null) 328 stmt.close(); 329 } 330 catch (Exception ignore) 331 { 332 } 333 } 334 } 335 336 341 public void deleteCheckpointTable() throws SQLException 342 { 343 PreparedStatement stmt = null; 345 try 346 { 347 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 348 "DELETE FROM " + recoveryLog.getCheckpointTableName()); 349 stmt.executeUpdate(); 350 } 351 catch (SQLException e) 352 { 353 String msg = "Failed to delete checkpoint table"; 354 logger.warn(msg, e); 355 throw new SQLException (msg); 356 } 357 finally 358 { 359 try 360 { 361 if (stmt != null) 362 stmt.close(); 363 } 364 catch (Exception ignore) 365 { 366 } 367 } 368 } 369 370 382 public void storeCheckpointWithLogId(String checkpointName, 383 long checkpointLogId) throws SQLException 384 { 385 PreparedStatement stmt = null; 386 try 387 { 388 if (logger.isDebugEnabled()) 389 logger.debug("Storing checkpoint " + checkpointName + " at request id " 390 + checkpointLogId); 391 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 392 "INSERT INTO " + recoveryLog.getCheckpointTableName() 393 + " VALUES(?,?)"); 394 stmt.setString(1, checkpointName); 395 stmt.setLong(2, checkpointLogId); 396 stmt.executeUpdate(); 397 } 398 catch (SQLException e) 399 { 400 invalidateLogStatements(); 401 throw new SQLException (Translate.get( 402 "recovery.jdbc.checkpoint.store.failed", new String []{checkpointName, 403 e.getMessage()})); 404 } 405 finally 406 { 407 try 408 { 409 if (stmt != null) 410 stmt.close(); 411 } 412 catch (Exception ignore) 413 { 414 } 415 } 416 } 417 418 428 public void removeCheckpoint(String checkpointName) throws SQLException 429 { 430 PreparedStatement stmt = null; 431 432 try 433 { 434 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 435 "DELETE FROM " + recoveryLog.getCheckpointTableName() 436 + " WHERE name like ?"); 437 stmt.setString(1, checkpointName); 438 stmt.executeUpdate(); 439 stmt.close(); 440 } 441 catch (SQLException e) 442 { 443 invalidateLogStatements(); 444 throw new SQLException (Translate.get( 445 "recovery.jdbc.checkpoint.remove.failed", new String []{ 446 checkpointName, e.getMessage()})); 447 } 448 finally 449 { 450 try 451 { 452 if (stmt != null) 453 stmt.close(); 454 } 455 catch (Exception ignore) 456 { 457 } 458 } 459 } 460 461 469 public void deleteLogEntriesBeforeId(long oldId) throws SQLException 470 { 471 PreparedStatement stmt = null; 472 try 473 { 474 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 475 "DELETE FROM " + recoveryLog.getLogTableName() + " WHERE log_id<=?"); 476 stmt.setLong(1, oldId); 477 stmt.executeUpdate(); 478 } 479 catch (SQLException e) 480 { 481 throw new SQLException (Translate.get( 483 "recovery.jdbc.transaction.remove.failed", new String []{ 484 String.valueOf(oldId), e.getMessage()})); 485 } 486 finally 487 { 488 try 489 { 490 if (stmt != null) 491 stmt.close(); 492 } 493 catch (Exception ignore) 494 { 495 } 496 } 497 } 498 499 508 public long getNumberOfLogEntries(long lowerLogId, long upperLogId) 509 throws SQLException 510 { 511 ResultSet rs = null; 512 PreparedStatement stmt = null; 513 try 514 { 515 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 516 "SELECT COUNT(*) FROM " + recoveryLog.getLogTableName() 517 + " WHERE log_id>? AND log_id<?"); 518 stmt.setLong(1, lowerLogId); 520 stmt.setLong(2, upperLogId); 521 rs = stmt.executeQuery(); 522 if (!rs.next()) 523 throw new SQLException ( 524 "Failed to retrieve number of log entries (no rows returned)"); 525 526 return rs.getLong(1); 527 } 528 catch (SQLException e) 529 { 530 throw e; 531 } 532 finally 533 { 534 try 535 { 536 if (rs != null) 537 rs.close(); 538 } 539 catch (Exception ignore) 540 { 541 } 542 try 543 { 544 if (stmt != null) 545 stmt.close(); 546 } 547 catch (Exception ignore) 548 { 549 } 550 } 551 } 552 553 560 public void shiftLogEntriesIds(long shiftValue) throws SQLException 561 { 562 PreparedStatement stmt = null; 563 try 564 { 565 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 566 "UPDATE " + recoveryLog.getLogTableName() + " SET log_id=log_id+?"); 567 stmt.setLong(1, shiftValue); 568 stmt.executeUpdate(); 569 } 570 catch (SQLException e) 571 { 572 throw new SQLException (Translate.get( 573 "recovery.jdbc.loggerthread.shift.failed", e.getMessage())); 574 } 575 finally 576 { 577 try 578 { 579 if (stmt != null) 580 stmt.close(); 581 } 582 catch (Exception ignore) 583 { 584 } 585 } 586 } 587 588 597 public void shiftLogEntriesAfterId(long fromId, long shiftValue) 598 throws SQLException 599 { 600 PreparedStatement stmt = null; 601 try 602 { 603 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 604 "UPDATE " + recoveryLog.getLogTableName() 605 + " SET log_id=log_id+? WHERE log_id>?"); 606 stmt.setLong(1, shiftValue); 607 stmt.setLong(2, fromId); 608 stmt.executeUpdate(); 609 } 610 catch (SQLException e) 611 { 612 throw new SQLException (Translate.get( 613 "recovery.jdbc.loggerthread.shift.failed", e.getMessage())); 614 } 615 finally 616 { 617 try 618 { 619 if (stmt != null) 620 stmt.close(); 621 } 622 catch (Exception ignore) 623 { 624 } 625 } 626 } 627 628 637 public void deleteLogEntriesAndCheckpointBetween(long commonCheckpointId, 638 long nowCheckpointId) throws SQLException 639 { 640 PreparedStatement stmt = null; 641 try 642 { 643 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 645 "DELETE FROM " + recoveryLog.getLogTableName() 646 + " WHERE ?<log_id AND log_id<?"); 647 stmt.setLong(1, commonCheckpointId); 648 stmt.setLong(2, nowCheckpointId); 649 int rows = stmt.executeUpdate(); 650 stmt.close(); 651 652 if (logger.isInfoEnabled()) 653 { 654 logger.info(rows 655 + " outdated log entries have been removed from the recovery log"); 656 657 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 659 "SELECT * FROM " + recoveryLog.getCheckpointTableName() 660 + " WHERE ?<log_id AND log_id<?"); 661 stmt.setLong(1, commonCheckpointId); 662 stmt.setLong(2, nowCheckpointId); 663 ResultSet rs = stmt.executeQuery(); 664 while (rs.next()) 665 { 666 logger.info("Checkpoint " + rs.getString(1) + " (" + rs.getLong(2) 667 + ") will be deleted."); 668 } 669 if (rs != null) 670 rs.close(); 671 stmt.close(); 672 } 673 674 stmt = recoveryLog.getDatabaseConnection().prepareStatement( 676 "DELETE FROM " + recoveryLog.getCheckpointTableName() 677 + " WHERE ?<log_id AND log_id<?"); 678 stmt.setLong(1, commonCheckpointId); 679 stmt.setLong(2, nowCheckpointId); 680 rows = stmt.executeUpdate(); 681 682 if (logger.isInfoEnabled()) 683 logger 684 .info(rows 685 + " out of sync checkpoints have been removed from the recovery log"); 686 687 } 688 catch (SQLException e) 689 { 690 throw new SQLException (Translate.get( 691 "recovery.jdbc.entries.remove.failed", e.getMessage())); 692 } 693 finally 694 { 695 try 696 { 697 if (stmt != null) 698 stmt.close(); 699 } 700 catch (Exception ignore) 701 { 702 } 703 } 704 } 705 706 710 public void run() 711 { 712 while (!killed) 713 { 714 synchronized (this) 715 { 716 while (getLogQueueIsEmpty() && !killed) 717 { 718 try 719 { 720 wait(); 721 } 722 catch (InterruptedException e) 723 { 724 logger.warn(Translate.get("recovery.jdbc.loggerthread.awaken"), e); 725 } 726 } 727 if (killed) 728 break; 729 currentEvent = (LogEvent) logQueue.getFirst(); 732 } 733 try 734 { 735 currentEvent.execute(this); 736 } 737 finally 738 { synchronized (this) 740 { 741 logQueue.removeFirst(); 742 } 743 } 744 } 745 746 int finalLogSize = logQueue.size(); 748 if (finalLogSize > 0) 749 { 750 logger.warn("Log queue contains requests following shutdown: " 751 + finalLogSize); 752 } 753 logger.info("Logger thread ending: " + this.getName()); 754 } 755 756 761 public synchronized void shutdown() 762 { 763 killed = true; 764 logger.info("Log shutdown method has been invoked"); 765 notify(); 766 } 767 768 } 769 | Popular Tags |