| 1 9 package org.ozoneDB.core.storage.classicStore; 10 11 import org.ozoneDB.core.*; 12 import org.ozoneDB.core.storage.classicStore.ClassicStore; 13 import org.ozoneDB.core.storage.classicStore.Cluster; 14 import org.ozoneDB.core.storage.classicStore.ClusterID; 15 import org.ozoneDB.core.storage.classicStore.DeathObject; 16 import org.ozoneDB.util.*; 17 import org.ozoneDB.DxLib.*; 18 import org.ozoneDB.OzoneInternalException; 19 20 import java.io.*; 21 22 23 26 public class PersistenceSpace extends Object { 27 final static String TRANSACTION_FLAG = "transaction"; 28 final static int TRANSACTION_FLAG_VERSION = 1; 29 final static int PROPS_FILE_VERSION = 1; 30 31 final static String CID = "ozoneDB.classicStore.clusterID"; 32 33 Env env; 34 ClassicStore classicStore; 35 36 Cluster currentCluster; 37 TransactionID currentTransaction; 38 DxSet touchedClusters; 39 DxSet clustersToCompress; 40 41 42 public PersistenceSpace( Env _env ) { 43 env = _env; 44 classicStore = (ClassicStore)env.getStoreManager(); 45 } 46 47 48 50 protected boolean startup() throws Exception { 51 File transFile = new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG ); 53 if (transFile.exists()) { 54 rollBackTransaction( transFile ); 57 } 58 59 if (!readProperties()) { 60 String [] list = new File( env.getDatabaseDir() + Env.DATA_DIR ).list(); 62 if (list.length != 0) { 63 recover(); 64 } else { 65 newCluster(); 66 } 67 } 68 return true; 69 } 70 71 72 74 protected boolean shutdown() throws Exception { 75 if (currentCluster != null) { 77 writeProperties(); 78 currentCluster.close(); 79 } 80 81 currentCluster = null; 82 touchedClusters = null; 83 clustersToCompress = null; 84 85 return true; 86 } 87 88 89 91 protected boolean readProperties() { 92 ClusterID cid = (ClusterID)env.state.property( CID, null ); 93 if (cid == null) { 94 return false; 95 } 96 97 currentCluster = new Cluster( env, classicStore, cid ); 98 return true; 99 } 100 101 102 104 protected void writeProperties() { 105 env.state.setProperty( CID, currentCluster.cluID() ); 106 } 107 108 109 112 protected void startTransaction( TransactionID tid ) throws OzoneInternalException { 113 currentTransaction = tid; 115 touchedClusters = new DxHashSet(); 116 clustersToCompress = new DxHashSet(); 117 118 try { 119 FileOutputStream fo = new FileOutputStream( new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG ) ); 121 DataOutputStream out = new DataOutputStream( fo ); 122 out.writeInt( TRANSACTION_FLAG_VERSION ); 123 out.writeLong( currentTransaction.value() ); 124 out.writeLong( currentCluster.cluID().value() ); 126 out.close(); 127 } catch (IOException e) { 128 throw new OzoneInternalException("Failed to start transaction", e); 129 } 130 } 131 132 133 134 protected void prepareCommitTransaction( TransactionID tid ) throws OzoneInternalException { 135 try { 136 currentCluster.close(); 138 139 DxIterator it = clustersToCompress.iterator(); 141 while (it.next() != null) { 143 compressCluster( (ClusterID)it.object() ); 144 } 145 it.reset(); 147 while (it.next() != null) { 148 new Cluster( env, classicStore, (ClusterID)it.object() ).removeFromDisk(); 149 } 150 } catch (IOException e) { 151 throw new OzoneInternalException("Failed to prepare to commit the transaction", e); 152 } 153 } 154 155 156 157 protected void commitTransaction( TransactionID tid ) { 158 File f = new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG ); 160 if (f.exists()) { 161 f.delete(); 162 } 163 164 touchedClusters = null; 166 clustersToCompress = null; 167 currentTransaction = null; 168 } 169 170 171 172 protected void abortTransaction( TransactionID tid ) { 173 } 174 175 176 178 private void registerCluster( ClusterID cid ) throws OzoneInternalException { 179 if (!touchedClusters.contains( cid )) { 180 touchedClusters.add( cid ); 181 182 try { 183 FileOutputStream fo = 185 new FileOutputStream( new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG ).toString(), true ); 186 DataOutputStream out = new DataOutputStream( fo ); 187 out.writeLong( cid.value() ); 188 out.close(); 189 } catch (IOException e) { 190 throw new OzoneInternalException("Failed to register cluster", e); 191 } 192 } 193 } 194 195 196 198 private Cluster newCluster() throws IOException { 199 Cluster oldCluster = null; 201 if (currentCluster != null) { 202 oldCluster = currentCluster; 203 currentCluster.close(); 204 } 205 206 currentCluster = new Cluster( env, classicStore, new ClusterID( env.keyGenerator.nextID() ) ); 208 209 if (oldCluster != null && oldCluster.needsCompressing()) { 212 clustersToCompress.add( oldCluster.cluID() ); 213 } 214 215 writeProperties(); 217 218 return currentCluster; 219 } 220 221 222 223 protected Cluster readCluster( ClusterID cid, int whatToRead ) throws OzoneInternalException { 224 Cluster cl = null; 227 try { 228 if (cid.equals( currentCluster.cluID() )) { 229 currentCluster.close(); 230 } 231 232 cl = new Cluster( env, classicStore, cid ); 233 cl.readObjects( whatToRead, null ); 234 235 if (cid.equals( currentCluster.cluID() )) { 237 currentCluster.open(); 238 } 239 } catch (Exception e) { 240 throw new OzoneInternalException("Failed to read cluster", e); 241 } 242 243 return cl; 244 } 245 246 247 249 protected void compressCluster( ClusterID cid ) throws IOException { 250 Cluster cl = new Cluster( env, classicStore, cid ); 252 cl.readObjects( Cluster.DATA, null ); 253 254 DeathObject dobj; 255 DxIterator it = cl.objects().iterator(); 256 while ((dobj = (DeathObject)it.next()) != null) { 257 writeObject( dobj, false, false ); 258 } 259 } 260 261 262 264 protected ClusterID[] allClusters() { 265 File path = new File( env.getDatabaseDir() + Env.DATA_DIR ); 266 String [] fileList = path.list( new FilenameFilter() { 267 268 269 public boolean accept( File dir, String name ) { 270 return name.endsWith( Cluster.CLUSTER_FILE_SUFF ); 271 } 272 } ); 273 ClusterID[] result = new ClusterID[fileList.length]; 274 275 for (int i = 0; i < fileList.length; i++) { 276 result[i] = new ClusterID( Long.parseLong( fileList[i].substring( 0, 277 fileList[i].length() - Cluster.CLUSTER_FILE_SUFF.length() ) ) ); 278 } 279 280 return result; 281 } 282 283 284 286 protected ClusterID writeObject( DeathObject dobj, boolean serialize, boolean useClone ) throws IOException { 287 if (currentCluster.size() > Cluster.MAX_SIZE) { 290 newCluster(); 291 } 292 293 registerCluster( currentCluster.cluID() ); 298 299 dobj.container().setClusterID( currentCluster.cluID() ); 301 currentCluster.appendObject( dobj, currentTransaction, serialize, useClone ); 302 return currentCluster.cluID(); 303 } 304 305 306 308 protected void writeLeak( ClusterID cid, DeathObject dobj ) throws OzoneInternalException { 309 311 registerCluster( cid ); 316 317 Cluster cl = new Cluster( env, classicStore, cid ); 319 try { 320 cl.writeLeak( dobj, currentTransaction ); 321 } catch (IOException e) { 322 throw new OzoneInternalException("Failed to write leak", e); 323 } 324 325 if (currentCluster.cluID().equals( cid )) { 331 return; 332 } 333 334 long clSize = cl.fileHandle().length(); 337 if (clSize > 0) { 338 if ((double)cl.leakSize() / clSize > Cluster.LEAK_WEIGHT) { 340 clustersToCompress.add( cid ); 341 } 342 } 343 } 344 345 346 348 protected void fillObjectSpace() { 349 env.logWriter.newEntry( this, "ObjectSpace recovery ...", LogWriter.INFO ); 350 int count = 0; 351 ClusterID[] clusters = allClusters(); 352 for (int i = 0; i < clusters.length; i++) { 353 try { 354 ObjectContainer os; 355 Cluster cl = new Cluster( env, classicStore, clusters[i] ); 356 cl.readObjects( Cluster.STATE, null ); 357 DxIterator it = cl.objects().iterator(); 358 while ((os = (ObjectContainer)it.next()) != null) { 359 ((ClassicStore)env.getStoreManager()).objectSpace.deleteObject( os ); 360 ((ClassicStore)env.getStoreManager()).objectSpace.addObject( os ); 361 count++; 362 } 364 } catch (Exception e) { 365 env.fatalError( this, "fillObjectSpace: " + e.toString(), e ); 366 } 367 } 368 env.logWriter.newEntry( this, count + " objects found.", LogWriter.INFO ); 369 } 370 371 372 375 protected void recover() { 376 } 377 378 379 381 protected void rollBackTransaction( File transFlag ) throws Exception { 382 TransactionID rollBackTid = null; 383 DxBag clusters = new DxArrayBag(); 384 try { 385 FileInputStream fi = new FileInputStream( transFlag ); 387 DataInputStream in = new DataInputStream( fi ); 388 389 in.readInt(); 390 rollBackTid = new TransactionID( in.readLong() ); 391 currentCluster = new Cluster( env, classicStore, new ClusterID( in.readLong() ) ); 393 while (fi.available() != 0) { 395 clusters.add( new ClusterID( in.readLong() ) ); 396 } 397 398 in.close(); 399 } catch (IOException e) { 400 env.logWriter.newEntry( this, "rollback transaction: flag file corrupted", LogWriter.WARN ); 401 } 402 403 405 ClusterID cid; 407 DxIterator it = clusters.iterator(); 408 while ((cid = (ClusterID)it.next()) != null) { 409 Cluster cl = new Cluster( env, classicStore, cid ); 410 cl.rollBack( rollBackTid ); 412 } 413 414 writeProperties(); 416 417 transFlag.delete(); 418 touchedClusters = null; 419 clustersToCompress = null; 420 currentTransaction = null; 421 } 422 } 423 | Popular Tags |