1 9 package org.ozoneDB.core.storage.classicStore; 10 11 import java.io.*; 12 import org.ozoneDB.core.*; 13 import org.ozoneDB.core.storage.classicStore.ClassicObjectContainer; 14 import org.ozoneDB.core.storage.classicStore.ClassicStore; 15 import org.ozoneDB.util.*; 16 import org.ozoneDB.DxLib.*; 17 import org.ozoneDB.io.stream.ResolvingObjectInputStream; 18 19 22 public class Cluster extends Object { 23 public final static int VERSION = 1; 24 public static int MAX_SIZE = 64 * 1024; 25 public final static String CLUSTER_FILE_SUFF = ".cl"; 26 public final static String LEAK_FILE_SUFF = ".lk"; 27 public final static String RECOVERY_SUFF = ".rec"; 28 public final static double LEAK_WEIGHT = 0.5; 29 30 public final static int DATA = 1; 31 public final static int STATE = 2; 32 public final static int TRANS = 4; 33 34 public final static byte CL_HEADER_CHUNK = 1; 36 public final static byte DATA_OID_CHUNK = 2; 37 public final static byte DATA_HEADER_CHUNK = 3; 38 public final static byte DATA_CHUNK = 4; 39 40 public final static byte NONE = 0; 42 public final static byte OBJECTS = 1; 43 public final static byte LEAKS = 2; 44 45 46 class Chunk extends Object { 47 public byte id; 48 public byte[] data; 49 public int dataLength; 50 public Object object; 51 52 53 public Chunk() { 54 } 55 56 57 public Chunk( byte _id, byte[] _data ) { 58 id = _id; 59 data = _data; 60 dataLength = data.length; 61 } 62 63 64 public Chunk( byte _id, Object obj ) { 65 id = _id; 66 object = obj; 67 } 68 } 69 70 71 ClusterID cid; 72 73 byte recoveryMode = NONE; 74 75 long clusterSize = 0; 76 77 long leakSize = -1; 78 79 DataOutputStream stream; 80 81 82 DxCollection objects; 83 84 85 Env env; 86 ClassicStore classicStore; 87 88 89 91 public Cluster( Env _env, ClassicStore _classicStore ) { 92 env = _env; 93 classicStore = _classicStore; 94 } 95 96 97 99 public Cluster( Env _env, ClassicStore _classicStore, ClusterID _cid ) { 100 env = _env; 101 classicStore = _classicStore; 102 cid = _cid; 103 } 104 105 106 protected void finalize() throws Throwable { 107 super.finalize(); 108 close(); 109 } 110 111 112 114 public final ClusterID cluID() { 115 return cid; 116 } 117 118 119 121 public final DxCollection objects() { 122 return objects; 123 } 124 125 126 128 public final void beginRecovery( byte mode ) { 129 recoveryMode = mode; 130 } 131 132 133 135 public final void endRecovery( byte mode ) { 136 File file = mode == OBJECTS ? fileHandle() : leakFileHandle(); 137 recoveryMode = NONE; 138 file.renameTo( mode == OBJECTS ? fileHandle() : leakFileHandle() ); 139 } 140 141 142 144 public final File fileHandle() { 145 return new File( env.getDatabaseDir() + Env.DATA_DIR, cid.toString() + Cluster.CLUSTER_FILE_SUFF + (recoveryMode == OBJECTS 146 ? Cluster.RECOVERY_SUFF 147 : "") ); 148 } 149 150 151 153 public final File leakFileHandle() { 154 return new File( env.getDatabaseDir() + Env.DATA_DIR, cid.toString() + Cluster.LEAK_FILE_SUFF + (recoveryMode == LEAKS 155 ? Cluster.RECOVERY_SUFF 156 : "") ); 157 } 158 159 160 169 public final long size() { 170 return clusterSize; 171 } 172 173 174 175 public void open() throws IOException { 176 if (stream != null) { 178 return; 179 } 180 181 File file = fileHandle(); 182 boolean newCluster = !file.exists(); 183 stream = new DataOutputStream( new BufferedOutputStream( new FileOutputStream( file.toString(), true ), 184 4 * 1024 ) ); 185 186 if (newCluster) { 187 writeHeader(); 188 } 189 clusterSize = file.length(); 190 } 191 192 193 194 public void close() throws IOException { 195 if (stream != null) { 197 stream.close(); 198 stream = null; 199 } 200 } 201 202 203 204 public void writeHeader() throws IOException { 205 stream.writeInt( Cluster.VERSION ); 208 stream.writeLong( cid.value() ); 209 } 210 211 212 222 private final long entrySize( DeathObject dobj ) { 223 return dobj.size() + dobj.stateSize + 21; 224 } 225 226 227 229 public void appendObject( DeathObject dobj, TransactionID tid, boolean serialize, boolean useClone ) 230 throws IOException { 231 env.logWriter.newEntry( this, 232 "Cluster " + cid + " appendObject: " + dobj.objID() + ", " + tid + ", " + serialize + ", " + useClone, 233 LogWriter.DEBUG3 ); 234 open(); 235 236 stream.writeLong( dobj.objID().value() ); 238 239 stream.writeLong( tid.value() ); 241 242 Chunk chunk = new Chunk( DATA_HEADER_CHUNK, dobj.container() ); 244 writeChunk( stream, chunk ); 245 dobj.stateSize = chunk.dataLength; 246 247 if (serialize) { 250 chunk = new Chunk( DATA_CHUNK, useClone ? dobj.container().targetShadow() : dobj.container().target() ); 251 } else { 252 chunk = new Chunk( DATA_CHUNK, dobj.data() ); 253 } 254 255 writeChunk( stream, chunk ); 256 dobj.setSize( chunk.data.length ); 257 258 clusterSize = fileHandle().length(); 259 } 260 261 262 267 public boolean readObjects( int whatToRead, TransactionID rollBackTid ) throws IOException { 268 boolean result = true; 270 271 DataInputStream fi = new DataInputStream( new FileInputStream( fileHandle() ) ); 272 int version = fi.readInt(); 273 cid = new ClusterID( fi.readLong() ); 274 275 DxMultiMap leaks = (DxMultiMap)readLeaks( rollBackTid, true ); 276 278 objects = new DxArrayBag(); 279 280 while (fi.available() != 0) { 281 TransactionID tid = null; 282 ClassicObjectContainer os = null; 283 DeathObject dobj = null; 284 boolean isLeak = false; 285 boolean rollBack = false; 286 287 try { 288 ObjectID oid = new ObjectID( fi.readLong() ); 290 292 DxDeque oidLeaks = (DxDeque)leaks.elementsForKey( oid ); 293 if (oidLeaks != null) { 294 isLeak = true; 295 if (oidLeaks.count() == 1) { 297 leaks.removeForKey( oid ); 298 } else { 299 oidLeaks.popBottom(); 300 } 301 } 302 303 304 tid = new TransactionID( fi.readLong() ); 306 if (rollBackTid != null && rollBackTid.equals( tid )) { 308 rollBack = true; 309 } 310 if (rollBack || isLeak || (whatToRead & TRANS) == 0) { 311 tid = null; 312 } 313 314 Chunk stateChunk = readChunk( fi, (whatToRead & STATE) == 0 || rollBack || isLeak ); 316 if (stateChunk.data != null) { 317 ObjectInputStream in = new ResolvingObjectInputStream( new ByteArrayInputStream( stateChunk.data ) ); 318 os = new ClassicObjectContainer(); 319 os.loadExternal( in ); 320 in.close(); 321 } 322 323 Chunk dataChunk = readChunk( fi, (whatToRead & DATA) == 0 || rollBack || isLeak ); 325 if (dataChunk.data != null) { 326 dobj = new DeathObject( oid ); 327 dobj.stateSize = stateChunk.dataLength; 328 dobj.setData( dataChunk.data ); 329 clusterSize += dobj.stateSize; 330 clusterSize += dobj.size(); 331 } 332 333 } catch (Exception e) { 334 env.fatalError( this, "exception in readObjects() of cluster " + cid, e ); 335 break; 336 } 337 338 if (tid != null) { 342 objects.add( tid ); 343 } 344 if (os != null) { 345 objects.add( os ); 346 } 347 if (dobj != null) { 348 objects.add( dobj ); 349 } 350 351 result &= !rollBack; 352 } 353 354 fi.close(); 355 return result; 356 } 357 358 359 360 public long leakSize() { 361 if (leakSize != -1) { 362 return leakSize; 363 } 364 365 File file = new File( env.getDatabaseDir() + Env.DATA_DIR, cid + Cluster.LEAK_FILE_SUFF ); 366 if (!file.exists()) { 367 return 0; 368 } 369 370 try { 371 DataInputStream leakStream = new DataInputStream( new FileInputStream( file ) ); 372 leakStream.skip( leakStream.available() - 8 ); 373 leakSize = leakStream.readLong(); 374 leakStream.close(); 375 return leakSize; 376 } catch (IOException e) { 377 return 0; 378 } 379 } 380 381 382 383 public void writeLeak( DeathObject dobj, TransactionID tid ) throws IOException { 384 writeLeak( dobj.objID(), tid, entrySize( dobj ) ); 385 } 386 387 388 389 public void writeLeak( ObjectID oid, TransactionID tid, long objSize ) throws IOException { 390 File file = leakFileHandle(); 392 boolean newFile = !file.exists(); 393 DataOutputStream leakStream = 394 new DataOutputStream( new BufferedOutputStream( new FileOutputStream( file.toString(), true ) ) ); 395 396 if (newFile) { 398 leakStream.writeInt( Cluster.VERSION ); 399 leakStream.writeLong( cid.value() ); 400 } 401 402 leakSize(); 404 leakSize += objSize; 406 407 leakStream.writeLong( oid.value() ); 409 leakStream.writeLong( tid.value() ); 410 leakStream.writeLong( leakSize ); 411 412 leakStream.close(); 413 } 414 415 416 417 public DxCollection readLeaks( TransactionID rollBackTid, boolean ordered ) throws IOException { 418 File file = leakFileHandle(); 419 420 DxCollection result; 421 if (ordered) { 422 result = new DxMultiMap( new DxHashMap(), new DxListDeque() ); 423 } else { 424 result = new DxListDeque(); 425 } 426 427 if (!file.exists()) { 428 return result; 429 } 430 431 DataInputStream leakStream = new DataInputStream( new FileInputStream( file ) ); 432 433 leakStream.readInt(); 434 leakStream.readLong(); 435 436 while (leakStream.available() != 0) { 437 ObjectID oid = new ObjectID( leakStream.readLong() ); 439 TransactionID tid = new TransactionID( leakStream.readLong() ); 441 Long leakSize = new Long ( leakStream.readLong() ); 443 444 if (rollBackTid == null || !rollBackTid.equals( tid )) { 445 if (ordered) { 446 ((DxMultiMap)result).addForKey( tid, oid ); 447 } else { 448 ((DxDeque)result).pushTop( oid ); 449 ((DxDeque)result).pushTop( tid ); 450 ((DxDeque)result).pushTop( leakSize ); 451 } 452 } 453 } 454 455 leakStream.close(); 456 457 return result; 458 } 459 460 461 462 public void removeFromDisk() throws IOException { 463 File f = fileHandle(); 465 if (f.exists()) { 466 f.delete(); 467 } 468 f = leakFileHandle(); 469 if (f.exists()) { 470 f.delete(); 471 } 472 } 473 474 475 476 private void writeChunk( DataOutputStream out, Chunk chunk ) throws IOException { 477 if (chunk.object != null) { 478 ByteArrayOutputStream bs = new ByteArrayOutputStream(); 479 ObjectOutputStream os = new ObjectOutputStream( bs ); 480 if (chunk.object instanceof ClassicObjectContainer) { 481 ((ClassicObjectContainer)chunk.object).storeExternal( os ); 482 } else { 483 os.writeObject( chunk.object ); 484 } 485 chunk.data = bs.toByteArray(); 486 chunk.dataLength = chunk.data.length; 487 os.close(); 488 } 489 490 env.logWriter.newEntry( this, "Cluster " + cid + " writeChunk: " + chunk.id + ", " + chunk.dataLength, 491 LogWriter.DEBUG3 ); 492 out.writeByte( chunk.id ); 493 out.writeInt( chunk.dataLength ); 494 out.write( chunk.data ); 495 } 496 497 498 499 private Chunk readChunk( DataInputStream in, boolean skip ) throws IOException { 500 Chunk chunk = new Chunk(); 501 chunk.id = in.readByte(); 502 503 chunk.dataLength = in.readInt(); 504 if (skip) { 506 in.skip( chunk.dataLength ); 507 } else { 508 chunk.data = new byte[chunk.dataLength]; 509 in.read( chunk.data ); 510 } 511 512 return chunk; 513 } 514 515 516 public void rollBack( TransactionID rollBackTid ) throws Exception { 517 rollBackLeaks( rollBackTid ); 519 520 boolean clusterIsClean = false; 521 522 try { 523 clusterIsClean = readObjects( Cluster.STATE | Cluster.TRANS | Cluster.DATA, rollBackTid ); 524 } catch (Exception e) { 525 } 527 528 if (!clusterIsClean) { 529 if (objects().count() > 0) { 530 beginRecovery( OBJECTS ); 532 open(); 533 534 DxIterator it = objects().iterator(); 536 while (it.next() != null) { 537 TransactionID tid = (TransactionID)it.object(); 538 ObjectContainer os = (ObjectContainer)it.next(); 539 DeathObject dobj = (DeathObject)it.next(); 540 classicStore.objectSpace.deleteObject( os ); 543 classicStore.objectSpace.addObject( os ); 544 appendObject( dobj, tid, false, false ); 545 } 546 547 close(); 548 endRecovery( OBJECTS ); 550 } else { 551 removeFromDisk(); 553 } 554 } 555 } 556 557 558 public void rollBackLeaks( TransactionID rollBackTid ) throws Exception { 559 DxDeque leaks = null; 560 try { 561 leaks = (DxDeque)readLeaks( rollBackTid, false ); 562 } catch (Exception e) { 563 } 565 566 if (leaks.count() > 0) { 567 beginRecovery( LEAKS ); 568 569 while (leaks.count() > 0) { 570 writeLeak( (ObjectID)leaks.popBottom(), (TransactionID)leaks.popBottom(), 571 ((Long )leaks.popBottom()).longValue() ); 572 } 573 574 endRecovery( LEAKS ); 575 } else { 576 if (leakFileHandle().exists()) { 577 leakFileHandle().delete(); 578 } 579 } 580 } 581 582 583 586 protected boolean needsCompressing() { 587 boolean result = false; 588 589 long clSize = fileHandle().length(); 592 if (clSize > 0) { 593 result = (double)leakSize() / clSize > Cluster.LEAK_WEIGHT; 594 } 595 596 return result; 597 } 598 599 } 600 | Popular Tags |