1 24 25 package com.mckoi.store; 26 27 import java.io.*; 28 import java.util.HashMap ; 29 import java.util.ArrayList ; 30 import java.util.Collections ; 31 import java.util.Comparator ; 32 import com.mckoi.debug.DebugLogger; 33 import com.mckoi.debug.Lvl; 34 import com.mckoi.util.ByteArrayUtil; 35 import com.mckoi.store.LoggingBufferManager.StoreDataAccessorFactory; 36 37 43 44 class JournalledSystem { 45 46 49 private final boolean ENABLE_LOGGING; 50 51 54 private final File journal_path; 55 56 59 private final boolean read_only; 60 61 64 private final int page_size; 65 66 69 private HashMap all_resources; 70 71 74 private long seq_id; 75 76 79 private final ArrayList journal_archives; 80 81 84 private JournalFile top_journal_file; 85 86 89 private long journal_number; 90 91 95 private final StoreDataAccessorFactory sda_factory; 96 97 100 private final Object top_journal_lock = new Object (); 101 102 106 private JournalingThread journaling_thread; 107 108 111 private final DebugLogger debug; 112 113 114 JournalledSystem(File journal_path, boolean read_only, int page_size, 115 StoreDataAccessorFactory sda_factory, DebugLogger debug, 116 boolean enable_logging) { 117 this.journal_path = journal_path; 118 this.read_only = read_only; 119 this.page_size = page_size; 120 this.sda_factory = sda_factory; 121 all_resources = new HashMap (); 122 journal_number = 0; 123 journal_archives = new ArrayList (); 124 this.debug = debug; 125 this.ENABLE_LOGGING = enable_logging; 126 } 127 128 129 133 private static String getJournalFileName(int number) { 134 if (number < 10 || number > 73) { 135 throw new Error ("Journal file name out of range."); 136 } 137 return "jnl" + number; 138 } 139 140 private final Object init_lock = new Object (); 142 143 146 void start() throws IOException { 147 if (ENABLE_LOGGING) { 148 synchronized (init_lock) { 149 if (journaling_thread == null) { 150 journaling_thread = new JournalingThread(); 152 journaling_thread.start(); 153 rollForwardRecover(); 155 if (!read_only) { 156 newTopJournalFile(); 158 } 159 } 160 else { 161 throw new Error ("Assertion failed - already started."); 162 } 163 } 164 } 165 } 166 167 171 void stop() throws IOException { 172 if (ENABLE_LOGGING) { 173 synchronized (init_lock) { 174 if (journaling_thread != null) { 175 journaling_thread.persistArchives(0); 177 journaling_thread.finish(); 178 journaling_thread.waitUntilFinished(); 179 journaling_thread = null; 180 } 181 else { 182 throw new Error ("Assertion failed - already stopped."); 183 } 184 } 185 186 if (!read_only) { 187 synchronized (top_journal_lock) { 190 int sz = journal_archives.size(); 192 for (int i = 0; i < sz; ++i) { 193 JournalFile jf = (JournalFile) journal_archives.get(i); 194 jf.close(); 195 } 196 topJournal().close(); 198 rollForwardRecover(); 200 } 201 } 202 203 } 204 } 205 206 211 void rollForwardRecover() throws IOException { 212 214 ArrayList journal_files_list = new ArrayList (); 216 217 for (int i = 10; i < 74; ++i) { 219 String journal_fn = getJournalFileName(i); 220 File f = new File(journal_path, journal_fn); 221 if (f.exists()) { 223 if (read_only) { 224 throw new IOException( 225 "Journal file " + f + " exists for a read-only session. " + 226 "There may not be any pending journals for a read-only session."); 227 } 228 229 JournalFile jf = new JournalFile(f, read_only); 230 JournalSummary summary = jf.openForRecovery(); 234 if (summary.can_be_recovered) { 236 if (debug.isInterestedIn(Lvl.INFORMATION)) { 237 debug.write(Lvl.INFORMATION, this, "Journal " + jf + 238 " found - can be recovered."); 239 } 240 journal_files_list.add(summary); 241 } 242 else { 243 if (debug.isInterestedIn(Lvl.INFORMATION)) { 244 debug.write(Lvl.INFORMATION, this, "Journal " + jf + 245 " deleting - nothing to recover."); 246 } 247 jf.closeAndDelete(); 249 } 250 } 251 } 252 253 257 Collections.sort(journal_files_list, journal_list_comparator); 260 261 long last_journal_number = -1; 262 263 for (int i = 0; i < journal_files_list.size(); ++i) { 265 JournalSummary summary = (JournalSummary) journal_files_list.get(i); 266 267 ArrayList res_list = summary.resource_list; 269 for (int n = 0; n < res_list.size(); ++n) { 270 String resource_name = (String ) res_list.get(n); 271 JournalledResource resource = createResource(resource_name); 273 } 274 275 JournalFile jf = summary.journal_file; 277 if (jf.journal_number < last_journal_number) { 278 throw new Error ("Assertion failed, sort failed."); 279 } 280 last_journal_number = jf.journal_number; 281 282 if (debug.isInterestedIn(Lvl.INFORMATION)) { 283 debug.write(Lvl.INFORMATION, this, "Recovering: " + jf + 284 " (8 .. " + summary.last_checkpoint + ")"); 285 } 286 287 jf.persist(8, summary.last_checkpoint); 288 jf.closeAndDelete(); 290 291 for (int n = 0; n < res_list.size(); ++n) { 293 String resource_name = (String ) res_list.get(n); 294 AbstractResource resource = 295 (AbstractResource) createResource(resource_name); 296 resource.persistClose(); 299 resource.notifyPostRecover(); 301 } 302 303 } 304 305 } 306 307 private Comparator journal_list_comparator = new Comparator () { 308 309 public int compare(Object ob1, Object ob2) { 310 JournalSummary js1 = (JournalSummary) ob1; 311 JournalSummary js2 = (JournalSummary) ob2; 312 313 long jn1 = js1.journal_file.getJournalNumber(); 314 long jn2 = js2.journal_file.getJournalNumber(); 315 316 if (jn1 > jn2) { 317 return 1; 318 } 319 else if (jn1 < jn2) { 320 return -1; 321 } 322 else { 323 return 0; 324 } 325 } 326 327 }; 328 329 330 333 private void newTopJournalFile() throws IOException { 334 339 String journal_fn = getJournalFileName((int) ((journal_number & 63) + 10)); 340 ++journal_number; 341 342 File f = new File(journal_path, journal_fn); 343 if (f.exists()) { 344 throw new IOException("Journal file already exists."); 345 } 346 347 top_journal_file = new JournalFile(f, read_only); 348 top_journal_file.open(journal_number - 1); 349 } 350 351 352 353 356 private JournalFile topJournal() { 357 synchronized (top_journal_lock) { 358 return top_journal_file; 359 } 360 } 361 362 363 366 public JournalledResource createResource(String resource_name) { 367 AbstractResource resource; 368 synchronized (all_resources) { 369 resource = (AbstractResource) all_resources.get(resource_name); 371 if (resource == null) { 372 final long id = seq_id; 375 ++seq_id; 376 StoreDataAccessor accessor = 378 sda_factory.createStoreDataAccessor(resource_name); 379 if (ENABLE_LOGGING) { 380 resource = new Resource(resource_name, id, accessor); 381 } 382 else { 383 resource = new NonLoggingResource(resource_name, id, accessor); 384 } 385 all_resources.put(resource_name, resource); 387 } 388 } 389 390 return resource; 392 } 393 394 400 void setCheckPoint(boolean flush_journals) throws IOException { 401 if (!ENABLE_LOGGING) { 403 return; 404 } 405 if (read_only) { 407 return; 408 } 409 410 boolean something_to_persist; 411 412 synchronized (top_journal_lock) { 413 JournalFile top_j = topJournal(); 414 415 if (flush_journals || top_j.size() > (256 * 1024)) { 417 newTopJournalFile(); 419 journal_archives.add(top_j); 421 } 422 something_to_persist = journal_archives.size() > 0; 423 top_j.setCheckPoint(); 424 } 425 426 if (something_to_persist) { 427 journaling_thread.persistArchives(10); 430 } 431 432 } 433 434 437 private AbstractResource getResource(String resource_name) { 438 synchronized (all_resources) { 439 return (AbstractResource) all_resources.get(resource_name); 440 } 441 } 442 443 444 445 446 447 449 454 private final class JournalFile { 455 456 459 private File file; 460 461 464 private boolean read_only; 465 466 470 private StreamFile data; 471 472 475 private DataOutputStream data_out; 476 477 480 private byte[] buffer; 481 482 485 private HashMap resource_id_map; 486 487 490 private long cur_seq_id; 491 492 495 private long journal_number; 496 497 500 private boolean is_open; 501 502 505 private int reference_count; 506 507 510 public JournalFile(File file, boolean read_only) { 511 this.file = file; 512 this.read_only = read_only; 513 this.is_open = false; 514 buffer = new byte[36]; 515 resource_id_map = new HashMap (); 516 cur_seq_id = 0; 517 reference_count = 1; 518 } 519 520 523 long size() { 524 return data.length(); 525 } 526 527 530 long getJournalNumber() { 531 return journal_number; 532 } 533 534 535 539 void open(long journal_number) throws IOException { 540 if (is_open) { 541 throw new IOException("Journal file is already open."); 542 } 543 if (file.exists()) { 544 throw new IOException("Journal file already exists."); 545 } 546 547 this.journal_number = journal_number; 548 data = new StreamFile(file, read_only ? "r" : "rw"); 549 data_out = new DataOutputStream( 550 new BufferedOutputStream(data.getOutputStream())); 551 data_out.writeLong(journal_number); 552 is_open = true; 553 } 554 555 561 JournalSummary openForRecovery() throws IOException { 562 if (is_open) { 563 throw new IOException("Journal file is already open."); 564 } 565 if (!file.exists()) { 566 throw new IOException("Journal file does not exists."); 567 } 568 569 data = new StreamFile(file, read_only ? "r" : "rw"); 571 574 is_open = true; 575 576 JournalSummary summary = new JournalSummary(this); 578 579 long end_pointer = data.length(); 580 581 if (end_pointer < 8) { 583 return summary; 584 } 585 586 final DataInputStream din = new DataInputStream( 588 new BufferedInputStream(data.getInputStream())); 589 590 try { 591 this.journal_number = din.readLong(); 593 long position = 8; 594 595 ArrayList checkpoint_res_list = new ArrayList (); 596 597 while (true) { 599 600 if (position + 12 > end_pointer) { 602 return summary; 603 } 604 605 long type = din.readLong(); 606 int size = din.readInt(); 607 608 position = position + size + 12; 610 611 boolean skip_body = true; 612 613 if (type == 100) { 615 summary.last_checkpoint = position; 616 summary.can_be_recovered = true; 617 618 summary.resource_list.addAll(checkpoint_res_list); 620 checkpoint_res_list.clear(); 622 623 } 624 625 else if (position >= end_pointer || 627 type < 1 || type > 7) { 628 return summary; 629 } 630 631 if (type == 2) { 633 634 skip_body = false; 636 long id = din.readLong(); 637 int str_len = din.readInt(); 638 StringBuffer str = new StringBuffer (str_len); 639 for (int i = 0; i < str_len; ++i) { 640 str.append(din.readChar()); 641 } 642 643 String resource_name = new String (str); 644 checkpoint_res_list.add(resource_name); 645 646 } 647 648 if (skip_body) { 649 int to_skip = size; 650 while (to_skip > 0) { 651 to_skip -= din.skip(to_skip); 652 } 653 } 654 655 } 656 657 } 658 finally { 659 din.close(); 660 } 661 662 } 663 664 667 void close() throws IOException { 668 synchronized (this) { 669 if (!is_open) { 670 throw new IOException("Journal file is already closed."); 671 } 672 673 data.close(); 674 data = null; 675 is_open = false; 676 } 677 } 678 679 682 boolean isDeleted() { 683 synchronized (this) { 684 return data == null; 685 } 686 } 687 688 693 void closeAndDelete() throws IOException { 694 synchronized (this) { 695 --reference_count; 696 if (reference_count == 0) { 697 close(); 699 boolean b = file.delete(); 700 if (!b) { 701 System.out.println("Unable to delete journal file: " + file); 702 } 703 } 704 } 705 } 706 707 710 void addReference() { 711 synchronized (this) { 712 if (reference_count != 0) { 713 ++reference_count; 714 } 715 } 716 } 717 718 722 void removeReference() throws IOException { 723 closeAndDelete(); 724 } 725 726 727 735 void persist(final long start, final long end) throws IOException { 736 737 if (debug.isInterestedIn(Lvl.INFORMATION)) { 738 debug.write(Lvl.INFORMATION, this, "Persisting: " + file); 739 } 740 741 final DataInputStream din = new DataInputStream( 742 new BufferedInputStream(data.getInputStream())); 743 long count = start; 744 while (count > 0) { 746 count -= din.skip(count); 747 } 748 749 ArrayList resources_updated = new ArrayList (); 751 752 HashMap id_name_map = new HashMap (); 754 755 boolean finished = false; 756 long position = start; 757 758 while (!finished) { 759 long type = din.readLong(); 760 int size = din.readInt(); 761 position = position + size + 12; 762 763 if (type == 2) { long id = din.readLong(); 765 int len = din.readInt(); 766 StringBuffer buf = new StringBuffer (len); 767 for (int i = 0; i < len; ++i) { 768 buf.append(din.readChar()); 769 } 770 String resource_name = new String (buf); 771 772 id_name_map.put(new Long (id), resource_name); 774 775 if (debug.isInterestedIn(Lvl.INFORMATION)) { 776 debug.write(Lvl.INFORMATION, this, "Journal Command: Tag: " + id + 777 " = " + resource_name); 778 } 779 780 resources_updated.add(getResource(resource_name)); 782 783 } 784 else if (type == 6) { long id = din.readLong(); 786 String resource_name = (String ) id_name_map.get(new Long (id)); 787 AbstractResource resource = getResource(resource_name); 788 789 if (debug.isInterestedIn(Lvl.INFORMATION)) { 790 debug.write(Lvl.INFORMATION, this, "Journal Command: Delete: " + 791 resource_name); 792 } 793 794 resource.persistDelete(); 795 796 } 797 else if (type == 3) { long id = din.readLong(); 799 long new_size = din.readLong(); 800 String resource_name = (String ) id_name_map.get(new Long (id)); 801 AbstractResource resource = getResource(resource_name); 802 803 if (debug.isInterestedIn(Lvl.INFORMATION)) { 804 debug.write(Lvl.INFORMATION, this, "Journal Command: Set Size: " + 805 resource_name + " size = " + new_size); 806 } 807 808 resource.persistSetSize(new_size); 809 810 } 811 else if (type == 1) { long id = din.readLong(); 813 long page = din.readLong(); 814 int off = din.readInt(); 815 int len = din.readInt(); 816 817 String resource_name = (String ) id_name_map.get(new Long (id)); 818 AbstractResource resource = getResource(resource_name); 819 820 if (debug.isInterestedIn(Lvl.INFORMATION)) { 821 debug.write(Lvl.INFORMATION, this, 822 "Journal Command: Page Modify: " + resource_name + 823 " page = " + page + " off = " + off + 824 " len = " + len); 825 } 826 827 resource.persistPageChange(page, off, len, din); 828 829 } 830 else if (type == 100) { 832 if (debug.isInterestedIn(Lvl.INFORMATION)) { 833 debug.write(Lvl.INFORMATION, this, "Journal Command: Check Point."); 834 } 835 836 if (position == end) { 837 finished = true; 838 } 839 } 840 841 else { 842 throw new Error ("Unknown tag type: " + type + " position = " + position); 843 } 844 845 } 847 int sz = resources_updated.size(); 849 for (int i = 0; i < sz; ++i) { 850 AbstractResource r = (AbstractResource) resources_updated.get(i); 851 if (debug.isInterestedIn(Lvl.INFORMATION)) { 852 debug.write(Lvl.INFORMATION, this, "Synch: " + r); 853 } 854 r.synch(); 855 } 856 857 din.close(); 858 859 } 860 861 865 private Long writeResourceName(String resource_name, 866 DataOutputStream out) throws IOException { 867 Long v; 868 synchronized (resource_id_map) { 869 v = (Long ) resource_id_map.get(resource_name); 870 if (v == null) { 871 ++cur_seq_id; 872 873 int len = resource_name.length(); 874 875 out.writeLong(2); 877 out.writeInt(8 + 4 + (len * 2)); 878 out.writeLong(cur_seq_id); 879 out.writeInt(len); 880 out.writeChars(resource_name); 881 882 v = new Long (cur_seq_id); 884 resource_id_map.put(resource_name, v); 885 } 886 } 887 888 return v; 889 } 890 891 894 void logResourceDelete(String resource_name) throws IOException { 895 896 synchronized (this) { 897 Long v = writeResourceName(resource_name, data_out); 899 900 long resource_id = v.longValue(); 902 data_out.writeLong(6); 903 data_out.writeInt(8); 904 data_out.writeLong(resource_id); 905 906 } 907 908 } 909 910 913 void logResourceSizeChange(String resource_name, long new_size) 914 throws IOException { 915 synchronized (this) { 916 Long v = writeResourceName(resource_name, data_out); 918 919 long resource_id = v.longValue(); 921 data_out.writeLong(3); 922 data_out.writeInt(8 + 8); 923 data_out.writeLong(resource_id); 924 data_out.writeLong(new_size); 925 926 } 927 928 } 929 930 933 void setCheckPoint() throws IOException { 934 synchronized (this) { 935 936 data_out.writeLong(100); 937 data_out.writeInt(0); 938 939 flushAndSynch(); 941 } 942 } 943 944 948 JournalEntry logPageModification(String resource_name, long page_number, 949 byte[] buf, int off, int len) throws IOException { 950 951 long ref; 952 synchronized (this) { 953 Long v = writeResourceName(resource_name, data_out); 955 956 final long absolute_position = page_number * page_size; 958 959 long resource_id = v.longValue(); 961 data_out.writeLong(1); 962 data_out.writeInt(8 + 8 + 4 + 4 + len); 963 data_out.writeLong(resource_id); 964 data_out.writeLong(absolute_position / 8192); 967 data_out.writeInt(off + (int) (absolute_position & 8191)); 968 data_out.writeInt(len); 969 970 data_out.write(buf, off, len); 971 972 data_out.flush(); 974 ref = data.length() - len - 36; 975 } 976 977 return new JournalEntry(resource_name, this, ref, page_number); 979 } 980 981 982 983 986 void buildPage(long in_page_number, 987 long position, byte[] buf, int off) throws IOException { 988 long type; 989 long resource_id; 990 long page_number; 991 int page_offset; 992 int page_length; 993 994 synchronized (this) { 995 data.readFully(position, buffer, 0, 36); 996 type = ByteArrayUtil.getLong(buffer, 0); 997 resource_id = ByteArrayUtil.getLong(buffer, 12); 998 page_number = ByteArrayUtil.getLong(buffer, 20); 999 page_offset = ByteArrayUtil.getInt(buffer, 28); 1000 page_length = ByteArrayUtil.getInt(buffer, 32); 1001 1002 if (type != 1) { 1004 throw new IOException("Invalid page type. type = " + type + 1005 " pos = " + position); 1006 } 1007 if (page_number != in_page_number) { 1008 throw new IOException("Page numbers do not match."); 1009 } 1010 1011 data.readFully(position + 36, buf, off + page_offset, page_length); 1013 } 1014 1015 } 1016 1017 1020 void flushAndSynch() throws IOException { 1021 synchronized (this) { 1022 data_out.flush(); 1023 data.synch(); 1024 } 1025 } 1026 1027 1028 public String toString() { 1029 return "[JOURNAL: " + file.getName() + "]"; 1030 } 1031 1032 } 1033 1034 1040 private static final class JournalEntry { 1041 1042 1045 private final String resource_name; 1046 1047 1050 private final JournalFile journal; 1051 1052 1055 private final long position; 1056 1057 1060 private final long page_number; 1061 1062 1063 1066 JournalEntry next_page; 1067 1068 1069 1072 public JournalEntry(String resource_name, JournalFile journal, 1073 long position, long page_number) { 1074 this.resource_name = resource_name; 1075 this.journal = journal; 1076 this.position = position; 1077 this.page_number = page_number; 1078 } 1079 1080 1083 public JournalFile getJournalFile() { 1084 return journal; 1085 } 1086 1087 1090 public long getPosition() { 1091 return position; 1092 } 1093 1094 1097 public long getPageNumber() { 1098 return page_number; 1099 } 1100 1101 } 1102 1103 1104 1107 private abstract class AbstractResource implements JournalledResource { 1108 1109 1112 protected final String name; 1113 1114 1118 protected final long id; 1119 1120 1123 protected final StoreDataAccessor data; 1124 1125 1128 protected boolean read_only; 1129 1130 1133 AbstractResource(String name, long id, StoreDataAccessor data) { 1134 this.name = name; 1135 this.id = id; 1136 this.data = data; 1137 } 1138 1139 1140 1142 abstract void persistClose() throws IOException; 1143 1144 abstract void persistDelete() throws IOException; 1145 1146 abstract void persistSetSize(final long new_size) throws IOException; 1147 1148 abstract void persistPageChange(final long page, 1149 final int off, int len, 1150 DataInputStream din) throws IOException; 1151 1152 abstract void synch() throws IOException; 1153 1154 abstract void notifyPostRecover(); 1157 1158 1160 1163 public int getPageSize() { 1164 return page_size; 1165 } 1166 1167 1170 public long getID() { 1171 return id; 1172 } 1173 1174 1175 public String toString() { 1176 return name; 1177 } 1178 1179 } 1180 1181 1184 private final class NonLoggingResource extends AbstractResource { 1185 1186 1189 NonLoggingResource(String name, long id, StoreDataAccessor data) { 1190 super(name, id, data); 1191 } 1192 1193 1194 1196 void persistClose() throws IOException { 1197 } 1199 1200 public void persistDelete() throws IOException { 1201 } 1203 1204 public void persistSetSize(final long new_size) throws IOException { 1205 } 1207 1208 public void persistPageChange(final long page, 1209 final int off, int len, 1210 DataInputStream din) throws IOException { 1211 } 1213 1214 public void synch() throws IOException { 1215 data.synch(); 1216 } 1217 1218 public void notifyPostRecover() { 1219 } 1221 1222 1224 1227 public void open(boolean read_only) throws IOException { 1228 this.read_only = read_only; 1229 data.open(read_only); 1230 } 1231 1232 1235 public void read(final long page_number, 1236 final byte[] buf, final int off) throws IOException { 1237 long page_position = page_number * page_size; 1239 data.read(page_position + off, buf, off, page_size); 1240 } 1241 1242 1245 public void write(final long page_number, 1246 byte[] buf, int off, int len) throws IOException { 1247 long page_position = page_number * page_size; 1248 data.write(page_position + off, buf, off, len); 1249 } 1250 1251 1254 public void setSize(long size) throws IOException { 1255 data.setSize(size); 1256 } 1257 1258 1261 public long getSize() throws IOException { 1262 return data.getSize(); 1263 } 1264 1265 1268 public void close() throws IOException { 1269 data.close(); 1270 } 1271 1272 1275 public void delete() throws IOException { 1276 data.delete(); 1277 } 1278 1279 1282 public boolean exists() { 1283 return data.exists(); 1284 } 1285 1286 } 1287 1288 1293 private final class Resource extends AbstractResource { 1294 1295 1298 private long size; 1299 1300 1303 private boolean there_is_backing_data; 1304 1305 1308 private boolean really_open; 1309 1310 1313 private boolean data_exists; 1314 1315 1318 private boolean data_open; 1319 1320 1323 private boolean data_deleted; 1324 1325 1328 private final JournalEntry[] journal_map; 1329 1330 1333 private final byte[] page_buffer; 1334 1335 1338 Resource(String name, long id, StoreDataAccessor data) { 1339 super(name, id, data); 1340 journal_map = new JournalEntry[257]; 1341 data_open = false; 1342 data_exists = data.exists(); 1343 data_deleted = false; 1344 if (data_exists) { 1345 try { 1346 size = data.getSize(); 1347 } 1349 catch (IOException e) { 1350 throw new Error ("Error getting size of resource: " + e.getMessage()); 1351 } 1352 } 1353 really_open = false; 1354 page_buffer = new byte[page_size]; 1355 } 1356 1357 1358 1360 private void persistOpen(boolean read_only) throws IOException { 1361 if (!really_open) { 1363 data.open(read_only); 1364 there_is_backing_data = true; 1365 really_open = true; 1366 } 1367 } 1368 1369 void persistClose() throws IOException { 1370 if (really_open) { 1372 size = data.getSize(); 1375 data.synch(); 1376 data.close(); 1377 really_open = false; 1378 } 1379 } 1380 1381 public void persistDelete() throws IOException { 1382 if (really_open) { 1385 persistClose(); 1386 } 1387 data.delete(); 1388 there_is_backing_data = false; 1389 } 1390 1391 public void persistSetSize(final long new_size) throws IOException { 1392 if (!really_open) { 1395 persistOpen(false); 1396 } 1397 if (new_size > data.getSize()) { 1399 data.setSize(new_size); 1400 } 1401 } 1402 1403 public void persistPageChange(final long page, 1404 final int off, int len, 1405 DataInputStream din) throws IOException { 1406 if (!really_open) { 1407 persistOpen(false); 1408 } 1409 1410 byte[] buf; 1412 if (len <= page_buffer.length) { 1413 buf = page_buffer; 1416 } 1417 else { 1418 buf = new byte[len]; 1421 } 1422 1423 din.readFully(buf, 0, len); 1425 long pos = page * 8192; data.write(pos + off, buf, 0, len); 1428 } 1429 1430 public void synch() throws IOException { 1431 if (really_open) { 1432 data.synch(); 1433 } 1434 } 1435 1436 public void notifyPostRecover() { 1437 data_exists = data.exists(); 1438 } 1439 1440 1441 1443 1449 public void open(boolean read_only) throws IOException { 1450 this.read_only = read_only; 1451 1452 if (!data_deleted && data.exists()) { 1453 persistOpen(read_only); 1455 } 1456 else { 1457 there_is_backing_data = false; 1458 data_deleted = false; 1459 } 1460 data_open = true; 1461 data_exists = true; 1462 } 1463 1464 1469 public void read(final long page_number, 1470 final byte[] buf, final int off) throws IOException { 1471 1472 synchronized (journal_map) { 1473 if (!data_open) { 1474 throw new IOException("Assertion failed: Data file is not open."); 1475 } 1476 } 1477 1478 final ArrayList all_journal_entries = new ArrayList (4); 1480 try { 1481 synchronized (journal_map) { 1483 int i = ((int) (page_number & 0x0FFFFFFF) % journal_map.length); 1484 JournalEntry entry = (JournalEntry) journal_map[i]; 1485 JournalEntry prev = null; 1486 1487 while (entry != null) { 1488 boolean deleted_hash = false; 1489 1490 JournalFile file = entry.getJournalFile(); 1491 file.addReference(); 1494 1495 if (file.isDeleted()) { 1497 deleted_hash = true; 1498 file.removeReference(); 1500 if (prev == null) { 1502 journal_map[i] = entry.next_page; 1503 } 1504 else { 1505 prev.next_page = entry.next_page; 1506 } 1507 } 1508 else if (entry.getPageNumber() == page_number) { 1510 all_journal_entries.add(entry); 1511 } 1512 else { 1513 file.removeReference(); 1516 } 1517 1518 if (!deleted_hash) { 1520 prev = entry; 1521 } 1522 entry = entry.next_page; 1523 } 1524 } 1525 1526 if (there_is_backing_data) { 1528 long page_position = page_number * page_size; 1529 data.read(page_position, buf, off, page_size); 1531 } 1532 else { 1533 for (int i = off; i < (page_size + off); ++i) { 1535 buf[i] = 0; 1536 } 1537 } 1538 1539 final int sz = all_journal_entries.size(); 1541 for (int i = 0; i < sz; ++i) { 1542 JournalEntry entry = (JournalEntry) all_journal_entries.get(i); 1543 JournalFile file = entry.getJournalFile(); 1544 final long position = entry.getPosition(); 1545 synchronized (file) { 1546 file.buildPage(page_number, position, buf, off); 1547 } 1548 } 1549 1550 } 1551 finally { 1552 1553 final int sz = all_journal_entries.size(); 1555 for (int i = 0; i < sz; ++i) { 1556 JournalEntry entry = (JournalEntry) all_journal_entries.get(i); 1557 JournalFile file = entry.getJournalFile(); 1558 file.removeReference(); 1559 } 1560 1561 } 1562 1563 } 1564 1565 1570 public void write(final long page_number, 1571 byte[] buf, int off, int len) throws IOException { 1572 1573 synchronized (journal_map) { 1574 if (!data_open) { 1575 throw new IOException("Assertion failed: Data file is not open."); 1576 } 1577 1578 JournalEntry journal; 1580 synchronized (top_journal_lock) { 1581 journal = topJournal().logPageModification(name, page_number, 1582 buf, off, len); 1583 } 1584 1585 1589 int i = ((int) (page_number & 0x0FFFFFFF) % journal_map.length); 1591 JournalEntry entry = (JournalEntry) journal_map[i]; 1592 if (entry == null) { 1594 journal_map[i] = journal; 1596 journal.next_page = null; 1597 } 1598 else { 1599 int journal_entry_count = 0; 1602 while (entry.next_page != null) { 1603 entry = entry.next_page; 1604 ++journal_entry_count; 1605 } 1606 entry.next_page = journal; 1608 journal.next_page = null; 1609 1610 if (journal_entry_count > 35) { 1613 int entries_cleaned = 0; 1614 entry = (JournalEntry) journal_map[i]; 1615 JournalEntry prev = null; 1616 1617 while (entry != null) { 1618 boolean deleted_hash = false; 1619 1620 JournalFile file = entry.getJournalFile(); 1621 file.addReference(); 1624 1625 if (file.isDeleted()) { 1627 deleted_hash = true; 1628 file.removeReference(); 1630 if (prev == null) { 1632 journal_map[i] = entry.next_page; 1633 } 1634 else { 1635 prev.next_page = entry.next_page; 1636 } 1637 ++entries_cleaned; 1638 } 1639 file.removeReference(); 1641 1642 if (!deleted_hash) { 1644 prev = entry; 1645 } 1646 entry = entry.next_page; 1647 } 1648 1649 } 1650 } 1651 } 1652 1653 } 1654 1655 1658 public void setSize(long size) throws IOException { 1659 synchronized (journal_map) { 1660 this.size = size; 1661 } 1662 synchronized (top_journal_lock) { 1663 topJournal().logResourceSizeChange(name, size); 1664 } 1665 } 1666 1667 1670 public long getSize() throws IOException { 1671 synchronized (journal_map) { 1672 return this.size; 1673 } 1674 } 1675 1676 1680 public void close() throws IOException { 1681 synchronized (journal_map) { 1682 data_open = false; 1683 } 1684 } 1685 1686 1690 public void delete() throws IOException { 1691 synchronized (top_journal_lock) { 1693 topJournal().logResourceDelete(name); 1694 } 1695 synchronized (journal_map) { 1696 data_exists = false; 1697 data_deleted = true; 1698 size = 0; 1699 } 1700 } 1701 1702 1705 public boolean exists() { 1706 return data_exists; 1707 } 1708 1709 } 1710 1711 1714 private static class JournalSummary { 1715 1716 1719 JournalFile journal_file; 1720 1721 1725 boolean can_be_recovered = false; 1726 1727 1730 long last_checkpoint; 1731 1732 1735 ArrayList resource_list = new ArrayList (); 1736 1737 1740 public JournalSummary(JournalFile journal_file) { 1741 this.journal_file = journal_file; 1742 } 1743 1744 } 1745 1746 1749 private class JournalingThread extends Thread { 1750 1751 private boolean finished = false; 1752 private boolean actually_finished; 1753 1754 1757 JournalingThread() { 1758 setName("Mckoi - Background Journaling"); 1759 setDaemon(true); 1762 } 1763 1764 1765 public void run() { 1766 boolean local_finished = false; 1767 1768 while (!local_finished) { 1769 1770 ArrayList to_process = null; 1771 synchronized (top_journal_lock) { 1772 if (journal_archives.size() > 0) { 1773 to_process = new ArrayList (); 1774 to_process.addAll(journal_archives); 1775 } 1776 } 1777 1778 if (to_process == null) { 1779 synchronized (this) { 1781 if (!finished) { 1782 try { 1783 wait(); 1784 } 1785 catch (InterruptedException e) { } 1786 } 1787 } 1788 1789 } 1790 else if (to_process.size() > 0) { 1791 int sz = to_process.size(); 1793 for (int i = 0; i < sz; ++i) { 1795 JournalFile jf = (JournalFile) to_process.get(i); 1797 try { 1798 jf.persist(8, jf.size()); 1800 jf.closeAndDelete(); 1802 } 1803 catch (IOException e) { 1804 debug.write(Lvl.ERROR, this, "Error persisting journal: " + jf); 1805 debug.writeException(Lvl.ERROR, e); 1806 synchronized (this) { 1809 finished = true; 1810 } 1811 } 1812 } 1813 } 1814 1815 synchronized (this) { 1816 local_finished = finished; 1817 if (to_process != null) { 1819 synchronized (top_journal_lock) { 1820 int sz = to_process.size(); 1821 for (int i = 0; i < sz; ++i) { 1822 journal_archives.remove(0); 1823 } 1824 } 1825 } 1826 notifyAll(); 1828 } 1829 1830 } 1831 1832 synchronized (this) { 1833 actually_finished = true; 1834 notifyAll(); 1835 } 1836 } 1837 1838 public synchronized void finish() { 1839 finished = true; 1840 notifyAll(); 1841 } 1842 1843 public synchronized void waitUntilFinished() { 1844 try { 1845 while (!actually_finished) { 1846 wait(); 1847 } 1848 } 1849 catch (InterruptedException e) { 1850 throw new Error ("Interrupted: " + e.getMessage()); 1851 } 1852 } 1853 1854 1858 public synchronized void persistArchives(int until_size) { 1859 notifyAll(); 1860 int sz; 1861 synchronized (top_journal_lock) { 1862 sz = journal_archives.size(); 1863 } 1864 while (sz > until_size) { 1866 try { 1867 wait(); 1868 } 1869 catch (InterruptedException e) { } 1870 1871 synchronized (top_journal_lock) { 1872 sz = journal_archives.size(); 1873 } 1874 } 1875 } 1876 1877 } 1878 1879} 1880 1881 | Popular Tags |