KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > mckoi > store > JournalledSystem


1 /**
2  * com.mckoi.store.JournalledSystem 11 Jun 2003
3  *
4  * Mckoi SQL Database ( http://www.mckoi.com/database )
5  * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * Version 2 as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License Version 2 for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * Version 2 along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19  *
20  * Change Log:
21  *
22  *
23  */

24
25 package com.mckoi.store;
26
27 import java.io.*;
28 import java.util.HashMap JavaDoc;
29 import java.util.ArrayList JavaDoc;
30 import java.util.Collections JavaDoc;
31 import java.util.Comparator JavaDoc;
32 import com.mckoi.debug.DebugLogger;
33 import com.mckoi.debug.Lvl;
34 import com.mckoi.util.ByteArrayUtil;
35 import com.mckoi.store.LoggingBufferManager.StoreDataAccessorFactory;
36
37 /**
38  * Manages a journalling data store management system. All operations are
39  * written out to a log that can be easily recovered from if a crash occurs.
40  *
41  * @author Tobias Downer
42  */

43
44 class JournalledSystem {
45
46   /**
47    * Set to true for logging behaviour.
48    */

49   private final boolean ENABLE_LOGGING;
50   
51   /**
52    * The path to the journal files.
53    */

54   private final File journal_path;
55   
56   /**
57    * If the journal system is in read only mode.
58    */

59   private final boolean read_only;
60   
61   /**
62    * The page size.
63    */

64   private final int page_size;
65   
66   /**
67    * The map of all resources that are available. (resource_name -> Resource)
68    */

69   private HashMap JavaDoc all_resources;
70
71   /**
72    * The unique sequence id counter for this session.
73    */

74   private long seq_id;
75
76   /**
77    * The archive of journal files currently pending (JournalFile).
78    */

79   private final ArrayList JavaDoc journal_archives;
80   
81   /**
82    * The current top journal file.
83    */

84   private JournalFile top_journal_file;
85
86   /**
87    * The current journal file number.
88    */

89   private long journal_number;
90
91   /**
92    * A factory that creates StoreDataAccessor objects used to access the
93    * resource with the given name.
94    */

95   private final StoreDataAccessorFactory sda_factory;
96
97   /**
98    * Mutex when accessing the top journal file.
99    */

100   private final Object JavaDoc top_journal_lock = new Object JavaDoc();
101
102   /**
103    * A thread that runs in the background and persists information that is in
104    * the journal.
105    */

106   private JournalingThread journaling_thread;
107   
108   /**
109    * A debug log to output information to.
110    */

111   private final DebugLogger debug;
112   
113
114   JournalledSystem(File journal_path, boolean read_only, int page_size,
115                    StoreDataAccessorFactory sda_factory, DebugLogger debug,
116                    boolean enable_logging) {
117     this.journal_path = journal_path;
118     this.read_only = read_only;
119     this.page_size = page_size;
120     this.sda_factory = sda_factory;
121     all_resources = new HashMap JavaDoc();
122     journal_number = 0;
123     journal_archives = new ArrayList JavaDoc();
124     this.debug = debug;
125     this.ENABLE_LOGGING = enable_logging;
126   }
127
128   
129   /**
130    * Returns a journal file name with the given number. The journal number
131    * must be between 10 and 63
132    */

133   private static String JavaDoc getJournalFileName(int number) {
134     if (number < 10 || number > 73) {
135       throw new Error JavaDoc("Journal file name out of range.");
136     }
137     return "jnl" + number;
138   }
139
140   // Lock used during initialization
141
private final Object JavaDoc init_lock = new Object JavaDoc();
142   
143   /**
144    * Starts the journal system.
145    */

146   void start() throws IOException {
147     if (ENABLE_LOGGING) {
148       synchronized (init_lock) {
149         if (journaling_thread == null) {
150           // Start the background journaling thread,
151
journaling_thread = new JournalingThread();
152           journaling_thread.start();
153           // Scan for any changes and make the changes.
154
rollForwardRecover();
155           if (!read_only) {
156             // Create a new top journal file
157
newTopJournalFile();
158           }
159         }
160         else {
161           throw new Error JavaDoc("Assertion failed - already started.");
162         }
163       }
164     }
165   }
166   
167   /**
168    * Stops the journal system. This will persist any pending changes up to the
169    * last check point and then finish.
170    */

171   void stop() throws IOException {
172     if (ENABLE_LOGGING) {
173       synchronized (init_lock) {
174         if (journaling_thread != null) {
175           // Stop the journal thread
176
journaling_thread.persistArchives(0);
177           journaling_thread.finish();
178           journaling_thread.waitUntilFinished();
179           journaling_thread = null;
180         }
181         else {
182           throw new Error JavaDoc("Assertion failed - already stopped.");
183         }
184       }
185
186       if (!read_only) {
187         // Close any remaining journals and roll forward recover (shouldn't
188
// actually be necessary but just incase...)
189
synchronized (top_journal_lock) {
190           // Close all the journals
191
int sz = journal_archives.size();
192           for (int i = 0; i < sz; ++i) {
193             JournalFile jf = (JournalFile) journal_archives.get(i);
194             jf.close();
195           }
196           // Close the top journal
197
topJournal().close();
198           // Scan for journals and make the changes.
199
rollForwardRecover();
200         }
201       }
202
203     }
204   }
205
206   /**
207    * Recovers any lost operations that are currently in the journal. This
208    * retries all logged entries. This would typically be called before any
209    * other IO operations.
210    */

211   void rollForwardRecover() throws IOException {
212 // System.out.println("rollForwardRecover()");
213

214     // The list of all journal files,
215
ArrayList JavaDoc journal_files_list = new ArrayList JavaDoc();
216
217     // Scan the journal path for any journal files.
218
for (int i = 10; i < 74; ++i) {
219       String JavaDoc journal_fn = getJournalFileName(i);
220       File f = new File(journal_path, journal_fn);
221       // If the journal exists, create a summary of the journal
222
if (f.exists()) {
223         if (read_only) {
224           throw new IOException(
225              "Journal file " + f + " exists for a read-only session. " +
226              "There may not be any pending journals for a read-only session.");
227         }
228
229         JournalFile jf = new JournalFile(f, read_only);
230         // Open the journal file for recovery. This will set various
231
// information about the journal such as the last check point and the
232
// id of the journal file.
233
JournalSummary summary = jf.openForRecovery();
234         // If the journal can be recovered from.
235
if (summary.can_be_recovered) {
236           if (debug.isInterestedIn(Lvl.INFORMATION)) {
237             debug.write(Lvl.INFORMATION, this, "Journal " + jf +
238                                                " found - can be recovered.");
239           }
240           journal_files_list.add(summary);
241         }
242         else {
243           if (debug.isInterestedIn(Lvl.INFORMATION)) {
244             debug.write(Lvl.INFORMATION, this, "Journal " + jf +
245                                           " deleting - nothing to recover.");
246           }
247           // Otherwise close and delete it
248
jf.closeAndDelete();
249         }
250       }
251     }
252
253 // if (journal_files_list.size() == 0) {
254
// System.out.println("Nothing to recover.");
255
// }
256

257     // Sort the journal file list from oldest to newest. The oldest journals
258
// are recovered first.
259
Collections.sort(journal_files_list, journal_list_comparator);
260
261     long last_journal_number = -1;
262     
263     // Persist the journals
264
for (int i = 0; i < journal_files_list.size(); ++i) {
265       JournalSummary summary = (JournalSummary) journal_files_list.get(i);
266       
267       // Check the resources for this summary
268
ArrayList JavaDoc res_list = summary.resource_list;
269       for (int n = 0; n < res_list.size(); ++n) {
270         String JavaDoc resource_name = (String JavaDoc) res_list.get(n);
271         // This puts the resource into the hash map.
272
JournalledResource resource = createResource(resource_name);
273       }
274
275       // Assert that we are recovering the journals in the correct order
276
JournalFile jf = summary.journal_file;
277       if (jf.journal_number < last_journal_number) {
278         throw new Error JavaDoc("Assertion failed, sort failed.");
279       }
280       last_journal_number = jf.journal_number;
281
282       if (debug.isInterestedIn(Lvl.INFORMATION)) {
283         debug.write(Lvl.INFORMATION, this, "Recovering: " + jf +
284                                     " (8 .. " + summary.last_checkpoint + ")");
285       }
286
287       jf.persist(8, summary.last_checkpoint);
288       // Then close and delete.
289
jf.closeAndDelete();
290       
291       // Check the resources for this summary and close them
292
for (int n = 0; n < res_list.size(); ++n) {
293         String JavaDoc resource_name = (String JavaDoc) res_list.get(n);
294         AbstractResource resource =
295                               (AbstractResource) createResource(resource_name);
296         // When we finished, make sure the resource is closed again
297
// Close the resource
298
resource.persistClose();
299         // Post recover notification
300
resource.notifyPostRecover();
301       }
302
303     }
304     
305   }
306
307   private Comparator JavaDoc journal_list_comparator = new Comparator JavaDoc() {
308     
309     public int compare(Object JavaDoc ob1, Object JavaDoc ob2) {
310       JournalSummary js1 = (JournalSummary) ob1;
311       JournalSummary js2 = (JournalSummary) ob2;
312
313       long jn1 = js1.journal_file.getJournalNumber();
314       long jn2 = js2.journal_file.getJournalNumber();
315
316       if (jn1 > jn2) {
317         return 1;
318       }
319       else if (jn1 < jn2) {
320         return -1;
321       }
322       else {
323         return 0;
324       }
325     }
326     
327   };
328
329
330   /**
331    * Creates a new top journal file.
332    */

333   private void newTopJournalFile() throws IOException {
334 // // Move the old journal to the archive?
335
// if (top_journal_file != null) {
336
// journal_archives.add(top_journal_file);
337
// }
338

339     String JavaDoc journal_fn = getJournalFileName((int) ((journal_number & 63) + 10));
340     ++journal_number;
341
342     File f = new File(journal_path, journal_fn);
343     if (f.exists()) {
344       throw new IOException("Journal file already exists.");
345     }
346
347     top_journal_file = new JournalFile(f, read_only);
348     top_journal_file.open(journal_number - 1);
349   }
350   
351   
352   
353   /**
354    * Returns the current top journal file.
355    */

356   private JournalFile topJournal() {
357     synchronized (top_journal_lock) {
358       return top_journal_file;
359     }
360   }
361
362   
363   /**
364    * Creates a resource.
365    */

366   public JournalledResource createResource(String JavaDoc resource_name) {
367     AbstractResource resource;
368     synchronized (all_resources) {
369       // Has this resource previously been open?
370
resource = (AbstractResource) all_resources.get(resource_name);
371       if (resource == null) {
372         // No...
373
// Create a unique id for this
374
final long id = seq_id;
375         ++seq_id;
376         // Create the StoreDataAccessor for this resource.
377
StoreDataAccessor accessor =
378                             sda_factory.createStoreDataAccessor(resource_name);
379         if (ENABLE_LOGGING) {
380           resource = new Resource(resource_name, id, accessor);
381         }
382         else {
383           resource = new NonLoggingResource(resource_name, id, accessor);
384         }
385         // Put this in the map.
386
all_resources.put(resource_name, resource);
387       }
388     }
389
390     // Return the resource
391
return resource;
392   }
393
394   /**
395    * Sets a check point in the log. If 'flush_journals' is true then when the
396    * method returns we are guarenteed that all the journals are flushed and the
397    * data is absolutely current. If 'flush_journals' is false then we can't
398    * assume the journals will be empty when the method returns.
399    */

400   void setCheckPoint(boolean flush_journals) throws IOException {
401     // No Logging
402
if (!ENABLE_LOGGING) {
403       return;
404     }
405     // Return if read-only
406
if (read_only) {
407       return;
408     }
409
410     boolean something_to_persist;
411
412     synchronized (top_journal_lock) {
413       JournalFile top_j = topJournal();
414       
415       // When the journal exceeds a threshold then we cycle the top journal
416
if (flush_journals || top_j.size() > (256 * 1024)) {
417         // Cycle to the next journal file
418
newTopJournalFile();
419         // Add this to the archives
420
journal_archives.add(top_j);
421       }
422       something_to_persist = journal_archives.size() > 0;
423       top_j.setCheckPoint();
424     }
425
426     if (something_to_persist) {
427       // Notifies the background thread that there is something to persist.
428
// This will block until there are at most 10 journal files open.
429
journaling_thread.persistArchives(10);
430     }
431
432   }
433   
434   /**
435    * Returns the Resource with the given name.
436    */

437   private AbstractResource getResource(String JavaDoc resource_name) {
438     synchronized (all_resources) {
439       return (AbstractResource) all_resources.get(resource_name);
440     }
441   }
442     
443
444
445   
446
447   // ---------- Inner classes ----------
448

449   /**
450    * A JournalFile represents a file in which modification are logged out to
451    * when changes are made. A JournalFile contains instructions for rebuilding
452    * a resource to a known stable state.
453    */

454   private final class JournalFile {
455
456     /**
457      * The File object of this journal in the file system.
458      */

459     private File file;
460
461     /**
462      * True if the journal file is read only.
463      */

464     private boolean read_only;
465     
466     /**
467      * The StreamFile object for reading and writing entries to/from the
468      * journal.
469      */

470     private StreamFile data;
471
472     /**
473      * A DataOutputStream object used to write entries to the journal file.
474      */

475     private DataOutputStream data_out;
476
477     /**
478      * Small buffer.
479      */

480     private byte[] buffer;
481     
482     /**
483      * A map between a resource name and an id for this journal file.
484      */

485     private HashMap JavaDoc resource_id_map;
486     
487     /**
488      * The sequence id for resources modified in this log.
489      */

490     private long cur_seq_id;
491
492     /**
493      * The journal number of this journal.
494      */

495     private long journal_number;
496
497     /**
498      * True when open.
499      */

500     private boolean is_open;
501     
502     /**
503      * The number of threads currently looking at info in this journal.
504      */

505     private int reference_count;
506
507     /**
508      * Constructs the journal file.
509      */

510     public JournalFile(File file, boolean read_only) {
511       this.file = file;
512       this.read_only = read_only;
513       this.is_open = false;
514       buffer = new byte[36];
515       resource_id_map = new HashMap JavaDoc();
516       cur_seq_id = 0;
517       reference_count = 1;
518     }
519
520     /**
521      * Returns the size of the journal file in bytes.
522      */

523     long size() {
524       return data.length();
525     }
526     
527     /**
528      * Returns the journal number assigned to this journal.
529      */

530     long getJournalNumber() {
531       return journal_number;
532     }
533     
534
535     /**
536      * Opens the journal file. If the journal file exists then an error is
537      * generated.
538      */

539     void open(long journal_number) throws IOException {
540       if (is_open) {
541         throw new IOException("Journal file is already open.");
542       }
543       if (file.exists()) {
544         throw new IOException("Journal file already exists.");
545       }
546       
547       this.journal_number = journal_number;
548       data = new StreamFile(file, read_only ? "r" : "rw");
549       data_out = new DataOutputStream(
550                              new BufferedOutputStream(data.getOutputStream()));
551       data_out.writeLong(journal_number);
552       is_open = true;
553     }
554
555     /**
556      * Opens the journal for recovery. This scans the journal and generates
557      * some statistics about the journal file such as the last check point and
558      * the journal number. If the journal file doesn't exist then an error is
559      * generated.
560      */

561     JournalSummary openForRecovery() throws IOException {
562       if (is_open) {
563         throw new IOException("Journal file is already open.");
564       }
565       if (!file.exists()) {
566         throw new IOException("Journal file does not exists.");
567       }
568
569       // Open the random access file to this journal
570
data = new StreamFile(file, read_only ? "r" : "rw");
571 // data_out = new DataOutputStream(
572
// new BufferedOutputStream(data.getOutputStream()));
573

574       is_open = true;
575       
576       // Create the summary object (by default, not recoverable).
577
JournalSummary summary = new JournalSummary(this);
578
579       long end_pointer = data.length();
580
581       // If end_pointer < 8 then can't recovert this journal
582
if (end_pointer < 8) {
583         return summary;
584       }
585
586       // The input stream.
587
final DataInputStream din = new DataInputStream(
588                               new BufferedInputStream(data.getInputStream()));
589       
590       try {
591         // Set the journal number for this
592
this.journal_number = din.readLong();
593         long position = 8;
594
595         ArrayList JavaDoc checkpoint_res_list = new ArrayList JavaDoc();
596         
597         // Start scan
598
while (true) {
599
600           // If we can't read 12 bytes ahead, return the summary
601
if (position + 12 > end_pointer) {
602             return summary;
603           }
604           
605           long type = din.readLong();
606           int size = din.readInt();
607
608 // System.out.println("Scan: " + type + " pos=" + position + " size=" + size);
609
position = position + size + 12;
610
611           boolean skip_body = true;
612
613           // If checkpoint reached then we are recoverable
614
if (type == 100) {
615             summary.last_checkpoint = position;
616             summary.can_be_recovered = true;
617
618             // Add the resources in this check point
619
summary.resource_list.addAll(checkpoint_res_list);
620             // And clear the temporary list.
621
checkpoint_res_list.clear();
622
623           }
624
625           // If end reached, or type is not understood then return
626
else if (position >= end_pointer ||
627                    type < 1 || type > 7) {
628             return summary;
629           }
630
631           // If we are resource type, then load the resource
632
if (type == 2) {
633             
634             // We don't skip body for this type, we read the content
635
skip_body = false;
636             long id = din.readLong();
637             int str_len = din.readInt();
638             StringBuffer JavaDoc str = new StringBuffer JavaDoc(str_len);
639             for (int i = 0; i < str_len; ++i) {
640               str.append(din.readChar());
641             }
642
643             String JavaDoc resource_name = new String JavaDoc(str);
644             checkpoint_res_list.add(resource_name);
645
646           }
647
648           if (skip_body) {
649             int to_skip = size;
650             while (to_skip > 0) {
651               to_skip -= din.skip(to_skip);
652             }
653           }
654
655         }
656
657       }
658       finally {
659         din.close();
660       }
661
662     }
663
664     /**
665      * Closes the journal file.
666      */

667     void close() throws IOException {
668       synchronized (this) {
669         if (!is_open) {
670           throw new IOException("Journal file is already closed.");
671         }
672
673         data.close();
674         data = null;
675         is_open = false;
676       }
677     }
678
679     /**
680      * Returns true if the journal is deleted.
681      */

682     boolean isDeleted() {
683       synchronized (this) {
684         return data == null;
685       }
686     }
687
688     /**
689      * Closes and deletes the journal file. This may not immediately close and
690      * delete the journal file if there are currently references to it (for
691      * example, in the middle of a read operation).
692      */

693     void closeAndDelete() throws IOException {
694       synchronized (this) {
695         --reference_count;
696         if (reference_count == 0) {
697           // Close and delete the journal file.
698
close();
699           boolean b = file.delete();
700           if (!b) {
701             System.out.println("Unable to delete journal file: " + file);
702           }
703         }
704       }
705     }
706
707     /**
708      * Adds a reference preventing the journal file from being deleted.
709      */

710     void addReference() {
711       synchronized (this) {
712         if (reference_count != 0) {
713           ++reference_count;
714         }
715       }
716     }
717
718     /**
719      * Removes a reference, if we are at the last reference the journal file is
720      * deleted.
721      */

722     void removeReference() throws IOException {
723       closeAndDelete();
724     }
725     
726     
727     /**
728      * Plays the log from the given offset in the file to the next checkpoint.
729      * This will actually persist the log. Returns -1 if the end of the journal
730      * is reached.
731      * <p>
732      * NOTE: This will not verify that the journal is correct. Verification
733      * should be done before the persist.
734      */

735     void persist(final long start, final long end) throws IOException {
736
737       if (debug.isInterestedIn(Lvl.INFORMATION)) {
738         debug.write(Lvl.INFORMATION, this, "Persisting: " + file);
739       }
740
741       final DataInputStream din = new DataInputStream(
742                               new BufferedInputStream(data.getInputStream()));
743       long count = start;
744       // Skip to the offset
745
while (count > 0) {
746         count -= din.skip(count);
747       }
748
749       // The list of resources we updated
750
ArrayList JavaDoc resources_updated = new ArrayList JavaDoc();
751       
752       // A map from resource id to resource name for this journal.
753
HashMap JavaDoc id_name_map = new HashMap JavaDoc();
754
755       boolean finished = false;
756       long position = start;
757
758       while (!finished) {
759         long type = din.readLong();
760         int size = din.readInt();
761         position = position + size + 12;
762
763         if (type == 2) { // Resource id tag
764
long id = din.readLong();
765           int len = din.readInt();
766           StringBuffer JavaDoc buf = new StringBuffer JavaDoc(len);
767           for (int i = 0; i < len; ++i) {
768             buf.append(din.readChar());
769           }
770           String JavaDoc resource_name = new String JavaDoc(buf);
771
772           // Put this in the map
773
id_name_map.put(new Long JavaDoc(id), resource_name);
774
775           if (debug.isInterestedIn(Lvl.INFORMATION)) {
776             debug.write(Lvl.INFORMATION, this, "Journal Command: Tag: " + id +
777                                                " = " + resource_name);
778           }
779           
780           // Add this to the list of resources we updated.
781
resources_updated.add(getResource(resource_name));
782           
783         }
784         else if (type == 6) { // Resource delete
785
long id = din.readLong();
786           String JavaDoc resource_name = (String JavaDoc) id_name_map.get(new Long JavaDoc(id));
787           AbstractResource resource = getResource(resource_name);
788
789           if (debug.isInterestedIn(Lvl.INFORMATION)) {
790             debug.write(Lvl.INFORMATION, this, "Journal Command: Delete: " +
791                                                resource_name);
792           }
793
794           resource.persistDelete();
795
796         }
797         else if (type == 3) { // Resource size change
798
long id = din.readLong();
799           long new_size = din.readLong();
800           String JavaDoc resource_name = (String JavaDoc) id_name_map.get(new Long JavaDoc(id));
801           AbstractResource resource = getResource(resource_name);
802           
803           if (debug.isInterestedIn(Lvl.INFORMATION)) {
804             debug.write(Lvl.INFORMATION, this, "Journal Command: Set Size: " +
805                         resource_name + " size = " + new_size);
806           }
807
808           resource.persistSetSize(new_size);
809
810         }
811         else if (type == 1) { // Page modification
812
long id = din.readLong();
813           long page = din.readLong();
814           int off = din.readInt();
815           int len = din.readInt();
816
817           String JavaDoc resource_name = (String JavaDoc) id_name_map.get(new Long JavaDoc(id));
818           AbstractResource resource = getResource(resource_name);
819
820           if (debug.isInterestedIn(Lvl.INFORMATION)) {
821             debug.write(Lvl.INFORMATION, this,
822                     "Journal Command: Page Modify: " + resource_name +
823                     " page = " + page + " off = " + off +
824                     " len = " + len);
825           }
826
827           resource.persistPageChange(page, off, len, din);
828
829         }
830         else if (type == 100) { // Checkpoint (end)
831

832           if (debug.isInterestedIn(Lvl.INFORMATION)) {
833             debug.write(Lvl.INFORMATION, this, "Journal Command: Check Point.");
834           }
835
836           if (position == end) {
837             finished = true;
838           }
839         }
840
841         else {
842           throw new Error JavaDoc("Unknown tag type: " + type + " position = " + position);
843         }
844
845       } // while (!finished)
846

847       // Synch all the resources that we have updated.
848
int sz = resources_updated.size();
849       for (int i = 0; i < sz; ++i) {
850         AbstractResource r = (AbstractResource) resources_updated.get(i);
851         if (debug.isInterestedIn(Lvl.INFORMATION)) {
852           debug.write(Lvl.INFORMATION, this, "Synch: " + r);
853         }
854         r.synch();
855       }
856
857       din.close();
858
859     }
860
861     /**
862      * Writes a resource identifier to the stream for the resource with the
863      * given name.
864      */

865     private Long JavaDoc writeResourceName(String JavaDoc resource_name,
866                                    DataOutputStream out) throws IOException {
867       Long JavaDoc v;
868       synchronized (resource_id_map) {
869         v = (Long JavaDoc) resource_id_map.get(resource_name);
870         if (v == null) {
871           ++cur_seq_id;
872
873           int len = resource_name.length();
874
875           // Write the header for this resource
876
out.writeLong(2);
877           out.writeInt(8 + 4 + (len * 2));
878           out.writeLong(cur_seq_id);
879           out.writeInt(len);
880           out.writeChars(resource_name);
881         
882           // Put this id in the cache
883
v = new Long JavaDoc(cur_seq_id);
884           resource_id_map.put(resource_name, v);
885         }
886       }
887       
888       return v;
889     }
890
891     /**
892      * Logs that a resource was deleted.
893      */

894     void logResourceDelete(String JavaDoc resource_name) throws IOException {
895
896       synchronized (this) {
897         // Build the header,
898
Long JavaDoc v = writeResourceName(resource_name, data_out);
899
900         // Write the header
901
long resource_id = v.longValue();
902         data_out.writeLong(6);
903         data_out.writeInt(8);
904         data_out.writeLong(resource_id);
905
906       }
907
908     }
909     
910     /**
911      * Logs a resource size change.
912      */

913     void logResourceSizeChange(String JavaDoc resource_name, long new_size)
914                                                          throws IOException {
915       synchronized (this) {
916         // Build the header,
917
Long JavaDoc v = writeResourceName(resource_name, data_out);
918
919         // Write the header
920
long resource_id = v.longValue();
921         data_out.writeLong(3);
922         data_out.writeInt(8 + 8);
923         data_out.writeLong(resource_id);
924         data_out.writeLong(new_size);
925
926       }
927
928     }
929
930     /**
931      * Sets a check point. This will add an entry to the log.
932      */

933     void setCheckPoint() throws IOException {
934       synchronized (this) {
935
936         data_out.writeLong(100);
937         data_out.writeInt(0);
938
939         // Flush and synch the journal file
940
flushAndSynch();
941       }
942     }
943
944     /**
945      * Logs a page modification to the end of the log and returns a pointer
946      * in the file to the modification.
947      */

948     JournalEntry logPageModification(String JavaDoc resource_name, long page_number,
949                              byte[] buf, int off, int len) throws IOException {
950
951       long ref;
952       synchronized (this) {
953         // Build the header,
954
Long JavaDoc v = writeResourceName(resource_name, data_out);
955
956         // The absolute position of the page,
957
final long absolute_position = page_number * page_size;
958
959         // Write the header
960
long resource_id = v.longValue();
961         data_out.writeLong(1);
962         data_out.writeInt(8 + 8 + 4 + 4 + len);
963         data_out.writeLong(resource_id);
964 // data_out.writeLong(page_number);
965
// data_out.writeInt(off);
966
data_out.writeLong(absolute_position / 8192);
967         data_out.writeInt(off + (int) (absolute_position & 8191));
968         data_out.writeInt(len);
969
970         data_out.write(buf, off, len);
971
972         // Flush the changes so we can work out the pointer.
973
data_out.flush();
974         ref = data.length() - len - 36;
975       }
976
977       // Returns a JournalEntry object
978
return new JournalEntry(resource_name, this, ref, page_number);
979     }
980
981
982
983     /**
984      * Reconstructs a modification that is logged in this journal.
985      */

986     void buildPage(long in_page_number,
987                    long position, byte[] buf, int off) throws IOException {
988       long type;
989       long resource_id;
990       long page_number;
991       int page_offset;
992       int page_length;
993
994       synchronized (this) {
995         data.readFully(position, buffer, 0, 36);
996         type = ByteArrayUtil.getLong(buffer, 0);
997         resource_id = ByteArrayUtil.getLong(buffer, 12);
998         page_number = ByteArrayUtil.getLong(buffer, 20);
999         page_offset = ByteArrayUtil.getInt(buffer, 28);
1000        page_length = ByteArrayUtil.getInt(buffer, 32);
1001
1002        // Some asserts,
1003
if (type != 1) {
1004          throw new IOException("Invalid page type. type = " + type +
1005                                " pos = " + position);
1006        }
1007        if (page_number != in_page_number) {
1008          throw new IOException("Page numbers do not match.");
1009        }
1010
1011        // Read the content.
1012
data.readFully(position + 36, buf, off + page_offset, page_length);
1013      }
1014
1015    }
1016
1017    /**
1018     * Synchronizes the log.
1019     */

1020    void flushAndSynch() throws IOException {
1021      synchronized (this) {
1022        data_out.flush();
1023        data.synch();
1024      }
1025    }
1026    
1027
1028     public String JavaDoc toString() {
1029       return "[JOURNAL: " + file.getName() + "]";
1030     }
1031     
1032  }
1033  
1034  /**
1035   * A JournalEntry represents a modification that has been logging in the
1036   * journal for a specific page of a resource. It contains the name of the
1037   * log file, the position in the journal of the modification, and the page
1038   * number.
1039   */

1040  private static final class JournalEntry {
1041
1042    /**
1043     * The resource that this page is on.
1044     */

1045    private final String JavaDoc resource_name;
1046    
1047    /**
1048     * The journal file.
1049     */

1050    private final JournalFile journal;
1051
1052    /**
1053     * The position in the journal file.
1054     */

1055    private final long position;
1056
1057    /**
1058     * The page number of this modification.
1059     */

1060    private final long page_number;
1061
1062
1063    /**
1064     * The next journal entry with the same page number
1065     */

1066    JournalEntry next_page;
1067
1068
1069    /**
1070     * Constructs the entry.
1071     */

1072    public JournalEntry(String JavaDoc resource_name, JournalFile journal,
1073                        long position, long page_number) {
1074      this.resource_name = resource_name;
1075      this.journal = journal;
1076      this.position = position;
1077      this.page_number = page_number;
1078    }
1079
1080    /**
1081     * Returns the journal file for this entry.
1082     */

1083    public JournalFile getJournalFile() {
1084      return journal;
1085    }
1086
1087    /**
1088     * Returns the position of the log entry in the journal file.
1089     */

1090    public long getPosition() {
1091      return position;
1092    }
1093
1094    /**
1095     * Returns the page number of this modification log entry.
1096     */

1097    public long getPageNumber() {
1098      return page_number;
1099    }
1100
1101  }
1102
1103
1104  /**
1105   * An abstract resource.
1106   */

1107  private abstract class AbstractResource implements JournalledResource {
1108    
1109    /**
1110     * The unique name given this resource (the file name).
1111     */

1112    protected final String JavaDoc name;
1113    
1114    /**
1115     * The id assigned to this resource by this session. This id should not
1116     * be used in any external source.
1117     */

1118    protected final long id;
1119    
1120    /**
1121     * The backing object.
1122     */

1123    protected final StoreDataAccessor data;
1124
1125    /**
1126     * True if this resource is read_only.
1127     */

1128    protected boolean read_only;
1129    
1130    /**
1131     * Constructs the resource.
1132     */

1133    AbstractResource(String JavaDoc name, long id, StoreDataAccessor data) {
1134      this.name = name;
1135      this.id = id;
1136      this.data = data;
1137    }
1138
1139
1140    // ---------- Persist methods ----------
1141

1142    abstract void persistClose() throws IOException;
1143
1144    abstract void persistDelete() throws IOException;
1145    
1146    abstract void persistSetSize(final long new_size) throws IOException;
1147    
1148    abstract void persistPageChange(final long page,
1149                                    final int off, int len,
1150                                    DataInputStream din) throws IOException;
1151
1152    abstract void synch() throws IOException;
1153
1154    // Called after a rollForwardRecover to notify the resource to update its
1155
// state to reflect the fact that changes have occurred.
1156
abstract void notifyPostRecover();
1157    
1158    // ----------
1159

1160    /**
1161     * Returns the size of the page.
1162     */

1163    public int getPageSize() {
1164      return page_size;
1165    }
1166
1167    /**
1168     * Returns the unique id of this page.
1169     */

1170    public long getID() {
1171      return id;
1172    }
1173
1174
1175    public String JavaDoc toString() {
1176      return name;
1177    }
1178
1179  }
1180
1181  /**
1182   * An implementation of AbstractResource that doesn't log.
1183   */

1184  private final class NonLoggingResource extends AbstractResource {
1185
1186    /**
1187     * Constructs the resource.
1188     */

1189    NonLoggingResource(String JavaDoc name, long id, StoreDataAccessor data) {
1190      super(name, id, data);
1191    }
1192
1193
1194    // ---------- Persist methods ----------
1195

1196    void persistClose() throws IOException {
1197      // No-op
1198
}
1199
1200    public void persistDelete() throws IOException {
1201      // No-op
1202
}
1203    
1204    public void persistSetSize(final long new_size) throws IOException {
1205      // No-op
1206
}
1207    
1208    public void persistPageChange(final long page,
1209                                  final int off, int len,
1210                                  DataInputStream din) throws IOException {
1211      // No-op
1212
}
1213
1214    public void synch() throws IOException {
1215      data.synch();
1216    }
1217
1218    public void notifyPostRecover() {
1219      // No-op
1220
}
1221    
1222    // ----------
1223

1224    /**
1225     * Opens the resource.
1226     */

1227    public void open(boolean read_only) throws IOException {
1228      this.read_only = read_only;
1229      data.open(read_only);
1230    }
1231
1232    /**
1233     * Reads a page from the resource.
1234     */

1235    public void read(final long page_number,
1236                     final byte[] buf, final int off) throws IOException {
1237      // Read the data.
1238
long page_position = page_number * page_size;
1239      data.read(page_position + off, buf, off, page_size);
1240    }
1241
1242    /**
1243     * Writes a page of some previously specified size.
1244     */

1245    public void write(final long page_number,
1246                      byte[] buf, int off, int len) throws IOException {
1247      long page_position = page_number * page_size;
1248      data.write(page_position + off, buf, off, len);
1249    }
1250
1251    /**
1252     * Sets the size of the resource.
1253     */

1254    public void setSize(long size) throws IOException {
1255      data.setSize(size);
1256    }
1257
1258    /**
1259     * Returns the size of this resource.
1260     */

1261    public long getSize() throws IOException {
1262      return data.getSize();
1263    }
1264    
1265    /**
1266     * Closes the resource.
1267     */

1268    public void close() throws IOException {
1269      data.close();
1270    }
1271
1272    /**
1273     * Deletes the resource.
1274     */

1275    public void delete() throws IOException {
1276      data.delete();
1277    }
1278
1279    /**
1280     * Returns true if the resource currently exists.
1281     */

1282    public boolean exists() {
1283      return data.exists();
1284    }
1285
1286  }
1287
1288  /**
1289   * Represents a resource in this system. A resource is backed by a
1290   * StoreDataAccessor and may have one or more modifications to it in the
1291   * journal.
1292   */

1293  private final class Resource extends AbstractResource {
1294
1295    /**
1296     * The size of the resource.
1297     */

1298    private long size;
1299    
1300    /**
1301     * True if there is actually data to be read in the above object.
1302     */

1303    private boolean there_is_backing_data;
1304
1305    /**
1306     * True if the underlying resource is really open.
1307     */

1308    private boolean really_open;
1309    
1310    /**
1311     * True if the data store exists.
1312     */

1313    private boolean data_exists;
1314
1315    /**
1316     * True if the data resource is open.
1317     */

1318    private boolean data_open;
1319
1320    /**
1321     * True if the data resource was deleted.
1322     */

1323    private boolean data_deleted;
1324    
1325    /**
1326     * The hash of all journal entries on this resource (JournalEntry).
1327     */

1328    private final JournalEntry[] journal_map;
1329
1330    /**
1331     * A temporary buffer the size of a page.
1332     */

1333    private final byte[] page_buffer;
1334    
1335    /**
1336     * Constructs the resource.
1337     */

1338    Resource(String JavaDoc name, long id, StoreDataAccessor data) {
1339      super(name, id, data);
1340      journal_map = new JournalEntry[257];
1341      data_open = false;
1342      data_exists = data.exists();
1343      data_deleted = false;
1344      if (data_exists) {
1345        try {
1346          size = data.getSize();
1347// System.out.println("Setting size of " + name + " to " + size);
1348
}
1349        catch (IOException e) {
1350          throw new Error JavaDoc("Error getting size of resource: " + e.getMessage());
1351        }
1352      }
1353      really_open = false;
1354      page_buffer = new byte[page_size];
1355    }
1356
1357
1358    // ---------- Persist methods ----------
1359

1360    private void persistOpen(boolean read_only) throws IOException {
1361// System.out.println(name + " Open");
1362
if (!really_open) {
1363        data.open(read_only);
1364        there_is_backing_data = true;
1365        really_open = true;
1366      }
1367    }
1368    
1369    void persistClose() throws IOException {
1370// System.out.println(name + " Close");
1371
if (really_open) {
1372        // When we close we reset the size attribute. We do this because of
1373
// the roll forward recovery.
1374
size = data.getSize();
1375        data.synch();
1376        data.close();
1377        really_open = false;
1378      }
1379    }
1380
1381    public void persistDelete() throws IOException {
1382// System.out.println(name + " Delete");
1383
// If open then close
1384
if (really_open) {
1385        persistClose();
1386      }
1387      data.delete();
1388      there_is_backing_data = false;
1389    }
1390    
1391    public void persistSetSize(final long new_size) throws IOException {
1392// System.out.println(name + " Set Size " + size);
1393
// If not open then open.
1394
if (!really_open) {
1395        persistOpen(false);
1396      }
1397      // Don't let us set a size that's smaller than the current size.
1398
if (new_size > data.getSize()) {
1399        data.setSize(new_size);
1400      }
1401    }
1402    
1403    public void persistPageChange(final long page,
1404                                  final int off, int len,
1405                                  DataInputStream din) throws IOException {
1406      if (!really_open) {
1407        persistOpen(false);
1408      }
1409
1410      // Buffer to read the page content into
1411
byte[] buf;
1412      if (len <= page_buffer.length) {
1413        // If length is smaller or equal to the size of a page then use the
1414
// local page buffer.
1415
buf = page_buffer;
1416      }
1417      else {
1418        // Otherwise create a new buffer of the required size (this may happen
1419
// if the page size changes between sessions).
1420
buf = new byte[len];
1421      }
1422
1423      // Read the change from the input stream
1424
din.readFully(buf, 0, len);
1425      // Write the change out to the underlying resource container
1426
long pos = page * 8192; //page_size;
1427
data.write(pos + off, buf, 0, len);
1428    }
1429
1430    public void synch() throws IOException {
1431      if (really_open) {
1432        data.synch();
1433      }
1434    }
1435
1436    public void notifyPostRecover() {
1437      data_exists = data.exists();
1438    }
1439
1440    
1441    // ----------
1442

1443    /**
1444     * Opens the resource. This method will check if the resource exists. If
1445     * it doesn't exist the 'read' method will return just the journal
1446     * modifications of a page. If it does exist it opens the resource and uses
1447     * that as the backing to any 'read' operations.
1448     */

1449    public void open(boolean read_only) throws IOException {
1450      this.read_only = read_only;
1451      
1452      if (!data_deleted && data.exists()) {
1453        // It does exist so open it.
1454
persistOpen(read_only);
1455      }
1456      else {
1457        there_is_backing_data = false;
1458        data_deleted = false;
1459      }
1460      data_open = true;
1461      data_exists = true;
1462    }
1463
1464    /**
1465     * Reads a page from the resource. This method reconstructs the page
1466     * from the underlying data, and from any journal entries. This should
1467     * read the data to be put into a buffer in memory.
1468     */

1469    public void read(final long page_number,
1470                     final byte[] buf, final int off) throws IOException {
1471
1472      synchronized (journal_map) {
1473        if (!data_open) {
1474          throw new IOException("Assertion failed: Data file is not open.");
1475        }
1476      }
1477
1478      // The list of all journal entries on this page number
1479
final ArrayList JavaDoc all_journal_entries = new ArrayList JavaDoc(4);
1480      try {
1481        // The map index.
1482
synchronized (journal_map) {
1483          int i = ((int) (page_number & 0x0FFFFFFF) % journal_map.length);
1484          JournalEntry entry = (JournalEntry) journal_map[i];
1485          JournalEntry prev = null;
1486  
1487          while (entry != null) {
1488            boolean deleted_hash = false;
1489
1490            JournalFile file = entry.getJournalFile();
1491            // Note that once we have a reference the journal file can not be
1492
// deleted.
1493
file.addReference();
1494  
1495            // If the file is closed (or deleted)
1496
if (file.isDeleted()) {
1497              deleted_hash = true;
1498              // Deleted so remove the reference to the journal
1499
file.removeReference();
1500              // Remove the journal entry from the chain.
1501
if (prev == null) {
1502                journal_map[i] = entry.next_page;
1503              }
1504              else {
1505                prev.next_page = entry.next_page;
1506              }
1507            }
1508            // Else if not closed then is this entry the page number?
1509
else if (entry.getPageNumber() == page_number) {
1510              all_journal_entries.add(entry);
1511            }
1512            else {
1513              // Not the page we are looking for so remove the reference to the
1514
// file.
1515
file.removeReference();
1516            }
1517
1518            // Only move prev is we have NOT deleted a hash entry
1519
if (!deleted_hash) {
1520              prev = entry;
1521            }
1522            entry = entry.next_page;
1523          }
1524        }
1525  
1526        // Read any data from the underlying file
1527
if (there_is_backing_data) {
1528          long page_position = page_number * page_size;
1529          // First read the page from the underlying store.
1530
data.read(page_position, buf, off, page_size);
1531        }
1532        else {
1533          // Clear the buffer
1534
for (int i = off; i < (page_size + off); ++i) {
1535            buf[i] = 0;
1536          }
1537        }
1538  
1539        // Rebuild from the journal file(s)
1540
final int sz = all_journal_entries.size();
1541        for (int i = 0; i < sz; ++i) {
1542          JournalEntry entry = (JournalEntry) all_journal_entries.get(i);
1543          JournalFile file = entry.getJournalFile();
1544          final long position = entry.getPosition();
1545          synchronized (file) {
1546            file.buildPage(page_number, position, buf, off);
1547          }
1548        }
1549
1550      }
1551      finally {
1552
1553        // Make sure we remove the reference for all the journal files.
1554
final int sz = all_journal_entries.size();
1555        for (int i = 0; i < sz; ++i) {
1556          JournalEntry entry = (JournalEntry) all_journal_entries.get(i);
1557          JournalFile file = entry.getJournalFile();
1558          file.removeReference();
1559        }
1560
1561      }
1562
1563    }
1564
1565    /**
1566     * Writes a page of some previously specified size to the top log. This
1567     * will add a single entry to the log and any 'read' operations after will
1568     * contain the written data.
1569     */

1570    public void write(final long page_number,
1571                      byte[] buf, int off, int len) throws IOException {
1572
1573      synchronized (journal_map) {
1574        if (!data_open) {
1575          throw new IOException("Assertion failed: Data file is not open.");
1576        }
1577
1578        // Make this modification in the log
1579
JournalEntry journal;
1580        synchronized (top_journal_lock) {
1581          journal = topJournal().logPageModification(name, page_number,
1582                                                     buf, off, len);
1583        }
1584
1585        // This adds the modification to the END of the hash list. This means
1586
// when we reconstruct the page the journals will always be in the
1587
// correct order - from oldest to newest.
1588

1589        // The map index.
1590
int i = ((int) (page_number & 0x0FFFFFFF) % journal_map.length);
1591        JournalEntry entry = (JournalEntry) journal_map[i];
1592        // Make sure this entry is added to the END
1593
if (entry == null) {
1594          // Add at the head if no first entry
1595
journal_map[i] = journal;
1596          journal.next_page = null;
1597        }
1598        else {
1599          // Otherwise search to the end
1600
// The number of journal entries in the linked list
1601
int journal_entry_count = 0;
1602          while (entry.next_page != null) {
1603            entry = entry.next_page;
1604            ++journal_entry_count;
1605          }
1606          // and add to the end
1607
entry.next_page = journal;
1608          journal.next_page = null;
1609
1610          // If there are over 35 journal entries, scan and remove all entries
1611
// on journals that have persisted
1612
if (journal_entry_count > 35) {
1613            int entries_cleaned = 0;
1614            entry = (JournalEntry) journal_map[i];
1615            JournalEntry prev = null;
1616
1617            while (entry != null) {
1618              boolean deleted_hash = false;
1619
1620              JournalFile file = entry.getJournalFile();
1621              // Note that once we have a reference the journal file can not be
1622
// deleted.
1623
file.addReference();
1624  
1625              // If the file is closed (or deleted)
1626
if (file.isDeleted()) {
1627                deleted_hash = true;
1628                // Deleted so remove the reference to the journal
1629
file.removeReference();
1630                // Remove the journal entry from the chain.
1631
if (prev == null) {
1632                  journal_map[i] = entry.next_page;
1633                }
1634                else {
1635                  prev.next_page = entry.next_page;
1636                }
1637                ++entries_cleaned;
1638              }
1639              // Remove the reference
1640
file.removeReference();
1641
1642              // Only move prev is we have NOT deleted a hash entry
1643
if (!deleted_hash) {
1644                prev = entry;
1645              }
1646              entry = entry.next_page;
1647            }
1648
1649          }
1650        }
1651      }
1652
1653    }
1654
1655    /**
1656     * Sets the size of the resource.
1657     */

1658    public void setSize(long size) throws IOException {
1659      synchronized (journal_map) {
1660        this.size = size;
1661      }
1662      synchronized (top_journal_lock) {
1663        topJournal().logResourceSizeChange(name, size);
1664      }
1665    }
1666
1667    /**
1668     * Returns the size of this resource.
1669     */

1670    public long getSize() throws IOException {
1671      synchronized (journal_map) {
1672        return this.size;
1673      }
1674    }
1675    
1676    /**
1677     * Closes the resource. This will actually simply log that the resource
1678     * has been closed.
1679     */

1680    public void close() throws IOException {
1681      synchronized (journal_map) {
1682        data_open = false;
1683      }
1684    }
1685
1686    /**
1687     * Deletes the resource. This will actually simply log that the resource
1688     * has been deleted.
1689     */

1690    public void delete() throws IOException {
1691      // Log that this resource was deleted.
1692
synchronized (top_journal_lock) {
1693        topJournal().logResourceDelete(name);
1694      }
1695      synchronized (journal_map) {
1696        data_exists = false;
1697        data_deleted = true;
1698        size = 0;
1699      }
1700    }
1701
1702    /**
1703     * Returns true if the resource currently exists.
1704     */

1705    public boolean exists() {
1706      return data_exists;
1707    }
1708
1709  }
1710
1711  /**
1712   * Summary information about a journal.
1713   */

1714  private static class JournalSummary {
1715
1716    /**
1717     * The JournalFile object that is a summary of.
1718     */

1719    JournalFile journal_file;
1720
1721    /**
1722     * True if the journal is recoverable (has one or more complete check
1723     * points available).
1724     */

1725    boolean can_be_recovered = false;
1726
1727    /**
1728     * The position of the last checkpoint in the journal.
1729     */

1730    long last_checkpoint;
1731
1732    /**
1733     * The list of all resource names that this journal 'touches'.
1734     */

1735    ArrayList JavaDoc resource_list = new ArrayList JavaDoc();
1736
1737    /**
1738     * Constructor.
1739     */

1740    public JournalSummary(JournalFile journal_file) {
1741      this.journal_file = journal_file;
1742    }
1743    
1744  }
1745
1746  /**
1747   * Thread that persists the journal in the backgroudn.
1748   */

1749  private class JournalingThread extends Thread JavaDoc {
1750
1751    private boolean finished = false;
1752    private boolean actually_finished;
1753
1754    /**
1755     * Constructor.
1756     */

1757    JournalingThread() {
1758      setName("Mckoi - Background Journaling");
1759      // This is a daemon thread. it should be safe if this thread
1760
// dies at any time.
1761
setDaemon(true);
1762    }
1763
1764    
1765    public void run() {
1766      boolean local_finished = false;
1767
1768      while (!local_finished) {
1769
1770        ArrayList JavaDoc to_process = null;
1771        synchronized (top_journal_lock) {
1772          if (journal_archives.size() > 0) {
1773            to_process = new ArrayList JavaDoc();
1774            to_process.addAll(journal_archives);
1775          }
1776        }
1777
1778        if (to_process == null) {
1779          // Nothing to process so wait
1780
synchronized (this) {
1781            if (!finished) {
1782              try {
1783                wait();
1784              }
1785              catch (InterruptedException JavaDoc e) { /* ignore */ }
1786            }
1787          }
1788
1789        }
1790        else if (to_process.size() > 0) {
1791          // Something to process, so go ahead and process the journals,
1792
int sz = to_process.size();
1793          // For all journals
1794
for (int i = 0; i < sz; ++i) {
1795            // Pick the lowest journal to persist
1796
JournalFile jf = (JournalFile) to_process.get(i);
1797            try {
1798              // Persist the journal
1799
jf.persist(8, jf.size());
1800              // Close and then delete the journal file
1801
jf.closeAndDelete();
1802            }
1803            catch (IOException e) {
1804              debug.write(Lvl.ERROR, this, "Error persisting journal: " + jf);
1805              debug.writeException(Lvl.ERROR, e);
1806              // If there is an error persisting the best thing to do is
1807
// finish
1808
synchronized (this) {
1809                finished = true;
1810              }
1811            }
1812          }
1813        }
1814
1815        synchronized (this) {
1816          local_finished = finished;
1817          // Remove the journals that we have just persisted.
1818
if (to_process != null) {
1819            synchronized (top_journal_lock) {
1820              int sz = to_process.size();
1821              for (int i = 0; i < sz; ++i) {
1822                journal_archives.remove(0);
1823              }
1824            }
1825          }
1826          // Notify any threads waiting
1827
notifyAll();
1828        }
1829
1830      }
1831
1832      synchronized (this) {
1833        actually_finished = true;
1834        notifyAll();
1835      }
1836    }
1837    
1838    public synchronized void finish() {
1839      finished = true;
1840      notifyAll();
1841    }
1842    
1843    public synchronized void waitUntilFinished() {
1844      try {
1845        while (!actually_finished) {
1846          wait();
1847        }
1848      }
1849      catch (InterruptedException JavaDoc e) {
1850        throw new Error JavaDoc("Interrupted: " + e.getMessage());
1851      }
1852    }
1853    
1854    /**
1855     * Persists the journal_archives list until the list is at least the
1856     * given size.
1857     */

1858    public synchronized void persistArchives(int until_size) {
1859      notifyAll();
1860      int sz;
1861      synchronized (top_journal_lock) {
1862        sz = journal_archives.size();
1863      }
1864      // Wait until the sz is smaller than 'until_size'
1865
while (sz > until_size) {
1866        try {
1867          wait();
1868        }
1869        catch (InterruptedException JavaDoc e) { /* ignore */ }
1870
1871        synchronized (top_journal_lock) {
1872          sz = journal_archives.size();
1873        }
1874      }
1875    }
1876
1877  }
1878
1879}
1880
1881
Popular Tags