1 24 25 package com.mckoi.database; 26 27 import java.util.ArrayList ; 28 import java.util.zip.Deflater ; 29 import java.util.zip.Inflater ; 30 import java.util.zip.DataFormatException ; 31 import java.io.IOException ; 32 import java.io.InputStream ; 33 import java.io.Reader ; 34 import com.mckoi.util.PagedInputStream; 35 import com.mckoi.store.Store; 36 import com.mckoi.store.Area; 37 import com.mckoi.store.MutableArea; 38 import com.mckoi.store.AreaWriter; 39 import com.mckoi.database.jdbc.AsciiReader; 40 import com.mckoi.database.jdbc.BinaryToUnicodeReader; 41 import com.mckoi.database.global.Ref; 42 import com.mckoi.database.global.BlobRef; 43 import com.mckoi.database.global.ClobRef; 44 import com.mckoi.database.global.ByteLongObject; 45 46 61 62 final class BlobStore implements BlobStoreInterface { 63 64 67 private final static int MAGIC = 0x012BC53A9; 68 69 72 private Store store; 73 74 78 private FixedRecordList fixed_list; 79 80 83 private long first_delete_chain_record; 84 85 86 89 BlobStore(Store store) { 90 this.store = store; 91 fixed_list = new FixedRecordList(store, 24); 92 } 93 94 95 99 long create() throws IOException { 100 long fixed_list_p = fixed_list.create(); 105 106 first_delete_chain_record = -1; 108 fixed_list.setReservedLong(-1); 109 110 AreaWriter blob_store_header = store.createArea(32); 113 long blob_store_p = blob_store_header.getID(); 114 blob_store_header.putInt(MAGIC); 117 blob_store_header.putInt(1); 119 blob_store_header.putLong(fixed_list_p); 121 blob_store_header.finish(); 123 124 return blob_store_p; 126 } 127 128 132 void init(long blob_store_p) throws IOException { 133 Area blob_store_header = store.getArea(blob_store_p); 135 blob_store_header.position(0); 136 int magic = blob_store_header.getInt(); 138 int version = blob_store_header.getInt(); 139 if (magic != MAGIC) { 140 throw new IOException ("MAGIC value for BlobStore is not correct."); 141 } 142 if (version != 1) { 143 throw new IOException ("version number for BlobStore is not correct."); 144 } 145 146 long fixed_list_p = blob_store_header.getLong(); 148 fixed_list.init(fixed_list_p); 150 151 first_delete_chain_record = fixed_list.getReservedLong(); 153 } 154 155 156 159 private static class CopyBlobInfo { 160 int ref_count; 161 long size; 162 long ob_p; 163 }; 164 165 171 void copyFrom(StoreSystem store_system, 172 BlobStore src_blob_store) throws IOException { 173 FixedRecordList src_fixed_list = src_blob_store.fixed_list; 174 long node_count; 175 synchronized (src_fixed_list) { 176 node_count = src_fixed_list.addressableNodeCount(); 177 } 178 179 synchronized (fixed_list) { 180 181 while (fixed_list.addressableNodeCount() < node_count) { 183 fixed_list.increaseSize(); 184 } 185 186 long last_deleted = -1; 188 189 final int BLOCK_WRITE_COUNT = 1024; 191 192 int max_to_read = (int) Math.min(BLOCK_WRITE_COUNT, node_count); 193 long p = 0; 194 195 while (max_to_read > 0) { 196 ArrayList src_copy_list = new ArrayList (); 198 199 synchronized (src_fixed_list) { 200 for (int i = 0; i < max_to_read; ++i) { 201 Area a = src_fixed_list.positionOnNode(p + i); 202 int status = a.getInt(); 203 if (status != 0x020000) { 205 CopyBlobInfo info = new CopyBlobInfo(); 206 info.ref_count = a.getInt(); 207 info.size = a.getLong(); 208 info.ob_p = a.getLong(); 209 src_copy_list.add(info); 210 } 211 else { 212 src_copy_list.add(null); 213 } 214 } 215 } 216 217 try { 218 store.lockForWrite(); 219 220 int sz = src_copy_list.size(); 222 for (int i = 0; i < sz; ++i) { 223 CopyBlobInfo info = (CopyBlobInfo) src_copy_list.get(i); 224 MutableArea a = fixed_list.positionOnNode(p + i); 225 if (info == null) { 227 a.putInt(0x020000); 228 a.putInt(0); 229 a.putLong(-1); 230 a.putLong(last_deleted); 231 a.checkOut(); 232 last_deleted = p + i; 233 } 234 else { 235 Area src_blob_header = src_blob_store.store.getArea(info.ob_p); 238 int res = src_blob_header.getInt(); 240 int type = src_blob_header.getInt(); 241 long total_block_size = src_blob_header.getLong(); 242 long total_block_pages = src_blob_header.getLong(); 243 244 AreaWriter dst_blob_header = store.createArea( 246 4 + 4 + 8 + 8 + (total_block_pages * 8)); 247 long new_ob_header_p = dst_blob_header.getID(); 248 dst_blob_header.putInt(res); 250 dst_blob_header.putInt(type); 251 dst_blob_header.putLong(total_block_size); 252 dst_blob_header.putLong(total_block_pages); 253 254 for (int n = 0; n < total_block_pages; ++n) { 256 long block_p = src_blob_header.getLong(); 258 Area src_block = src_blob_store.store.getArea(block_p); 259 int block_type = src_block.getInt(); 260 int block_size = src_block.getInt(); 261 int new_block_size = block_size + 4 + 4; 263 AreaWriter dst_block_p = store.createArea(new_block_size); 264 long new_block_p = dst_block_p.getID(); 265 src_block.position(0); 266 src_block.copyTo(dst_block_p, new_block_size); 267 dst_block_p.finish(); 269 dst_blob_header.putLong(new_block_p); 271 } 272 273 dst_blob_header.finish(); 275 276 a.putInt(1); 278 a.putInt(0); 280 a.putLong(info.size); 281 a.putLong(new_ob_header_p); 282 a.checkOut(); 284 } 285 } 286 287 } 288 finally { 289 store.unlockForWrite(); 290 } 291 292 node_count -= max_to_read; 293 p += max_to_read; 294 max_to_read = (int) Math.min(BLOCK_WRITE_COUNT, node_count); 295 296 store_system.setCheckPoint(); 299 300 } 301 302 first_delete_chain_record = last_deleted; 304 fixed_list.setReservedLong(last_deleted); 305 306 } 308 } 309 310 314 ClobRef putStringInBlobStore(String str) throws IOException { 315 final int BUF_SIZE = 64 * 1024; 316 317 int size = str.length(); 318 319 byte type = 4; 320 type = (byte) (type | 0x010); 322 323 ClobRef ref = (ClobRef) allocateLargeObject(type, size * 2); 324 byte[] buf = new byte[BUF_SIZE]; 325 long p = 0; 326 int str_i = 0; 327 while (size > 0) { 328 int to_write = Math.min(BUF_SIZE / 2, size); 329 int buf_i = 0; 330 for (int i = 0; i < to_write; ++i) { 331 char c = str.charAt(str_i); 332 buf[buf_i] = (byte) (c >> 8); 333 ++buf_i; 334 buf[buf_i] = (byte) c; 335 ++buf_i; 336 ++str_i; 337 } 338 ref.write(p, buf, buf_i); 339 size -= to_write; 340 p += to_write * 2; 341 } 342 343 ref.complete(); 344 345 return ref; 346 } 347 348 352 BlobRef putByteLongObjectInBlobStore(ByteLongObject blob) throws IOException { 353 354 final int BUF_SIZE = 64 * 1024; 355 356 byte[] src_buf = blob.getByteArray(); 357 final int size = src_buf.length; 358 BlobRef ref = (BlobRef) allocateLargeObject((byte) 2, size); 359 360 byte[] copy_buf = new byte[BUF_SIZE]; 361 int offset = 0; 362 int to_write = Math.min(BUF_SIZE, size); 363 364 while (to_write > 0) { 365 System.arraycopy(src_buf, offset, copy_buf, 0, to_write); 366 ref.write(offset, copy_buf, to_write); 367 368 offset += to_write; 369 to_write = Math.min(BUF_SIZE, (size - offset)); 370 } 371 372 ref.complete(); 373 374 return ref; 375 } 376 377 385 private long addToRecordList(long record_p) throws IOException { 386 387 synchronized (fixed_list) { 388 if (first_delete_chain_record == -1) { 390 391 fixed_list.increaseSize(); 393 int new_block_number = fixed_list.listBlockCount() - 1; 395 long start_index = fixed_list.listBlockFirstPosition(new_block_number); 396 long size_of_block = fixed_list.listBlockNodeCount(new_block_number); 397 MutableArea a = fixed_list.positionOnNode(start_index); 399 400 a.putInt(0); 401 a.putInt(0); 402 a.putLong(-1); a.putLong(record_p); 404 for (long n = 1; n < size_of_block - 1; ++n) { 406 a.putInt(0x020000); 407 a.putInt(0); 408 a.putLong(-1); 409 a.putLong(start_index + n + 1); 410 } 411 a.putInt(0x020000); 413 a.putInt(0); 414 a.putLong(-1); 415 a.putLong(-1); 416 a.checkOut(); 418 first_delete_chain_record = start_index + 1; 420 fixed_list.setReservedLong(first_delete_chain_record); 422 425 return start_index; 427 428 } 429 else { 430 431 long recycled_record = first_delete_chain_record; 433 MutableArea block = fixed_list.positionOnNode(recycled_record); 434 int rec_pos = block.position(); 435 int status = block.getInt(); 437 if ((status & 0x020000) == 0) { 438 throw new Error ("Assertion failed: record is not deleted!"); 439 } 440 block.getInt(); 442 block.getLong(); 444 long next_chain = block.getLong(); 446 first_delete_chain_record = next_chain; 447 fixed_list.setReservedLong(first_delete_chain_record); 449 block.position(rec_pos); 451 block.putInt(0); 452 block.putInt(0); 453 block.putLong(-1); block.putLong(record_p); 455 block.checkOut(); 457 458 return recycled_record; 459 } 460 } 461 462 } 463 464 465 466 475 Ref allocateLargeObject(byte type, long size) throws IOException { 476 if (size < 0) { 477 throw new IOException ("Negative blob size not allowed."); 478 } 479 480 try { 481 store.lockForWrite(); 482 483 long page_count = ((size - 1) / (64 * 1024)) + 1; 485 AreaWriter blob_area = store.createArea((page_count * 8) + 24); 486 long blob_p = blob_area.getID(); 487 blob_area.putInt(0); blob_area.putInt(type); 490 blob_area.putLong(size); 491 blob_area.putLong(page_count); 492 for (long i = 0; i < page_count; ++i) { 494 blob_area.putLong(-1); 495 } 496 blob_area.finish(); 498 499 long reference_id = addToRecordList(blob_p); 501 byte st_type = (byte) (type & 0x0F); 502 if (st_type == 2) { 503 return new BlobRefImpl(reference_id, type, size, true); 505 } 506 else if (st_type == 3) { 507 return new ClobRefImpl(reference_id, type, size, true); 508 } 509 else if (st_type == 4) { 510 return new ClobRefImpl(reference_id, type, size, true); 511 } 512 else { 513 throw new IOException ("Unknown large object type"); 514 } 515 516 } 517 finally { 518 store.unlockForWrite(); 519 } 520 521 } 522 523 527 public Ref getLargeObject(long reference_id) throws IOException { 528 529 long blob_p; 530 long size; 531 synchronized (fixed_list) { 532 533 if (reference_id < 0 || 535 reference_id >= fixed_list.addressableNodeCount()) { 536 throw new IOException ("reference_id is out of range."); 537 } 538 539 Area block = fixed_list.positionOnNode(reference_id); 541 int status = block.getInt(); 543 if ((status & 0x020000) != 0) { 545 throw new Error ("Assertion failed: record is deleted!"); 546 } 547 int reference_count = block.getInt(); 549 size = block.getLong(); 551 blob_p = block.getLong(); 553 554 } 555 556 Area blob_area = store.getArea(blob_p); 557 blob_area.position(0); 558 blob_area.getInt(); byte type = (byte) blob_area.getInt(); 561 long block_size = blob_area.getLong(); 563 long page_count = blob_area.getLong(); 565 566 if (type == (byte) 2) { 567 return new BlobRefImpl(reference_id, type, size, false); 569 } 570 else { 571 return new ClobRefImpl(reference_id, type, size, false); 573 } 574 } 575 576 581 void completeBlob(AbstractRef ref) throws IOException { 582 ref.assertIsOpen(); 584 long blob_reference_id = ref.getID(); 586 587 synchronized (fixed_list) { 588 589 MutableArea block = fixed_list.positionOnNode(blob_reference_id); 591 int rec_pos = block.position(); 593 int status = block.getInt(); 595 if (status != 0) { 597 throw new IOException ("Assertion failed: record is not open."); 598 } 599 int reference_count = block.getInt(); 600 long size = block.getLong(); 601 long page_count = block.getLong(); 602 603 try { 604 store.lockForWrite(); 605 606 block.position(rec_pos); 608 block.putInt(1); 610 block.putInt(0); 612 block.putLong(ref.getRawSize()); 614 block.putLong(page_count); 616 block.checkOut(); 618 619 } 620 finally { 621 store.unlockForWrite(); 622 } 623 624 } 625 ref.close(); 628 629 } 630 631 640 public void establishReference(long blob_reference_id) { 641 try { 642 synchronized (fixed_list) { 643 MutableArea block = fixed_list.positionOnNode(blob_reference_id); 645 int rec_pos = block.position(); 647 int status = block.getInt(); 649 if (status != 1) { 651 throw new RuntimeException ("Assertion failed: record is not static."); 652 } 653 int reference_count = block.getInt(); 654 655 block.position(rec_pos + 4); 657 block.putInt(reference_count + 1); 659 block.checkOut(); 661 } 662 } 665 catch (IOException e) { 666 throw new RuntimeException ("IO Error: " + e.getMessage()); 667 } 668 } 669 670 678 public void releaseReference(long blob_reference_id) { 679 try { 680 synchronized (fixed_list) { 681 MutableArea block = fixed_list.positionOnNode(blob_reference_id); 683 int rec_pos = block.position(); 685 int status = block.getInt(); 687 if (status != 1) { 689 throw new RuntimeException ("Assertion failed: " + 690 "Record is not static (status = " + status + ")"); 691 } 692 int reference_count = block.getInt(); 693 if (reference_count == 0) { 694 throw new RuntimeException ( 695 "Releasing when Blob reference counter is at 0."); 696 } 697 698 long object_size = block.getLong(); 699 long object_p = block.getLong(); 700 701 if ((reference_count - 1) == 0) { 704 Area blob_area = store.getArea(object_p); 706 blob_area.getInt(); 707 byte type = (byte) blob_area.getInt(); 708 long total_size = blob_area.getLong(); 709 long page_count = blob_area.getLong(); 710 for (long i = 0; i < page_count; ++i) { 712 long page_p = blob_area.getLong(); 713 if (page_p > 0) { 714 store.deleteArea(page_p); 715 } 716 } 717 store.deleteArea(object_p); 719 block.position(rec_pos); 721 block.putInt(0x020000); 722 block.putInt(0); 723 block.putLong(-1); 724 block.putLong(first_delete_chain_record); 725 block.checkOut(); 727 first_delete_chain_record = blob_reference_id; 728 fixed_list.setReservedLong(first_delete_chain_record); 730 } 731 else { 732 block.position(rec_pos + 4); 734 block.putInt(reference_count - 1); 736 block.checkOut(); 738 } 739 740 } 741 } 744 catch (IOException e) { 745 throw new RuntimeException ("IO Error: " + e.getMessage()); 746 } 747 } 748 749 750 751 755 private void readBlobByteArray(long reference_id, long offset, 756 byte[] buf, int off, int length) throws IOException { 757 758 if (offset % (64 * 1024) != 0) { 760 throw new RuntimeException ("Assert failed: offset is not 64k aligned."); 761 } 762 if (length > (64 * 1024)) { 764 throw new RuntimeException ("Assert failed: length is greater than 64K."); 765 } 766 767 int status; 768 int reference_count; 769 long size; 770 long blob_p; 771 772 synchronized (fixed_list) { 773 774 if (reference_id < 0 || 776 reference_id >= fixed_list.addressableNodeCount()) { 777 throw new IOException ("blob_reference_id is out of range."); 778 } 779 780 Area block = fixed_list.positionOnNode(reference_id); 782 status = block.getInt(); 784 if ((status & 0x020000) != 0) { 786 throw new Error ("Assertion failed: record is deleted!"); 787 } 788 reference_count = block.getInt(); 790 size = block.getLong(); 792 blob_p = block.getLong(); 794 795 } 796 797 if (offset < 0 || offset + length > size) { 799 throw new IOException ("Blob invalid read. offset = " + offset + 800 ", length = " + length); 801 } 802 803 Area blob_area = store.getArea(blob_p); 805 blob_area.getInt(); 806 byte type = (byte) blob_area.getInt(); 807 808 long page_number = (offset / (64 * 1024)); 810 blob_area.position((int) ((page_number * 8) + 24)); 811 long page_p = blob_area.getLong(); 812 813 Area page_area = store.getArea(page_p); 815 page_area.position(0); 816 int page_type = page_area.getInt(); 817 int page_size = page_area.getInt(); 818 if ((type & 0x010) != 0) { 819 byte[] page_buf = new byte[page_size]; 821 page_area.get(page_buf, 0, page_size); 822 Inflater inflater = new Inflater (); 823 inflater.setInput(page_buf, 0, page_size); 824 try { 825 int result_length = inflater.inflate(buf, off, length); 826 if (result_length != length) { 827 throw new RuntimeException ( 828 "Assert failed: decompressed length is incorrect."); 829 } 830 } 831 catch (DataFormatException e) { 832 throw new IOException ("ZIP Data Format Error: " + e.getMessage()); 833 } 834 inflater.end(); 835 } 836 else { 837 page_area.get(buf, off, length); 839 } 840 841 } 842 843 848 private void writeBlobByteArray(long reference_id, long offset, 849 byte[] buf, int length) throws IOException { 850 851 if (offset % (64 * 1024) != 0) { 853 throw new RuntimeException ("Assert failed: offset is not 64k aligned."); 854 } 855 if (length > (64 * 1024)) { 857 throw new RuntimeException ("Assert failed: length is greater than 64K."); 858 } 859 860 int status; 861 int reference_count; 862 long size; 863 long blob_p; 864 865 synchronized (fixed_list) { 866 867 if (reference_id < 0 || 869 reference_id >= fixed_list.addressableNodeCount()) { 870 throw new IOException ("blob_reference_id is out of range."); 871 } 872 873 Area block = fixed_list.positionOnNode(reference_id); 875 status = block.getInt(); 877 if ((status & 0x020000) != 0) { 879 throw new Error ("Assertion failed: record is deleted!"); 880 } 881 reference_count = block.getInt(); 883 size = block.getLong(); 885 blob_p = block.getLong(); 887 888 } 889 890 MutableArea blob_area = store.getMutableArea(blob_p); 892 blob_area.getInt(); 893 byte type = (byte) blob_area.getInt(); 894 size = blob_area.getLong(); 895 896 if (offset < 0 || offset + length > size) { 898 throw new IOException ("Blob invalid write. offset = " + offset + 899 ", length = " + length + ", size = " + size); 900 } 901 902 long page_number = (offset / (64 * 1024)); 904 blob_area.position((int) ((page_number * 8) + 24)); 905 long page_p = blob_area.getLong(); 906 907 if (page_p != -1) { 909 throw new RuntimeException ("Assert failed: page_p is not -1"); 912 } 913 914 byte[] to_write; 916 int write_length; 917 if ((type & 0x010) != 0) { 918 Deflater deflater = new Deflater (); 920 deflater.setInput(buf, 0, length); 921 deflater.finish(); 922 to_write = new byte[65 * 1024]; 923 write_length = deflater.deflate(to_write); 924 } 925 else { 926 to_write = buf; 928 write_length = length; 929 } 930 931 try { 932 store.lockForWrite(); 933 934 AreaWriter page_area = store.createArea(write_length + 8); 936 page_p = page_area.getID(); 937 page_area.putInt(1); 938 page_area.putInt(write_length); 939 page_area.put(to_write, 0, write_length); 940 page_area.finish(); 942 943 blob_area.position((int) ((page_number * 8) + 24)); 945 blob_area.putLong(page_p); 946 blob_area.checkOut(); 948 949 } 950 finally { 951 store.unlockForWrite(); 952 } 953 954 } 955 956 960 private class BLOBInputStream extends PagedInputStream { 961 962 final static int B_SIZE = 64 * 1024; 963 964 private long reference_id; 965 966 public BLOBInputStream(final long reference_id, final long size) { 967 super(B_SIZE, size); 968 this.reference_id = reference_id; 969 } 970 971 public void readPageContent(byte[] buf, long pos, int length) 972 throws IOException { 973 readBlobByteArray(reference_id, pos, buf, 0, length); 974 } 975 976 } 977 978 982 private class AbstractRef { 983 984 988 protected final long reference_id; 989 990 993 protected final long size; 994 995 998 protected final byte type; 999 1000 1004 private boolean open_for_write; 1005 1006 1009 AbstractRef(long reference_id, byte type, long size, 1010 boolean open_for_write) { 1011 this.reference_id = reference_id; 1012 this.size = size; 1013 this.type = type; 1014 this.open_for_write = open_for_write; 1015 } 1016 1017 1020 void assertIsOpen() { 1021 if (!open_for_write) { 1022 throw new Error ("Large object ref is newly allocated."); 1023 } 1024 } 1025 1026 public long getRawSize() { 1027 return size; 1028 } 1029 1030 1033 void close() { 1034 open_for_write = false; 1035 } 1036 1037 public int length() { 1038 return (int) size; 1039 } 1040 1041 public long getID() { 1042 return reference_id; 1043 } 1044 1045 public byte getType() { 1046 return type; 1047 } 1048 1049 public void read(long offset, byte[] buf, int length) throws IOException { 1050 readBlobByteArray(reference_id, offset, buf, 0, length); 1053 } 1054 1055 public void write(long offset, byte[] buf, int length) throws IOException { 1056 if (open_for_write) { 1057 writeBlobByteArray(reference_id, offset, buf, length); 1058 } 1059 else { 1060 throw new IOException ("Blob is read-only."); 1061 } 1062 } 1063 1064 public void complete() throws IOException { 1065 completeBlob(this); 1066 } 1067 1068 } 1069 1070 1074 private class ClobRefImpl extends AbstractRef implements ClobRef { 1075 1076 1079 ClobRefImpl(long reference_id, byte type, long size, 1080 boolean open_for_write) { 1081 super(reference_id, type, size, open_for_write); 1082 } 1083 1084 1086 public int length() { 1087 byte st_type = (byte) (type & 0x0F); 1088 if (st_type == 3) { 1089 return (int) size; 1090 } 1091 else if (st_type == 4) { 1092 return (int) (size / 2); 1093 } 1094 else { 1095 throw new RuntimeException ("Unknown type."); 1096 } 1097 } 1098 1099 public Reader getReader() { 1100 byte st_type = (byte) (type & 0x0F); 1101 if (st_type == 3) { 1102 return new AsciiReader(new BLOBInputStream(reference_id, size)); 1103 } 1104 else if (st_type == 4) { 1105 return new BinaryToUnicodeReader( 1106 new BLOBInputStream(reference_id, size)); 1107 } 1108 else { 1109 throw new RuntimeException ("Unknown type."); 1110 } 1111 } 1112 1113 public String toString() { 1114 final int BUF_SIZE = 8192; 1115 Reader r = getReader(); 1116 StringBuffer buf = new StringBuffer (length()); 1117 char[] c = new char[BUF_SIZE]; 1118 try { 1119 while(true) { 1120 int has_read = r.read(c, 0, BUF_SIZE); 1121 if (has_read == 0 || has_read == -1) { 1122 return new String (buf); 1123 } 1124 buf.append(c); 1125 } 1126 } 1127 catch (IOException e) { 1128 throw new RuntimeException ("IO Error: " + e.getMessage()); 1129 } 1130 } 1131 1132 } 1133 1134 1138 private class BlobRefImpl extends AbstractRef implements BlobRef { 1139 1140 1143 BlobRefImpl(long reference_id, byte type, long size, 1144 boolean open_for_write) { 1145 super(reference_id, type, size, open_for_write); 1146 } 1147 1148 1150 public InputStream getInputStream() { 1151 return new BLOBInputStream(reference_id, size); 1152 } 1153 1154 } 1155 1156} 1157 1158 | Popular Tags |