KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > mckoi > database > BlobStore


1 /**
2  * com.mckoi.database.BlobStore 18 Jan 2003
3  *
4  * Mckoi SQL Database ( http://www.mckoi.com/database )
5  * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * Version 2 as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License Version 2 for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * Version 2 along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19  *
20  * Change Log:
21  *
22  *
23  */

24
25 package com.mckoi.database;
26
27 import java.util.ArrayList JavaDoc;
28 import java.util.zip.Deflater JavaDoc;
29 import java.util.zip.Inflater JavaDoc;
30 import java.util.zip.DataFormatException JavaDoc;
31 import java.io.IOException JavaDoc;
32 import java.io.InputStream JavaDoc;
33 import java.io.Reader JavaDoc;
34 import com.mckoi.util.PagedInputStream;
35 import com.mckoi.store.Store;
36 import com.mckoi.store.Area;
37 import com.mckoi.store.MutableArea;
38 import com.mckoi.store.AreaWriter;
39 import com.mckoi.database.jdbc.AsciiReader;
40 import com.mckoi.database.jdbc.BinaryToUnicodeReader;
41 import com.mckoi.database.global.Ref;
42 import com.mckoi.database.global.BlobRef;
43 import com.mckoi.database.global.ClobRef;
44 import com.mckoi.database.global.ByteLongObject;
45
46 /**
47  * A structure inside an Area that maintains the storage of any number of large
48  * binary objects. A blob store allows for the easy allocation of areas for
49  * storing blob data and for reading and writing blob information via BlobRef
50  * objects.
51  * <p>
52  * A BlobStore can be broken down to the following simplistic functions;
53  * <p>
54  * 1) Allocation of an area to store a new blob.<br>
55  * 2) Reading the information in a Blob given a Blob reference identifier.<br>
56  * 3) Reference counting to a particular Blob.<br>
57  * 4) Cleaning up a Blob when no static references are left.<br>
58  *
59  * @author Tobias Downer
60  */

