KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > mckoi > database > V2MasterTableDataSource


1 /**
2  * com.mckoi.database.V2MasterTableDataSource 01 Sep 2002
3  *
4  * Mckoi SQL Database ( http://www.mckoi.com/database )
5  * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * Version 2 as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License Version 2 for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * Version 2 along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19  *
20  * Change Log:
21  *
22  *
23  */

24
25 package com.mckoi.database;
26
27 import com.mckoi.store.*;
28 import com.mckoi.debug.*;
29 import com.mckoi.util.ByteArrayUtil;
30 import com.mckoi.util.IntegerListInterface;
31 import com.mckoi.util.UserTerminal;
32 import com.mckoi.database.global.ObjectTransfer;
33 import com.mckoi.database.global.StringObject;
34 import com.mckoi.database.global.ClobRef;
35 import com.mckoi.database.global.Ref;
36 import java.util.Collections JavaDoc;
37 import java.util.HashMap JavaDoc;
38 import java.util.Iterator JavaDoc;
39 import java.util.ArrayList JavaDoc;
40 import java.util.List JavaDoc;
41 import java.io.*;
42
43 /**
44  * A MasterTableDataSource that is backed by a non-shared com.mckoi.store.Store
45  * object. The store interface allows us a great deal of flexibility because
46  * we can map a store around different underlying devices. For example, a
47  * store could map to a memory region, a memory mapped file, or a standard
48  * random access file.
49  * <p>
50  * Unlike V1MasterTableDataSource, this manages data and index information in
51  * a single store which can be backed by a single file in the file system.
52  * <p>
53  * The structure of the store comprises of a header block that contains the
54  * following information;
55  * <p><pre>
56  * HEADER BLOCK
57  * +-------------------------------+
58  * | version |
59  * | table id |
60  * | table sequence id |
61  * | pointer to DataTableDef |
62  * | pointer to DataIndexSetDef |
63  * | pointer to index block |
64  * | LIST BLOCK HEADER pointer |
65  * +-------------------------------+
66  * </pre>
67  * <p>
68  * Each record is comprised of a header which contains offsets to the fields
69  * in the record, and a serializable of the fields themselves.
70  *
71  * @author Tobias Downer
72  */

