1 package com.quadcap.sql.file; 2 3 40 41 import java.io.File ; 42 import java.io.FileOutputStream ; 43 import java.io.IOException ; 44 import java.io.RandomAccessFile ; 45 46 import java.util.ArrayList ; 47 import java.util.List ; 48 import java.util.Map ; 49 import java.util.Properties ; 50 51 import javax.concurrent.BoundedBuffer; 52 import javax.concurrent.Channel; 53 import javax.concurrent.Latch; 54 55 import com.quadcap.sql.lock.Transaction; 56 import com.quadcap.sql.lock.TransactionObserver; 57 58 import com.quadcap.util.collections.LongMap; 59 60 import com.quadcap.util.ConfigNumber; 61 import com.quadcap.util.Debug; 62 import com.quadcap.util.Util; 63 64 71 public class Log1 implements Log { 72 73 private Datafile db; 74 75 76 private BlockFile dbFile; 77 78 79 Logger logger; 80 81 82 private File dbRootDir; 83 84 85 LogSync logSync; 86 87 88 Latch closeLatch = new Latch(); 89 90 91 Channel channel = new BoundedBuffer(2048); 92 93 94 Object fileLock; 95 96 97 LongMap rowIdMap = new LongMap(256); 98 99 100 FileOutputStream bfo; 101 boolean bfoActive = false; 102 103 boolean recovering = false; 104 105 int blockSize; 106 107 115 long minSyncInterval 116 = ConfigNumber.find("qed.minSyncInterval", "15000").longValue(); 117 118 128 long maxSyncInterval 129 = ConfigNumber.find("qed.maxSyncInterval", "60000").longValue(); 130 131 int syncMap = 0 + 132 (1 << 1) + (1 << 2) + (1 << 4) + (1 << 5) + (1 << 6) + (1 << 7) + 0; 140 141 String [] syncStrs = { 143 "Log1.reallyFlush from Log2.flushLog", 144 "Log1.reallyFlush: bfo.sync()", 145 "Log1.reallyFlush: logger.sync()", 146 "Log1.reallyFlush: db.tempFile.flush(!)", 147 "Log1.reallyCheckpoint: dbfile.flush(!)", 148 "Log1.reallyCheckpoint: logger.sync()", 149 "Log1.reallyCheckpoint: dbfile.checkpoint(!)", 150 "Log1.saveBlock: bfo.sync" 151 }; 152 153 int[] syncCnts = new int[syncStrs.length]; 154 protected boolean checksync(int x) { 156 syncCnts[x]++; 158 boolean ret = (syncMap & (1 << x)) != 0; 161 return ret; 162 } 163 164 168 public Log1() {} 169 170 177 public void init(Datafile db, boolean create, Properties props) 178 throws IOException 179 { 180 this.db = db; 181 this.dbFile = db.file; 182 this.blockSize = dbFile.getBlockSize(); 183 this.dbRootDir = db.dbRootDir; 184 this.fileLock = db.getFileLock(); 185 194 198 206 this.minSyncInterval = 207 Long.parseLong(props.getProperty("minSyncInterval", 208 String.valueOf(minSyncInterval))); 209 210 219 this.maxSyncInterval = 220 Long.parseLong(props.getProperty("maxSyncInterval", 221 String.valueOf(maxSyncInterval))); 222 223 String loggerClass = props.getProperty("loggerClass"); 226 try { 227 this.logger = (Logger)(Class.forName(loggerClass).newInstance()); 228 } catch (Throwable t) { 229 this.logger = new Logger1(); 230 } 231 logger.init(this, create, props); 232 233 } 234 235 public void start() { 236 logSync = new LogSync(this); 237 logSync.setDaemon(true); 238 logSync.start(); 239 } 240 241 public void remove() throws IOException { 242 new File (dbRootDir, "logfile").delete(); 243 } 244 245 248 public Datafile getDatafile() { return db; } 249 250 253 public File getDbRootDir() { return dbRootDir; } 254 255 258 int pendingBegins = 0; 259 public void addEntry(LogEntry entry) 260 throws IOException 261 { 262 if (entry.getCode() == LogEntry.BEGIN_TRANSACTION) { 263 pendingBegins++; 264 } 265 put(entry); 266 } 267 268 271 public void close() throws IOException { 272 put(opClose); 273 try { 274 closeLatch.acquire(); 275 } catch (InterruptedException ex) { 276 } 277 } 278 279 283 public void flushLog() throws IOException { 284 put(opFlush); 285 } 286 287 290 public void checkpoint() throws IOException { 291 put(opCheckpoint); 292 } 293 294 297 public void sync() throws IOException { 298 Latch latch = new Latch(); 299 put(new Sync(latch)); 300 try { 301 latch.acquire(); 302 } catch (InterruptedException ex) { 303 } 304 } 305 306 309 public void rollbackTransaction(Transaction trans) 310 throws IOException 311 { 312 put(new Rollback(trans)); 313 sync(); 314 } 315 316 319 public void rollbackStatement(Transaction trans, int stmtId) 320 throws IOException 321 { 322 put(new Rollback(trans, stmtId)); 323 sync(); 324 } 325 326 329 public void restart() throws Exception { 330 reallyRestart(); 331 } 332 333 335 338 public final long getRowMap(long rowId) { 339 long ret = rowId; 340 if (rowIdMap != null) { 341 Long mapped = (Long )rowIdMap.get(rowId); 342 if (mapped != null) ret = mapped.longValue(); 343 } 344 return ret; 345 } 346 347 355 public final void putRowMap(long logRow, long fileRow) { 356 if (rowIdMap == null) rowIdMap = new LongMap(256); 357 rowIdMap.put(logRow, new Long (fileRow)); 358 } 359 360 public final void removeRowMap(long row) { 361 if (rowIdMap != null) rowIdMap.remove(row); 362 } 363 364 void reallyFlush() throws IOException { 365 if (Trace.bit(18)) { 367 Debug.println(this + ".reallyFlush()"); 368 } 369 if (checksync(2)) logger.sync(); 371 if (db.tempFile != null) { 372 db.tempFile.flush(!checksync(3)); 373 } 374 } 375 376 void reallyClose() throws IOException { 377 try { 378 if (logSync != null) { 379 logSync.close(); 380 } 381 } finally { 382 logSync = null; 383 if (bfo != null) { 384 try { 385 bfo.close(); 386 } finally { 387 bfo = null; 388 } 389 } 390 } 391 } 392 393 void reallyRollbackTransaction(Transaction tr) throws Exception { 394 if (Trace.bit(18)) { 396 Debug.println("reallyRollbackTransaction(" + tr + ")"); 397 } 398 long t = tr.getTransactionId(); 400 LogEntry e = logger.getLastOp(t); 401 try { 402 while (e != null) { 403 if (e.getTransactionId() == t) { 405 switch (e.getCode()) { 406 case LogEntry.STEP: 407 if (e.getRedoState() == LogEntry.DONE) { 408 try { 409 e.undo(tr, db); 410 } catch (Throwable th) { 411 Debug.println("Exception in rollback transaction"); 413 Debug.print(th); 414 } 416 logger.setRedoState(e, LogEntry.UNDONE); 417 } 418 break; 419 case LogEntry.BEGIN_TRANSACTION: 420 e = null; 421 break; 422 default: 423 break; 424 } 425 } 426 e = (e == null || e.getPrev() < 0) ? null : logger.getPrevOp(e); 427 } 428 } finally { 429 rowIdMap = null; 430 } 431 } 432 433 void reallyRollbackStatement(Transaction tr, int s) throws Exception { 434 long t = tr.getTransactionId(); 435 LogEntry e = logger.getLastOp(t); 436 rowIdMap = null; 437 try { 438 while (e != null) { 439 if (e.getStatementId() == s) { 440 switch (e.getCode()) { 441 case LogEntry.STEP: 442 if (e.getRedoState() == LogEntry.DONE) { 443 try { 444 e.undo(tr, db); 445 } catch (Throwable th) { 446 Debug.println("Exception in statement rollback"); 448 Debug.print(th); 449 } 451 logger.setRedoState(e, LogEntry.UNDONE); 452 } 453 break; 454 case LogEntry.BEGIN_STATEMENT: 455 return; 458 default: 459 break; 460 } 461 } 462 e = logger.getPrevOp(e); 463 } 464 } finally { 465 rowIdMap = null; 466 } 467 } 468 469 LogEntry scanLog(LongMap t) throws IOException { 470 LogEntry op = logger.getFirstOp(); 471 LogEntry last = op; 472 for (; op != null; last = op, op = logger.getNextOp()) { 473 LogEntry e = op; 474 if (e != null && e.getCode() == LogEntry.COMMIT) { 475 t.put(e.getTransactionId(), ""); 477 } 478 } 479 return last; 480 } 481 482 void reallyRestart() throws Exception { 483 if (Trace.bit(18)) { 485 Debug.println(toString() + ".reallyRestart()"); 486 } 487 db.getTempFile(false); Transaction t = db.makeTransaction(false); 490 LongMap map = new LongMap(32); 491 LogEntry last = scanLog(map); 492 int q = 0; 493 int checkpointPosition = logger.getCheckpoint(); 494 int endPosition = logger.getEnd(); 495 LogEntry op = last; 496 recovering = true; 497 try { 498 for (; op != null; op = logger.getPrevOp(op)) { 499 LogEntry e = op; 500 if (e != null && e.getCode() == LogEntry.STEP && 501 e.getPosition() < checkpointPosition && 502 map.get(e.getTransactionId()) == null) { 503 if (e.getRedoState() == LogEntry.DONE) { 504 if (Trace.bit(18)) { 506 Debug.println("UNDO[" + e + "]"); 507 } 508 if (q++ == 0) { 510 Debug.println("Undoing in-progress transactions..."); 511 } 512 e.undo(t, db); 513 } 514 } 515 } 516 rowIdMap = null; 517 518 int p = 0; 519 for (op = logger.getFirstOp(); op != null; op = logger.getNextOp()) { 520 LogEntry e = op; 521 if (e.getPosition() >= endPosition) break; 522 if (e != null && map.get(e.getTransactionId()) != null) { 523 if (e.getCode() == LogEntry.STEP) { 524 if (e.getRedoState() == LogEntry.DONE) { 525 if (p++ == 0) { 526 Debug.println("Restoring committed transactions: " + map); 527 } 528 if (Trace.bit(18)) { 530 Debug.println("REDO[" + e + "]"); 531 } 532 e.redo(t, db); 534 } 535 } 536 } 537 } 538 rowIdMap = null; 539 540 if (p > 0 || q > 0) { 541 Debug.println("Recovery complete: " + 542 p + " redos, " + 543 q + " undos"); 544 } 545 logger.reset(); 546 logger.sync(); 547 if (p > 0 || q > 0) { 548 checkpoint(); 549 } 550 } finally { 551 recovering = false; 552 } 553 } 554 555 long lastCheckpoint = System.currentTimeMillis(); 556 557 void maybeCheckpoint() throws IOException { 558 long now = System.currentTimeMillis(); 559 boolean idle = logger.getActiveTransactionCount() == 0; 560 if (idle) rowIdMap = null; 561 long interval = idle ? minSyncInterval : maxSyncInterval; 562 if (now - lastCheckpoint >= interval) { 563 if (Trace.bit(25)) { 565 Debug.println("[" + entryCount + BlockStore.rw() + 566 "] interval " + (now - lastCheckpoint) + 567 " ms," + (idle ? " IDLE" : "") + 568 " checkpoint now"); 569 } 570 572 reallyCheckpoint(); 573 574 if (Trace.bit(25)) { 576 Debug.println("checkpoint done"); 577 } 578 } 580 } 581 582 void reallyCheckpoint() throws IOException { 583 try { 584 db.flushRoot(); 585 dbFile.flush(!checksync(4)); 586 logger.checkpoint(); 587 588 int numTrans = logger.getActiveTransactionCount(); 589 boolean truncate = pendingBegins == 0 && numTrans == 0; 590 if (truncate) { 591 logger.reset(); 592 } 593 logger.sync(); 594 db.checkpoint(truncate, !checksync(6)); 595 db.checkpointHandler(logger.getActiveTransactions()); 596 dbFile.clearModified(); 597 } finally { 598 lastCheckpoint = System.currentTimeMillis(); 599 rowIdMap = null; 600 } 601 if (Trace.bit(21)) { 603 Debug.println("AFTER Log1.reallyCheckpoint: [" + 604 logger.getActiveTransactionCount() + "]"); 605 } 606 if (false) { 607 long sum = 0; 608 long blk = dbFile.getBlockSize(); 609 for (long x = 0; x < dbFile.getSize(); x += blk) { 610 Block b = dbFile.getBlock(x / blk); 611 for (int ix = 0; ix < blk; ix += 8) { 612 sum += b.readLong(ix); 613 } 614 b.decrRefCount(); 615 } 616 Debug.println("CHECKPOINT: CHECKSUM(" + dbFile.getSize() + ") bytes = " + 617 sum); 618 } 619 } 621 622 int entryCount = 0; 624 626 public void reallyAddEntry(LogEntry entry) throws IOException { 627 entryCount++; 629 if (Trace.bit(16)) { 630 Debug.println(toString() + ".reallyAddEntry(" + entry + ")"); 631 } 632 try { 634 logger.put(entry); 635 if (entry.getCode() == LogEntry.BEGIN_TRANSACTION) { 636 pendingBegins--; 637 } 638 } catch (IOException ex) { 639 if (ex.toString().indexOf("full") > 0) { abortOldestTransaction(entry); 641 logger.put(entry); 642 } else { 643 throw ex; 644 } 645 } 646 } 647 648 private void sortEntry(long tId, List save, List discard, LogEntry e) { 649 if (e.getTransactionId() == tId) { 650 discard.add(e); 651 } else { 652 save.add(e); 653 } 654 } 655 656 private final void abortOldestTransaction(LogEntry entry) 657 throws IOException 658 { 659 long tId = logger.getOldestTransaction(); 660 Transaction t = db.findTransaction(tId); 661 if (t == null) { 662 Debug.println("Log full, will reset log"); 663 logger.reset(); 664 } else { 667 ArrayList save = new ArrayList (); 668 Debug.println("Log full, will abort " + t); 669 try { 670 ArrayList discard = new ArrayList (); 674 LogEntry e; 675 while ((e = (LogEntry)channel.poll(0)) != null) { 676 sortEntry(tId, save, discard, e); 677 } 678 sortEntry(tId, save, discard, entry); 679 Debug.println("discarding " + discard.size() + 680 " completed log entries"); 681 for (int i = discard.size() - 1; i >= 0; i--) { 682 e = (LogEntry)discard.get(i); 683 try { 684 e.undo(t, db); 685 } catch (Throwable th) { 686 Debug.println("Error during abort oldest"); 688 Debug.print(th); 689 } 691 } 692 reallyRollbackTransaction(t); 693 } catch (IOException ex1) { 694 throw ex1; 695 } catch (Exception ex2) { 696 Debug.print(ex2); 698 throw new DatafileException(ex2); 700 } 701 TransactionObserver obs = t.getObserver(); 702 if (obs != null) { 703 obs.abort(t); 704 } 705 reallyCheckpoint(); 706 Debug.println("handling " + save.size() + " saved log entries"); 710 for (int i = 0; i < save.size(); i++) { 711 LogEntry e = (LogEntry)save.get(i); 712 try { 713 e.handle(this); 714 } catch (Throwable th) { 715 Debug.print(th); 716 } 717 } 718 } 719 } 720 721 726 public void put(LogEntry h) throws IOException { 727 while (true) { 728 try { 729 channel.put(h); 730 } catch (InterruptedException ex) { 731 continue; 732 } 733 return; 734 } 735 } 736 737 740 public static class Flush extends LogEntry { 741 public Flush() { super(FLUSH); } 742 public void handle(Log log) throws IOException { 743 ((Log1)log).reallyFlush(); 744 } 745 } 746 747 750 static Flush opFlush = new Flush(); 751 752 755 public static class Checkpoint extends LogEntry { 756 public Checkpoint() { super(CHECKPOINT); } 757 public void handle(Log log) throws IOException { 758 ((Log1)log).reallyCheckpoint(); 759 } 760 } 761 static Checkpoint opCheckpoint = new Checkpoint(); 762 763 766 public static class Close extends LogEntry { 767 public Close() { super(CLOSE); } 768 public void handle(Log log) throws IOException { 769 ((Log1)log).reallyClose(); 770 } 771 } 772 773 776 static Close opClose = new Close(); 777 778 781 public class Rollback extends LogEntry { 782 Transaction t; 783 public Rollback(Transaction t) { 784 super(t.getTransactionId(), ROLLBACK); 785 this.t = t; 786 } 787 public Rollback(Transaction t0, int s) { 788 super(t0.getTransactionId(), s, ROLLBACK); 789 this.t = t0; 790 } 791 public void handle(Log log) throws Exception { 792 if (stmtId == -1) { 793 reallyRollbackTransaction(t); 794 } else { 795 reallyRollbackStatement(t, stmtId); 796 } 797 } 798 } 799 800 803 public class Sync extends LogEntry { 804 Latch latch; 805 public Sync(Latch latch) { 806 super(SYNC); 807 this.latch = latch; 808 } 809 public void handle(Log log) throws Exception { 810 latch.release(); 811 } 812 } 813 814 818 class LogSync extends Thread { 819 boolean closeMe = false; 820 Log1 log = null; 821 LogSync(Log1 log) { super("Log Sync"); this.log = log; } 822 823 public void close() { 824 closeMe = true; 825 } 826 827 public void run() { 828 try { 829 while (!closeMe) { 830 Object obj = channel.poll(500); 831 if (obj != null) { 832 synchronized (fileLock) { 835 while (obj != null) { 836 try { 837 if (Trace.bit(17)) { 839 Debug.println("PRE [" + obj + "].handle()"); 840 } 841 ((LogEntry)obj).handle(log); 843 if (Trace.bit(15)) { 845 Debug.println("POST [" + obj + "].handle()"); 846 } 847 } catch (IOException ex) { 849 Debug.println("LogSync: Got exception in [" + obj + "].handle()"); 850 Debug.print(ex); 851 } catch (Throwable t) { 852 Debug.println("LogSync: Got exception in [" + obj + "].handle()"); 853 Debug.print(t); 854 } 855 obj = channel.poll(0); 856 } 857 if (!closeMe) { 858 log.maybeCheckpoint(); 859 } 860 } 861 } 862 } 863 log = null; 864 if (logger != null) logger.close(); 865 } catch (InterruptedException ex) { 866 Debug.print(ex); 867 } catch (Throwable t) { 868 Debug.print(t); 869 } finally { 870 closeLatch.release(); 874 } 875 } 876 877 } 878 879 884 public boolean isLogging() { return true; } 885 886 889 public boolean inRecovery() { 890 return recovering; 891 } 892 893 int filepos = 0; 895 897 900 byte[] sav = null; 901 public void saveBlock(long b) throws IOException { 902 if (sav == null) { 903 sav = new byte[blockSize + 8]; 904 } 905 ByteUtil.putLong(sav, 0, b); 906 dbFile.store.read(b, sav, 8); 907 if (bfo == null) resetBlocks(); 908 bfo.write(sav, 0, sav.length); 909 bfoActive = true; 910 if (Trace.bit(19)) Debug.println("saveBlock(" + b + ", " + Block.signature(sav, 8, blockSize) + " @ " + filepos + ")"); 912 filepos += sav.length; 913 if (checksync(7)) { 915 try { 916 bfo.getFD().sync(); 917 } catch (Throwable t) { 918 } finally { 919 bfoActive = false; 920 } 921 } 922 } 923 924 927 public void restoreBlocks() throws IOException { 928 if (Trace.bit(19)) { 930 Debug.println(db.toString() + 931 ".restoreBlocks(): size = " + dbFile.getSize()); 932 } 933 File f = new File (db.getScratchDir(), "before-images"); 935 int siz = blockSize + 8; 936 if (f.exists() && f.length() >= siz) { 937 RandomAccessFile bf = new RandomAccessFile (f, "r"); 938 try { 939 byte[] buf = new byte[siz]; 940 long pos = bf.length() - siz; 941 while (pos >= 0) { 942 bf.seek(pos); 943 bf.read(buf); 944 long blk = ByteUtil.getLong(buf, 0); 945 if (Trace.bit(19)) { 947 Debug.println(db.toString() + " [RESTORE " + blk + " @ " + pos + "]"); 948 } 949 dbFile.restoreBlock(blk, buf, 8); 951 pos -= siz; 952 } 953 } finally { 954 bf.close(); 955 } 956 } 957 if (Trace.bit(19)) { 959 Debug.println(db.toString() + 960 ".restoreBlocks(): size = " + dbFile.getSize()); 961 } 962 if (false) { 963 long sum = 0; 964 long blk = dbFile.getBlockSize(); 965 for (long x = 0; x < dbFile.getSize(); x += blk) { 966 Block b = dbFile.getBlock(x / blk); 967 for (int ix = 0; ix < blk; ix += 8) { 968 sum += b.readLong(ix); 969 } 970 b.decrRefCount(); 971 } 972 Debug.println("RECOVER: CHECKSUM(" + dbFile.getSize() + ") bytes = " + 973 sum); 974 } 975 } 977 978 981 public void resetBlocks() throws IOException { 982 if (bfo != null) { 983 try { 984 bfo.close(); 985 } catch (Throwable t) { 986 } finally { 987 bfo = null; 988 } 989 } 990 File b = new File (db.getScratchDir(), "before-images"); 991 bfo = new FileOutputStream (b); 992 bfoActive = false; 993 if (Trace.bit(19)) Debug.println(db.toString() + ".Log1.resetBlocks() @ " + filepos + ", dbFile.size = " + dbFile.getSize()); 995 filepos = 0; 996 } 998 999 public String toString() { 1001 String s = getClass().getName(); 1002 int x = s.lastIndexOf('.'); 1003 if (x >= 0) s = s.substring(x+1); 1004 return s; 1005 } 1006 } 1008 1009 | Popular Tags |