61
62 final class BlobStore implements BlobStoreInterface {
63
64   /**
65    * The magic value for fixed record list structures.
66    */

67   private final static int MAGIC = 0x012BC53A9;
68   
69   /**
70    * The outer Store object that is to contain the blob store.
71    */

72   private Store store;
73
74   /**
75    * The FixedRecordList structure that maintains a list of fixed size records
76    * for blob reference counting.
77    */

78   private FixedRecordList fixed_list;
79   
80   /**
81    * The first delete chain element.
82    */

83   private long first_delete_chain_record;
84   
85
86   /**
87    * Constructs the BlobStore on the given Area object.
88    */

89   BlobStore(Store store) {
90     this.store = store;
91     fixed_list = new FixedRecordList(store, 24);
92   }
93   
94   
95   /**
96    * Creates the blob store and returns a pointer in the store to the header
97    * information. This value is later used to initialize the store.
98    */

99   long create() throws IOException JavaDoc {
100     // Init the fixed record list area.
101
// The fixed list entries are formatted as follows;
102
// ( status (int), reference_count (int),
103
// blob_size (long), blob_pointer (long) )
104
long fixed_list_p = fixed_list.create();
105
106     // Delete chain is empty when we start
107
first_delete_chain_record = -1;
108     fixed_list.setReservedLong(-1);
109     
110     // Allocate a small header that contains the MAGIC, and the pointer to the
111
// fixed list structure.
112
AreaWriter blob_store_header = store.createArea(32);
113     long blob_store_p = blob_store_header.getID();
114     // Write the blob store header information
115
// The magic
116
blob_store_header.putInt(MAGIC);
117     // The version
118
blob_store_header.putInt(1);
119     // The pointer to the fixed list area
120
blob_store_header.putLong(fixed_list_p);
121     // And finish
122
blob_store_header.finish();
123     
124     // Return the pointer to the blob store header
125
return blob_store_p;
126   }
127   
128   /**
129    * Initializes the blob store given a pointer to the blob store pointer
130    * header (the value previously returned by the 'create' method).
131    */

132   void init(long blob_store_p) throws IOException JavaDoc {
133     // Get the header area
134
Area blob_store_header = store.getArea(blob_store_p);
135     blob_store_header.position(0);
136     // Read the magic
137
int magic = blob_store_header.getInt();
138     int version = blob_store_header.getInt();
139     if (magic != MAGIC) {
140       throw new IOException JavaDoc("MAGIC value for BlobStore is not correct.");
141     }
142     if (version != 1) {
143       throw new IOException JavaDoc("version number for BlobStore is not correct.");
144     }
145
146     // Read the pointer to the fixed area
147
long fixed_list_p = blob_store_header.getLong();
148     // Init the FixedRecordList area
149
fixed_list.init(fixed_list_p);
150     
151     // Set the delete chain
152
first_delete_chain_record = fixed_list.getReservedLong();
153   }
154
155
156   /**
157    * Simple structure used when copying blob information.
158    */

159   private static class CopyBlobInfo {
160     int ref_count;
161     long size;
162     long ob_p;
163   };
164   
165   /**
166    * Copies all the blob data from the given BlobStore into this blob store.
167    * Any blob information that already exists within this BlobStore is deleted.
168    * We assume this method is called after the blob store is created or
169    * initialized.
170    */

171   void copyFrom(StoreSystem store_system,
172                 BlobStore src_blob_store) throws IOException JavaDoc {
173     FixedRecordList src_fixed_list = src_blob_store.fixed_list;
174     long node_count;
175     synchronized (src_fixed_list) {
176       node_count = src_fixed_list.addressableNodeCount();
177     }
178
179     synchronized (fixed_list) {
180
181       // Make sure our fixed_list is big enough to accomodate the copied list,
182
while (fixed_list.addressableNodeCount() < node_count) {
183         fixed_list.increaseSize();
184       }
185       
186       // We rearrange the delete chain
187
long last_deleted = -1;
188
189       // We copy blobs in groups no larger than 1024 Blobs
190
final int BLOCK_WRITE_COUNT = 1024;
191       
192       int max_to_read = (int) Math.min(BLOCK_WRITE_COUNT, node_count);
193       long p = 0;
194
195       while (max_to_read > 0) {
196         // (CopyBlboInfo)
197
ArrayList JavaDoc src_copy_list = new ArrayList JavaDoc();
198
199         synchronized (src_fixed_list) {
200           for (int i = 0; i < max_to_read; ++i) {
201             Area a = src_fixed_list.positionOnNode(p + i);
202             int status = a.getInt();
203             // If record is not deleted
204
if (status != 0x020000) {
205               CopyBlobInfo info = new CopyBlobInfo();
206               info.ref_count = a.getInt();
207               info.size = a.getLong();
208               info.ob_p = a.getLong();
209               src_copy_list.add(info);
210             }
211             else {
212               src_copy_list.add(null);
213             }
214           }
215         }
216
217         try {
218           store.lockForWrite();
219         
220           // We now should have a list of all records from the src to copy,
221
int sz = src_copy_list.size();
222           for (int i = 0; i < sz; ++i) {
223             CopyBlobInfo info = (CopyBlobInfo) src_copy_list.get(i);
224             MutableArea a = fixed_list.positionOnNode(p + i);
225             // Either set a deleted entry or set the entry with a copied blob.
226
if (info == null) {
227               a.putInt(0x020000);
228               a.putInt(0);
229               a.putLong(-1);
230               a.putLong(last_deleted);
231               a.checkOut();
232               last_deleted = p + i;
233             }
234             else {
235               // Get the Area containing the blob header data in the source
236
// store
237
Area src_blob_header = src_blob_store.store.getArea(info.ob_p);
238               // Read the information from the header,
239
int res = src_blob_header.getInt();
240               int type = src_blob_header.getInt();
241               long total_block_size = src_blob_header.getLong();
242               long total_block_pages = src_blob_header.getLong();
243             
244               // Allocate a new header
245
AreaWriter dst_blob_header = store.createArea(
246                                       4 + 4 + 8 + 8 + (total_block_pages * 8));
247               long new_ob_header_p = dst_blob_header.getID();
248               // Copy information into the header
249
dst_blob_header.putInt(res);
250               dst_blob_header.putInt(type);
251               dst_blob_header.putLong(total_block_size);
252               dst_blob_header.putLong(total_block_pages);
253             
254               // Allocate and copy each page,
255
for (int n = 0; n < total_block_pages; ++n) {
256                 // Get the block information
257
long block_p = src_blob_header.getLong();
258                 Area src_block = src_blob_store.store.getArea(block_p);
259                 int block_type = src_block.getInt();
260                 int block_size = src_block.getInt();
261                 // Copy a new block,
262
int new_block_size = block_size + 4 + 4;
263                 AreaWriter dst_block_p = store.createArea(new_block_size);
264                 long new_block_p = dst_block_p.getID();
265                 src_block.position(0);
266                 src_block.copyTo(dst_block_p, new_block_size);
267                 // And finish
268
dst_block_p.finish();
269                 // Write the new header
270
dst_blob_header.putLong(new_block_p);
271               }
272
273               // And finish 'dst_blob_header'
274
dst_blob_header.finish();
275             
276               // Set up the data in the fixed list
277
a.putInt(1);
278               // Note all the blobs are written with 0 reference count.
279
a.putInt(0);
280               a.putLong(info.size);
281               a.putLong(new_ob_header_p);
282               // Check out the changes
283
a.checkOut();
284             }
285           }
286
287         }
288         finally {
289           store.unlockForWrite();
290         }
291           
292         node_count -= max_to_read;
293         p += max_to_read;
294         max_to_read = (int) Math.min(BLOCK_WRITE_COUNT, node_count);
295
296         // Set a checkpoint in the destination store system so we write out
297
// all pending changes from the log
298
store_system.setCheckPoint();
299
300       }
301
302       // Set the delete chain
303
first_delete_chain_record = last_deleted;
304       fixed_list.setReservedLong(last_deleted);
305
306     } // synchronized (fixed_list)
307

308   }
309   
310   /**
311    * Convenience method that converts the given String into a ClobRef
312    * object and pushes it into the given BlobStore object.
313    */

314   ClobRef putStringInBlobStore(String JavaDoc str) throws IOException JavaDoc {
315     final int BUF_SIZE = 64 * 1024;
316     
317     int size = str.length();
318
319     byte type = 4;
320     // Enable compression (ISSUE: Should this be enabled by default?)
321
type = (byte) (type | 0x010);
322
323     ClobRef ref = (ClobRef) allocateLargeObject(type, size * 2);
324     byte[] buf = new byte[BUF_SIZE];
325     long p = 0;
326     int str_i = 0;
327     while (size > 0) {
328       int to_write = Math.min(BUF_SIZE / 2, size);
329       int buf_i = 0;
330       for (int i = 0; i < to_write; ++i) {
331         char c = str.charAt(str_i);
332         buf[buf_i] = (byte) (c >> 8);
333         ++buf_i;
334         buf[buf_i] = (byte) c;
335         ++buf_i;
336         ++str_i;
337       }
338       ref.write(p, buf, buf_i);
339       size -= to_write;
340       p += to_write * 2;
341     }
342
343     ref.complete();
344
345     return ref;
346   }
347
348   /**
349    * Convenience method that converts the given ByteLongObject into a
350    * BlobRef object and pushes it into the given BlobStore object.
351    */

352   BlobRef putByteLongObjectInBlobStore(ByteLongObject blob) throws IOException JavaDoc {
353
354     final int BUF_SIZE = 64 * 1024;
355
356     byte[] src_buf = blob.getByteArray();
357     final int size = src_buf.length;
358     BlobRef ref = (BlobRef) allocateLargeObject((byte) 2, size);
359
360     byte[] copy_buf = new byte[BUF_SIZE];
361     int offset = 0;
362     int to_write = Math.min(BUF_SIZE, size);
363
364     while (to_write > 0) {
365       System.arraycopy(src_buf, offset, copy_buf, 0, to_write);
366       ref.write(offset, copy_buf, to_write);
367
368       offset += to_write;
369       to_write = Math.min(BUF_SIZE, (size - offset));
370     }
371
372     ref.complete();
373
374     return ref;
375   }
376
377   /**
378    * Finds a free place to add a record and returns an index to the record here.
379    * This may expand the record space as necessary if there are no free record
380    * slots to use.
381    * <p>
382    * NOTE: Unfortunately this is cut-and-paste from the way
383    * V2MasterTableDataSource manages recycled elements.
384    */

385   private long addToRecordList(long record_p) throws IOException JavaDoc {
386
387     synchronized (fixed_list) {
388       // If there is no free deleted records in the delete chain,
389
if (first_delete_chain_record == -1) {
390   
391         // Increase the size of the list structure.
392
fixed_list.increaseSize();
393         // The start record of the new size
394
int new_block_number = fixed_list.listBlockCount() - 1;
395         long start_index = fixed_list.listBlockFirstPosition(new_block_number);
396         long size_of_block = fixed_list.listBlockNodeCount(new_block_number);
397         // The Area object for the new position
398
MutableArea a = fixed_list.positionOnNode(start_index);
399         
400         a.putInt(0);
401         a.putInt(0);
402         a.putLong(-1); // Initially unknown size
403
a.putLong(record_p);
404         // Set the rest of the block as deleted records
405
for (long n = 1; n < size_of_block - 1; ++n) {
406           a.putInt(0x020000);
407           a.putInt(0);
408           a.putLong(-1);
409           a.putLong(start_index + n + 1);
410         }
411         // The last block is end of delete chain.
412
a.putInt(0x020000);
413         a.putInt(0);
414         a.putLong(-1);
415         a.putLong(-1);
416         // Check out the changes.
417
a.checkOut();
418         // And set the new delete chain
419
first_delete_chain_record = start_index + 1;
420         // Set the reserved area
421
fixed_list.setReservedLong(first_delete_chain_record);
422 // // Flush the changes to the store
423
// store.flush();
424

425         // Return pointer to the record we just added.
426
return start_index;
427         
428       }
429       else {
430   
431         // Pull free block from the delete chain and recycle it.
432
long recycled_record = first_delete_chain_record;
433         MutableArea block = fixed_list.positionOnNode(recycled_record);
434         int rec_pos = block.position();
435         // Status of the recycled block
436
int status = block.getInt();
437         if ((status & 0x020000) == 0) {
438           throw new Error JavaDoc("Assertion failed: record is not deleted!");
439         }
440         // Reference count (currently unused in delete chains).
441
block.getInt();
442         // The size (should be -1);
443
block.getLong();
444         // The pointer to the next in the chain.
445
long next_chain = block.getLong();
446         first_delete_chain_record = next_chain;
447         // Update the first_delete_chain_record field in the header
448
fixed_list.setReservedLong(first_delete_chain_record);
449         // Update the block
450
block.position(rec_pos);
451         block.putInt(0);
452         block.putInt(0);
453         block.putLong(-1); // Initially unknown size
454
block.putLong(record_p);
455         // Check out the changes
456
block.checkOut();
457   
458         return recycled_record;
459       }
460     }
461
462   }
463   
464   
465   
466   /**
467    * Allocates an area in the store for a large binary object to be stored.
468    * After the blob area is allocated the blob may be written. This returns
469    * a BlobRef object for future access to the blob.
470    * <p>
471    * A newly allocated blob is read and write enabled. A call to the
472    * 'completeBlob' method must be called to finalize the blob at which point
473    * the blob becomes a static read-only object.
474    */

475   Ref allocateLargeObject(byte type, long size) throws IOException JavaDoc {
476     if (size < 0) {
477       throw new IOException JavaDoc("Negative blob size not allowed.");
478     }
479
480     try {
481       store.lockForWrite();
482
483       // Allocate the area (plus header area) for storing the blob pages
484
long page_count = ((size - 1) / (64 * 1024)) + 1;
485       AreaWriter blob_area = store.createArea((page_count * 8) + 24);
486       long blob_p = blob_area.getID();
487       // Set up the area header
488
blob_area.putInt(0); // Reserved for future
489
blob_area.putInt(type);
490       blob_area.putLong(size);
491       blob_area.putLong(page_count);
492       // Initialize the empty blob area
493
for (long i = 0; i < page_count; ++i) {
494         blob_area.putLong(-1);
495       }
496       // And finish
497
blob_area.finish();
498   
499       // Update the fixed_list and return the record number for this blob
500
long reference_id = addToRecordList(blob_p);
501       byte st_type = (byte) (type & 0x0F);
502       if (st_type == 2) {
503         // Create a BlobRef implementation that can access this blob
504
return new BlobRefImpl(reference_id, type, size, true);
505       }
506       else if (st_type == 3) {
507         return new ClobRefImpl(reference_id, type, size, true);
508       }
509       else if (st_type == 4) {
510         return new ClobRefImpl(reference_id, type, size, true);
511       }
512       else {
513         throw new IOException JavaDoc("Unknown large object type");
514       }
515
516     }
517     finally {
518       store.unlockForWrite();
519     }
520
521   }
522
523   /**
524    * Returns a Ref object that allows read-only access to a large object in this
525    * blob store.
526    */

527   public Ref getLargeObject(long reference_id) throws IOException JavaDoc {
528
529     long blob_p;
530     long size;
531     synchronized (fixed_list) {
532
533       // Assert that the blob reference id given is a valid range
534
if (reference_id < 0 ||
535           reference_id >= fixed_list.addressableNodeCount()) {
536         throw new IOException JavaDoc("reference_id is out of range.");
537       }
538
539       // Position on this record
540
Area block = fixed_list.positionOnNode(reference_id);
541       // Read the information in the fixed record
542
int status = block.getInt();
543       // Assert that the status is not deleted
544
if ((status & 0x020000) != 0) {
545         throw new Error JavaDoc("Assertion failed: record is deleted!");
546       }
547       // Get the reference count
548
int reference_count = block.getInt();
549       // Get the total size of the blob
550
size = block.getLong();
551       // Get the blob pointer
552
blob_p = block.getLong();
553
554     }
555       
556     Area blob_area = store.getArea(blob_p);
557     blob_area.position(0);
558     blob_area.getInt(); // (reserved)
559
// Read the type
560
byte type = (byte) blob_area.getInt();
561     // The size of the block
562
long block_size = blob_area.getLong();
563     // The number of pages in the blob
564
long page_count = blob_area.getLong();
565
566     if (type == (byte) 2) {
567       // Create a new BlobRef object.
568
return new BlobRefImpl(reference_id, type, size, false);
569     }
570     else {
571       // Create a new ClobRef object.
572
return new ClobRefImpl(reference_id, type, size, false);
573     }
574   }
575   
576   /**
577    * Call this to complete a blob in the store after a blob has been completely
578    * written. Only BlobRef implementations returned by the 'allocateBlob'
579    * method are accepted.
580    */

581   void completeBlob(AbstractRef ref) throws IOException JavaDoc {
582     // Assert that the BlobRef is open and allocated
583
ref.assertIsOpen();
584     // Get the blob reference id (reference to the fixed record list).
585
long blob_reference_id = ref.getID();
586
587     synchronized (fixed_list) {
588
589       // Update the record in the fixed list.
590
MutableArea block = fixed_list.positionOnNode(blob_reference_id);
591       // Record the position
592
int rec_pos = block.position();
593       // Read the information in the fixed record
594
int status = block.getInt();
595       // Assert that the status is open
596
if (status != 0) {
597         throw new IOException JavaDoc("Assertion failed: record is not open.");
598       }
599       int reference_count = block.getInt();
600       long size = block.getLong();
601       long page_count = block.getLong();
602
603       try {
604         store.lockForWrite();
605
606         // Set the fixed blob record as complete.
607
block.position(rec_pos);
608         // Write the new status
609
block.putInt(1);
610         // Write the reference count
611
block.putInt(0);
612         // Write the completed size
613
block.putLong(ref.getRawSize());
614         // Write the pointer
615
block.putLong(page_count);
616         // Check out the change
617
block.checkOut();
618
619       }
620       finally {
621         store.unlockForWrite();
622       }
623       
624     }
625     // Now the blob has been finalized so change the state of the BlobRef
626
// object.
627
ref.close();
628       
629   }
630
631   /**
632    * Tells the BlobStore that a static reference has been established in a
633    * table to the blob referenced by the given id. This is used to count
634    * references to a blob, and possibly clean up a blob if there are no
635    * references remaining to it.
636    * <p>
637    * NOTE: It is the responsibility of the callee to establish a 'lockForWrite'
638    * lock on the store before this is used.
639    */

640   public void establishReference(long blob_reference_id) {
641     try {
642       synchronized (fixed_list) {
643         // Update the record in the fixed list.
644
MutableArea block = fixed_list.positionOnNode(blob_reference_id);
645         // Record the position
646
int rec_pos = block.position();
647         // Read the information in the fixed record
648
int status = block.getInt();
649         // Assert that the status is static
650
if (status != 1) {
651           throw new RuntimeException JavaDoc("Assertion failed: record is not static.");
652         }
653         int reference_count = block.getInt();
654   
655         // Set the fixed blob record as complete.
656
block.position(rec_pos + 4);
657         // Write the reference count + 1
658
block.putInt(reference_count + 1);
659         // Check out the change
660
block.checkOut();
661       }
662 // // Flush all changes to the store.
663
// store.flush();
664
}
665     catch (IOException JavaDoc e) {
666       throw new RuntimeException JavaDoc("IO Error: " + e.getMessage());
667     }
668   }
669   
670   /**
671    * Tells the BlobStore that a static reference has been released to the
672    * given blob. This would typically be called when the row in the database
673    * is removed.
674    * <p>
675    * NOTE: It is the responsibility of the callee to establish a 'lockForWrite'
676    * lock on the store before this is used.
677    */

678   public void releaseReference(long blob_reference_id) {
679     try {
680       synchronized (fixed_list) {
681         // Update the record in the fixed list.
682
MutableArea block = fixed_list.positionOnNode(blob_reference_id);
683         // Record the position
684
int rec_pos = block.position();
685         // Read the information in the fixed record
686
int status = block.getInt();
687         // Assert that the status is static
688
if (status != 1) {
689           throw new RuntimeException JavaDoc("Assertion failed: " +
690                               "Record is not static (status = " + status + ")");
691         }
692         int reference_count = block.getInt();
693         if (reference_count == 0) {
694           throw new RuntimeException JavaDoc(
695                               "Releasing when Blob reference counter is at 0.");
696         }
697
698         long object_size = block.getLong();
699         long object_p = block.getLong();
700         
701         // If reference count == 0 then we need to free all the resources
702
// associated with this Blob in the blob store.
703
if ((reference_count - 1) == 0) {
704           // Free the resources associated with this object.
705
Area blob_area = store.getArea(object_p);
706           blob_area.getInt();
707           byte type = (byte) blob_area.getInt();
708           long total_size = blob_area.getLong();
709           long page_count = blob_area.getLong();
710           // Free all of the pages in this blob.
711
for (long i = 0; i < page_count; ++i) {
712             long page_p = blob_area.getLong();
713             if (page_p > 0) {
714               store.deleteArea(page_p);
715             }
716           }
717           // Free the blob area object itself.
718
store.deleteArea(object_p);
719           // Write out the blank record.
720
block.position(rec_pos);
721           block.putInt(0x020000);
722           block.putInt(0);
723           block.putLong(-1);
724           block.putLong(first_delete_chain_record);
725           // CHeck out these changes
726
block.checkOut();
727           first_delete_chain_record = blob_reference_id;
728           // Update the first_delete_chain_record field in the header
729
fixed_list.setReservedLong(first_delete_chain_record);
730         }
731         else {
732           // Simply decrement the reference counter for this record.
733
block.position(rec_pos + 4);
734           // Write the reference count - 1
735
block.putInt(reference_count - 1);
736           // Check out this change
737
block.checkOut();
738         }
739
740       }
741 // // Flush all changes to the store.
742
// store.flush();
743
}
744     catch (IOException JavaDoc e) {
745       throw new RuntimeException JavaDoc("IO Error: " + e.getMessage());
746     }
747   }
748   
749   
750   
751   /**
752    * Reads a section of the blob referenced by the given id, offset and length
753    * into the byte array.
754    */

755   private void readBlobByteArray(long reference_id, long offset,
756                           byte[] buf, int off, int length) throws IOException JavaDoc {
757
758     // ASSERT: Read and write position must be 64K aligned.
759
if (offset % (64 * 1024) != 0) {
760       throw new RuntimeException JavaDoc("Assert failed: offset is not 64k aligned.");
761     }
762     // ASSERT: Length is less than or equal to 64K
763
if (length > (64 * 1024)) {
764       throw new RuntimeException JavaDoc("Assert failed: length is greater than 64K.");
765     }
766
767     int status;
768     int reference_count;
769     long size;
770     long blob_p;
771
772     synchronized (fixed_list) {
773       
774       // Assert that the blob reference id given is a valid range
775
if (reference_id < 0 ||
776           reference_id >= fixed_list.addressableNodeCount()) {
777         throw new IOException JavaDoc("blob_reference_id is out of range.");
778       }
779
780       // Position on this record
781
Area block = fixed_list.positionOnNode(reference_id);
782       // Read the information in the fixed record
783
status = block.getInt();
784       // Assert that the status is not deleted
785
if ((status & 0x020000) != 0) {
786         throw new Error JavaDoc("Assertion failed: record is deleted!");
787       }
788       // Get the reference count
789
reference_count = block.getInt();
790       // Get the total size of the blob
791
size = block.getLong();
792       // Get the blob pointer
793
blob_p = block.getLong();
794
795     }
796
797     // Assert that the area being read is within the bounds of the blob
798
if (offset < 0 || offset + length > size) {
799       throw new IOException JavaDoc("Blob invalid read. offset = " + offset +
800                             ", length = " + length);
801     }
802
803     // Open an Area into the blob
804
Area blob_area = store.getArea(blob_p);
805     blob_area.getInt();
806     byte type = (byte) blob_area.getInt();
807     
808     // Convert to the page number
809
long page_number = (offset / (64 * 1024));
810     blob_area.position((int) ((page_number * 8) + 24));
811     long page_p = blob_area.getLong();
812
813     // Read the page
814
Area page_area = store.getArea(page_p);
815     page_area.position(0);
816     int page_type = page_area.getInt();
817     int page_size = page_area.getInt();
818     if ((type & 0x010) != 0) {
819       // The page is compressed
820
byte[] page_buf = new byte[page_size];
821       page_area.get(page_buf, 0, page_size);
822       Inflater JavaDoc inflater = new Inflater JavaDoc();
823       inflater.setInput(page_buf, 0, page_size);
824       try {
825         int result_length = inflater.inflate(buf, off, length);
826         if (result_length != length) {
827           throw new RuntimeException JavaDoc(
828                           "Assert failed: decompressed length is incorrect.");
829         }
830       }
831       catch (DataFormatException JavaDoc e) {
832         throw new IOException JavaDoc("ZIP Data Format Error: " + e.getMessage());
833       }
834       inflater.end();
835     }
836     else {
837       // The page is not compressed
838
page_area.get(buf, off, length);
839     }
840
841   }
842
843   /**
844    * Writes a section of the blob referenced by the given id, offset and
845    * length to the byte array. Note that this does not perform any checks on
846    * whether we are allowed to write to this blob.
847    */

848   private void writeBlobByteArray(long reference_id, long offset,
849                                   byte[] buf, int length) throws IOException JavaDoc {
850
851     // ASSERT: Read and write position must be 64K aligned.
852
if (offset % (64 * 1024) != 0) {
853       throw new RuntimeException JavaDoc("Assert failed: offset is not 64k aligned.");
854     }
855     // ASSERT: Length is less than or equal to 64K
856
if (length > (64 * 1024)) {
857       throw new RuntimeException JavaDoc("Assert failed: length is greater than 64K.");
858     }
859
860     int status;
861     int reference_count;
862     long size;
863     long blob_p;
864
865     synchronized (fixed_list) {
866       
867       // Assert that the blob reference id given is a valid range
868
if (reference_id < 0 ||
869           reference_id >= fixed_list.addressableNodeCount()) {
870         throw new IOException JavaDoc("blob_reference_id is out of range.");
871       }
872
873       // Position on this record
874
Area block = fixed_list.positionOnNode(reference_id);
875       // Read the information in the fixed record
876
status = block.getInt();
877       // Assert that the status is not deleted
878
if ((status & 0x020000) != 0) {
879         throw new Error JavaDoc("Assertion failed: record is deleted!");
880       }
881       // Get the reference count
882
reference_count = block.getInt();
883       // Get the total size of the blob
884
size = block.getLong();
885       // Get the blob pointer
886
blob_p = block.getLong();
887
888     }
889
890     // Open an Area into the blob
891
MutableArea blob_area = store.getMutableArea(blob_p);
892     blob_area.getInt();
893     byte type = (byte) blob_area.getInt();
894     size = blob_area.getLong();
895
896     // Assert that the area being read is within the bounds of the blob
897
if (offset < 0 || offset + length > size) {
898       throw new IOException JavaDoc("Blob invalid write. offset = " + offset +
899                             ", length = " + length + ", size = " + size);
900     }
901
902     // Convert to the page number
903
long page_number = (offset / (64 * 1024));
904     blob_area.position((int) ((page_number * 8) + 24));
905     long page_p = blob_area.getLong();
906
907     // Assert that 'page_p' is -1
908
if (page_p != -1) {
909       // This means we are trying to rewrite a page we've already written
910
// before.
911
throw new RuntimeException JavaDoc("Assert failed: page_p is not -1");
912     }
913
914     // Is the compression bit set?
915
byte[] to_write;
916     int write_length;
917     if ((type & 0x010) != 0) {
918       // Yes, compression
919
Deflater JavaDoc deflater = new Deflater JavaDoc();
920       deflater.setInput(buf, 0, length);
921       deflater.finish();
922       to_write = new byte[65 * 1024];
923       write_length = deflater.deflate(to_write);
924     }
925     else {
926       // No compression
927
to_write = buf;
928       write_length = length;
929     }
930
931     try {
932       store.lockForWrite();
933     
934       // Allocate and write the page.
935
AreaWriter page_area = store.createArea(write_length + 8);
936       page_p = page_area.getID();
937       page_area.putInt(1);
938       page_area.putInt(write_length);
939       page_area.put(to_write, 0, write_length);
940       // Finish this page
941
page_area.finish();
942
943       // Update the page in the header.
944
blob_area.position((int) ((page_number * 8) + 24));
945       blob_area.putLong(page_p);
946       // Check out this change.
947
blob_area.checkOut();
948
949     }
950     finally {
951       store.unlockForWrite();
952     }
953
954   }
955
956   /**
957    * An InputStream implementation that reads from the underlying blob data as
958    * fixed size pages.
959    */

960   private class BLOBInputStream extends PagedInputStream {
961       
962     final static int B_SIZE = 64 * 1024;
963     
964     private long reference_id;
965       
966     public BLOBInputStream(final long reference_id, final long size) {
967       super(B_SIZE, size);
968       this.reference_id = reference_id;
969     }
970
971     public void readPageContent(byte[] buf, long pos, int length)
972                                                           throws IOException JavaDoc {
973       readBlobByteArray(reference_id, pos, buf, 0, length);
974     }
975
976   }
977   
978   /**
979    * An abstract implementation of a Ref object for referencing large objects
980    * in this blob store.
981    */

982   private class AbstractRef {
983     
984     /**
985      * The reference identifier. This is a pointer into the fixed list
986      * structure.
987      */

988     protected final long reference_id;
989
990     /**
991      * The total size of the large object in bytes.
992      */

993     protected final long size;
994     
995     /**
996      * The type of large object.
997      */

998     protected final byte type;
999
1000    /**
1001     * Set to true if this large object is open for writing, otherwise the
1002     * object is an immutable static object.
1003     */

1004    private boolean open_for_write;
1005    
1006    /**
1007     * Constructs the Ref implementation.
1008     */

1009    AbstractRef(long reference_id, byte type, long size,
1010                boolean open_for_write) {
1011      this.reference_id = reference_id;
1012      this.size = size;
1013      this.type = type;
1014      this.open_for_write = open_for_write;
1015    }
1016    
1017    /**
1018     * Asserts that this blob is open for writing.
1019     */

1020    void assertIsOpen() {
1021      if (!open_for_write) {
1022        throw new Error JavaDoc("Large object ref is newly allocated.");
1023      }
1024    }
1025    
1026    public long getRawSize() {
1027      return size;
1028    }
1029    
1030    /**
1031     * Marks this large object as closed to write operations.
1032     */

1033    void close() {
1034      open_for_write = false;
1035    }
1036
1037    public int length() {
1038      return (int) size;
1039    }
1040    
1041    public long getID() {
1042      return reference_id;
1043    }
1044
1045    public byte getType() {
1046      return type;
1047    }
1048    
1049    public void read(long offset, byte[] buf, int length) throws IOException JavaDoc {
1050      // Reads the section of the blob into the given buffer byte array at the
1051
// given offset of the blob.
1052
readBlobByteArray(reference_id, offset, buf, 0, length);
1053    }
1054
1055    public void write(long offset, byte[] buf, int length) throws IOException JavaDoc {
1056      if (open_for_write) {
1057        writeBlobByteArray(reference_id, offset, buf, length);
1058      }
1059      else {
1060        throw new IOException JavaDoc("Blob is read-only.");
1061      }
1062    }
1063
1064    public void complete() throws IOException JavaDoc {
1065      completeBlob(this);
1066    }
1067    
1068  }
1069   
1070  /**
1071   * An implementation of ClobRef used to represent a reference to a large
1072   * character object inside this blob store.
1073   */

1074  private class ClobRefImpl extends AbstractRef implements ClobRef {
1075    
1076    /**
1077     * Constructs the ClobRef implementation.
1078     */

1079    ClobRefImpl(long reference_id, byte type, long size,
1080                boolean open_for_write) {
1081      super(reference_id, type, size, open_for_write);
1082    }
1083    
1084    // ---------- Implemented from ClobRef ----------
1085

1086    public int length() {
1087      byte st_type = (byte) (type & 0x0F);
1088      if (st_type == 3) {
1089        return (int) size;
1090      }
1091      else if (st_type == 4) {
1092        return (int) (size / 2);
1093      }
1094      else {
1095        throw new RuntimeException JavaDoc("Unknown type.");
1096      }
1097    }
1098
1099    public Reader JavaDoc getReader() {
1100      byte st_type = (byte) (type & 0x0F);
1101      if (st_type == 3) {
1102        return new AsciiReader(new BLOBInputStream(reference_id, size));
1103      }
1104      else if (st_type == 4) {
1105        return new BinaryToUnicodeReader(
1106                               new BLOBInputStream(reference_id, size));
1107      }
1108      else {
1109        throw new RuntimeException JavaDoc("Unknown type.");
1110      }
1111    }
1112
1113    public String JavaDoc toString() {
1114      final int BUF_SIZE = 8192;
1115      Reader JavaDoc r = getReader();
1116      StringBuffer JavaDoc buf = new StringBuffer JavaDoc(length());
1117      char[] c = new char[BUF_SIZE];
1118      try {
1119        while(true) {
1120          int has_read = r.read(c, 0, BUF_SIZE);
1121          if (has_read == 0 || has_read == -1) {
1122            return new String JavaDoc(buf);
1123          }
1124          buf.append(c);
1125        }
1126      }
1127      catch (IOException JavaDoc e) {
1128        throw new RuntimeException JavaDoc("IO Error: " + e.getMessage());
1129      }
1130    }
1131
1132  }
1133
1134  /**
1135   * An implementation of BlobRef used to represent a blob reference inside this
1136   * blob store.
1137   */

1138  private class BlobRefImpl extends AbstractRef implements BlobRef {
1139    
1140    /**
1141     * Constructs the BlobRef implementation.
1142     */

1143    BlobRefImpl(long reference_id, byte type, long size,
1144                boolean open_for_write) {
1145      super(reference_id, type, size, open_for_write);
1146    }
1147
1148    // ---------- Implemented from BlobRef ----------
1149

1150    public InputStream JavaDoc getInputStream() {
1151      return new BLOBInputStream(reference_id, size);
1152    }
1153
1154  }
1155
1156}
1157
1158
Popular Tags