73
74 public final class V2MasterTableDataSource extends MasterTableDataSource {
75
76   /**
77    * The file name of this store in the conglomerate path.
78    */

79   private String JavaDoc file_name;
80
81   /**
82    * The backing store object.
83    */

84   private Store store;
85
86   /**
87    * An IndexSetStore object that manages the indexes for this table.
88    */

89   private IndexSetStore index_store;
90   
91   /**
92    * The current sequence id.
93    */

94   private long sequence_id;
95   
96   
97   // ---------- Pointers into the store ----------
98

99 // /**
100
// * Points to the store header area.
101
// */
102
// private long header_p;
103

104   /**
105    * Points to the index header area.
106    */

107   private long index_header_p;
108   
109   /**
110    * Points to the block list header area.
111    */

112   private long list_header_p;
113
114   /**
115    * The header area itself.
116    */

117   private MutableArea header_area;
118
119
120   /**
121    * The structure that manages the pointers to the records.
122    */

123   private FixedRecordList list_structure;
124
125   /**
126    * The first delete chain element.
127    */

128   private long first_delete_chain_record;
129   
130
131
132   
133   
134   /**
135    * Set to true when the VM has shutdown and writes should no longer be
136    * possible on the object.
137    */

138   private boolean has_shutdown;
139   
140
141   /**
142    * The Constructor.
143    */

144   public V2MasterTableDataSource(TransactionSystem system,
145                                  StoreSystem store_system,
146                                  OpenTransactionList open_transactions,
147                                  BlobStoreInterface blob_store_interface) {
148     super(system, store_system, open_transactions, blob_store_interface);
149     first_delete_chain_record = -1;
150     has_shutdown = false;
151   }
152
153   /**
154    * Convenience - wraps the given output stream around a buffered data output
155    * stream.
156    */

157   private static DataOutputStream getDOut(OutputStream out) {
158 // return new DataOutputStream(out);
159
return new DataOutputStream(new BufferedOutputStream(out, 512));
160   }
161
162   /**
163    * Convenience - wraps the given input stream around a buffered data input
164    * stream.
165    */

166   private static DataInputStream getDIn(InputStream in) {
167 // return new DataInputStream(in);
168
return new DataInputStream(new BufferedInputStream(in, 512));
169   }
170
171   /**
172    * Sets up an initial store (should only be called from the 'create' method).
173    */

174   private void setupInitialStore() throws IOException {
175     // Serialize the DataTableDef object
176
ByteArrayOutputStream bout = new ByteArrayOutputStream();
177     DataOutputStream dout = new DataOutputStream(bout);
178     dout.writeInt(1);
179     getDataTableDef().write(dout);
180     // Convert to a byte array
181
byte[] data_table_def_buf = bout.toByteArray();
182     
183     // Serialize the DataIndexSetDef object
184
bout = new ByteArrayOutputStream();
185     dout = new DataOutputStream(bout);
186     dout.writeInt(1);
187     getDataIndexSetDef().write(dout);
188     // Convert to byte array
189
byte[] index_set_def_buf = bout.toByteArray();
190     
191     bout = null;
192     dout = null;
193
194     try {
195       store.lockForWrite();
196
197       // Allocate an 80 byte header
198
AreaWriter header_writer = store.createArea(80);
199       long header_p = header_writer.getID();
200       // Allocate space to store the DataTableDef serialization
201
AreaWriter data_table_def_writer =
202                                  store.createArea(data_table_def_buf.length);
203       long data_table_def_p = data_table_def_writer.getID();
204       // Allocate space to store the DataIndexSetDef serialization
205
AreaWriter data_index_set_writer =
206                                   store.createArea(index_set_def_buf.length);
207       long data_index_set_def_p = data_index_set_writer.getID();
208
209       // Allocate space for the list header
210
list_header_p = list_structure.create();
211       list_structure.setReservedLong(-1);
212       first_delete_chain_record = -1;
213
214       // Create the index store
215
index_store = new IndexSetStore(store, getSystem());
216       index_header_p = index_store.create();
217
218       // Write the main header
219
header_writer.putInt(1); // Version
220
header_writer.putInt(table_id); // table_id
221
header_writer.putLong(sequence_id); // initial sequence id
222
header_writer.putLong(data_table_def_p); // pointer to DataTableDef
223
header_writer.putLong(data_index_set_def_p); // pointer to DataIndexSetDef
224
header_writer.putLong(index_header_p); // index header pointer
225
header_writer.putLong(list_header_p); // list header pointer
226
header_writer.finish();
227
228       // Write the data_table_def
229
data_table_def_writer.put(data_table_def_buf);
230       data_table_def_writer.finish();
231
232       // Write the data_index_set_def
233
data_index_set_writer.put(index_set_def_buf);
234       data_index_set_writer.finish();
235     
236       // Set the pointer to the header in the reserved area.
237
MutableArea fixed_area = store.getMutableArea(-1);
238       fixed_area.putLong(header_p);
239       fixed_area.checkOut();
240
241       // Set the header area
242
header_area = store.getMutableArea(header_p);
243     
244     }
245     finally {
246       store.unlockForWrite();
247     }
248
249   }
250
251   /**
252    * Read the store headers and initialize any internal object state. This is
253    * called by the 'open' method.
254    */

255   private void readStoreHeaders() throws IOException {
256     // Read the fixed header
257
Area fixed_area = store.getArea(-1);
258     // Set the header area
259
header_area = store.getMutableArea(fixed_area.getLong());
260
261     // Open a stream to the header
262
int version = header_area.getInt(); // version
263
if (version != 1) {
264       throw new IOException("Incorrect version identifier.");
265     }
266     this.table_id = header_area.getInt(); // table_id
267
this.sequence_id = header_area.getLong(); // sequence id
268
long def_p = header_area.getLong(); // pointer to DataTableDef
269
long index_def_p = header_area.getLong(); // pointer to DataIndexSetDef
270
this.index_header_p = header_area.getLong(); // pointer to index header
271
this.list_header_p = header_area.getLong(); // pointer to list header
272

273     // Read the data table def
274
DataInputStream din = getDIn(store.getAreaInputStream(def_p));
275     version = din.readInt();
276     if (version != 1) {
277       throw new IOException("Incorrect DataTableDef version identifier.");
278     }
279     table_def = DataTableDef.read(din);
280     din.close();
281
282     // Read the data index set def
283
din = getDIn(store.getAreaInputStream(index_def_p));
284     version = din.readInt();
285     if (version != 1) {
286       throw new IOException("Incorrect DataIndexSetDef version identifier.");
287     }
288     index_def = DataIndexSetDef.read(din);
289     din.close();
290     
291     // Read the list header
292
list_structure.init(list_header_p);
293     first_delete_chain_record = list_structure.getReservedLong();
294
295     // Init the index store
296
index_store = new IndexSetStore(store, getSystem());
297     try {
298       index_store.init(index_header_p);
299     }
300     catch (IOException e) {
301       // If this failed try writing out a new empty index set.
302
// ISSUE: Should this occur here? This is really an attempt at repairing
303
// the index store.
304
index_store = new IndexSetStore(store, getSystem());
305       index_header_p = index_store.create();
306       index_store.addIndexLists(table_def.columnCount() + 1, (byte) 1, 1024);
307       header_area.position(32);
308       header_area.putLong(index_header_p);
309       header_area.position(0);
310       header_area.checkOut();
311     }
312
313   }
314
315   /**
316    * Create this master table in the file system at the given path. This will
317    * initialise the various file objects and result in a new empty master table
318    * to store data in.
319    */

320   void create(int table_id, DataTableDef table_def) throws IOException {
321
322     // Set the data table def object
323
setupDataTableDef(table_def);
324
325     // Initially set the table sequence_id to 1
326
this.sequence_id = 1;
327     
328     // Generate the name of the store file name.
329
this.file_name = makeTableFileName(getSystem(), table_id, getTableName());
330
331     // Create and open the store.
332
store = storeSystem().createStore(file_name);
333
334     try {
335       store.lockForWrite();
336
337       // Setup the list structure
338
list_structure = new FixedRecordList(store, 12);
339     }
340     finally {
341       store.unlockForWrite();
342     }
343
344     // Set up internal state of this object
345
this.table_id = table_id;
346
347     // Initialize the store to an empty state,
348
setupInitialStore();
349     index_store.addIndexLists(table_def.columnCount() + 1, (byte) 1, 1024);
350     
351     // Load internal state
352
loadInternal();
353
354 // synchAll();
355

356   }
357
358   /**
359    * Returns true if the master table data source with the given source
360    * identity exists.
361    */

362   boolean exists(String JavaDoc identity) throws IOException {
363     return storeSystem().storeExists(identity);
364   }
365
366   /**
367    * Opens an existing master table from the file system at the path of the
368    * conglomerate this belongs to. This will set up the internal state of
369    * this object with the data read in.
370    */

371   public void open(String JavaDoc file_name) throws IOException {
372
373     // Set read only flag.
374
this.file_name = file_name;
375
376     // Open the store.
377
store = storeSystem().openStore(file_name);
378     boolean need_check = !store.lastCloseClean();
379
380     // Setup the list structure
381
list_structure = new FixedRecordList(store, 12);
382     
383     // Read and setup the pointers
384
readStoreHeaders();
385
386     // Set the column count
387
column_count = table_def.columnCount();
388
389     // Open table indices
390
table_indices = new MultiVersionTableIndices(getSystem(),
391                            table_def.getTableName(), table_def.columnCount());
392     // The column rid list cache
393
column_rid_list = new RIDList[table_def.columnCount()];
394
395     // Load internal state
396
loadInternal();
397
398     if (need_check) {
399       // Do an opening scan of the table. Any records that are uncommited
400
// must be marked as deleted.
401
doOpeningScan();
402       
403       // Scan for any leaks in the file,
404
Debug().write(Lvl.INFORMATION, this,
405                     "Scanning File: " + file_name + " for leaks.");
406       scanForLeaks();
407     }
408
409 // HashMap properties = new HashMap();
410
// file_store.statsScan(properties);
411
// System.out.println("File: " + file_name);
412
// Iterator key_i = properties.keySet().iterator();
413
// while (key_i.hasNext()) {
414
// String key = (String) key_i.next();
415
// System.out.print(key);
416
// System.out.print(" = ");
417
// System.out.println(properties.get(key));
418
// }
419

420   }
421
422   /**
423    * Closes this master table in the file system. This frees up all the
424    * resources associated with this master table.
425    * <p>
426    * This method is typically called when the database is shut down.
427    */

428   synchronized void close(boolean pending_drop) throws IOException {
429     // NOTE: This method MUST be synchronized over the table to prevent
430
// establishing a root lock on this table. If a root lock is established
431
// then the collection event could fail.
432

433     synchronized (list_structure) {
434
435       // If we are root locked, we must become un root locked.
436
clearAllRootLocks();
437
438       try {
439         try {
440           store.lockForWrite();
441
442           // Force a garbage collection event.
443
if (!isReadOnly()) {
444             garbage_collector.performCollectionEvent(true);
445           }
446
447           // If we are closing pending a drop, we need to remove all blob
448
// references in the table.
449
// NOTE: This must only happen after the above collection event.
450
if (pending_drop) {
451             // Scan and remove all blob references for this dropped table.
452
dropAllBlobReferences();
453           }
454         }
455         finally {
456           store.unlockForWrite();
457         }
458       }
459       catch (Throwable JavaDoc e) {
460         Debug().write(Lvl.ERROR, this,
461                       "Exception during table (" + toString() + ") close: " +
462                       e.getMessage());
463         Debug().writeException(e);
464       }
465     
466       // Synchronize the store
467
index_store.close();
468 // store.flush();
469

470       // Close the store in the store system.
471
storeSystem().closeStore(store);
472
473       table_def = null;
474       table_indices = null;
475       column_rid_list = null;
476       is_closed = true;
477     }
478   }
479
480   /**
481    * Creates a new v2 master table data source that is a copy of the given
482    * MasterTableDataSource object.
483    *
484    * @param table_id the table id to given the new table.
485    * @param src_master_table the table to copy.
486    * @param index_set the view of the table to be copied.
487    */

488   void copy(int table_id, MasterTableDataSource src_master_table,
489             IndexSet index_set) throws IOException {
490
491     // Basically we need to copy all the data and then set the new index view.
492
create(table_id, src_master_table.getDataTableDef());
493       
494     // The record list.
495
IntegerListInterface master_index = index_set.getIndex(0);
496
497     // For each row in the master table
498
int sz = src_master_table.rawRowCount();
499     for (int i = 0; i < sz; ++i) {
500       // Is this row in the set we are copying from?
501
if (master_index.contains(i)) {
502         // Yes so copy the record into this table.
503
copyRecordFrom(src_master_table, i);
504       }
505     }
506
507     // Copy the index set
508
if (src_master_table instanceof V2MasterTableDataSource) {
509       index_store.copyAllFrom(index_set);
510     }
511     else if (src_master_table instanceof V1MasterTableDataSource) {
512       // HACK: This is a bit of a hack. We should copy the index_set into
513
// this newly created object but instead we rebuild the indexes from
514
// scratch.
515
// This is only used when converting a 0.93 database to the
516
// V2MasterTableDataSource format.
517
buildIndexes();
518     }
519
520     // Finally set the unique id
521
long un_id = src_master_table.nextUniqueID();
522     setUniqueID(un_id);
523
524   }
525
526   // ---------- Low level operations ----------
527

528   /**
529    * Writes a record to the store and returns a pointer to the area that
530    * represents the new record. This does not manipulate the fixed structure
531    * in any way. This method only allocates an area to store the record and
532    * serializes the record. It is the responsibility of the callee to add the
533    * record into the general file structure.
534    * <p>
535    * Note that if the RowData contains any references to Blob objects then a
536    * reference count to the blob is generated at this point.
537    */

538   private long writeRecordToStore(RowData data) throws IOException {
539     
540     // Calculate how much space this record will use
541
int row_cells = data.getColumnCount();
542
543     int[] cell_sizes = new int[row_cells];
544     int[] cell_type = new int[row_cells];
545     
546     try {
547       store.lockForWrite();
548
549       // Establish a reference to any blobs in the record
550
int all_records_size = 0;
551       for (int i = 0; i < row_cells; ++i) {
552         TObject cell = data.getCellData(i);
553         int sz;
554         int ctype;
555         if (cell.getObject() instanceof Ref) {
556           Ref large_object_ref = (Ref) cell.getObject();
557           // TBinaryType that are BlobRef objects have to be handled separately.
558
sz = 16;
559           ctype = 2;
560           if (large_object_ref != null) {
561             // Tell the blob store interface that we've made a static reference
562
// to this blob.
563
blob_store_interface.establishReference(large_object_ref.getID());
564           }
565         }
566         else {
567           sz = ObjectTransfer.exactSize(cell.getObject());
568           ctype = 1;
569         }
570         cell_sizes[i] = sz;
571         cell_type[i] = ctype;
572         all_records_size += sz;
573       }
574   
575       long record_p;
576
577       // Allocate space for the record,
578
AreaWriter writer =
579                      store.createArea(all_records_size + (row_cells * 8) + 4);
580       record_p = writer.getID();
581
582       // The record output stream
583
DataOutputStream dout = getDOut(writer.getOutputStream());
584
585       // Write the record header first,
586
dout.writeInt(0); // reserved for future use
587
int cell_skip = 0;
588       for (int i = 0; i < row_cells; ++i) {
589         dout.writeInt((int) cell_type[i]);
590         dout.writeInt(cell_skip);
591         cell_skip += cell_sizes[i];
592       }
593
594       // Now write a serialization of the cells themselves,
595
for (int i = 0; i < row_cells; ++i) {
596         TObject t_object = data.getCellData(i);
597         int ctype = cell_type[i];
598         if (ctype == 1) {
599           // Regular object
600
ObjectTransfer.writeTo(dout, t_object.getObject());
601         }
602         else if (ctype == 2) {
603           // This is a binary large object and must be represented as a ref
604
// to a blob in the BlobStore.
605
Ref large_object_ref = (Ref) t_object.getObject();
606           if (large_object_ref == null) {
607             // null value
608
dout.writeInt(1);
609             dout.writeInt(0); // Reserved for future use
610
dout.writeLong(-1);
611           }
612           else {
613             dout.writeInt(0);
614             dout.writeInt(0); // Reserved for future use
615
dout.writeLong(large_object_ref.getID());
616           }
617         }
618         else {
619           throw new IOException("Unrecognised cell type.");
620         }
621       }
622
623       // Flush the output
624
dout.flush();
625
626       // Finish the record
627
writer.finish();
628
629       // Return the record
630
return record_p;
631
632     }
633     finally {
634       store.unlockForWrite();
635     }
636
637   }
638   
639   /**
640    * Copies the record at the given index in the source table to the same
641    * record index in this table. Note that this may need to expand the
642    * fixed list record heap as necessary to copy the record into the given
643    * position. The record is NOT copied into the first free record position.
644    */

645   private void copyRecordFrom(MasterTableDataSource src_master_table,
646                               int record_id) throws IOException {
647
648     // Copy the record from the source table in a RowData object,
649
int sz = src_master_table.getDataTableDef().columnCount();
650     RowData row_data = new RowData(getSystem(), sz);
651     for (int i = 0; i < sz; ++i) {
652       TObject tob = src_master_table.getCellContents(i, record_id);
653       row_data.setColumnDataFromTObject(i, tob);
654     }
655
656     try {
657       store.lockForWrite();
658
659       // Write record to this table but don't update any structures for the new
660
// record.
661
long record_p = writeRecordToStore(row_data);
662
663       // Add this record into the table structure at the given index
664
addToRecordList(record_id, record_p);
665
666       // Set the record type for this record (committed added).
667
writeRecordType(record_id, 0x010);
668
669     }
670     finally {
671       store.unlockForWrite();
672     }
673
674   }
675
676   /**
677    * Removes all blob references in the record area pointed to by 'record_p'.
678    * This should only be used when the record is be reclaimed.
679    */

680   private void removeAllBlobReferencesForRecord(long record_p)
681                                                           throws IOException {
682     // NOTE: Does this need to be optimized?
683
Area record_area = store.getArea(record_p);
684     int reserved = record_area.getInt(); // reserved
685
// Look for any blob references in the row
686
for (int i = 0; i < column_count; ++i) {
687       int ctype = record_area.getInt();
688       int cell_offset = record_area.getInt();
689       if (ctype == 1) {
690         // Type 1 is not a large object
691
}
692       else if (ctype == 2) {
693         int cur_p = record_area.position();
694         record_area.position(cell_offset + 4 + (column_count * 8));
695         int btype = record_area.getInt();
696         record_area.getInt(); // (reserved)
697
if (btype == 0) {
698           long blob_ref_id = record_area.getLong();
699           // Release this reference
700
blob_store_interface.releaseReference(blob_ref_id);
701         }
702         // Revert the area pointer
703
record_area.position(cur_p);
704       }
705       else {
706         throw new RuntimeException JavaDoc("Unrecognised type.");
707       }
708     }
709   }
710   
711   /**
712    * Scans the table and drops ALL blob references in this table. This is
713    * used when a table is dropped when is still contains elements referenced
714    * in the BlobStore. This will decrease the reference count in the BlobStore
715    * for all blobs. In effect, this is like calling 'delete' on all the data
716    * in the table.
717    * <p>
718    * This method should only be called when the table is about to be deleted
719    * from the file system.
720    */

721   private void dropAllBlobReferences() throws IOException {
722
723     synchronized (list_structure) {
724       long elements = list_structure.addressableNodeCount();
725       for (long i = 0; i < elements; ++i) {
726         Area a = list_structure.positionOnNode(i);
727         int status = a.getInt();
728         // Is the record not deleted?
729
if ((status & 0x020000) == 0) {
730           // Get the record pointer
731
long record_p = a.getLong();
732           removeAllBlobReferencesForRecord(record_p);
733         }
734       }
735     }
736
737   }
738   
739   // ---------- Diagnostic and repair ----------
740

741   /**
742    * Looks for any leaks in the file. This works by walking through the
743    * file and index area graph and 'remembering' all areas that were read.
744    * The store is then checked that all other areas except these are deleted.
745    * <p>
746    * Assumes the master table is open.
747    */

748   public void scanForLeaks() throws IOException {
749
750     synchronized (list_structure) {
751
752       // The list of pointers to areas (as Long).
753
ArrayList JavaDoc used_areas = new ArrayList JavaDoc();
754
755       // Add the header_p pointer
756
used_areas.add(new Long JavaDoc(header_area.getID()));
757       
758       header_area.position(16);
759       // Add the DataTableDef and DataIndexSetDef objects
760
used_areas.add(new Long JavaDoc(header_area.getLong()));
761       used_areas.add(new Long JavaDoc(header_area.getLong()));
762
763       // Add all the used areas in the list_structure itself.
764
list_structure.addAllAreasUsed(used_areas);
765
766       // Adds all the user areas in the index store.
767
index_store.addAllAreasUsed(used_areas);
768
769       // Search the list structure for all areas
770
long elements = list_structure.addressableNodeCount();
771       for (long i = 0; i < elements; ++i) {
772         Area a = list_structure.positionOnNode(i);
773         int status = a.getInt();
774         if ((status & 0x020000) == 0) {
775           long pointer = a.getLong();
776 // System.out.println("Not deleted = " + pointer);
777
// Record is not deleted,
778
used_areas.add(new Long JavaDoc(pointer));
779         }
780       }
781
782       // Following depends on store implementation
783
if (store instanceof AbstractStore) {
784         AbstractStore a_store = (AbstractStore) store;
785         ArrayList JavaDoc leaked_areas = a_store.findAllocatedAreasNotIn(used_areas);
786         if (leaked_areas.size() == 0) {
787           Debug().write(Lvl.INFORMATION, this, "No leaked areas.");
788         }
789         else {
790           Debug().write(Lvl.INFORMATION, this, "There were " +
791                         leaked_areas.size() + " leaked areas found.");
792           for (int n = 0; n < leaked_areas.size(); ++n) {
793             Long JavaDoc area_pointer = (Long JavaDoc) leaked_areas.get(n);
794             store.deleteArea(area_pointer.longValue());
795           }
796           Debug().write(Lvl.INFORMATION, this,
797                         "Leaked areas successfully freed.");
798         }
799       }
800
801     }
802
803   }
804   
805   /**
806    * Performs a complete check and repair of the table. The table must not
807    * have been opened before this method is called. The given UserTerminal
808    * parameter is an implementation of a user interface that is used to ask
809    * any questions and output the results of the check.
810    */

811   public void checkAndRepair(String JavaDoc file_name,
812                              UserTerminal terminal) throws IOException {
813
814     this.file_name = file_name;
815     
816     terminal.println("+ Repairing V2MasterTableDataSource " + file_name);
817
818     store = storeSystem().openStore(file_name);
819     // If AbstractStore then fix
820
if (store instanceof AbstractStore) {
821       ((AbstractStore) store).openScanAndFix(terminal);
822     }
823
824     // Setup the list structure
825
list_structure = new FixedRecordList(store, 12);
826     
827     try {
828       // Read and setup the pointers
829
readStoreHeaders();
830       // Set the column count
831
column_count = table_def.columnCount();
832     }
833     catch (IOException e) {
834       // If this fails, the table is not recoverable.
835
terminal.println(
836             "! Table is not repairable because the file headers are corrupt.");
837       terminal.println(" Error reported: " + e.getMessage());
838       e.printStackTrace();
839       return;
840     }
841
842     // From here, we at least have intact headers.
843
terminal.println("- Checking record integrity.");
844
845     // Get the sorted list of all areas in the file.
846
List JavaDoc all_areas = store.getAllAreas();
847     // The list of all records generated when we check each record
848
ArrayList JavaDoc all_records = new ArrayList JavaDoc();
849
850     // Look up each record and check it's intact, Any records that are deleted
851
// are added to the delete chain.
852
first_delete_chain_record = -1;
853     int record_count = 0;
854     int free_count = 0;
855     int sz = rawRowCount();
856     for (int i = sz - 1; i >= 0; --i) {
857       boolean record_valid = checkAndRepairRecord(i, all_areas, terminal);
858       if (record_valid) {
859         all_records.add(new Long JavaDoc(i));
860         ++record_count;
861       }
862       else {
863         ++free_count;
864       }
865     }
866     // Set the reserved area
867
list_structure.setReservedLong(first_delete_chain_record);
868
869     terminal.print("* Record count = " + record_count);
870     terminal.println(" Free count = " + free_count);
871     
872     // Check indexes
873
terminal.println("- Rebuilding all table index information.");
874
875     int index_count = table_def.columnCount() + 1;
876     for (int i = 0; i < index_count; ++i) {
877       index_store.commitDropIndex(i);
878     }
879 // store.flush();
880
buildIndexes();
881
882     terminal.println("- Table check complete.");
883 // // Flush any changes
884
// store.flush();
885

886   }
887
888   /**
889    * Checks and repairs a record if it requires repairing. Returns true if the
890    * record is valid, or false otherwise (record is/was deleted).
891    */

892   private boolean checkAndRepairRecord(
893                   int row_index, List JavaDoc all_areas, UserTerminal terminal)
894                                                            throws IOException {
895     synchronized (list_structure) {
896       // Position in the list structure
897
MutableArea block_area = list_structure.positionOnNode(row_index);
898       int p = block_area.position();
899       int status = block_area.getInt();
900       // If it is not deleted,
901
if ((status & 0x020000) == 0) {
902         long record_p = block_area.getLong();
903 // System.out.println("row_index = " + row_index + " record_p = " + record_p);
904
// Is this pointer valid?
905
int i = Collections.binarySearch(all_areas, new Long JavaDoc(record_p));
906         if (i >= 0) {
907           // Pointer is valid in the store,
908
// Try reading from column 0
909
try {
910             internalGetCellContents(0, row_index);
911             // Return because the record is valid.
912
return true;
913           }
914           catch (Throwable JavaDoc e) {
915             // If an exception is generated when accessing the data, delete the
916
// record.
917
terminal.println("+ Error accessing record: " + e.getMessage());
918           }
919
920         }
921
922         // If we get here, the record needs to be deleted and added to the delete
923
// chain
924
terminal.println("+ Record area not valid: row = " + row_index +
925                          " pointer = " + record_p);
926         terminal.println("+ Deleting record.");
927       }
928       // Put this record in the delete chain
929
block_area.position(p);
930       block_area.putInt(0x020000);
931       block_area.putLong(first_delete_chain_record);
932       block_area.checkOut();
933       first_delete_chain_record = row_index;
934
935       return false;
936
937     }
938
939   }
940   
941
942
943   /**
944    * Grows the list structure to accomodate more entries. The new entries
945    * are added to the free chain pool. Assumes we are synchronized over
946    * list_structure.
947    */

948   private void growListStructure() throws IOException {
949     try {
950       store.lockForWrite();
951
952       // Increase the size of the list structure.
953
list_structure.increaseSize();
954       // The start record of the new size
955
int new_block_number = list_structure.listBlockCount() - 1;
956       long start_index =
957                      list_structure.listBlockFirstPosition(new_block_number);
958       long size_of_block = list_structure.listBlockNodeCount(new_block_number);
959
960       // The Area object for the new position
961
MutableArea a = list_structure.positionOnNode(start_index);
962         
963       // Set the rest of the block as deleted records
964
for (long n = 0; n < size_of_block - 1; ++n) {
965         a.putInt(0x020000);
966         a.putLong(start_index + n + 1);
967       }
968       // The last block is end of delete chain.
969
a.putInt(0x020000);
970       a.putLong(first_delete_chain_record);
971       a.checkOut();
972       // And set the new delete chain
973
first_delete_chain_record = start_index;
974       // Set the reserved area
975
list_structure.setReservedLong(first_delete_chain_record);
976
977     }
978     finally {
979       store.unlockForWrite();
980     }
981
982   }
983   
984   /**
985    * Adds a record to the given position in the fixed structure. If the place
986    * is already used by a record then an exception is thrown, otherwise the
987    * record is set.
988    */

989   private long addToRecordList(long index, long record_p) throws IOException {
990     synchronized (list_structure) {
991       if (has_shutdown) {
992         throw new IOException("IO operation while VM shutting down.");
993       }
994       
995       long addr_count = list_structure.addressableNodeCount();
996       // First make sure there are enough nodes to accomodate this entry,
997
while (index >= addr_count) {
998         growListStructure();
999         addr_count = list_structure.addressableNodeCount();
1000      }
1001
1002      // Remove this from the delete chain by searching for the index in the
1003
// delete chain.
1004
long prev = -1;
1005      long chain = first_delete_chain_record;
1006      while (chain != -1 && chain != index) {
1007        Area a = list_structure.positionOnNode(chain);
1008        if (a.getInt() == 0x020000) {
1009          prev = chain;
1010          chain = a.getLong();
1011        }
1012        else {
1013          throw new IOException("Not deleted record is in delete chain!");
1014        }
1015      }
1016      // Wasn't found
1017
if (chain == -1) {
1018        throw new IOException(
1019                     "Unable to add record because index is not available.");
1020      }
1021      // Read the next entry in the delete chain.
1022
Area a = list_structure.positionOnNode(chain);
1023      if (a.getInt() != 0x020000) {
1024        throw new IOException("Not deleted record is in delete chain!");
1025      }
1026      long next_p = a.getLong();
1027
1028      try {
1029        store.lockForWrite();
1030
1031        // If prev == -1 then first_delete_chain_record points to this record
1032
if (prev == -1) {
1033          first_delete_chain_record = next_p;
1034          list_structure.setReservedLong(first_delete_chain_record);
1035        }
1036        else {
1037          // Otherwise we need to set the previous node to point to the next node
1038
MutableArea ma = list_structure.positionOnNode(prev);
1039          ma.putInt(0x020000);
1040          ma.putLong(next_p);
1041          ma.checkOut();
1042        }
1043
1044        // Finally set the record_p
1045
MutableArea ma = list_structure.positionOnNode(index);
1046        ma.putInt(0);
1047        ma.putLong(record_p);
1048        ma.checkOut();
1049
1050      }
1051      finally {
1052        store.unlockForWrite();
1053      }
1054
1055    }
1056    
1057    return index;
1058  }
1059
1060  /**
1061   * Finds a free place to add a record and returns an index to the record here.
1062   * This may expand the record space as necessary if there are no free record
1063   * slots to use.
1064   */

1065  private long addToRecordList(long record_p) throws IOException {
1066    synchronized (list_structure) {
1067      if (has_shutdown) {
1068        throw new IOException("IO operation while VM shutting down.");
1069      }
1070
1071      // If there are no free deleted records in the delete chain,
1072
if (first_delete_chain_record == -1) {
1073        // Grow the fixed structure to allow more nodes,
1074
growListStructure();
1075      }
1076    
1077      // Pull free block from the delete chain and recycle it.
1078
long recycled_record = first_delete_chain_record;
1079      MutableArea block = list_structure.positionOnNode(recycled_record);
1080      int rec_pos = block.position();
1081      // Status of the recycled block
1082
int status = block.getInt();
1083      if ((status & 0x020000) == 0) {
1084        throw new Error JavaDoc("Assertion failed: record is not deleted. " +
1085                        "status = " + status + ", rec_pos = " + rec_pos);
1086      }
1087      // The pointer to the next in the chain.
1088
long next_chain = block.getLong();
1089      first_delete_chain_record = next_chain;
1090
1091      try {
1092
1093        store.lockForWrite();
1094
1095        // Update the first_delete_chain_record field in the header
1096
list_structure.setReservedLong(first_delete_chain_record);
1097        // Update the block
1098
block.position(rec_pos);
1099        block.putInt(0);
1100        block.putLong(record_p);
1101        block.checkOut();
1102
1103      }
1104      finally {
1105        store.unlockForWrite();
1106      }
1107
1108      return recycled_record;
1109
1110    }
1111
1112  }
1113
1114 
1115  // ---------- Implemented from AbstractMasterTableDataSource ----------
1116

1117  String JavaDoc getSourceIdent() {
1118    return file_name;
1119  }
1120
1121
1122  int writeRecordType(int row_index, int row_state) throws IOException {
1123    synchronized (list_structure) {
1124      if (has_shutdown) {
1125        throw new IOException("IO operation while VM shutting down.");
1126      }
1127
1128      // Find the record entry in the block list.
1129
MutableArea block_area = list_structure.positionOnNode(row_index);
1130      int pos = block_area.position();
1131      // Get the status.
1132
int old_status = block_area.getInt();
1133      int mod_status = (old_status & 0x0FFFF0000) | (row_state & 0x0FFFF);
1134
1135      // Write the new status
1136
try {
1137
1138        store.lockForWrite();
1139
1140        block_area.position(pos);
1141        block_area.putInt(mod_status);
1142        block_area.checkOut();
1143
1144      }
1145      finally {
1146        store.unlockForWrite();
1147      }
1148
1149      return old_status & 0x0FFFF;
1150    }
1151  }
1152
1153
1154  int readRecordType(int row_index) throws IOException {
1155    synchronized (list_structure) {
1156      // Find the record entry in the block list.
1157
Area block_area = list_structure.positionOnNode(row_index);
1158      // Get the status.
1159
return block_area.getInt() & 0x0FFFF;
1160    }
1161  }
1162
1163  
1164  boolean recordDeleted(int row_index) throws IOException {
1165    synchronized (list_structure) {
1166      // Find the record entry in the block list.
1167
Area block_area = list_structure.positionOnNode(row_index);
1168      // If the deleted bit set for the record
1169
return (block_area.getInt() & 0x020000) != 0;
1170    }
1171  }
1172
1173
1174  int rawRowCount() throws IOException {
1175    synchronized (list_structure) {
1176      long total = list_structure.addressableNodeCount();
1177      // 32-bit row limitation here - we should return a long.
1178
return (int) total;
1179    }
1180  }
1181
1182
1183  void internalDeleteRow(int row_index) throws IOException {
1184    long record_p;
1185    synchronized (list_structure) {
1186      if (has_shutdown) {
1187        throw new IOException("IO operation while VM shutting down.");
1188      }
1189
1190      // Find the record entry in the block list.
1191
MutableArea block_area = list_structure.positionOnNode(row_index);
1192      int p = block_area.position();
1193      int status = block_area.getInt();
1194      // Check it is not already deleted
1195
if ((status & 0x020000) != 0) {
1196        throw new IOException("Record is already marked as deleted.");
1197      }
1198      record_p = block_area.getLong();
1199
1200      // Update the status record.
1201
try {
1202        store.lockForWrite();
1203
1204        block_area.position(p);
1205        block_area.putInt(0x020000);
1206        block_area.putLong(first_delete_chain_record);
1207        block_area.checkOut();
1208        first_delete_chain_record = row_index;
1209        // Update the first_delete_chain_record field in the header
1210
list_structure.setReservedLong(first_delete_chain_record);
1211
1212        // If the record contains any references to blobs, remove the reference
1213
// here.
1214
removeAllBlobReferencesForRecord(record_p);
1215
1216        // Free the record from the store
1217
store.deleteArea(record_p);
1218
1219      }
1220      finally {
1221        store.unlockForWrite();
1222      }
1223
1224    }
1225
1226  }
1227
1228
1229  IndexSet createIndexSet() {
1230    return index_store.getSnapshotIndexSet();
1231  }
1232
1233
1234  void commitIndexSet(IndexSet index_set) {
1235    index_store.commitIndexSet(index_set);
1236    index_set.dispose();
1237  }
1238
1239  int internalAddRow(RowData data) throws IOException {
1240
1241    long row_number;
1242    int int_row_number;
1243
1244    // Write the record to the store.
1245
synchronized (list_structure) {
1246      long record_p = writeRecordToStore(data);
1247      // Now add this record into the record block list,
1248
row_number = addToRecordList(record_p);
1249      int_row_number = (int) row_number;
1250    }
1251
1252    // Update the cell cache as appropriate
1253
if (DATA_CELL_CACHING) {
1254      int row_cells = data.getColumnCount();
1255      for (int i = 0; i < row_cells; ++i) {
1256        // Put the row/column/TObject into the cache.
1257
cache.put(table_id, int_row_number, i, data.getCellData(i));
1258      }
1259    }
1260
1261    // Return the record index of the new data in the table
1262
// NOTE: We are casting this from a long to int which means we are limited
1263
// to ~2 billion record references.
1264
return (int) row_number;
1265
1266  }
1267  
1268  
1269  synchronized void checkForCleanup() {
1270// index_store.cleanUpEvent();
1271
garbage_collector.performCollectionEvent(false);
1272  }
1273  
1274  
1275  
1276  // ---- getCellContents ----
1277

1278  private void skipStream(InputStream in, final long amount)
1279                                                          throws IOException {
1280    long count = amount;
1281    long skipped = 0;
1282    while (skipped < amount) {
1283      long last_skipped = in.skip(count);
1284      skipped += last_skipped;
1285      count -= last_skipped;
1286    }
1287  }
1288
1289  
1290// private short s_run_total_hits = 0;
1291
private short s_run_file_hits = Short.MAX_VALUE;
1292
1293  // ---- Optimization that saves some cycles -----
1294

1295  TObject internalGetCellContents(int column, int row) {
1296
1297    // NOTES:
1298
// This is called *A LOT*. It's a key part of the 20% of the program
1299
// that's run 80% of the time.
1300
// This performs very nicely for rows that are completely contained within
1301
// 1 sector. However, rows that contain large cells (eg. a large binary
1302
// or a large string) and spans many sectors will not be utilizing memory
1303
// as well as it could.
1304
// The reason is because all the data for a row is read from the store even
1305
// if only 1 cell of the column is requested. This will have a big
1306
// impact on column scans and searches. The cell cache takes some of this
1307
// performance bottleneck away.
1308
// However, a better implementation of this method is made difficult by
1309
// the fact that sector spans can be compressed. We should perhaps
1310
// revise the low level data storage so only sectors can be compressed.
1311

1312// // If the database stats need updating then do so now.
1313
// if (s_run_total_hits >= 1600) {
1314
// getSystem().stats().add(s_run_total_hits, total_hits_key);
1315
// getSystem().stats().add(s_run_file_hits, file_hits_key);
1316
// s_run_total_hits = 0;
1317
// s_run_file_hits = 0;
1318
// }
1319

1320// // Increment the total hits counter
1321
// ++s_run_total_hits;
1322

1323    // First check if this is within the cache before we continue.
1324
TObject cell;
1325    if (DATA_CELL_CACHING) {
1326      cell = cache.get(table_id, row, column);
1327      if (cell != null) {
1328        return cell;
1329      }
1330    }
1331
1332    // We maintain a cache of byte[] arrays that contain the rows read in
1333
// from the file. If consequtive reads are made to the same row, then
1334
// this will cause lots of fast cache hits.
1335

1336    long record_p = -1;
1337    try {
1338      synchronized (list_structure) {
1339
1340        // Increment the file hits counter
1341
++s_run_file_hits;
1342
1343        if (s_run_file_hits >= 100) {
1344          getSystem().stats().add(s_run_file_hits, file_hits_key);
1345          s_run_file_hits = 0;
1346        }
1347
1348        // Get the node for the record
1349
Area list_block = list_structure.positionOnNode(row);
1350        int status = list_block.getInt();
1351        // Check it's not deleted
1352
if ((status & 0x020000) != 0) {
1353          throw new Error JavaDoc("Unable to read deleted record.");
1354        }
1355        // Get the pointer to the record we are reading
1356
record_p = list_block.getLong();
1357
1358      }
1359        
1360      // Open a stream to the record
1361
DataInputStream din = getDIn(store.getAreaInputStream(record_p));
1362
1363      skipStream(din, 4 + (column * 8));
1364      int cell_type = din.readInt();
1365      int cell_offset = din.readInt();
1366  
1367      int cur_at = 8 + 4 + (column * 8);
1368      int be_at = 4 + (column_count * 8);
1369      int skip_amount = (be_at - cur_at) + cell_offset;
1370
1371      skipStream(din, skip_amount);
1372        
1373      Object JavaDoc ob;
1374      if (cell_type == 1) {
1375        // If standard object type
1376
ob = ObjectTransfer.readFrom(din);
1377      }
1378      else if (cell_type == 2) {
1379        // If reference to a blob in the BlobStore
1380
int f_type = din.readInt();
1381        int f_reserved = din.readInt();
1382        long ref_id = din.readLong();
1383        if (f_type == 0) {
1384          // Resolve the reference
1385
ob = blob_store_interface.getLargeObject(ref_id);
1386        }
1387        else if (f_type == 1) {
1388          ob = null;
1389        }
1390        else {
1391          throw new RuntimeException JavaDoc("Unknown blob type.");
1392        }
1393      }
1394      else {
1395        throw new RuntimeException JavaDoc("Unrecognised cell type in data.");
1396      }
1397        
1398      // Get the TType for this column
1399
// NOTE: It's possible this call may need optimizing?
1400
TType ttype = getDataTableDef().columnAt(column).getTType();
1401      // Wrap it around a TObject
1402
cell = new TObject(ttype, ob);
1403  
1404      // And close the reader.
1405
din.close();
1406
1407    }
1408    catch (IOException e) {
1409      Debug().writeException(e);
1410// System.out.println("Pointer = " + row_pointer);
1411
throw new RuntimeException JavaDoc("IOError getting cell at (" + column + ", " +
1412                                 row + ") pointer = " + record_p + ".");
1413    }
1414
1415    // And put in the cache and return it.
1416
if (DATA_CELL_CACHING) {
1417      cache.put(table_id, row, column, cell);
1418    }
1419
1420    return cell;
1421
1422  }
1423
1424
1425  long currentUniqueID() {
1426    synchronized (list_structure) {
1427      return sequence_id - 1;
1428    }
1429  }
1430  
1431  
1432  long nextUniqueID() {
1433    synchronized (list_structure) {
1434      long v = sequence_id;
1435      ++sequence_id;
1436      if (has_shutdown) {
1437        throw new RuntimeException JavaDoc("IO operation while VM shutting down.");
1438      }
1439      try {
1440        try {
1441          store.lockForWrite();
1442          header_area.position(4 + 4);
1443          header_area.putLong(sequence_id);
1444          header_area.checkOut();
1445        }
1446        finally {
1447          store.unlockForWrite();
1448        }
1449      }
1450      catch (IOException e) {
1451        Debug().writeException(e);
1452        throw new Error JavaDoc("IO Error: " + e.getMessage());
1453      }
1454      return v;
1455    }
1456  }
1457
1458
1459  void setUniqueID(long value) {
1460    synchronized (list_structure) {
1461      sequence_id = value;
1462      if (has_shutdown) {
1463        throw new RuntimeException JavaDoc("IO operation while VM shutting down.");
1464      }
1465      try {
1466        try {
1467          store.lockForWrite();
1468          header_area.position(4 + 4);
1469          header_area.putLong(sequence_id);
1470          header_area.checkOut();
1471        }
1472        finally {
1473          store.unlockForWrite();
1474        }
1475      }
1476      catch (IOException e) {
1477        Debug().writeException(e);
1478        throw new Error JavaDoc("IO Error: " + e.getMessage());
1479      }
1480    }
1481  }
1482
1483  synchronized void dispose(boolean pending_drop) throws IOException {
1484    synchronized (list_structure) {
1485      if (!is_closed) {
1486        close(pending_drop);
1487      }
1488    }
1489  }
1490  
1491  synchronized boolean drop() throws IOException {
1492    synchronized (list_structure) {
1493      
1494      if (!is_closed) {
1495        close(true);
1496      }
1497      
1498      boolean b = storeSystem().deleteStore(store);
1499      if (b) {
1500        Debug().write(Lvl.MESSAGE, this, "Dropped: " + getSourceIdent());
1501      }
1502      return b;
1503
1504    }
1505  }
1506  
1507  void shutdownHookCleanup() {
1508// try {
1509
synchronized (list_structure) {
1510        index_store.close();
1511// store.synch();
1512
has_shutdown = true;
1513      }
1514// }
1515
// catch (IOException e) {
1516
// Debug().write(Lvl.ERROR, this, "IO Error during shutdown hook.");
1517
// Debug().writeException(e);
1518
// }
1519
}
1520
1521  boolean isWorthCompacting() {
1522    // PENDING: We should perform some analysis on the data to decide if a
1523
// compact is necessary or not.
1524
return true;
1525  }
1526
1527
1528  /**
1529   * For diagnostic.
1530   */

1531  public String JavaDoc toString() {
1532    return "[V2MasterTableDataSource: " + file_name + "]";
1533  }
1534
1535}
1536
1537
Popular Tags