1 8 9 package com.sleepycat.je.log; 10 11 import java.io.IOException ; 12 import java.nio.Buffer ; 13 import java.nio.ByteBuffer ; 14 15 import com.sleepycat.je.DatabaseException; 16 import com.sleepycat.je.config.EnvironmentParams; 17 import com.sleepycat.je.dbi.DbConfigManager; 18 import com.sleepycat.je.dbi.EnvironmentImpl; 19 import com.sleepycat.je.utilint.DbLsn; 20 import com.sleepycat.je.utilint.Tracer; 21 22 27 public abstract class FileReader { 28 29 protected EnvironmentImpl env; 30 protected FileManager fileManager; 31 32 33 private ByteBuffer readBuffer; private ByteBuffer saveBuffer; private int maxReadBufferSize; 37 38 private boolean singleFile; protected boolean eof; private boolean forward; 43 52 protected long readBufferFileNum; protected long readBufferFileStart; protected long readBufferFileEnd; 56 57 private int nRead; 59 63 private long nRepeatIteratorReads; 64 65 66 private int nReadOperations; 67 68 69 protected byte currentEntryTypeNum; 70 protected byte currentEntryTypeVersion; 71 protected long currentEntryPrevOffset; 72 protected int currentEntrySize; 73 protected long currentEntryChecksum; 74 75 79 protected long currentEntryOffset; 80 protected long nextEntryOffset; 81 protected long startLsn; private long finishLsn; 84 85 protected ChecksumValidator cksumValidator; 86 private boolean doValidateChecksum; private boolean alwaysValidateChecksum; 89 90 protected boolean anticipateChecksumErrors; 91 92 96 public FileReader(EnvironmentImpl env, 97 int readBufferSize, 98 boolean forward, 99 long startLsn, 100 Long singleFileNumber, 101 long endOfFileLsn, 102 long finishLsn) 103 throws IOException , DatabaseException { 104 105 this.env = env; 106 this.fileManager = env.getFileManager(); 107 this.doValidateChecksum = env.getLogManager().getChecksumOnRead(); 108 109 110 this.singleFile = (singleFileNumber != null); 111 this.forward = forward; 112 113 readBuffer = ByteBuffer.allocate(readBufferSize); 114 threadSafeBufferFlip(readBuffer); 115 saveBuffer = ByteBuffer.allocate(readBufferSize); 116 117 DbConfigManager configManager = env.getConfigManager(); 118 maxReadBufferSize = 119 configManager.getInt(EnvironmentParams. LOG_ITERATOR_MAX_SIZE); 120 121 122 this.startLsn = startLsn; 123 this.finishLsn = finishLsn; 124 initStartingPosition(endOfFileLsn, singleFileNumber); 125 126 127 nRead = 0; 128 if (doValidateChecksum) { 129 cksumValidator = new ChecksumValidator(); 130 } 131 anticipateChecksumErrors = false; 132 } 133 134 138 protected void initStartingPosition(long endOfFileLsn, 139 Long ignoreSingleFileNumber) 140 throws IOException , DatabaseException { 141 142 eof = false; 143 if (forward) { 144 145 149 if (startLsn != DbLsn.NULL_LSN) { 150 readBufferFileNum = DbLsn.getFileNumber(startLsn); 151 readBufferFileEnd = DbLsn.getFileOffset(startLsn); 152 } else { 153 Long firstNum = fileManager.getFirstFileNum(); 154 if (firstNum == null) { 155 eof = true; 156 } else { 157 readBufferFileNum = firstNum.longValue(); 158 readBufferFileEnd = 0; 159 } 160 } 161 162 165 nextEntryOffset = readBufferFileEnd; 166 } else { 167 168 173 assert startLsn != DbLsn.NULL_LSN; 174 readBufferFileNum = DbLsn.getFileNumber(endOfFileLsn); 175 readBufferFileStart = DbLsn.getFileOffset(endOfFileLsn); 176 readBufferFileEnd = readBufferFileStart; 177 178 183 if (DbLsn.getFileNumber(startLsn) == 184 DbLsn.getFileNumber(endOfFileLsn)) { 185 currentEntryPrevOffset = DbLsn.getFileOffset(startLsn); 186 } else { 187 currentEntryPrevOffset = 0; 188 } 189 currentEntryOffset = DbLsn.getFileOffset(endOfFileLsn); 190 } 191 } 192 193 196 public void setAlwaysValidateChecksum(boolean validate) { 197 alwaysValidateChecksum = validate; 198 } 199 200 203 public int getNumRead() { 204 return nRead; 205 } 206 207 public long getNRepeatIteratorReads() { 208 return nRepeatIteratorReads; 209 } 210 211 214 public long getLastLsn() { 215 return DbLsn.makeLsn(readBufferFileNum, currentEntryOffset); 216 } 217 218 221 public int getLastEntrySize() { 222 return LogManager.HEADER_BYTES + currentEntrySize; 223 } 224 225 231 public boolean readNextEntry() 232 throws DatabaseException, IOException { 233 234 boolean foundEntry = false; 235 try { 236 while ((!eof) && (!foundEntry)) { 237 238 239 getLogEntryInReadBuffer(); 240 ByteBuffer dataBuffer = 241 readData(LogManager.HEADER_BYTES, true); 242 243 readHeader(dataBuffer); 244 245 boolean isTargetEntry = isTargetEntry(currentEntryTypeNum, 246 currentEntryTypeVersion); 247 boolean doValidate = doValidateChecksum && 248 (isTargetEntry || alwaysValidateChecksum); 249 boolean collectData = doValidate || isTargetEntry; 250 251 252 if (doValidate) { 253 startChecksum(dataBuffer); 254 } 255 256 261 dataBuffer = readData(currentEntrySize, collectData); 262 263 268 if (forward) { 269 currentEntryOffset = nextEntryOffset; 270 nextEntryOffset += 271 LogManager.HEADER_BYTES + currentEntrySize; 272 } 273 274 275 if (doValidate) { 276 validateChecksum(dataBuffer); 277 } 278 279 if (isTargetEntry) { 280 281 288 if (processEntry(dataBuffer)) { 289 foundEntry = true; 290 nRead++; 291 } 292 } else if (collectData) { 293 294 298 threadSafeBufferPosition 299 (dataBuffer, 300 threadSafeBufferPosition(dataBuffer) + 301 currentEntrySize); 302 } 303 } 304 } catch (EOFException e) { 305 eof = true; 306 } catch (DatabaseException e) { 307 eof = true; 308 309 LogEntryType problemType = 310 LogEntryType.findType(currentEntryTypeNum, 311 currentEntryTypeVersion); 312 Tracer.trace(env, "FileReader", "readNextEntry", 313 "Halted log file reading at file 0x" + 314 Long.toHexString(readBufferFileNum) + 315 " offset 0x" + 316 Long.toHexString(nextEntryOffset) + 317 " offset(decimal)=" + nextEntryOffset + 318 ":\nentry="+ problemType + 319 "(typeNum=" + currentEntryTypeNum + 320 ",version=" + currentEntryTypeVersion + 321 ")\nprev=0x" + 322 Long.toHexString(currentEntryPrevOffset) + 323 "\nsize=" + currentEntrySize + 324 "\nNext entry should be at 0x" + 325 Long.toHexString((nextEntryOffset + 326 LogManager.HEADER_BYTES + 327 currentEntrySize)) + 328 "\n:", e); 329 throw e; 330 } 331 return foundEntry; 332 } 333 334 protected boolean resyncReader(long nextGoodRecordPostCorruption, 335 boolean dumpCorruptedBounds) 336 throws DatabaseException, IOException { 337 338 339 return false; 340 } 341 342 346 private void getLogEntryInReadBuffer() 347 throws IOException , DatabaseException, EOFException { 348 349 354 if (!forward) { 355 356 361 if ((currentEntryPrevOffset != 0) && 362 (currentEntryPrevOffset >= readBufferFileStart)) { 363 364 365 long nextLsn = DbLsn.makeLsn(readBufferFileNum, 366 currentEntryPrevOffset); 367 if (finishLsn != DbLsn.NULL_LSN) { 368 if (DbLsn.compareTo(nextLsn, finishLsn) == -1) { 369 throw new EOFException(); 370 } 371 } 372 373 374 threadSafeBufferPosition(readBuffer, 375 (int) (currentEntryPrevOffset - 376 readBufferFileStart)); 377 } else { 378 379 390 if (currentEntryPrevOffset == 0) { 391 392 currentEntryPrevOffset = 393 fileManager.getFileHeaderPrevOffset(readBufferFileNum); 394 Long prevFileNum = 395 fileManager.getFollowingFileNum(readBufferFileNum, 396 false); 397 if (prevFileNum == null) { 398 throw new EOFException(); 399 } 400 if (readBufferFileNum - prevFileNum.longValue() != 1) { 401 402 if (!resyncReader(DbLsn.makeLsn 403 (prevFileNum.longValue(), 404 DbLsn.MAX_FILE_OFFSET), 405 false)) { 406 407 throw new DatabaseException 408 ("Cannot read backward over cleaned file" + 409 " from " + readBufferFileNum + 410 " to " + prevFileNum); 411 } 412 } 413 readBufferFileNum = prevFileNum.longValue(); 414 readBufferFileStart = currentEntryPrevOffset; 415 } else if ((currentEntryOffset - currentEntryPrevOffset) > 416 readBuffer.capacity()) { 417 418 422 readBufferFileStart = currentEntryPrevOffset; 423 } else { 424 425 426 long newPosition = currentEntryOffset - 427 readBuffer.capacity(); 428 readBufferFileStart = (newPosition < 0) ? 0 : newPosition; 429 } 430 431 432 long nextLsn = DbLsn.makeLsn(readBufferFileNum, 433 currentEntryPrevOffset); 434 if (finishLsn != DbLsn.NULL_LSN) { 435 if (DbLsn.compareTo(nextLsn, finishLsn) == -1) { 436 throw new EOFException(); 437 } 438 } 439 440 444 FileHandle fileHandle = 445 fileManager.getFileHandle(readBufferFileNum); 446 try { 447 readBuffer.clear(); 448 fileManager.readFromFile(fileHandle.getFile(), readBuffer, 449 readBufferFileStart); 450 nReadOperations += 1; 451 452 assert EnvironmentImpl.maybeForceYield(); 453 } finally { 454 fileHandle.release(); 455 } 456 readBufferFileEnd = readBufferFileStart + 457 threadSafeBufferPosition(readBuffer); 458 threadSafeBufferFlip(readBuffer); 459 threadSafeBufferPosition(readBuffer, 460 (int) (currentEntryPrevOffset - 461 readBufferFileStart)); 462 } 463 464 465 currentEntryOffset = currentEntryPrevOffset; 466 } else { 467 468 472 if (finishLsn != DbLsn.NULL_LSN) { 473 474 long nextLsn = DbLsn.makeLsn(readBufferFileNum, 475 nextEntryOffset); 476 if (DbLsn.compareTo(nextLsn, finishLsn) >= 0) { 477 throw new EOFException(); 478 } 479 } 480 } 481 } 482 483 487 private void readHeader(ByteBuffer dataBuffer) 488 throws DatabaseException { 489 490 491 currentEntryChecksum = LogUtils.getUnsignedInt(dataBuffer); 492 dataBuffer.mark(); 493 494 495 currentEntryTypeNum = dataBuffer.get(); 496 497 502 if (!LogEntryType.isValidType(currentEntryTypeNum)) 503 throw new DbChecksumException 504 ((anticipateChecksumErrors ? null : env), 505 "FileReader read invalid log entry type: " + 506 currentEntryTypeNum); 507 508 currentEntryTypeVersion = dataBuffer.get(); 509 currentEntryPrevOffset = LogUtils.getUnsignedInt(dataBuffer); 510 currentEntrySize = LogUtils.readInt(dataBuffer); 511 } 512 513 517 private void startChecksum(ByteBuffer dataBuffer) 518 throws DatabaseException { 519 520 521 cksumValidator.reset(); 522 int entryStart = threadSafeBufferPosition(dataBuffer); 523 dataBuffer.reset(); 524 cksumValidator.update(env, dataBuffer, 525 LogManager.HEADER_CONTENT_BYTES, 526 anticipateChecksumErrors); 527 528 529 threadSafeBufferPosition(dataBuffer, entryStart); 530 } 531 532 536 private void validateChecksum(ByteBuffer entryBuffer) 537 throws DatabaseException { 538 539 cksumValidator.update(env, entryBuffer, currentEntrySize, 540 anticipateChecksumErrors); 541 cksumValidator.validate(env, currentEntryChecksum, 542 readBufferFileNum, currentEntryOffset, 543 anticipateChecksumErrors); 544 } 545 546 555 private ByteBuffer readData(int amountToRead, boolean collectData) 556 throws IOException , DatabaseException, EOFException { 557 558 int alreadyRead = 0; 559 ByteBuffer completeBuffer = null; 560 saveBuffer.clear(); 561 562 while ((alreadyRead < amountToRead) && !eof) { 563 564 int bytesNeeded = amountToRead - alreadyRead; 565 if (readBuffer.hasRemaining()) { 566 567 568 if (collectData) { 569 572 if ((alreadyRead > 0) || 573 (readBuffer.remaining() < bytesNeeded)) { 574 575 576 577 copyToSaveBuffer(bytesNeeded); 578 alreadyRead = threadSafeBufferPosition(saveBuffer); 579 completeBuffer = saveBuffer; 580 } else { 581 582 583 584 completeBuffer = readBuffer; 585 alreadyRead = amountToRead; 586 } 587 } else { 588 591 int positionIncrement = 592 (readBuffer.remaining() > bytesNeeded) ? 593 bytesNeeded : readBuffer.remaining(); 594 595 alreadyRead += positionIncrement; 596 threadSafeBufferPosition 597 (readBuffer, 598 threadSafeBufferPosition(readBuffer) + 599 positionIncrement); 600 completeBuffer = readBuffer; 601 } 602 } else { 603 606 fillReadBuffer(bytesNeeded); 607 } 608 } 609 610 611 threadSafeBufferFlip(saveBuffer); 612 613 return completeBuffer; 614 } 615 616 621 private void adjustReadBufferSize(int amountToRead) { 622 int readBufferSize = readBuffer.capacity(); 623 624 if (amountToRead > readBufferSize) { 625 626 if (readBufferSize < maxReadBufferSize) { 627 628 632 if (amountToRead < maxReadBufferSize) { 633 readBufferSize = amountToRead; 634 635 int remainder = readBufferSize % 1024; 636 readBufferSize += 1024 - remainder; 637 readBufferSize = Math.min(readBufferSize, 638 maxReadBufferSize); 639 } else { 640 readBufferSize = maxReadBufferSize; 641 } 642 readBuffer = ByteBuffer.allocate(readBufferSize); 643 } 644 645 if (amountToRead > readBuffer.capacity()) { 646 nRepeatIteratorReads++; 647 } 648 } 649 } 650 651 654 private void copyToSaveBuffer(int bytesNeeded) { 655 656 int bytesFromThisBuffer; 657 658 if (bytesNeeded <= readBuffer.remaining()) { 659 bytesFromThisBuffer = bytesNeeded; 660 } else { 661 bytesFromThisBuffer = readBuffer.remaining(); 662 } 663 664 665 ByteBuffer temp; 666 667 668 if (saveBuffer.capacity() - threadSafeBufferPosition(saveBuffer) < 669 bytesFromThisBuffer) { 670 671 temp = ByteBuffer.allocate(saveBuffer.capacity() + 672 bytesFromThisBuffer); 673 threadSafeBufferFlip(saveBuffer); 674 temp.put(saveBuffer); 675 saveBuffer = temp; 676 } 677 678 683 temp = readBuffer.slice(); 684 temp.limit(bytesFromThisBuffer); 685 saveBuffer.put(temp); 686 threadSafeBufferPosition(readBuffer, 687 threadSafeBufferPosition(readBuffer) + 688 bytesFromThisBuffer); 689 } 690 691 694 private void fillReadBuffer(int bytesNeeded) 695 throws DatabaseException, EOFException { 696 697 FileHandle fileHandle = null; 698 try { 699 adjustReadBufferSize(bytesNeeded); 700 701 702 fileHandle = fileManager.getFileHandle(readBufferFileNum); 703 boolean fileOk = false; 704 705 709 if (readBufferFileEnd < fileHandle.getFile().length()) { 710 fileOk = true; 711 } else { 712 713 if (!singleFile) { 714 Long nextFile = 715 fileManager.getFollowingFileNum(readBufferFileNum, 716 forward); 717 if (nextFile != null) { 718 readBufferFileNum = nextFile.longValue(); 719 fileHandle.release(); 720 fileHandle = 721 fileManager.getFileHandle(readBufferFileNum); 722 fileOk = true; 723 readBufferFileEnd = 0; 724 nextEntryOffset = 0; 725 } 726 } 727 } 728 729 if (fileOk) { 730 readBuffer.clear(); 731 fileManager.readFromFile(fileHandle.getFile(), readBuffer, 732 readBufferFileEnd); 733 nReadOperations += 1; 734 735 assert EnvironmentImpl.maybeForceYield(); 736 737 readBufferFileStart = readBufferFileEnd; 738 readBufferFileEnd = 739 readBufferFileStart + threadSafeBufferPosition(readBuffer); 740 threadSafeBufferFlip(readBuffer); 741 } else { 742 throw new EOFException(); 743 } 744 } catch (IOException e) { 745 e.printStackTrace(); 746 throw new DatabaseException 747 ("Problem in fillReadBuffer, readBufferFileNum = " + 748 readBufferFileNum + ": " + e.getMessage()); 749 750 } finally { 751 if (fileHandle != null) { 752 fileHandle.release(); 753 } 754 } 755 } 756 757 760 public int getAndResetNReads() { 761 int tmp = nReadOperations; 762 nReadOperations = 0; 763 return tmp; 764 } 765 766 770 protected boolean isTargetEntry(byte logEntryTypeNumber, 771 byte logEntryTypeVersion) 772 throws DatabaseException { 773 774 return true; 775 } 776 777 783 protected abstract boolean processEntry(ByteBuffer entryBuffer) 784 throws DatabaseException; 785 786 private static class EOFException extends Exception { 787 } 788 789 799 private Buffer threadSafeBufferFlip(ByteBuffer buffer) { 800 while (true) { 801 try { 802 return buffer.flip(); 803 } catch (IllegalArgumentException IAE) { 804 continue; 805 } 806 } 807 } 808 809 private int threadSafeBufferPosition(ByteBuffer buffer) { 810 while (true) { 811 try { 812 return buffer.position(); 813 } catch (IllegalArgumentException IAE) { 814 continue; 815 } 816 } 817 } 818 819 private Buffer threadSafeBufferPosition(ByteBuffer buffer, 820 int newPosition) { 821 while (true) { 822 try { 823 return buffer.position(newPosition); 824 } catch (IllegalArgumentException IAE) { 825 continue; 826 } 827 } 828 } 829 } 830 | Popular Tags |