1 9 package org.ozoneDB.core.storage.wizardStore; 10 11 import java.io.*; 12 import java.util.zip.GZIPInputStream ; 13 import java.util.zip.GZIPOutputStream ; 14 15 import org.ozoneDB.DxLib.*; 16 import org.ozoneDB.Setup; 17 import org.ozoneDB.io.stream.ResolvingObjectInputStream; 18 import org.ozoneDB.core.*; 19 import org.ozoneDB.core.storage.*; 20 import org.ozoneDB.core.storage.ClusterID; 21 import org.ozoneDB.core.storage.Cluster; 22 import org.ozoneDB.util.LogWriter; 23 24 33 public final class ClusterStore extends AbstractClusterStore { 34 35 public final static String POSTFIX_SHADOW = ".sh"; 36 37 protected final static int compressionFactor = 3; 38 39 protected DxMap cachedClusters; 40 41 protected int maxClusterSize = 64 * 1024; 42 43 46 protected DxMap growingClusterIDs; 47 48 private boolean compressClusters; 49 50 51 ClusterStore(Env _env) { 52 super(_env); 53 maxClusterSize = env.config.intProperty(Setup.WS_CLUSTER_SIZE, -1); 54 cachedClusters = new DxHashMap(64); 55 compressClusters = env.config.booleanProperty(Setup.WS_COMPRESS_CLUSTERS, true); 56 } 57 58 59 public void startup() throws Exception { 60 growingClusterIDs = new DxHashMap(32); 61 } 62 63 64 public void shutdown() { 65 } 66 67 68 71 public boolean isCleanShutdown() { 72 File file = new File(env.getDatabaseDir() + Env.DATA_DIR); 73 String [] fileList = file.list(); 74 75 for (int i = 0; i < fileList.length; i++) { 76 if (fileList[i].endsWith(POSTFIX_SHADOW) || fileList[i].endsWith(POSTFIX_TEMP)) { 77 return false; 78 } 79 } 80 return true; 81 } 82 83 84 87 public DxSet recoverClusterIDs() { 88 File file = new File(env.getDatabaseDir() + Env.DATA_DIR); 89 String [] fileList = file.list(); 90 91 DxSet result = new DxHashSet(); 92 for (int i = 0; i < fileList.length; i++) { 93 if (fileList[i].endsWith(POSTFIX_CLUSTER) || fileList[i].endsWith(POSTFIX_SHADOW)) { 94 String cidString = fileList[i].substring(0, fileList[i].indexOf('.')); 95 long cid = Long.parseLong(cidString); 96 result.add(new ClusterID(cid)); 97 } 98 } 99 return result; 100 } 101 102 103 public long currentCacheSize() { 104 long result = 0; 105 DxIterator it = cachedClusters.iterator(); 106 Cluster cluster; 107 while ((cluster = (Cluster) it.next()) != null) { 108 result += cluster.size(); 109 } 110 return result; 111 } 112 113 114 public int currentBytesPerContainer() { 115 int result = env.config.intProperty(Setup.WS_CLUSTER_SIZE_RATIO, 256); 116 return result; 118 119 } 138 139 140 150 151 156 protected synchronized Cluster growingCluster(Permissions perms) throws Exception { 157 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 158 env.logWriter.newEntry(this, "growingCluster() ", LogWriter.DEBUG3); 159 } 160 161 Cluster cluster = null; 162 ClusterID cid = (ClusterID) growingClusterIDs.elementForKey(perms); 163 164 if (cid != null) { 166 cluster = (Cluster) cachedClusters.elementForKey(cid); 167 if (cluster == null) { 168 cluster = loadCluster(cid, true); 169 if (cluster instanceof WizardCluster) { 170 ((WizardCluster)cluster).unpin(); 171 } 172 } 173 if (cluster.lock() == null || cluster.size() >= maxClusterSize || 176 cluster.lock().level(null) > Lock.LEVEL_NONE && !cluster.lock().isAcquiredBy(env.transactionManager.currentTA())) { 177 178 if (env.logWriter.hasTarget(LogWriter.DEBUG1)) { 179 env.logWriter.newEntry(this, 180 "growingCluster(): growing cluster not usable: cid=" + cluster.clusterID() + " size=" + cluster.size() + " lockLevel=" + 181 (cluster.lock() != null ? String.valueOf(cluster.lock().level(null)) : "null"), 182 LogWriter.DEBUG1); 183 } 184 185 growingClusterIDs.removeForKey(perms); 186 cluster = null; 187 } 188 } 189 190 if (cluster == null) { 192 DxIterator it = cachedClusters.iterator(); 193 Cluster cursor; 194 while ((cursor = (Cluster) it.next()) != null) { 195 if (cursor.size() < maxClusterSize && cursor.permissions().equals(perms)) { 197 cluster = cursor; 198 199 trimCache(); 203 204 if (cluster.lock() == null) { 206 env.logWriter.newEntry(this, 207 "growingCluster(): loaded cluster was deactivated: " + cluster.clusterID(), 208 LogWriter.DEBUG); 209 cluster = null; 210 } else if (cluster.lock().level(null) > Lock.LEVEL_NONE && !cluster.lock().isAcquiredBy( 211 env.transactionManager.currentTA())) { 212 if (env.logWriter.hasTarget(LogWriter.DEBUG1)) { 214 env.logWriter.newEntry(this, 215 "growingCluster(): loaded cluster is locked by another transaction: " 216 + cluster.clusterID(), LogWriter.DEBUG1); 217 } 218 cluster = null; 219 } else { 220 growingClusterIDs.addForKey(cluster.clusterID(), perms); 221 if (env.logWriter.hasTarget(LogWriter.DEBUG1)) { 222 env.logWriter.newEntry(this, 223 "growingCluster(): loaded cluster is now growing cluster: " + cluster.clusterID() 224 + " size:" + cluster.size(), LogWriter.DEBUG1); 225 } 226 break; 227 } 228 } 229 } 230 } 231 232 if (cluster == null) { 235 cluster = createANewEmptyAndUsableCluster(perms); 236 } 237 238 return cluster; 239 } 240 241 250 protected synchronized Cluster createANewEmptyAndUsableCluster(Permissions perms) throws IOException, ClassNotFoundException { 251 Cluster cluster = new WizardCluster(new ClusterID(env.keyGenerator.nextID()), perms, env.transactionManager.newLock(), 256); 253 254 storeData(cluster, basename(cluster.clusterID()) + POSTFIX_CLUSTER); 257 271 trimCache(); 276 277 cluster = loadCluster(cluster.clusterID(), true); 279 if (cluster instanceof WizardCluster) { 280 ((WizardCluster)cluster).unpin(); 281 } 282 283 growingClusterIDs.addForKey(cluster.clusterID(), perms); 284 286 return cluster; 287 } 288 289 294 protected Cluster giveMeAnUnlockedCluster(Permissions perms) throws IOException, ClassNotFoundException { 295 return createANewEmptyAndUsableCluster(perms); 296 } 297 298 309 public void registerContainerAndLock(StorageObjectContainer container, Permissions perms, Transaction locker, int lockLevel) throws Exception { 310 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 311 env.logWriter.newEntry(this, "registerContainer()", LogWriter.DEBUG3); 312 } 313 314 Cluster cluster = null; 315 316 boolean pinned = false; 317 boolean locked = false; 318 boolean alright = false; 319 320 try { 321 synchronized (this) { 322 cluster = growingCluster(perms); 323 324 Lock clusterLock = cluster.lock(); 325 int prevLevel = clusterLock.tryAcquire(locker, lockLevel); 326 327 if (prevLevel == Lock.NOT_ACQUIRED) { cluster = giveMeAnUnlockedCluster(perms); 329 330 clusterLock = cluster.lock(); 331 prevLevel = clusterLock.tryAcquire(locker, lockLevel); 332 333 if (prevLevel == Lock.NOT_ACQUIRED) { 334 throw new Error ("BUG! We could not acquire a lock for an unlocked cluster."); 335 } 336 } 337 locked = true; 338 339 cluster.registerContainer(container); 340 container.pin(); 341 pinned = true; 342 } 343 cluster.updateLockLevel(locker); 344 345 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 346 env.logWriter.newEntry(this, " cluster: " + cluster.clusterID(), LogWriter.DEBUG3); 347 } 348 alright = true; 349 } finally { 350 if (!alright) { 351 if (locked) { 352 cluster.lock().release(locker); 353 } 354 if (pinned) { 355 container.unpin(); 356 } 357 } 358 } 359 } 360 361 362 public void invalidateContainer(StorageObjectContainer container) { 363 synchronized (container) { 364 container.getCluster().removeContainer(container); 365 container.setCluster(null); 366 } 367 } 368 369 370 protected Cluster restoreCluster(ClusterID cid) throws Exception { 371 String basename = basename(cid); 372 Cluster cluster; 373 374 new File(basename + POSTFIX_LOCK).delete(); 375 new File(basename + POSTFIX_TEMP).delete(); 376 377 File shadowFile = new File(basename + POSTFIX_SHADOW); 378 File clusterFile = new File(basename + POSTFIX_CLUSTER); 379 380 if (shadowFile.exists()) { 381 391 if (shadowFile.length() > 0) { 392 if (!shadowFile.renameTo(clusterFile)) { 393 throw new IOException("Unable to rename shadow file."); 394 } 395 } else { 396 shadowFile.delete(); 397 } 398 } 399 cluster = (Cluster) loadData(basename + POSTFIX_CLUSTER); 400 activateCluster(cluster, 0); 401 402 return cluster; 403 } 404 405 406 420 public Cluster loadCluster(ClusterID cid, boolean pin) throws IOException, ClassNotFoundException { 421 Cluster cluster = (Cluster) cachedClusters.elementForKey(cid); 422 if (cluster == null) { 423 424 if (env.logWriter.hasTarget(LogWriter.DEBUG)) { 425 env.logWriter.newEntry(this, "loadCluster(): load cluster from disk: " + cid.toString(), LogWriter.DEBUG); 426 } 427 428 String basename = basename(cid); 429 String clusterName = basename + POSTFIX_CLUSTER; 430 String lockName = basename + POSTFIX_LOCK; 431 432 int clusterByteSize = (int) new File(clusterName).length(); 433 if (compressClusters) { 434 clusterByteSize *= compressionFactor; 435 } 436 437 trimCache(); 441 cluster = (Cluster) loadData(clusterName); 442 443 synchronized (this) { 444 445 Cluster interimCluster = (Cluster) cachedClusters.elementForKey(cid); 449 if (interimCluster != null) { 450 env.logWriter.newEntry(this, "loadCluster(): cluster was loaded by another thread too; droping my copy", LogWriter.DEBUG); 452 453 cluster = interimCluster; 454 455 if (pin && cluster instanceof WizardCluster) { 456 ((WizardCluster)cluster).pin(); 457 } 458 459 } else { 460 synchronized (cluster) { 465 try { 467 cluster.setLock((Lock) loadData(lockName)); 468 new File(lockName).delete(); 469 ((MROWLock) cluster.lock()).setDebugInfo("clusterID=" + cluster.clusterID()); 470 } catch (Exception e) { 471 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 472 env.logWriter.newEntry(this, " Unable to load lock from disk - creating a new lock.", LogWriter.DEBUG3); 473 } 474 cluster.setLock(env.transactionManager.newLock()); 475 ((MROWLock) cluster.lock()).setDebugInfo("clusterID=" + cluster.clusterID()); 476 } 477 478 if (pin && cluster instanceof WizardCluster) { ((WizardCluster)cluster).pin(); 480 } 481 482 activateCluster(cluster, clusterByteSize); 483 } 484 485 if (clusterByteSize > maxClusterSize * 2) { 486 splitCluster(cluster); 487 } 488 489 cachedClusters.addForKey(cluster, cluster.clusterID()); 490 491 trimCache(); 492 } 493 } 494 } else { 495 synchronized (cluster) { 496 if (pin && cluster instanceof WizardCluster) { 497 ((WizardCluster)cluster).pin(); 498 } 499 } 500 } 501 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 502 env.logWriter.newEntry(this, "returning WizardCluster: " + cluster, LogWriter.DEBUG3); 503 } 504 return cluster; 505 } 506 507 508 public void splitCluster(Cluster cluster) { 509 } 510 511 512 516 public void unloadCluster(ClusterID cid, boolean deactivate) throws IOException { 517 if (env.logWriter.hasTarget(LogWriter.DEBUG)) { 518 env.logWriter.newEntry(this, "unloadCluster(" + cid + "," + deactivate + ").", LogWriter.DEBUG); 519 } 520 521 Cluster cluster = (Cluster) cachedClusters.removeForKey(cid); 522 523 if (deactivate) { 524 deactivateCluster(cluster); 525 } 526 } 527 528 529 538 protected void trimCache() throws IOException { 539 540 long freeSpace = env.freeMemory(); 541 if (false && env.logWriter.hasTarget(LogWriter.DEBUG)) { 542 env.logWriter.newEntry(this, "trimCache(): free:" + freeSpace, LogWriter.DEBUG2); 543 } 544 545 boolean tryRemoveCluster = true; 546 while (freeSpace <= 0 && tryRemoveCluster) { 547 tryRemoveCluster = false; 548 synchronized (this) { 549 long cacheSize = 0; 550 551 DxMap priorityQueue = new DxTreeMap(); 553 DxIterator it = cachedClusters.iterator(); 554 Cluster cluster; 555 while ((cluster = (Cluster) it.next()) != null) { 556 priorityQueue.addForKey(cluster, cluster.cachePriority()); 557 cacheSize += cluster.size(); 558 } 559 560 long cacheSizeToRemove = cacheSize / 5; 562 if (env.logWriter.hasTarget(LogWriter.DEBUG)) { 563 env.logWriter.newEntry(this, " cache: " + cacheSize + " to be freed:" + cacheSizeToRemove, LogWriter.DEBUG2); 564 } 565 566 it = priorityQueue.iterator(); 568 while (cacheSizeToRemove > 0 && (cluster = (WizardCluster) it.next()) != null) { 569 570 if (cluster instanceof WizardCluster) { 575 WizardCluster wizardCluster = (WizardCluster) cluster; 576 if ((!wizardCluster.isPinned()) && (!wizardCluster.isInvoked())) { 577 if (env.logWriter.hasTarget(LogWriter.DEBUG)) { 578 env.logWriter.newEntry(this, "DEACTIVATE cluster: " + cluster.clusterID(), LogWriter.DEBUG2); 579 } 580 581 cluster = (Cluster) it.removeObject(); 582 cacheSizeToRemove -= cluster.size(); 583 unloadCluster(cluster.clusterID(), true); 584 tryRemoveCluster = true; 585 } else { 589 if (false) { 590 env.logWriter.newEntry(this, "trying to DEACTIVATE 'invoked' cluster: " + cluster.clusterID(), LogWriter.WARN); 591 } 592 } 593 } else { 594 env.logWriter.newEntry(this, "the cluster is not a WizardCluster, not sure what to do", LogWriter.WARN); 595 } 596 } 597 System.gc(); 598 freeSpace = env.freeMemory(); 599 } 600 } 601 } 602 603 604 608 protected void activateCluster(Cluster cluster, int size) { 609 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 610 env.logWriter.newEntry(this, "activateCluster(): " + cluster.clusterID(), LogWriter.DEBUG3); 611 } 612 cluster.setEnv(env); 613 cluster.setClusterStore(this); 614 cluster.touch(); 615 cluster.setCurrentSize(size); 616 } 617 618 619 626 protected void deactivateCluster(Cluster cluster) throws IOException { 627 if (env.logWriter.hasTarget(LogWriter.DEBUG)) { 628 env.logWriter.newEntry(this, 629 "deactivateCluster(): " + cluster.clusterID() + " priority: " + cluster.cachePriority(), 630 LogWriter.DEBUG); 631 env.logWriter.newEntry(this, " lock: " + cluster.lock().level(null), LogWriter.DEBUG); 632 } 633 634 String basename = basename(cluster.clusterID()); 635 636 synchronized (this) { synchronized (cluster) { 638 if (cluster.lock().level(null) >= Lock.LEVEL_READ) { 640 if (env.logWriter.hasTarget(LogWriter.DEBUG)) { 641 env.logWriter.newEntry(this, " write lock to disk: " + cluster.clusterID(), LogWriter.DEBUG); 642 } 643 644 storeData(cluster.lock(), basename + POSTFIX_LOCK); 645 } else { 646 File lockFile = new File(basename + POSTFIX_LOCK); 647 if (lockFile.exists()) { 648 lockFile.delete(); 649 } 650 } 651 652 if (cluster.lock().level(null) > Lock.LEVEL_UPGRADE) { 654 if (env.logWriter.hasTarget(LogWriter.DEBUG)) { 655 env.logWriter.newEntry(this, " write cluster: " + cluster.clusterID(), LogWriter.DEBUG); 656 } 657 658 storeData(cluster, basename + POSTFIX_CLUSTER); 659 } 660 661 cluster.setLock(null); 663 } 664 } 665 } 666 667 668 682 public synchronized void prepareCommitCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException { 683 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 684 env.logWriter.newEntry(this, "prepareCommitCluster(): " + cid, LogWriter.DEBUG3); 685 } 686 687 689 691 Cluster cluster = loadCluster(cid, true); 692 if (cluster instanceof WizardCluster) { 693 ((WizardCluster)cluster).unpin(); 694 } 695 cluster.prepareCommit(ta); 696 if (cluster.lock().level(null) >= Lock.LEVEL_WRITE) { 697 String tempFilename = basename(cid) + POSTFIX_TEMP; 698 699 storeData(cluster, tempFilename); 702 703 long fileSize = new File(tempFilename).length(); 704 if (fileSize == 0L) { 705 throw new IOException("Unable to determine cluster file size."); 706 } 707 708 if (compressClusters) { 709 fileSize *= compressionFactor; 710 } 711 cluster.setCurrentSize((int) fileSize); 712 } 713 } 714 715 716 723 public synchronized void commitCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException { 724 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 725 env.logWriter.newEntry(this, "commitCluster(): " + cid, LogWriter.DEBUG3); 726 } 727 728 String basename = basename(cid); 729 File clusterFile = new File(basename + POSTFIX_CLUSTER); 730 File tempFile = new File(basename + POSTFIX_TEMP); 731 732 if (tempFile.exists()) { 733 clusterFile.delete(); 734 if (!tempFile.renameTo(clusterFile)) { 735 throw new IOException("Unable to rename temp cluster."); 736 } 737 } 738 739 743 Cluster cluster = loadCluster(cid, true); 745 if (cluster instanceof WizardCluster) { 746 ((WizardCluster)cluster).unpin(); 747 } 748 cluster.commit(ta); 749 750 updateLockOnDisk(cluster, ta); 754 } 755 756 757 761 public synchronized void abortCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException { 762 File tempFile = new File(basename(cid) + POSTFIX_TEMP); 763 if (tempFile.exists()) { 764 if (!tempFile.delete()) { 765 throw new IOException("Unable to delete temp cluster."); 766 } 767 } 768 769 773 Cluster cluster = loadCluster(cid, true); 775 boolean isWizardCluster = cluster instanceof WizardCluster; 776 if (isWizardCluster) { 777 ((WizardCluster)cluster).unpin(); 778 } 779 cluster.abort(ta); 780 781 if (isWizardCluster && ((WizardCluster)cluster).isPinned() ) { 790 791 env.logWriter.newEntry(this, "abortCluster(): Unloading pinned cluster " + cid + ". Should we really do that?", LogWriter.DEBUG); 792 793 unloadCluster(cid, false); 796 loadCluster(cid, true); 797 798 } else { 799 unloadCluster(cid, false); 802 } 803 804 updateLockOnDisk(cluster, ta); 808 809 } 810 811 812 protected void updateLockOnDisk(Cluster cluster, Transaction ta) throws IOException { 813 ClusterID cid = cluster.clusterID(); 815 if (cluster.lock().level(ta) == Lock.LEVEL_NONE) { 816 File lockFile = new File(basename(cid) + POSTFIX_LOCK); 817 if (lockFile.exists() && !lockFile.delete()) { 818 throw new IOException("Unable to delete lock file."); 819 } 820 } else { 821 storeData(cluster.lock(), basename(cid) + POSTFIX_LOCK); 822 } 823 } 824 825 826 830 protected void storeData(Object obj, String key) throws IOException { 831 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 832 env.logWriter.newEntry(this, "storeData(): " + key, LogWriter.DEBUG3); 833 } 834 835 OutputStream out = new FileOutputStream(key); 836 837 if (compressClusters) { 838 out = new GZIPOutputStream (out, 3 * 4096); 839 } else { 840 out = new BufferedOutputStream(out, 3 * 4096); 841 } 842 843 ObjectOutputStream oout = new ObjectOutputStream(out); 844 try { 845 oout.writeObject(obj); 846 } finally { 847 oout.close(); 848 } 849 } 850 851 852 855 protected Object loadData(String key) throws IOException, ClassNotFoundException { 856 if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { 857 env.logWriter.newEntry(this, "loadData(): " + key, LogWriter.DEBUG3); 858 } 859 860 InputStream in = new FileInputStream(key); 861 862 if (compressClusters) { 863 in = new GZIPInputStream (in, 3 * 4096); 864 } else { 865 in = new BufferedInputStream(in, 3 * 4096); 866 } 867 868 ObjectInputStream oin = new ResolvingObjectInputStream(in); 869 try { 870 Object result = oin.readObject(); 871 return result; 872 } finally { 873 oin.close(); 874 } 875 } 876 877 } 878 | Popular Tags |