1 8 9 package com.sleepycat.je.log; 10 11 import java.io.IOException ; 12 import java.io.RandomAccessFile ; 13 import java.nio.BufferOverflowException ; 14 import java.nio.ByteBuffer ; 15 import java.nio.channels.ClosedChannelException ; 16 import java.util.List ; 17 import java.util.zip.Checksum ; 18 19 import com.sleepycat.je.DatabaseException; 20 import com.sleepycat.je.EnvironmentStats; 21 import com.sleepycat.je.RunRecoveryException; 22 import com.sleepycat.je.StatsConfig; 23 import com.sleepycat.je.cleaner.TrackedFileSummary; 24 import com.sleepycat.je.cleaner.UtilizationTracker; 25 import com.sleepycat.je.config.EnvironmentParams; 26 import com.sleepycat.je.dbi.DbConfigManager; 27 import com.sleepycat.je.dbi.EnvironmentImpl; 28 import com.sleepycat.je.dbi.Operation; 29 import com.sleepycat.je.latch.Latch; 30 import com.sleepycat.je.latch.LatchSupport; 31 import com.sleepycat.je.log.entry.LogEntry; 32 import com.sleepycat.je.utilint.Adler32; 33 import com.sleepycat.je.utilint.DbLsn; 34 import com.sleepycat.je.utilint.TestHook; 35 import com.sleepycat.je.utilint.Tracer; 36 37 40 abstract public class LogManager { 41 42 private static final String DEBUG_NAME = LogManager.class.getName(); 44 45 48 public static final int HEADER_BYTES = 14; static final int CHECKSUM_BYTES = 4; static final int PREV_BYTES = 4; static final int HEADER_CONTENT_BYTES = 52 HEADER_BYTES - CHECKSUM_BYTES; 53 static final int HEADER_CHECKSUM_OFFSET = 0; 54 static final int HEADER_ENTRY_TYPE_OFFSET = 4; 55 static final int HEADER_VERSION_OFFSET = 5; 56 static final int HEADER_PREV_OFFSET = 6; 57 public static final int HEADER_SIZE_OFFSET = 10; 58 59 protected LogBufferPool logBufferPool; protected Latch logWriteLatch; private boolean doChecksumOnRead; private FileManager fileManager; protected EnvironmentImpl envImpl; 64 private boolean readOnly; 65 private int readBufferSize; 67 private long lastLsnAtRecovery = DbLsn.NULL_LSN; 68 69 70 71 75 private int nRepeatFaultReads; 76 77 81 private long nTempBufferWrites; 82 83 84 private TestHook readHook; 86 89 public LogManager(EnvironmentImpl envImpl, 90 boolean readOnly) 91 throws DatabaseException { 92 93 this.envImpl = envImpl; 95 this.fileManager = envImpl.getFileManager(); 96 DbConfigManager configManager = envImpl.getConfigManager(); 97 this.readOnly = readOnly; 98 logBufferPool = new LogBufferPool(fileManager, envImpl); 99 100 101 doChecksumOnRead = 102 configManager.getBoolean(EnvironmentParams.LOG_CHECKSUM_READ); 103 104 logWriteLatch = LatchSupport.makeLatch(DEBUG_NAME, envImpl); 105 readBufferSize = 106 configManager.getInt(EnvironmentParams.LOG_FAULT_READ_SIZE); 107 } 108 109 public boolean getChecksumOnRead() { 110 return doChecksumOnRead; 111 } 112 113 public long getLastLsnAtRecovery() { 114 return lastLsnAtRecovery; 115 } 116 117 public void setLastLsnAtRecovery(long lastLsnAtRecovery) { 118 this.lastLsnAtRecovery = lastLsnAtRecovery; 119 } 120 121 125 public void resetPool(DbConfigManager configManager) 126 throws DatabaseException { 127 128 logBufferPool.reset(configManager); 129 } 130 131 134 135 141 public long logForceFlush(LoggableObject item, 142 boolean fsyncRequired) 143 throws DatabaseException { 144 145 return log(item, 146 false, true, fsyncRequired, 149 false, false, DbLsn.NULL_LSN, 0); } 154 155 161 public long logForceFlip(LoggableObject item) 162 throws DatabaseException { 163 164 return log(item, 165 false, true, false, true, false, DbLsn.NULL_LSN, 0); } 173 174 178 public long log(LoggableObject item) 179 throws DatabaseException { 180 181 return log(item, 182 false, false, false, false, false, DbLsn.NULL_LSN, 0); } 190 191 195 public long log(LoggableObject item, 196 boolean isProvisional, 197 boolean backgroundIO, 198 long oldNodeLsn, 199 int oldNodeSize) 200 throws DatabaseException { 201 202 return log(item, 203 isProvisional, 204 false, false, false, backgroundIO, 208 oldNodeLsn, 209 oldNodeSize); 210 } 211 212 231 private long log(LoggableObject item, 232 boolean isProvisional, 233 boolean flushRequired, 234 boolean fsyncRequired, 235 boolean forceNewLogFile, 236 boolean backgroundIO, 237 long oldNodeLsn, 238 int oldNodeSize) 239 throws DatabaseException { 240 241 if (readOnly) { 242 return DbLsn.NULL_LSN; 243 } 244 245 boolean marshallOutsideLatch = item.marshallOutsideWriteLatch(); 246 ByteBuffer marshalledBuffer = null; 247 UtilizationTracker tracker = envImpl.getUtilizationTracker(); 248 LogResult logResult = null; 249 250 try { 251 252 257 if (marshallOutsideLatch) { 258 int itemSize = item.getLogSize(); 259 int entrySize = itemSize + HEADER_BYTES; 260 marshalledBuffer = marshallIntoBuffer(item, 261 itemSize, 262 isProvisional, 263 entrySize); 264 } 265 266 logResult = logItem(item, isProvisional, flushRequired, 267 forceNewLogFile, oldNodeLsn, oldNodeSize, 268 marshallOutsideLatch, marshalledBuffer, 269 tracker); 270 271 } catch (BufferOverflowException e) { 272 273 284 throw new RunRecoveryException(envImpl, e); 285 } catch (IOException e) { 286 287 292 throw new DatabaseException(Tracer.getStackTrace(e), e); 293 } 294 295 298 299 303 if (fsyncRequired) { 304 fileManager.groupSync(); 305 } 306 307 311 envImpl.getCheckpointer().wakeupAfterWrite(); 312 if (logResult.wakeupCleaner) { 313 tracker.activateCleaner(); 314 } 315 316 317 if (backgroundIO) { 318 envImpl.updateBackgroundWrites 319 (logResult.entrySize, logBufferPool.getLogBufferSize()); 320 } 321 322 return logResult.currentLsn; 323 } 324 325 abstract protected LogResult logItem(LoggableObject item, 326 boolean isProvisional, 327 boolean flushRequired, 328 boolean forceNewLogFile, 329 long oldNodeLsn, 330 int oldNodeSize, 331 boolean marshallOutsideLatch, 332 ByteBuffer marshalledBuffer, 333 UtilizationTracker tracker) 334 throws IOException , DatabaseException; 335 336 339 protected LogResult logInternal(LoggableObject item, 340 boolean isProvisional, 341 boolean flushRequired, 342 boolean forceNewLogFile, 343 long oldNodeLsn, 344 int oldNodeSize, 345 boolean marshallOutsideLatch, 346 ByteBuffer marshalledBuffer, 347 UtilizationTracker tracker) 348 throws IOException , DatabaseException { 349 350 356 LogEntryType entryType = item.getLogType(); 357 if (oldNodeLsn != DbLsn.NULL_LSN) { 358 tracker.countObsoleteNode(oldNodeLsn, entryType, oldNodeSize); 359 } 360 361 367 int entrySize; 368 if (marshallOutsideLatch) { 369 entrySize = marshalledBuffer.limit(); 370 } else { 371 entrySize = item.getLogSize() + HEADER_BYTES; 372 } 373 374 381 382 if (forceNewLogFile) { 383 fileManager.forceNewLogFile(); 384 } 385 386 boolean flippedFile = fileManager.bumpLsn(entrySize); 387 long currentLsn = DbLsn.NULL_LSN; 388 boolean wakeupCleaner = false; 389 boolean usedTemporaryBuffer = false; 390 boolean success = false; 391 try { 392 currentLsn = fileManager.getLastUsedLsn(); 393 394 398 wakeupCleaner = 399 tracker.countNewLogEntry(currentLsn, entryType, entrySize); 400 401 406 if (item.countAsObsoleteWhenLogged()) { 407 tracker.countObsoleteNodeInexact 408 (currentLsn, entryType, entrySize); 409 } 410 411 414 if (!marshallOutsideLatch) { 415 marshalledBuffer = marshallIntoBuffer(item, 416 entrySize-HEADER_BYTES, 417 isProvisional, 418 entrySize); 419 } 420 421 422 if (entrySize != marshalledBuffer.limit()) { 423 throw new DatabaseException( 424 "Logged item entrySize= " + entrySize + 425 " but marshalledSize=" + marshalledBuffer.limit() + 426 " type=" + entryType + " currentLsn=" + 427 DbLsn.getNoFormatString(currentLsn)); 428 } 429 430 436 LogBuffer useLogBuffer = 437 logBufferPool.getWriteBuffer(entrySize, flippedFile); 438 439 440 marshalledBuffer = 441 addPrevOffsetAndChecksum(marshalledBuffer, 442 fileManager.getPrevEntryOffset(), 443 entrySize); 444 445 452 useLogBuffer.latchForWrite(); 453 try { 454 ByteBuffer useBuffer = useLogBuffer.getDataBuffer(); 455 if (useBuffer.capacity() - useBuffer.position() < entrySize) { 456 fileManager.writeLogBuffer 457 (new LogBuffer(marshalledBuffer, currentLsn)); 458 usedTemporaryBuffer = true; 459 assert useBuffer.position() == 0; 460 nTempBufferWrites++; 461 } else { 462 463 useBuffer.put(marshalledBuffer); 464 } 465 } finally { 466 useLogBuffer.release(); 467 } 468 469 475 if (envImpl.isReplicated()) { 476 if (item.getLogType().isReplicated()) { 477 envImpl.getReplicator().replicateOperation( 478 Operation.PLACEHOLDER, 479 marshalledBuffer); 480 } 481 } 482 success = true; 483 } finally { 484 if (!success) { 485 486 500 fileManager.restoreLastPosition(); 501 } 502 } 503 504 509 if (!usedTemporaryBuffer) { 510 logBufferPool.writeCompleted(currentLsn, flushRequired); 511 } 512 513 518 item.postLogWork(currentLsn); 519 520 return new LogResult(currentLsn, wakeupCleaner, entrySize); 521 } 522 523 526 private ByteBuffer marshallIntoBuffer(LoggableObject item, 527 int itemSize, 528 boolean isProvisional, 529 int entrySize) 530 throws DatabaseException { 531 532 ByteBuffer destBuffer = ByteBuffer.allocate(entrySize); 533 534 535 destBuffer.position(CHECKSUM_BYTES); 536 537 538 writeHeader(destBuffer, item.getLogType(), itemSize, isProvisional); 539 540 541 item.writeToLog(destBuffer); 542 543 544 destBuffer.flip(); 545 546 return destBuffer; 547 } 548 549 private ByteBuffer addPrevOffsetAndChecksum(ByteBuffer destBuffer, 550 long lastOffset, 551 int entrySize) { 552 553 Checksum checksum = Adler32.makeChecksum(); 554 555 556 destBuffer.position(HEADER_PREV_OFFSET); 557 LogUtils.writeUnsignedInt(destBuffer, lastOffset); 558 559 560 checksum.update(destBuffer.array(), CHECKSUM_BYTES, 561 (entrySize - CHECKSUM_BYTES)); 562 destBuffer.position(0); 563 LogUtils.writeUnsignedInt(destBuffer, checksum.getValue()); 564 565 566 destBuffer.position(0); 567 568 return destBuffer; 569 } 570 571 575 ByteBuffer putIntoBuffer(LoggableObject item, 576 int itemSize, 577 long prevLogEntryOffset, 578 boolean isProvisional, 579 int entrySize) 580 throws DatabaseException { 581 582 ByteBuffer destBuffer = 583 marshallIntoBuffer(item, itemSize, isProvisional, entrySize); 584 return addPrevOffsetAndChecksum(destBuffer, 0, entrySize); 585 } 586 587 594 private void writeHeader(ByteBuffer destBuffer, 595 LogEntryType itemType, 596 int itemSize, 597 boolean isProvisional) { 598 byte typeNum = itemType.getTypeNum(); 600 destBuffer.put(typeNum); 601 602 byte version = itemType.getVersion(); 604 if (isProvisional) 605 version = LogEntryType.setProvisional(version); 606 destBuffer.put(version); 607 608 destBuffer.position(HEADER_SIZE_OFFSET); 610 LogUtils.writeInt(destBuffer, itemSize); 611 } 612 613 616 617 622 public LogEntry getLogEntry(long lsn) 623 throws DatabaseException { 624 625 629 envImpl.checkIfInvalid(); 630 631 637 LogSource logSource = getLogSource(lsn); 638 639 640 return getLogEntryFromLogSource(lsn, logSource); 641 } 642 643 LogEntry getLogEntry(long lsn, RandomAccessFile file) 644 throws DatabaseException { 645 646 return getLogEntryFromLogSource 647 (lsn, new FileSource(file, readBufferSize, fileManager)); 648 } 649 650 657 private LogEntry getLogEntryFromLogSource(long lsn, 658 LogSource logSource) 659 throws DatabaseException { 660 661 try { 662 663 671 long fileOffset = DbLsn.getFileOffset(lsn); 672 ByteBuffer entryBuffer = logSource.getBytes(fileOffset); 673 674 675 ChecksumValidator validator = null; 676 long storedChecksum = LogUtils.getUnsignedInt(entryBuffer); 677 if (doChecksumOnRead) { 678 validator = new ChecksumValidator(); 679 validator.update(envImpl, entryBuffer, 680 HEADER_CONTENT_BYTES, false); 681 } 682 683 684 byte loggableType = entryBuffer.get(); byte version = entryBuffer.get(); 687 entryBuffer.position(entryBuffer.position() + PREV_BYTES); 688 int itemSize = LogUtils.readInt(entryBuffer); 689 690 694 if (entryBuffer.remaining() < itemSize) { 695 entryBuffer = logSource.getBytes(fileOffset + HEADER_BYTES, 696 itemSize); 697 nRepeatFaultReads++; 698 } 699 700 704 if (doChecksumOnRead) { 705 706 validator.update(envImpl, entryBuffer, itemSize, false); 707 validator.validate(envImpl, storedChecksum, lsn); 708 } 709 710 assert LogEntryType.isValidType(loggableType): 711 "Read non-valid log entry type: " + loggableType; 712 713 714 LogEntry logEntry = 715 LogEntryType.findType(loggableType, version).getNewLogEntry(); 716 logEntry.readEntry(entryBuffer, itemSize, version, true); 717 718 719 if (readHook != null) { 720 readHook.doIOHook(); 721 } 722 723 731 return logEntry; 732 } catch (DatabaseException e) { 733 734 738 throw e; 739 } catch (ClosedChannelException e) { 740 741 745 throw new RunRecoveryException(envImpl, 746 "Channel closed, may be "+ 747 "due to thread interrupt", 748 e); 749 } catch (Exception e) { 750 throw new DatabaseException(e); 751 } finally { 752 if (logSource != null) { 753 logSource.release(); 754 } 755 } 756 } 757 758 763 public Object get(long lsn) 764 throws DatabaseException { 765 766 LogEntry entry = getLogEntry(lsn); 767 return entry.getMainItem(); 768 } 769 770 774 public LogSource getLogSource(long lsn) 775 throws DatabaseException { 776 777 780 LogBuffer logBuffer = logBufferPool.getReadBuffer(lsn); 781 782 if (logBuffer == null) { 783 try { 784 785 return new FileHandleSource 786 (fileManager.getFileHandle(DbLsn.getFileNumber(lsn)), 787 readBufferSize, 788 fileManager); 789 } catch (LogFileNotFoundException e) { 790 791 throw new LogFileNotFoundException 792 (DbLsn.getNoFormatString(lsn) + ' ' + e.getMessage()); 793 } 794 } else { 795 return logBuffer; 796 } 797 } 798 799 802 public void flush() 803 throws DatabaseException { 804 805 if (!readOnly) { 806 flushInternal(); 807 fileManager.syncLogEnd(); 808 } 809 } 810 811 814 public void flushNoSync() 815 throws DatabaseException { 816 817 if (!readOnly) { 818 flushInternal(); 819 } 820 } 821 822 abstract protected void flushInternal() 823 throws LogException, DatabaseException; 824 825 826 public void loadStats(StatsConfig config, EnvironmentStats stats) 827 throws DatabaseException { 828 829 stats.setNRepeatFaultReads(nRepeatFaultReads); 830 stats.setNTempBufferWrites(nTempBufferWrites); 831 if (config.getClear()) { 832 nRepeatFaultReads = 0; 833 nTempBufferWrites = 0; 834 } 835 836 logBufferPool.loadStats(config, stats); 837 fileManager.loadStats(config, stats); 838 } 839 840 844 abstract public TrackedFileSummary getUnflushableTrackedSummary(long file) 845 throws DatabaseException; 846 847 protected TrackedFileSummary getUnflushableTrackedSummaryInternal(long file) 848 throws DatabaseException { 849 850 return envImpl.getUtilizationTracker(). 851 getUnflushableTrackedSummary(file); 852 } 853 854 859 abstract public void countObsoleteNode(long lsn, 860 LogEntryType type, 861 int size) 862 throws DatabaseException; 863 864 protected void countObsoleteNodeInternal(UtilizationTracker tracker, 865 long lsn, 866 LogEntryType type, 867 int size) 868 throws DatabaseException { 869 870 tracker.countObsoleteNode(lsn, type, size); 871 } 872 873 876 abstract public void countObsoleteNodes(TrackedFileSummary[] summaries) 877 throws DatabaseException; 878 879 protected void countObsoleteNodesInternal(UtilizationTracker tracker, 880 TrackedFileSummary[] summaries) 881 throws DatabaseException { 882 883 for (int i = 0; i < summaries.length; i += 1) { 884 TrackedFileSummary summary = summaries[i]; 885 tracker.addSummary(summary.getFileNumber(), summary); 886 } 887 } 888 889 892 abstract public void countObsoleteINs(List lsnList) 893 throws DatabaseException; 894 895 protected void countObsoleteINsInternal(List lsnList) 896 throws DatabaseException { 897 898 UtilizationTracker tracker = envImpl.getUtilizationTracker(); 899 900 for (int i = 0; i < lsnList.size(); i += 1) { 901 Long offset = (Long ) lsnList.get(i); 902 tracker.countObsoleteNode 903 (offset.longValue(), LogEntryType.LOG_IN, 0); 904 } 905 } 906 907 908 public void setReadHook(TestHook hook) { 909 readHook = hook; 910 } 911 912 915 static class LogResult { 916 long currentLsn; 917 boolean wakeupCleaner; 918 int entrySize; 919 920 LogResult(long currentLsn, 921 boolean wakeupCleaner, 922 int entrySize) { 923 this.currentLsn = currentLsn; 924 this.wakeupCleaner = wakeupCleaner; 925 this.entrySize = entrySize; 926 } 927 } 928 } 929 | Popular Tags |