1 24 25 package com.mckoi.store; 26 27 import java.util.ArrayList ; 28 import java.util.Comparator ; 29 import java.util.Arrays ; 30 import java.io.IOException ; 31 import java.io.File ; 32 import com.mckoi.debug.DebugLogger; 33 import com.mckoi.debug.Lvl; 34 35 42 43 public class LoggingBufferManager { 44 45 48 private static boolean PARANOID_CHECKS = false; 49 50 53 private long current_T; 54 55 58 private int current_page_count; 59 60 63 private ArrayList page_list; 64 65 69 private final Object T_lock = new Object (); 70 71 76 private final BMPage[] page_map; 77 78 81 private int unique_id_seq; 82 83 86 private JournalledSystem journalled_system; 87 88 92 private final int max_pages; 93 94 97 private final int page_size; 98 99 101 104 private boolean check_point_in_progress; 105 106 111 private int write_lock_count; 112 113 116 private final Object write_lock = new Object (); 117 118 119 129 130 131 134 public LoggingBufferManager(File journal_path, boolean read_only, 135 int max_pages, int page_size, 136 StoreDataAccessorFactory sda_factory, 137 DebugLogger debug, boolean enable_logging) { 138 this.max_pages = max_pages; 139 this.page_size = page_size; 140 141 check_point_in_progress = false; 142 write_lock_count = 0; 143 144 current_T = 0; 145 page_list = new ArrayList (); 146 page_map = new BMPage[257]; 147 unique_id_seq = 0; 148 149 journalled_system = new JournalledSystem(journal_path, read_only, 150 page_size, sda_factory, debug, enable_logging); 151 } 152 153 157 public LoggingBufferManager(final File resource_path, 158 final File journal_path, final boolean read_only, final int max_pages, 159 final int page_size, final String file_ext, final long max_slice_size, 160 DebugLogger debug, boolean enable_logging) { 161 this(journal_path, read_only, max_pages, page_size, 162 new StoreDataAccessorFactory() { 163 public StoreDataAccessor createStoreDataAccessor(String resource_name) { 164 return new ScatteringStoreDataAccessor(resource_path, resource_name, 165 file_ext, max_slice_size); 166 } 167 }, debug, enable_logging); 168 } 169 170 173 public void start() throws IOException { 174 journalled_system.start(); 175 } 176 177 180 public void stop() throws IOException { 181 journalled_system.stop(); 182 } 183 184 186 189 JournalledResource createResource(String resource_name) { 190 return journalled_system.createResource(resource_name); 191 } 192 193 197 public void lockForWrite() throws InterruptedException { 198 synchronized (write_lock) { 199 while (check_point_in_progress) { 200 write_lock.wait(); 201 } 202 ++write_lock_count; 203 } 204 } 205 206 211 public void unlockForWrite() { 212 synchronized (write_lock) { 213 --write_lock_count; 214 write_lock.notifyAll(); 215 } 216 } 217 218 232 public void setCheckPoint(boolean flush_journals) 233 throws IOException , InterruptedException { 234 235 synchronized (write_lock) { 238 while (write_lock_count > 0) { 239 write_lock.wait(); 240 } 241 check_point_in_progress = true; 242 } 243 244 try { 245 synchronized (page_map) { 247 for (int i = 0; i < page_map.length; ++i) { 249 BMPage page = page_map[i]; 250 BMPage prev = null; 251 252 while (page != null) { 253 boolean deleted_hash = false; 254 synchronized (page) { 255 page.flush(); 257 258 if (page.notInUse()) { 260 deleted_hash = true; 261 if (prev == null) { 262 page_map[i] = page.hash_next; 263 } 264 else { 265 prev.hash_next = page.hash_next; 266 } 267 } 268 269 } 270 if (!deleted_hash) { 272 prev = page; 273 } 274 page = page.hash_next; 275 } 276 } 277 } 278 279 journalled_system.setCheckPoint(flush_journals); 280 281 } 282 finally { 283 synchronized (write_lock) { 286 check_point_in_progress = false; 287 write_lock.notifyAll(); 288 } 289 } 290 291 } 292 293 294 297 private void pageCreated(final BMPage page) throws IOException { 298 synchronized (T_lock) { 299 300 if (PARANOID_CHECKS) { 301 int i = page_list.indexOf(page); 302 if (i != -1) { 303 BMPage f = (BMPage) page_list.get(i); 304 if (f == page) { 305 throw new Error ("Same page added multiple times."); 306 } 307 if (f != null) { 308 throw new Error ("Duplicate pages."); 309 } 310 } 311 } 312 313 page.t = current_T; 314 ++current_T; 315 316 ++current_page_count; 317 page_list.add(page); 318 319 324 if (current_page_count > max_pages) { 326 Object [] pages = page_list.toArray(); 333 Arrays.sort(pages, PAGE_CACHE_COMPARATOR); 334 335 int purge_size = Math.max((int) (pages.length * 0.20f), 2); 336 for (int i = 0; i < purge_size; ++i) { 337 BMPage dpage = (BMPage) pages[pages.length - (i + 1)]; 338 synchronized (dpage) { 339 dpage.dispose(); 340 } 341 } 342 343 page_list.clear(); 346 for (int i = 0; i < pages.length - purge_size; ++i) { 347 page_list.add(pages[i]); 348 } 349 350 current_page_count -= purge_size; 351 352 } 353 } 354 } 355 356 359 private void pageAccessed(BMPage page) { 360 synchronized (T_lock) { 361 page.t = current_T; 362 ++current_T; 363 ++page.access_count; 364 } 365 } 366 367 370 private static int calcHashCode(long id, long page_number) { 371 return (int) ((id << 6) + (page_number * ((id + 25) << 2))); 372 } 373 374 379 private BMPage fetchPage(JournalledResource data, 380 final long page_number) throws IOException { 381 final long id = data.getID(); 382 383 BMPage prev_page = null; 384 boolean new_page = false; 385 BMPage page; 386 387 synchronized (page_map) { 388 final int p = (calcHashCode(id, page_number) & 0x07FFFFFFF) % 390 page_map.length; 391 page = page_map[p]; 393 while (page != null && !page.isPage(id, page_number)) { 394 prev_page = page; 395 page = page.hash_next; 396 } 397 398 if (page == null) { 400 page = new BMPage(data, page_number, page_size); 401 page.hash_next = page_map[p]; 403 page_map[p] = page; 404 } 405 else { 406 if (prev_page != null) { 408 prev_page.hash_next = page.hash_next; 409 page.hash_next = page_map[p]; 410 page_map[p] = page; 411 } 412 } 413 414 synchronized (page) { 415 if (page.notInUse()) { 418 page.reset(); 419 new_page = true; 420 page.referenceAdd(); 421 } 422 page.referenceAdd(); 424 } 425 426 } 427 428 if (new_page) { 430 pageCreated(page); 431 } 432 else { 433 pageAccessed(page); 434 } 435 436 return page; 438 439 } 440 441 442 450 int readByteFrom(JournalledResource data, long position) throws IOException { 451 final long page_number = position / page_size; 452 int v; 453 454 BMPage page = fetchPage(data, page_number); 455 synchronized (page) { 456 try { 457 page.initialize(); 458 v = ((int) page.read((int) (position % page_size))) & 0x0FF; 459 } 460 finally { 461 page.dispose(); 462 } 463 } 464 465 return v; 466 } 467 468 int readByteArrayFrom(JournalledResource data, 469 long position, byte[] buf, int off, int len) throws IOException { 470 471 final int orig_len = len; 472 long page_number = position / page_size; 473 int start_offset = (int) (position % page_size); 474 int to_read = Math.min(len, page_size - start_offset); 475 476 BMPage page = fetchPage(data, page_number); 477 synchronized (page) { 478 try { 479 page.initialize(); 480 page.read(start_offset, buf, off, to_read); 481 } 482 finally { 483 page.dispose(); 484 } 485 } 486 487 len -= to_read; 488 while (len > 0) { 489 off += to_read; 490 position += to_read; 491 ++page_number; 492 to_read = Math.min(len, page_size); 493 494 page = fetchPage(data, page_number); 495 synchronized (page) { 496 try { 497 page.initialize(); 498 page.read(0, buf, off, to_read); 499 } 500 finally { 501 page.dispose(); 502 } 503 } 504 len -= to_read; 505 } 506 507 return orig_len; 508 } 509 510 void writeByteTo(JournalledResource data, 511 long position, int b) throws IOException { 512 513 if (PARANOID_CHECKS) { 514 synchronized (write_lock) { 515 if (write_lock_count == 0) { 516 System.out.println("Write without a lock!"); 517 new Error ().printStackTrace(); 518 } 519 } 520 } 521 522 final long page_number = position / page_size; 523 524 BMPage page = fetchPage(data, page_number); 525 synchronized (page) { 526 try { 527 page.initialize(); 528 page.write((int) (position % page_size), (byte) b); 529 } 530 finally { 531 page.dispose(); 532 } 533 } 534 } 535 536 void writeByteArrayTo(JournalledResource data, 537 long position, byte[] buf, int off, int len) throws IOException { 538 539 if (PARANOID_CHECKS) { 540 synchronized (write_lock) { 541 if (write_lock_count == 0) { 542 System.out.println("Write without a lock!"); 543 new Error ().printStackTrace(); 544 } 545 } 546 } 547 548 long page_number = position / page_size; 549 int start_offset = (int) (position % page_size); 550 int to_write = Math.min(len, page_size - start_offset); 551 552 BMPage page = fetchPage(data, page_number); 553 synchronized (page) { 554 try { 555 page.initialize(); 556 page.write(start_offset, buf, off, to_write); 557 } 558 finally { 559 page.dispose(); 560 } 561 } 562 len -= to_write; 563 564 while (len > 0) { 565 off += to_write; 566 position += to_write; 567 ++page_number; 568 to_write = Math.min(len, page_size); 569 570 page = fetchPage(data, page_number); 571 synchronized (page) { 572 try { 573 page.initialize(); 574 page.write(0, buf, off, to_write); 575 } 576 finally { 577 page.dispose(); 578 } 579 } 580 len -= to_write; 581 } 582 583 } 584 585 void setDataAreaSize(JournalledResource data, 586 long new_size) throws IOException { 587 data.setSize(new_size); 588 } 589 590 long getDataAreaSize(JournalledResource data) throws IOException { 591 return data.getSize(); 592 } 593 594 void close(JournalledResource data) throws IOException { 595 long id = data.getID(); 596 synchronized (page_map) { 598 for (int i = 0; i < page_map.length; ++i) { 603 BMPage page = page_map[i]; 604 BMPage prev = null; 605 606 while (page != null) { 607 boolean deleted_hash = false; 608 if (page.getID() == id) { 609 synchronized (page) { 611 page.flush(); 613 614 if (page.notInUse()) { 616 deleted_hash = true; 617 if (prev == null) { 618 page_map[i] = page.hash_next; 619 } 620 else { 621 prev.hash_next = page.hash_next; 622 } 623 } 624 } 625 626 } 627 628 if (!deleted_hash) { 630 prev = page; 631 } 632 page = page.hash_next; 633 634 } 635 } 636 } 637 638 data.close(); 639 } 640 641 642 643 645 649 private static final class BMPage { 650 651 654 private final JournalledResource data; 655 656 659 private final long page; 660 661 664 private final int page_size; 665 666 667 670 private byte[] buffer; 671 672 675 private boolean initialized; 676 677 678 681 BMPage hash_next; 682 683 684 688 long t; 689 690 693 int access_count; 694 695 696 699 private int first_write_position; 700 701 704 private int last_write_position; 705 706 709 private int reference_count; 710 711 712 715 BMPage(JournalledResource data, long page, int page_size) { 716 this.data = data; 717 this.page = page; 718 this.reference_count = 0; 719 this.page_size = page_size; 720 reset(); 721 } 722 723 726 void reset() { 727 if (reference_count != 0) { 729 throw new Error ("reset when 'reference_count' is != 0 ( = " + 730 reference_count + " )"); 731 } 732 this.initialized = false; 733 this.t = 0; 734 this.access_count = 0; 735 } 736 737 740 long getID() { 741 return data.getID(); 742 } 743 744 747 void referenceAdd() { 748 ++reference_count; 749 } 750 751 754 private void referenceRemove() { 755 if (reference_count <= 0) { 756 throw new Error ("Too many reference remove."); 757 } 758 --reference_count; 759 } 760 761 765 boolean notInUse() { 766 return reference_count == 0; 767 } 769 770 773 boolean isPage(long in_id, long in_page) { 774 return (getID() == in_id && 775 page == in_page); 776 } 777 778 782 private void readPageContent( 783 long page_number, byte[] buf, int pos) throws IOException { 784 if (pos != 0) { 785 throw new Error ("Assert failed: pos != 0"); 786 } 787 data.read(page_number, buf, pos); 789 } 790 791 795 void flush() throws IOException { 796 if (initialized) { 797 if (last_write_position > -1) { 798 data.write(page, buffer, first_write_position, 800 last_write_position - first_write_position); 801 } 804 first_write_position = Integer.MAX_VALUE; 805 last_write_position = -1; 806 } 807 } 808 809 814 void initialize() throws IOException { 815 if (!initialized) { 816 817 try { 818 819 buffer = new byte[page_size]; 821 readPageContent(page, buffer, 0); 824 initialized = true; 825 826 first_write_position = Integer.MAX_VALUE; 828 last_write_position = -1; 829 830 } 831 catch (IOException e) { 832 System.out.println("IO Error during page initialize: " + e.getMessage()); 835 e.printStackTrace(); 836 throw e; 837 } 838 839 } 840 } 841 842 848 void dispose() throws IOException { 849 referenceRemove(); 850 if (reference_count == 0) { 851 if (initialized) { 852 853 flush(); 856 857 initialized = false; 859 buffer = null; 861 862 } 863 else { 864 buffer = null; 870 } 873 } 874 } 875 876 877 880 byte read(int pos) { 881 return buffer[pos]; 882 } 883 884 887 void read(int pos, byte[] buf, int off, int len) { 888 System.arraycopy(buffer, pos, buf, off, len); 889 } 890 891 894 void write(int pos, byte v) { 895 first_write_position = Math.min(pos, first_write_position); 896 last_write_position = Math.max(pos + 1, last_write_position); 897 898 buffer[pos] = v; 899 } 900 901 904 void write(int pos, byte[] buf, int off, int len) { 905 first_write_position = Math.min(pos, first_write_position); 906 last_write_position = Math.max(pos + len, last_write_position); 907 908 System.arraycopy(buf, off, buffer, pos, len); 909 } 910 911 public boolean equals(Object ob) { 912 BMPage dest_page = (BMPage) ob; 913 return isPage(dest_page.getID(), dest_page.page); 914 } 915 916 } 917 918 921 private static class BResource { 922 923 926 private final long id; 927 928 931 private final String name; 932 933 936 BResource(long id, String name) { 937 this.id = id; 938 this.name = name; 939 } 940 941 944 long getID() { 945 return id; 946 } 947 948 951 String getName() { 952 return name; 953 } 954 955 } 956 957 960 private final Comparator PAGE_CACHE_COMPARATOR = new Comparator () { 961 962 967 private final float pageEnumValue(BMPage page) { 968 final long bounded_page_count = Math.min(page.access_count, 10000); 973 final float v = (1f / bounded_page_count) * ( current_T - page.t ); 974 return v; 975 } 976 977 public int compare(Object ob1, Object ob2) { 978 float v1 = pageEnumValue((BMPage) ob1); 979 float v2 = pageEnumValue((BMPage) ob2); 980 if (v1 > v2) { 981 return 1; 982 } 983 else if (v1 < v2) { 984 return -1; 985 } 986 return 0; 987 } 988 989 }; 990 991 995 public static interface StoreDataAccessorFactory { 996 997 1000 public StoreDataAccessor createStoreDataAccessor(String resource_name); 1001 1002 } 1003 1004 1005} 1006 1007 | Popular Tags |