KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > ozoneDB > core > storage > classicStore > Cluster


1 // You can redistribute this software and/or modify it under the terms of
2
// the Ozone Core License version 1 published by ozone-db.org.
3
//
4
// The original code and portions created by SMB are
5
// Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
6
//
7
// $Id$
8

9 package org.ozoneDB.core.storage.classicStore;
10
11 import java.io.*;
12 import org.ozoneDB.core.*;
13 import org.ozoneDB.core.storage.classicStore.ClassicObjectContainer;
14 import org.ozoneDB.core.storage.classicStore.ClassicStore;
15 import org.ozoneDB.util.*;
16 import org.ozoneDB.DxLib.*;
17 import org.ozoneDB.io.stream.ResolvingObjectInputStream;
18
19 /**
20  * @author <a HREF="http://www.softwarebuero.de/">SMB</a>
21  */

22 public class Cluster extends Object JavaDoc {
23     public final static int VERSION = 1;
24     public static int MAX_SIZE = 64 * 1024;
25     public final static String JavaDoc CLUSTER_FILE_SUFF = ".cl";
26     public final static String JavaDoc LEAK_FILE_SUFF = ".lk";
27     public final static String JavaDoc RECOVERY_SUFF = ".rec";
28     public final static double LEAK_WEIGHT = 0.5;
29
30     public final static int DATA = 1;
31     public final static int STATE = 2;
32     public final static int TRANS = 4;
33
34     // chunk id's
35
public final static byte CL_HEADER_CHUNK = 1;
36     public final static byte DATA_OID_CHUNK = 2;
37     public final static byte DATA_HEADER_CHUNK = 3;
38     public final static byte DATA_CHUNK = 4;
39
40     // recovery modes
41
public final static byte NONE = 0;
42     public final static byte OBJECTS = 1;
43     public final static byte LEAKS = 2;
44
45
46     class Chunk extends Object JavaDoc {
47         public byte id;
48         public byte[] data;
49         public int dataLength;
50         public Object JavaDoc object;
51
52
53         public Chunk() {
54         }
55
56
57         public Chunk( byte _id, byte[] _data ) {
58             id = _id;
59             data = _data;
60             dataLength = data.length;
61         }
62
63
64         public Chunk( byte _id, Object JavaDoc obj ) {
65             id = _id;
66             object = obj;
67         }
68     }
69
70     /** */
71     ClusterID cid;
72     /** */
73     byte recoveryMode = NONE;
74     /** */
75     long clusterSize = 0;
76     /** */
77     long leakSize = -1;
78     /** */
79     DataOutputStream stream;
80
81     /** */
82     DxCollection objects;
83
84     /** */
85     Env env;
86     ClassicStore classicStore;
87
88
89     /**
90      */

91     public Cluster( Env _env, ClassicStore _classicStore ) {
92         env = _env;
93         classicStore = _classicStore;
94     }
95
96
97     /**
98      */

99     public Cluster( Env _env, ClassicStore _classicStore, ClusterID _cid ) {
100         env = _env;
101         classicStore = _classicStore;
102         cid = _cid;
103     }
104
105
106     protected void finalize() throws Throwable JavaDoc {
107         super.finalize();
108         close();
109     }
110
111
112     /**
113      */

114     public final ClusterID cluID() {
115         return cid;
116     }
117
118
119     /**
120      */

121     public final DxCollection objects() {
122         return objects;
123     }
124
125
126     /**
127      */

128     public final void beginRecovery( byte mode ) {
129         recoveryMode = mode;
130     }
131
132
133     /**
134      */

135     public final void endRecovery( byte mode ) {
136         File file = mode == OBJECTS ? fileHandle() : leakFileHandle();
137         recoveryMode = NONE;
138         file.renameTo( mode == OBJECTS ? fileHandle() : leakFileHandle() );
139     }
140
141
142     /**
143      */

144     public final File fileHandle() {
145         return new File( env.getDatabaseDir() + Env.DATA_DIR, cid.toString() + Cluster.CLUSTER_FILE_SUFF + (recoveryMode == OBJECTS
146                 ? Cluster.RECOVERY_SUFF
147                 : "") );
148     }
149
150
151     /**
152      */

153     public final File leakFileHandle() {
154         return new File( env.getDatabaseDir() + Env.DATA_DIR, cid.toString() + Cluster.LEAK_FILE_SUFF + (recoveryMode == LEAKS
155                 ? Cluster.RECOVERY_SUFF
156                 : "") );
157     }
158
159
160     /**
161      * The cluster size has to different meanings:
162      * - while writing it means the size of the cluster file, so that we can
163      * limit the file's size as good as possible to the user defined
164      * maximum cluster size
165      * - while reading it means the sum of the sizes of all its death objects,
166      * so that we can determine exactly the space we will need in the object
167      * buffer of the cluster space
168      */

169     public final long size() {
170         return clusterSize;
171     }
172
173
174     /** */
175     public void open() throws IOException {
176         //env.logWriter.newEntry ("Cluster.open: " + cid, LogWriter.DEBUG);
177
if (stream != null) {
178             return;
179         }
180
181         File file = fileHandle();
182         boolean newCluster = !file.exists();
183         stream = new DataOutputStream( new BufferedOutputStream( new FileOutputStream( file.toString(), true ),
184                 4 * 1024 ) );
185
186         if (newCluster) {
187             writeHeader();
188         }
189         clusterSize = file.length();
190     }
191
192
193     /** */
194     public void close() throws IOException {
195         //env.logWriter.newEntry ("Cluster.close: " + cid, LogWriter.DEBUG);
196
if (stream != null) {
197             stream.close();
198             stream = null;
199         }
200     }
201
202
203     /** */
204     public void writeHeader() throws IOException {
205         //env.logWriter.newEntry ("Cluster.writeHeader: " + cid, LogWriter.DEBUG);
206
// write cluster version and id
207
stream.writeInt( Cluster.VERSION );
208         stream.writeLong( cid.value() );
209     }
210
211
212     /**
213      * size of a object entry:
214      * ObjectID : 8 bytes
215      * TransactionID : 8 bytes
216      * streamed ObjectContainer: comes from DeathObject.stateSize
217      * ChunkID : 1 byte
218      * data length : 4 bytes
219      * data itself : n bytes
220      * -> data overhead : 21 bytes + object state
221      */

222     private final long entrySize( DeathObject dobj ) {
223         return dobj.size() + dobj.stateSize + 21;
224     }
225
226
227     /**
228      */

229     public void appendObject( DeathObject dobj, TransactionID tid, boolean serialize, boolean useClone )
230             throws IOException {
231         env.logWriter.newEntry( this,
232                 "Cluster " + cid + " appendObject: " + dobj.objID() + ", " + tid + ", " + serialize + ", " + useClone,
233                 LogWriter.DEBUG3 );
234         open();
235
236         // write the object id
237
stream.writeLong( dobj.objID().value() );
238
239         // write transaction id
240
stream.writeLong( tid.value() );
241
242         // write the object' container
243
Chunk chunk = new Chunk( DATA_HEADER_CHUNK, dobj.container() );
244         writeChunk( stream, chunk );
245         dobj.stateSize = chunk.dataLength;
246
247         // write the object itself: if we are in recovery mode, we use the
248
// dobj.data() directly because it is already set
249
if (serialize) {
250             chunk = new Chunk( DATA_CHUNK, useClone ? dobj.container().targetShadow() : dobj.container().target() );
251         } else {
252             chunk = new Chunk( DATA_CHUNK, dobj.data() );
253         }
254
255         writeChunk( stream, chunk );
256         dobj.setSize( chunk.data.length );
257
258         clusterSize = fileHandle().length();
259     }
260
261
262     /**
263      * reads all objects from the cluster, while dropping leaks in normal mode
264      * or broken-transaction-objects in recovery mode;
265      * returns false if any objects were dropped
266      */

267     public boolean readObjects( int whatToRead, TransactionID rollBackTid ) throws IOException {
268         //env.logWriter.newEntry ("Cluster " + cid + " readObjects: " + whatToRead + ", " + rollBackTid, LogWriter.DEBUG);
269
boolean result = true;
270
271         DataInputStream fi = new DataInputStream( new FileInputStream( fileHandle() ) );
272         int version = fi.readInt();
273         cid = new ClusterID( fi.readLong() );
274
275         DxMultiMap leaks = (DxMultiMap)readLeaks( rollBackTid, true );
276         //env.logWriter.newEntry ("leaks for " + cid + ": " + leaks.count(), LogWriter.DEBUG);
277

278         objects = new DxArrayBag();
279
280         while (fi.available() != 0) {
281             TransactionID tid = null;
282             ClassicObjectContainer os = null;
283             DeathObject dobj = null;
284             boolean isLeak = false;
285             boolean rollBack = false;
286
287             try {
288                 // read the object id
289
ObjectID oid = new ObjectID( fi.readLong() );
290                 //env.logWriter.newEntry ("\tnext object: " + oid, LogWriter.DEBUG);
291

292                 DxDeque oidLeaks = (DxDeque)leaks.elementsForKey( oid );
293                 if (oidLeaks != null) {
294                     isLeak = true;
295                     //env.logWriter.newEntry ("\t" + oid + " is a leak", LogWriter.DEBUG);
296
if (oidLeaks.count() == 1) {
297                         leaks.removeForKey( oid );
298                     } else {
299                         oidLeaks.popBottom();
300                     }
301                 }
302
303
304                 // read TransactionID
305
tid = new TransactionID( fi.readLong() );
306                 // check, if this an object of the broken tranaction
307
if (rollBackTid != null && rollBackTid.equals( tid )) {
308                     rollBack = true;
309                 }
310                 if (rollBack || isLeak || (whatToRead & TRANS) == 0) {
311                     tid = null;
312                 }
313
314                 // read object state, if necessary
315
Chunk stateChunk = readChunk( fi, (whatToRead & STATE) == 0 || rollBack || isLeak );
316                 if (stateChunk.data != null) {
317                     ObjectInputStream in = new ResolvingObjectInputStream( new ByteArrayInputStream( stateChunk.data ) );
318                     os = new ClassicObjectContainer();
319                     os.loadExternal( in );
320                     in.close();
321                 }
322
323                 // create a new deathobj and read the data
324
Chunk dataChunk = readChunk( fi, (whatToRead & DATA) == 0 || rollBack || isLeak );
325                 if (dataChunk.data != null) {
326                     dobj = new DeathObject( oid );
327                     dobj.stateSize = stateChunk.dataLength;
328                     dobj.setData( dataChunk.data );
329                     clusterSize += dobj.stateSize;
330                     clusterSize += dobj.size();
331                 }
332
333             } catch (Exception JavaDoc e) {
334                 env.fatalError( this, "exception in readObjects() of cluster " + cid, e );
335                 break;
336             }
337
338             // is everything was fine, add all required information,
339
// we have to do this at last, because only complete objects
340
// should be added
341
if (tid != null) {
342                 objects.add( tid );
343             }
344             if (os != null) {
345                 objects.add( os );
346             }
347             if (dobj != null) {
348                 objects.add( dobj );
349             }
350
351             result &= !rollBack;
352         }
353
354         fi.close();
355         return result;
356     }
357
358
359     /** */
360     public long leakSize() {
361         if (leakSize != -1) {
362             return leakSize;
363         }
364
365         File file = new File( env.getDatabaseDir() + Env.DATA_DIR, cid + Cluster.LEAK_FILE_SUFF );
366         if (!file.exists()) {
367             return 0;
368         }
369
370         try {
371             DataInputStream leakStream = new DataInputStream( new FileInputStream( file ) );
372             leakStream.skip( leakStream.available() - 8 );
373             leakSize = leakStream.readLong();
374             leakStream.close();
375             return leakSize;
376         } catch (IOException e) {
377             return 0;
378         }
379     }
380
381
382     /** */
383     public void writeLeak( DeathObject dobj, TransactionID tid ) throws IOException {
384         writeLeak( dobj.objID(), tid, entrySize( dobj ) );
385     }
386
387
388     /** */
389     public void writeLeak( ObjectID oid, TransactionID tid, long objSize ) throws IOException {
390         //env.logWriter.newEntry ("Cluster " + cid + " writeLeak: " + oid + ", " + tid + ", " + objSize, LogWriter.DEBUG);
391
File file = leakFileHandle();
392         boolean newFile = !file.exists();
393         DataOutputStream leakStream =
394                 new DataOutputStream( new BufferedOutputStream( new FileOutputStream( file.toString(), true ) ) );
395
396         // write header, if necessary
397
if (newFile) {
398             leakStream.writeInt( Cluster.VERSION );
399             leakStream.writeLong( cid.value() );
400         }
401
402         // update the leakSize
403
leakSize();
404         // increase the leak size
405
leakSize += objSize;
406
407         // write leak entry
408
leakStream.writeLong( oid.value() );
409         leakStream.writeLong( tid.value() );
410         leakStream.writeLong( leakSize );
411
412         leakStream.close();
413     }
414
415
416     /** */
417     public DxCollection readLeaks( TransactionID rollBackTid, boolean ordered ) throws IOException {
418         File file = leakFileHandle();
419
420         DxCollection result;
421         if (ordered) {
422             result = new DxMultiMap( new DxHashMap(), new DxListDeque() );
423         } else {
424             result = new DxListDeque();
425         }
426
427         if (!file.exists()) {
428             return result;
429         }
430
431         DataInputStream leakStream = new DataInputStream( new FileInputStream( file ) );
432
433         leakStream.readInt();
434         leakStream.readLong();
435
436         while (leakStream.available() != 0) {
437             // read object id
438
ObjectID oid = new ObjectID( leakStream.readLong() );
439             // read transaction id
440
TransactionID tid = new TransactionID( leakStream.readLong() );
441             // read leak size
442
Long JavaDoc leakSize = new Long JavaDoc( leakStream.readLong() );
443
444             if (rollBackTid == null || !rollBackTid.equals( tid )) {
445                 if (ordered) {
446                     ((DxMultiMap)result).addForKey( tid, oid );
447                 } else {
448                     ((DxDeque)result).pushTop( oid );
449                     ((DxDeque)result).pushTop( tid );
450                     ((DxDeque)result).pushTop( leakSize );
451                 }
452             }
453         }
454
455         leakStream.close();
456
457         return result;
458     }
459
460
461     /** */
462     public void removeFromDisk() throws IOException {
463         //env.logWriter.newEntry ("Cluster " + cid + " removeFromDisk", LogWriter.DEBUG);
464
File f = fileHandle();
465         if (f.exists()) {
466             f.delete();
467         }
468         f = leakFileHandle();
469         if (f.exists()) {
470             f.delete();
471         }
472     }
473
474
475     /** */
476     private void writeChunk( DataOutputStream out, Chunk chunk ) throws IOException {
477         if (chunk.object != null) {
478             ByteArrayOutputStream bs = new ByteArrayOutputStream();
479             ObjectOutputStream os = new ObjectOutputStream( bs );
480             if (chunk.object instanceof ClassicObjectContainer) {
481                 ((ClassicObjectContainer)chunk.object).storeExternal( os );
482             } else {
483                 os.writeObject( chunk.object );
484             }
485             chunk.data = bs.toByteArray();
486             chunk.dataLength = chunk.data.length;
487             os.close();
488         }
489
490         env.logWriter.newEntry( this, "Cluster " + cid + " writeChunk: " + chunk.id + ", " + chunk.dataLength,
491                 LogWriter.DEBUG3 );
492         out.writeByte( chunk.id );
493         out.writeInt( chunk.dataLength );
494         out.write( chunk.data );
495     }
496
497
498     /** */
499     private Chunk readChunk( DataInputStream in, boolean skip ) throws IOException {
500         Chunk chunk = new Chunk();
501         chunk.id = in.readByte();
502
503         chunk.dataLength = in.readInt();
504         //env.logWriter.newEntry ("Cluster " + cid + " readChunk: " + chunk.id + ", " + chunk.dataLength, LogWriter.DEBUG);
505
if (skip) {
506             in.skip( chunk.dataLength );
507         } else {
508             chunk.data = new byte[chunk.dataLength];
509             in.read( chunk.data );
510         }
511
512         return chunk;
513     }
514
515
516     public void rollBack( TransactionID rollBackTid ) throws Exception JavaDoc {
517         //env.logWriter.newEntry ("Cluster " + cid + " rollback: " + rollBackTid, LogWriter.DEBUG);
518
rollBackLeaks( rollBackTid );
519
520         boolean clusterIsClean = false;
521
522         try {
523             clusterIsClean = readObjects( Cluster.STATE | Cluster.TRANS | Cluster.DATA, rollBackTid );
524         } catch (Exception JavaDoc e) {
525         //env.logWriter.newEntry (this, "rollBack: cluster " + cid + " corrupted", LogWriter.WARN);
526
}
527
528         if (!clusterIsClean) {
529             if (objects().count() > 0) {
530                 // switch into recovery mode
531
beginRecovery( OBJECTS );
532                 open();
533
534                 // rewrite all valid objects in the shadow cluster
535
DxIterator it = objects().iterator();
536                 while (it.next() != null) {
537                     TransactionID tid = (TransactionID)it.object();
538                     ObjectContainer os = (ObjectContainer)it.next();
539                     DeathObject dobj = (DeathObject)it.next();
540                     // swap the object state, because it may have changed
541
// while the transaction rollBackTid
542
classicStore.objectSpace.deleteObject( os );
543                     classicStore.objectSpace.addObject( os );
544                     appendObject( dobj, tid, false, false );
545                 }
546
547                 close();
548                 // switch back to normal mode
549
endRecovery( OBJECTS );
550             } else {
551                 // if all objects of the cluster are invalid simply delete it
552
removeFromDisk();
553             }
554         }
555     }
556
557
558     public void rollBackLeaks( TransactionID rollBackTid ) throws Exception JavaDoc {
559         DxDeque leaks = null;
560         try {
561             leaks = (DxDeque)readLeaks( rollBackTid, false );
562         } catch (Exception JavaDoc e) {
563         //env.logWriter.newEntry (this, "rollBackLeaks: leaks " + cid + " corrupted", LogWriter.WARN);
564
}
565
566         if (leaks.count() > 0) {
567             beginRecovery( LEAKS );
568
569             while (leaks.count() > 0) {
570                 writeLeak( (ObjectID)leaks.popBottom(), (TransactionID)leaks.popBottom(),
571                         ((Long JavaDoc)leaks.popBottom()).longValue() );
572             }
573
574             endRecovery( LEAKS );
575         } else {
576             if (leakFileHandle().exists()) {
577                 leakFileHandle().delete();
578             }
579         }
580     }
581
582
583     /**
584      * Checks, if the specified cluster needs to be compressed.
585      */

586     protected boolean needsCompressing() {
587         boolean result = false;
588
589         // retrieve the cluster size simply of it's file size
590
// this is much faster than reading the whole cluster
591
long clSize = fileHandle().length();
592         if (clSize > 0) {
593             result = (double)leakSize() / clSize > Cluster.LEAK_WEIGHT;
594         }
595
596         return result;
597     }
598
599 }
600
Popular Tags