1 8 9 package com.sleepycat.je.incomp; 10 11 import java.util.ArrayList ; 12 import java.util.Collection ; 13 import java.util.HashMap ; 14 import java.util.Iterator ; 15 import java.util.List ; 16 import java.util.Map ; 17 import java.util.logging.Level ; 18 19 import com.sleepycat.je.DatabaseException; 20 import com.sleepycat.je.EnvironmentStats; 21 import com.sleepycat.je.StatsConfig; 22 import com.sleepycat.je.cleaner.TrackedFileSummary; 23 import com.sleepycat.je.cleaner.UtilizationTracker; 24 import com.sleepycat.je.config.EnvironmentParams; 25 import com.sleepycat.je.dbi.DatabaseImpl; 26 import com.sleepycat.je.dbi.DbTree; 27 import com.sleepycat.je.dbi.EnvironmentImpl; 28 import com.sleepycat.je.latch.LatchSupport; 29 import com.sleepycat.je.tree.BIN; 30 import com.sleepycat.je.tree.BINReference; 31 import com.sleepycat.je.tree.CursorsExistException; 32 import com.sleepycat.je.tree.DBIN; 33 import com.sleepycat.je.tree.DIN; 34 import com.sleepycat.je.tree.IN; 35 import com.sleepycat.je.tree.Key; 36 import com.sleepycat.je.tree.Node; 37 import com.sleepycat.je.tree.NodeNotEmptyException; 38 import com.sleepycat.je.tree.Tree; 39 import com.sleepycat.je.tree.Tree.SearchType; 40 import com.sleepycat.je.utilint.DaemonThread; 41 import com.sleepycat.je.utilint.PropUtil; 42 import com.sleepycat.je.utilint.Tracer; 43 44 50 public class INCompressor extends DaemonThread { 51 private static final String TRACE_COMPRESS = "INCompress:"; 52 private static final boolean DEBUG = false; 53 54 private EnvironmentImpl env; 55 private long lockTimeout; 56 57 58 private int splitBins = 0; 59 private int dbClosedBins = 0; 60 private int cursorsBins = 0; 61 private int nonEmptyBins = 0; 62 private int processedBins = 0; 63 64 65 private int splitBinsThisRun = 0; 66 private int dbClosedBinsThisRun = 0; 67 private int cursorsBinsThisRun = 0; 68 private int nonEmptyBinsThisRun = 0; 69 private int processedBinsThisRun = 0; 70 71 76 private int lazyProcessed = 0; 77 private int lazyEmpty = 0; 78 private int lazySplit = 0; 79 private int wokenUp = 0; 80 81 85 private Map binRefQueue; 86 private Object binRefQueueSync; 87 88 public INCompressor(EnvironmentImpl env, long waitTime, String name) 89 throws DatabaseException { 90 91 super(waitTime, name, env); 92 this.env = env; 93 lockTimeout = PropUtil.microsToMillis( 94 env.getConfigManager().getLong 95 (EnvironmentParams.COMPRESSOR_LOCK_TIMEOUT)); 96 binRefQueue = new HashMap (); 97 binRefQueueSync = new Object (); 98 } 99 100 public String toString() { 101 StringBuffer sb = new StringBuffer (); 102 sb.append("<INCompressor name=\"").append(name).append("\"/>"); 103 return sb.toString(); 104 } 105 106 synchronized public void clearEnv() { 107 env = null; 108 } 109 110 public synchronized void verifyCursors() 111 throws DatabaseException { 112 113 116 if (env.isClosed()) { 117 return; 118 } 119 120 124 List queueSnapshot = null; 125 synchronized (binRefQueueSync) { 126 queueSnapshot = new ArrayList (binRefQueue.values()); 127 } 128 129 130 Map dbCache = new HashMap (); 131 132 Iterator it = queueSnapshot.iterator(); 133 while (it.hasNext()) { 134 BINReference binRef = (BINReference) it.next(); 135 DatabaseImpl db = env.getDbMapTree().getDb 136 (binRef.getDatabaseId(), lockTimeout, dbCache); 137 BIN bin = searchForBIN(db, binRef); 138 if (bin != null) { 139 bin.verifyCursors(); 140 bin.releaseLatch(); 141 } 142 } 143 } 144 145 149 public void addToQueue(Object o) 150 throws DatabaseException { 151 152 throw new DatabaseException 153 ("INCompressor.addToQueue should never be called."); 154 } 155 156 public int getBinRefQueueSize() 157 throws DatabaseException { 158 159 int size = 0; 160 synchronized (binRefQueueSync) { 161 size = binRefQueue.size(); 162 } 163 164 return size; 165 } 166 167 173 174 178 public void addBinKeyToQueue(BIN bin, Key deletedKey, boolean doWakeup) 179 throws DatabaseException { 180 181 synchronized (binRefQueueSync) { 182 addBinKeyToQueueAlreadyLatched(bin, deletedKey); 183 } 184 if (doWakeup) { 185 wakeup(); 186 } 187 } 188 189 193 public void addBinRefToQueue(BINReference binRef, boolean doWakeup) 194 throws DatabaseException { 195 196 synchronized (binRefQueueSync) { 197 addBinRefToQueueAlreadyLatched(binRef); 198 } 199 200 if (doWakeup) { 201 wakeup(); 202 } 203 } 204 205 209 public void addMultipleBinRefsToQueue(Collection binRefs, 210 boolean doWakeup) 211 throws DatabaseException { 212 213 synchronized (binRefQueueSync) { 214 Iterator it = binRefs.iterator(); 215 while (it.hasNext()) { 216 BINReference binRef = (BINReference) it.next(); 217 addBinRefToQueueAlreadyLatched(binRef); 218 } 219 } 220 221 if (doWakeup) { 222 wakeup(); 223 } 224 } 225 226 229 private void addBinRefToQueueAlreadyLatched(BINReference binRef) { 230 231 Long node = new Long (binRef.getNodeId()); 232 BINReference existingRef = (BINReference) binRefQueue.get(node); 233 if (existingRef != null) { 234 existingRef.addDeletedKeys(binRef); 235 } else { 236 binRefQueue.put(node, binRef); 237 } 238 } 239 240 243 private void addBinKeyToQueueAlreadyLatched(BIN bin, Key deletedKey) { 244 245 Long node = new Long (bin.getNodeId()); 246 BINReference existingRef = (BINReference) binRefQueue.get(node); 247 if (existingRef != null) { 248 if (deletedKey != null) { 249 existingRef.addDeletedKey(deletedKey); 250 } 251 } else { 252 BINReference binRef = bin.createReference(); 253 if (deletedKey != null) { 254 binRef.addDeletedKey(deletedKey); 255 } 256 binRefQueue.put(node, binRef); 257 } 258 } 259 260 public boolean exists(long nodeId) { 261 Long node = new Long (nodeId); 262 synchronized (binRefQueueSync) { 263 return (binRefQueue.get(node) != null); 264 } 265 } 266 267 271 private BINReference removeCompressibleBinReference(long nodeId) { 272 Long node = new Long (nodeId); 273 BINReference foundRef = null; 274 synchronized (binRefQueueSync) { 275 BINReference target = (BINReference) binRefQueue.remove(node); 276 if (target != null) { 277 if (target.deletedKeysExist()) { 278 foundRef = target; 279 } else { 280 281 285 binRefQueue.put(node, target); 286 } 287 } 288 } 289 return foundRef; 290 } 291 292 295 public void loadStats(StatsConfig config, EnvironmentStats stat) 296 throws DatabaseException { 297 298 stat.setSplitBins(splitBins); 299 stat.setDbClosedBins(dbClosedBins); 300 stat.setCursorsBins(cursorsBins); 301 stat.setNonEmptyBins(nonEmptyBins); 302 stat.setProcessedBins(processedBins); 303 stat.setInCompQueueSize(getBinRefQueueSize()); 304 305 if (DEBUG) { 306 System.out.println("lazyProcessed = " + lazyProcessed); 307 System.out.println("lazyEmpty = " + lazyEmpty); 308 System.out.println("lazySplit = " + lazySplit); 309 System.out.println("wokenUp=" + wokenUp); 310 } 311 312 if (config.getClear()) { 313 splitBins = 0; 314 dbClosedBins = 0; 315 cursorsBins = 0; 316 nonEmptyBins = 0; 317 processedBins = 0; 318 lazyProcessed = 0; 319 lazyEmpty = 0; 320 lazySplit = 0; 321 wokenUp = 0; 322 } 323 } 324 325 328 protected int nDeadlockRetries() 329 throws DatabaseException { 330 331 return env.getConfigManager().getInt 332 (EnvironmentParams.COMPRESSOR_RETRY); 333 } 334 335 public synchronized void onWakeup() 336 throws DatabaseException { 337 338 if (env.isClosed()) { 339 return; 340 } 341 wokenUp++; 342 doCompress(); 343 } 344 345 349 public synchronized void doCompress() 350 throws DatabaseException { 351 352 if (!isRunnable()) { 353 return; 354 } 355 356 362 Map queueSnapshot = null; 363 int binQueueSize = 0; 364 synchronized (binRefQueueSync) { 365 binQueueSize = binRefQueue.size(); 366 if (binQueueSize > 0) { 367 queueSnapshot = binRefQueue; 368 binRefQueue = new HashMap (); 369 } 370 } 371 372 373 if (binQueueSize > 0) { 374 resetPerRunCounters(); 375 Tracer.trace(Level.FINE, env, 376 "InCompress.doCompress called, queue size: " + 377 binQueueSize); 378 assert LatchSupport.countLatchesHeld() == 0; 379 380 388 UtilizationTracker tracker = new UtilizationTracker(env); 389 390 391 Map dbCache = new HashMap (); 392 393 DbTree dbTree = env.getDbMapTree(); 394 BINSearch binSearch = new BINSearch(); 395 try { 396 Iterator it = queueSnapshot.values().iterator(); 397 while (it.hasNext()) { 398 if (env.isClosed()) { 399 return; 400 } 401 402 BINReference binRef = (BINReference) it.next(); 403 if (!findDBAndBIN(binSearch, binRef, dbTree, dbCache)) { 404 405 409 continue; 410 } 411 412 if (binRef.deletedKeysExist()) { 413 414 boolean requeued = compressBin 415 (binSearch.db, binSearch.bin, binRef, tracker); 416 417 if (!requeued) { 418 419 425 checkForRelocatedSlots 426 (binSearch.db, binRef, tracker); 427 } 428 } else { 429 430 435 BIN foundBin = binSearch.bin; 436 437 byte[] idKey = foundBin.getIdentifierKey(); 438 boolean isDBIN = foundBin.containsDuplicates(); 439 byte[] dupKey = null; 440 if (isDBIN) { 441 dupKey = ((DBIN) foundBin).getDupKey(); 442 } 443 444 449 foundBin.releaseLatch(); 450 451 pruneBIN(binSearch.db, binRef, idKey, isDBIN, 452 dupKey, tracker); 453 } 454 } 455 456 460 TrackedFileSummary[] summaries = tracker.getTrackedFiles(); 461 if (summaries.length > 0) { 462 env.getUtilizationProfile().countAndLogSummaries 463 (summaries); 464 } 465 466 } finally { 467 assert LatchSupport.countLatchesHeld() == 0; 468 accumulatePerRunCounters(); 469 } 470 } 471 } 472 473 479 private boolean compressBin(DatabaseImpl db, 480 BIN bin, 481 BINReference binRef, 482 UtilizationTracker tracker) 483 throws DatabaseException { 484 485 486 boolean empty = false; 487 boolean requeued = false; 488 byte[] idKey = bin.getIdentifierKey(); 489 byte[] dupKey = null; 490 boolean isDBIN = bin.containsDuplicates(); 491 492 try { 493 int nCursors = bin.nCursors(); 494 if (nCursors > 0) { 495 496 499 addBinRefToQueue(binRef, false); 500 requeued = true; 501 cursorsBinsThisRun++; 502 } else { 503 requeued = bin.compress(binRef, true ); 504 if (!requeued) { 505 506 510 empty = (bin.getNEntries() == 0); 511 512 if (empty) { 513 514 518 if (isDBIN) { 519 dupKey = ((DBIN) bin).getDupKey(); 520 } 521 } 522 } 523 } 524 } finally { 525 bin.releaseLatch(); 526 } 527 528 529 if (empty) { 530 requeued = pruneBIN(db, binRef, idKey, isDBIN, dupKey, tracker); 531 } 532 533 return requeued; 534 } 535 536 542 private boolean pruneBIN(DatabaseImpl dbImpl, 543 BINReference binRef, 544 byte[] idKey, 545 boolean containsDups, 546 byte[] dupKey, 547 UtilizationTracker tracker) 548 throws DatabaseException { 549 550 boolean requeued = false; 551 try { 552 Tree tree = dbImpl.getTree(); 553 554 if (containsDups) { 555 tree.deleteDup(idKey, dupKey, tracker); 556 } else { 557 tree.delete(idKey, tracker); 558 } 559 processedBinsThisRun++; 560 } catch (NodeNotEmptyException NNEE) { 561 562 567 nonEmptyBinsThisRun++; 568 } catch (CursorsExistException e) { 569 570 580 addBinRefToQueue(binRef, false); 581 cursorsBinsThisRun++; 582 requeued = true; 583 } 584 return requeued; 585 } 586 587 592 private void checkForRelocatedSlots(DatabaseImpl db, 593 BINReference binRef, 594 UtilizationTracker tracker) 595 throws DatabaseException { 596 597 Iterator iter = binRef.getDeletedKeyIterator(); 598 if (iter != null) { 599 600 601 byte[] mainKey = binRef.getKey(); 602 boolean isDup = (binRef.getData() != null); 603 604 while (iter.hasNext()) { 605 Key key = (Key) iter.next(); 606 607 611 BIN splitBin = isDup ? 612 searchForBIN(db, mainKey, key.getKey()) : 613 searchForBIN(db, key.getKey(), null); 614 if (splitBin != null) { 615 BINReference splitBinRef = splitBin.createReference(); 616 splitBinRef.addDeletedKey(key); 617 compressBin(db, splitBin, splitBinRef, tracker); 618 } 619 } 620 } 621 } 622 623 private boolean isRunnable() 624 throws DatabaseException { 625 626 return true; 627 } 628 629 637 public BIN searchForBIN(DatabaseImpl db, BINReference binRef) 638 throws DatabaseException { 639 640 return searchForBIN(db, binRef.getKey(), binRef.getData()); 641 } 642 643 private BIN searchForBIN(DatabaseImpl db, byte[] mainKey, byte[] dupKey) 644 throws DatabaseException { 645 646 647 Tree tree = db.getTree(); 648 IN in = tree.search 649 (mainKey, SearchType.NORMAL, -1, null, false ); 650 651 652 if (in == null) { 653 return null; 654 } 655 656 657 if (dupKey == null) { 658 return (BIN) in; 659 } 660 661 662 DIN duplicateRoot = null; 663 DBIN duplicateBin = null; 664 BIN bin = (BIN) in; 665 try { 666 int index = bin.findEntry(mainKey, false, true); 667 if (index >= 0) { 668 Node node = null; 669 if (!bin.isEntryKnownDeleted(index)) { 670 671 node = bin.fetchTarget(index); 672 } 673 if (node == null) { 674 bin.releaseLatch(); 675 return null; 676 } 677 if (node.containsDuplicates()) { 678 679 duplicateRoot = (DIN) node; 680 duplicateRoot.latch(); 681 bin.releaseLatch(); 682 duplicateBin = (DBIN) tree.searchSubTree 683 (duplicateRoot, dupKey, SearchType.NORMAL, -1, null, 684 false ); 685 686 return duplicateBin; 687 } else { 688 690 return bin; 691 } 692 } else { 693 bin.releaseLatch(); 694 return null; 695 } 696 } catch (DatabaseException DBE) { 697 if (bin != null) { 698 bin.releaseLatchIfOwner(); 699 } 700 if (duplicateRoot != null) { 701 duplicateRoot.releaseLatchIfOwner(); 702 } 703 704 708 if (duplicateBin != null) { 709 duplicateBin.releaseLatchIfOwner(); 710 } 711 throw DBE; 712 } 713 } 714 715 718 private void resetPerRunCounters() { 719 splitBinsThisRun = 0; 720 dbClosedBinsThisRun = 0; 721 cursorsBinsThisRun = 0; 722 nonEmptyBinsThisRun = 0; 723 processedBinsThisRun = 0; 724 } 725 726 private void accumulatePerRunCounters() { 727 splitBins += splitBinsThisRun; 728 dbClosedBins += dbClosedBinsThisRun; 729 cursorsBins += cursorsBinsThisRun; 730 nonEmptyBins += nonEmptyBinsThisRun; 731 processedBins += processedBinsThisRun; 732 } 733 734 738 public void lazyCompress(IN in) 739 throws DatabaseException { 740 741 if (!in.isCompressible()) { 742 return; 743 } 744 745 assert in.isLatchOwnerForWrite(); 746 747 748 BIN bin = (BIN) in; 749 int nCursors = bin.nCursors(); 750 if (nCursors > 0) { 751 752 return; 753 } else { 754 BINReference binRef = 755 removeCompressibleBinReference(bin.getNodeId()); 756 if ((binRef == null) || (!binRef.deletedKeysExist())) { 757 return; 758 } else { 759 760 boolean requeued = bin.compress(binRef, false ); 761 lazyProcessed++; 762 763 769 if (!requeued && binRef.deletedKeysExist()) { 770 addBinRefToQueue(binRef, false); 771 lazySplit++; 772 } else { 773 if (bin.getNEntries() == 0) { 774 addBinRefToQueue(binRef, false); 775 lazyEmpty++; 776 } 777 } 778 } 779 } 780 } 781 782 786 private boolean findDBAndBIN(BINSearch binSearch, 787 BINReference binRef, 788 DbTree dbTree, 789 Map dbCache) 790 throws DatabaseException { 791 792 793 binSearch.db = dbTree.getDb 794 (binRef.getDatabaseId(), lockTimeout, dbCache); 795 if ((binSearch.db == null) ||(binSearch.db.isDeleted())) { 796 797 dbClosedBinsThisRun++; 798 return false; 799 } 800 801 802 env.getEvictor().doCriticalEviction(true); 804 805 binSearch.bin = searchForBIN(binSearch.db, binRef); 806 if ((binSearch.bin == null) || 807 binSearch.bin.getNodeId() != binRef.getNodeId()) { 808 809 if (binSearch.bin != null) { 810 binSearch.bin.releaseLatch(); 811 } 812 splitBinsThisRun++; 813 return false; 814 } 815 816 return true; 817 } 818 819 820 private static class BINSearch { 821 public DatabaseImpl db; 822 public BIN bin; 823 } 824 } 825 | Popular Tags |