KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > db > DistributedWebDBWriter


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.db;
5
6 import java.io.*;
7 import java.util.*;
8 import java.util.logging.*;
9 import java.nio.channels.*;
10
11 import net.nutch.io.*;
12 import net.nutch.fs.*;
13 import net.nutch.util.*;
14 import net.nutch.pagedb.*;
15 import net.nutch.linkdb.*;
16
17 /***************************************************
18  * This is a wrapper class that allows us to reorder
19  * write operations to the linkdb and pagedb. It is
20  * useful only for objects like UpdateDatabaseTool,
21  * which just does writes.
22  *
23  * The WebDBWriter is a traditional single-pass database writer.
24  * It does not cache any instructions to disk (but it does
25  * in memory, with possible resorting). It certainly does
26  * nothing in a distributed fashion.
27  *
28  * There are other implementors of IWebDBWriter that do
29  * all that fancy stuff.
30  *
31  * @author Mike Cafarella
32  *************************************************/

33 public class DistributedWebDBWriter implements IWebDBWriter {
34     static final Logger LOG = LogFormatter.getLogger("net.nutch.db.WebDBWriter");
35     static final byte CUR_VERSION = 0;
36     static final byte OPEN_COUNTER_VERSION = 0;
37     static final byte CLOSE_COUNTER_VERSION = 0;
38     static final byte MACHINE_INFO_VERSION = 0;
39
40     // magic number
41
static int READY_TO_USE = 0xbabecafe;
42     static int IS_COMPLETE = 0xbabe0000;
43     static int WRITE_LOCK_INFO = 0xcafe0000;
44     static long LONG_TIMEOUT = 10 * 1000;
45
46     // db opcodes
47
static final byte ADD_PAGE = 0;
48     static final byte ADD_PAGE_WITH_SCORE = 1;
49     static final byte ADD_PAGE_IFN_PRESENT = 2;
50     static final byte DEL_PAGE = 3;
51     static final int ADD_LINK = 0;
52     static final int DEL_LINK = 1;
53     static final int DEL_SINGLE_LINK = 2;
54
55     // filenames
56
static final String JavaDoc PAGES_BY_URL = "pagesByURL";
57     static final String JavaDoc PAGES_BY_MD5 = "pagesByMD5";
58     static final String JavaDoc LINKS_BY_URL = "linksByURL";
59     static final String JavaDoc LINKS_BY_MD5 = "linksByMD5";
60     static final String JavaDoc STATS_FILE = "stats";
61     static final String JavaDoc META_SHAREGROUP = "metashare";
62     static final String JavaDoc METAINFO = "metainfo";
63
64     // Result codes for page-url comparisons
65
static final int NO_OUTLINKS = 0;
66     static final int HAS_OUTLINKS = 1;
67     static final int LINK_INVALID = 2;
68
69     /********************************************
70      * PageInstruction holds an operation over a Page.
71      *********************************************/

72     public static class PageInstruction implements WritableComparable {
73         byte opcode;
74         boolean hasLink;
75         Page page;
76         Link link;
77
78         /**
79          */

80         public PageInstruction() {}
81
82         /**
83          */

84         public PageInstruction(Page page, int opcode) {
85             set(page, opcode);
86         }
87
88         /**
89          */

90         public PageInstruction(Page page, Link link, int opcode) {
91             set(page, link, opcode);
92         }
93
94         /**
95          * Init from another PageInstruction object.
96          */

97         public void set(PageInstruction that) {
98             this.opcode = that.opcode;
99
100             if (this.page == null) {
101                 this.page = new Page();
102             }
103             this.page.set(that.page);
104
105             if (this.link == null) {
106                 this.link = new Link();
107             }
108             this.hasLink = that.hasLink;
109             if (this.hasLink) {
110                 this.link.set(that.link);
111             }
112         }
113
114         /**
115          * Init PageInstruction with no Link
116          */

117         public void set(Page page, int opcode) {
118             this.opcode = (byte) opcode;
119             this.page = page;
120             this.hasLink = false;
121             this.link = null;
122         }
123
124         /**
125          * Init PageInstruction with a Link
126          */

127         public void set(Page page, Link link, int opcode) {
128             this.opcode = (byte) opcode;
129             this.page = page;
130             this.hasLink = true;
131             this.link = link;
132         }
133
134         //
135
// WritableComparable
136
//
137
public int compareTo(Object JavaDoc o) {
138             int pageResult = this.page.compareTo(((PageInstruction) o).page);
139             if (pageResult != 0) {
140                 return pageResult;
141             } else {
142                 return this.opcode - (((PageInstruction) o).opcode);
143             }
144         }
145         public void write(DataOutput out) throws IOException {
146             out.writeByte(opcode);
147             page.write(out);
148             out.writeByte(hasLink ? 1 : 0);
149             if (hasLink) {
150                 link.write(out);
151             }
152         }
153         public void readFields(DataInput in) throws IOException {
154             opcode = in.readByte();
155             if (page == null) {
156                 page = new Page();
157             }
158             page.readFields(in);
159             
160             if (link == null) {
161                 link = new Link();
162             }
163             hasLink = (1 == in.readByte());
164             if (hasLink) {
165                 link.readFields(in);
166             }
167         }
168         public Page getPage() {
169             return page;
170         }
171         public Link getLink() {
172             if (hasLink) {
173                 return link;
174             } else {
175                 return null;
176             }
177         }
178         public int getInstruction() {
179             return opcode;
180         }
181
182         /**
183          * Sorts the instruction first by Page, then by opcode.
184          */

185         public static class PageComparator extends WritableComparator {
186             private static final Page.Comparator PAGE_COMPARATOR =
187             new Page.Comparator();
188
189             public PageComparator() { super(PageInstruction.class); }
190
191             /** Optimized comparator. */
192             public int compare(byte[] b1, int s1, int l1,
193                                byte[] b2, int s2, int l2) {
194                 int opcode1 = b1[s1];
195                 int opcode2 = b2[s2];
196                 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
197                 if (c != 0)
198                     return c;
199                 return opcode1 - opcode2;
200             }
201         }
202  
203         /*****************************************************
204          * Sorts the instruction first by url, then by opcode.
205          *****************************************************/

206         public static class UrlComparator extends WritableComparator {
207             private static final Page.UrlComparator PAGE_COMPARATOR =
208             new Page.UrlComparator();
209
210             public UrlComparator() { super(PageInstruction.class); }
211
212             /**
213              * We need to sort by ordered URLs. First, we sort by
214              * URL, then by opcode.
215              */

216             public int compare(WritableComparable a, WritableComparable b) {
217                 PageInstruction instructionA = (PageInstruction)a;
218                 PageInstruction instructionB = (PageInstruction)b;
219                 Page pageA = instructionA.getPage();
220                 Page pageB = instructionB.getPage();
221
222                 int result = pageA.getURL().compareTo(pageB.getURL());
223                 if (result != 0) {
224                     return result;
225                 } else {
226                     return instructionA.opcode - instructionB.opcode;
227                 }
228             }
229
230             /**
231              * Optimized comparator.
232              */

233             public int compare(byte[] b1, int s1, int l1,
234                                byte[] b2, int s2, int l2) {
235                 int opcode1 = b1[s1];
236                 int opcode2 = b2[s2];
237                 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
238                 if (c != 0)
239                     return c;
240                 return opcode1 - opcode2;
241             }
242         }
243     }
244
245     /********************************************************
246      * PageInstructionWriter very efficiently writes a
247      * PageInstruction to an EditSectionGroupWriter. Much better
248      * than calling "writer.append(new PageInstruction())"
249      ********************************************************/

250     public static class PageInstructionWriter {
251         PageInstruction pi = new PageInstruction();
252
253         /**
254          */

255         public PageInstructionWriter() {
256         }
257
258         /**
259          * Append the PageInstruction info to the indicated SequenceFile,
260          * and keep the PI for later reuse.
261          */

262         public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Page page, int opcode, Writable val) throws IOException {
263             pi.set(page, opcode);
264             writer.append(pi, val);
265         }
266
267         /**
268          * Append the PageInstruction info to the indicated SequenceFile,
269          * and keep the PI for later reuse.
270          */

271         public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Page page, Link link, int opcode, Writable val) throws IOException {
272             pi.set(page, link, opcode);
273             writer.append(pi, val);
274         }
275     }
276
277     /*************************************************************
278      * Reduce multiple instructions for a given url to the single effective
279      * instruction. ADD is prioritized highest, then ADD_IFN_PRESENT, and then
280      * DEL. Not coincidentally, this is opposite the order they're sorted in.
281      **************************************************************/

282     private static class DeduplicatingPageSequenceReader {
283         SequenceFile.Reader edits;
284         PageInstruction current = new PageInstruction();
285         UTF8 currentUrl = new UTF8();
286         boolean haveCurrent;
287
288         /**
289          */

290         public DeduplicatingPageSequenceReader(SequenceFile.Reader edits) throws IOException {
291             this.edits = edits;
292             this.haveCurrent = edits.next(current, NullWritable.get());
293         }
294
295         /**
296          */

297         public boolean next(PageInstruction result) throws IOException {
298             if (!haveCurrent) {
299                 return false;
300             }
301         
302             currentUrl.set(current.getPage().getURL());
303             result.set(current); // take the first instruction
304

305             do {
306                 // skip the rest
307
} while ((haveCurrent = edits.next(current, NullWritable.get())) &&
308                      currentUrl.compareTo(current.getPage().getURL()) == 0);
309             return true;
310         }
311     }
312
313
314     /*************************************************
315      * Holds an instruction over a Link.
316      *************************************************/

317     public static class LinkInstruction implements WritableComparable {
318         Link link;
319         int instruction;
320
321         /**
322          */

323         public LinkInstruction() {
324         }
325
326         /**
327          */

328         public LinkInstruction(Link link, int instruction) {
329             set(link, instruction);
330         }
331
332         /**
333          * Re-init from another LinkInstruction's info.
334          */

335         public void set(LinkInstruction that) {
336             this.instruction = that.instruction;
337           
338             if (this.link == null)
339                 this.link = new Link();
340
341             this.link.set(that.link);
342         }
343
344         /**
345          * Re-init with a Link and an instruction
346          */

347         public void set(Link link, int instruction) {
348             this.link = link;
349             this.instruction = instruction;
350         }
351
352         //
353
// WritableComparable
354
//
355
public int compareTo(Object JavaDoc o) {
356             return this.link.compareTo(((LinkInstruction) o).link);
357         }
358         public void write(DataOutput out) throws IOException {
359             out.writeByte(instruction);
360             link.write(out);
361         }
362         public void readFields(DataInput in) throws IOException {
363             this.instruction = in.readByte();
364             if (link == null)
365                 link = new Link();
366             link.readFields(in);
367         }
368         public Link getLink() {
369             return link;
370         }
371         public int getInstruction() {
372             return instruction;
373         }
374
375         /*******************************************************
376          * Sorts the instruction first by Md5, then by opcode.
377          *******************************************************/

378         public static class MD5Comparator extends WritableComparator {
379             private static final Link.MD5Comparator MD5_COMPARATOR =
380             new Link.MD5Comparator();
381
382             public MD5Comparator() { super(LinkInstruction.class); }
383
384             public int compare(WritableComparable a, WritableComparable b) {
385                 LinkInstruction instructionA = (LinkInstruction)a;
386                 LinkInstruction instructionB = (LinkInstruction)b;
387                 return instructionA.link.md5Compare(instructionB.link);
388             }
389
390             /** Optimized comparator. */
391             public int compare(byte[] b1, int s1, int l1,
392                                byte[] b2, int s2, int l2) {
393                 return MD5_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
394             }
395         }
396  
397         /*********************************************************
398          * Sorts the instruction first by url, then by opcode.
399          *********************************************************/

400         public static class UrlComparator extends WritableComparator {
401             private static final Link.UrlComparator URL_COMPARATOR =
402             new Link.UrlComparator();
403
404             public UrlComparator() { super(LinkInstruction.class); }
405
406             public int compare(WritableComparable a, WritableComparable b) {
407                 LinkInstruction instructionA = (LinkInstruction)a;
408                 LinkInstruction instructionB = (LinkInstruction)b;
409                 return instructionA.link.urlCompare(instructionB.link);
410
411             }
412
413             /**
414              * Optimized comparator.
415              */

416             public int compare(byte[] b1, int s1, int l1,
417                                byte[] b2, int s2, int l2) {
418                 return URL_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
419             }
420         }
421     }
422
423     /*******************************************************
424      * LinkInstructionWriter very efficiently writes a
425      * LinkInstruction to an EditSectionGroupWriter. Much better
426      * than calling "writer.append(new LinkInstruction())"
427      ********************************************************/

428     public static class LinkInstructionWriter {
429         LinkInstruction li = new LinkInstruction();
430
431         /**
432          */

433         public LinkInstructionWriter() {
434         }
435
436         /**
437          * Append the LinkInstruction info to the indicated SequenceFile
438          * and keep the LI for later reuse.
439          */

440         public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Link link, int opcode, Writable val) throws IOException {
441             li.set(link, opcode);
442             writer.append(li, val);
443         }
444     }
445
446     /********************************************************
447      * This class deduplicates link operations. We want to
448      * sort by MD5, then by URL. But all operations
449      * should be unique.
450      *********************************************************/

451     class DeduplicatingLinkSequenceReader {
452         Link currentKey = new Link();
453         LinkInstruction current = new LinkInstruction();
454         SequenceFile.Reader edits;
455         boolean haveCurrent;
456
457         /**
458          */

459         public DeduplicatingLinkSequenceReader(SequenceFile.Reader edits) throws IOException {
460             this.edits = edits;
461             this.haveCurrent = edits.next(current, NullWritable.get());
462         }
463
464
465         /**
466          * The incoming stream of edits is sorted first by MD5, then by URL.
467          * MD5-only values always come before MD5+URL.
468          */

469         public boolean next(LinkInstruction key) throws IOException {
470             if (! haveCurrent) {
471                 return false;
472             }
473
474             currentKey.set(current.getLink());
475             
476             do {
477                 key.set(current);
478             } while ((haveCurrent = edits.next(current, NullWritable.get())) &&
479                      currentKey.compareTo(current.getLink()) == 0);
480             return true;
481         }
482     }
483
484
485     /**************************************************
486      * The CloseProcessor class is used when we close down
487      * the webdb. We give it the path, members, and class values
488      * needed to apply changes to any of our 4 data tables.
489      *
490      * This is an abstract class. Each subclass must define
491      * the exact merge procedure. However, file-handling
492      * and edit-processing is standardized as much as possible.
493      *
494      **************************************************/

495     private abstract class CloseProcessor {
496         String JavaDoc basename;
497         String JavaDoc curDBPart;
498         MapFile.Reader oldDb;
499         EditSectionGroupWriter editWriter;
500         SequenceFile.Sorter sorter;
501         WritableComparator comparator;
502         Class JavaDoc keyClass, valueClass;
503         long itemsWritten = 0;
504
505         /**
506          * Store away these members for later use.
507          */

508         CloseProcessor(String JavaDoc basename, MapFile.Reader oldDb, EditSectionGroupWriter editWriter, SequenceFile.Sorter sorter, WritableComparator comparator, Class JavaDoc keyClass, Class JavaDoc valueClass, String JavaDoc curDBPart) {
509             this.basename = basename;
510             this.oldDb = oldDb;
511             this.editWriter = editWriter;
512             this.sorter = sorter;
513             this.comparator = comparator;
514             this.keyClass = keyClass;
515             this.valueClass = valueClass;
516             this.curDBPart = curDBPart;
517         }
518
519         /**
520          * Perform the shutdown sequence for this Processor.
521          * There is a lot of file-moving and edit-sorting that
522          * is common across all the 4 tables.
523          *
524          * Returns how many items were written out by this close().
525          */

526         long closeDown(File workingDir, File outputDir) throws IOException {
527             //
528
// Done adding edits, so close edit-writer.
529
//
530
editWriter.close();
531
532             //
533
// Where the output is going
534
//
535
File sectionDir = new File(outputDir, "dbsection." + machineNum);
536             File newDbFile = new File(sectionDir, basename);
537
538             //
539
// Grab all the edits that we need to process. We build an EditSectionGroupReader
540
// and aim it at the right location. The ESR will wait until all its
541
// component Sections are written and completed before returning from
542
// any method (other than the constructor). So we expect to possibly wait
543
// inside the call to numEdits().
544
//
545
EditSectionGroupReader edits = new EditSectionGroupReader(nfs, basename, machineNum, totalMachines);
546             int numEdits = edits.numEdits();
547
548             // If there are edits, then process them.
549
if (numEdits != 0) {
550                 File mergedEditsFile = new File(sectionDir, "mergedEdits");
551                 edits.mergeSectionComponents(mergedEditsFile);
552                 File sortedEditsFile = new File(mergedEditsFile.getPath() + ".sorted");
553
554                 // Sort the edits
555
long startSort = System.currentTimeMillis();
556                 sorter.sort(mergedEditsFile.getPath(), sortedEditsFile.getPath());
557                 long endSort = System.currentTimeMillis();
558
559                 LOG.info("Processing " + basename + ": Sorted " + numEdits + " instructions in " + ((endSort - startSort) / 1000.0) + " seconds.");
560                 LOG.info("Processing " + basename + ": Sorted " + (numEdits / ((endSort - startSort) / 1000.0)) + " instructions/second");
561             
562                 // Delete old file
563
nfs.delete(mergedEditsFile);
564
565                 // Read the sorted edits. That means read all
566
// the edits from the local subsection of the
567
// database. We must merge every machine's
568
// contribution to the edit-list first (which
569
// also means waiting until each machine has
570
// completed that step).
571

572                 // Read the sorted edits
573
SequenceFile.Reader sortedEdits = new SequenceFile.Reader(nfs, sortedEditsFile.getPath());
574
575                 // Create a brand-new output db for the integrated data
576
MapFile.Writer newDb = (comparator == null) ? new MapFile.Writer(nfs, newDbFile.getPath(), keyClass, valueClass) : new MapFile.Writer(nfs, newDbFile.getPath(), comparator, valueClass);
577
578                 // Iterate through the edits, and merge changes with existing
579
// db into the brand-new file
580
oldDb.reset();
581             
582                 // Merge the edits. We did it!
583
long startMerge = System.currentTimeMillis();
584                 mergeEdits(oldDb, sortedEdits, newDb);
585                 long endMerge = System.currentTimeMillis();
586                 LOG.info("Processing " + basename + ": Merged to new DB containing " + itemsWritten + " records in " + ((endMerge - startMerge) / 1000.0) + " seconds");
587                 LOG.info("Processing " + basename + ": Merged " + (itemsWritten / ((endMerge - startMerge) / 1000.0)) + " records/second");
588
589                 // Close down readers, writers
590
sortedEdits.close();
591                 newDb.close();
592
593                 // Delete the (sorted) merged-edits
594
nfs.delete(sortedEditsFile);
595             } else {
596                 // Otherwise, simply copy the original file into place,
597
// without all the processing overhead.
598
long startCopy = System.currentTimeMillis();
599
600                 File srcSectionDir = new File(oldDbDir, "dbsection." + machineNum);
601                 File srcDbFile = new File(srcSectionDir, basename);
602                 nfs.rename(srcDbFile, newDbFile);
603                 long endCopy = System.currentTimeMillis();
604                 LOG.info("Processing " + basename + ": Copied file (" + srcDbFile.length()+ " bytes) in " + ((endCopy - startCopy) / 1000.0) + " secs.");
605             }
606
607             // Delete the now-consumed edits file to save space
608
edits.delete();
609             return itemsWritten;
610         }
611
612         /**
613          * The loop that actually applies the changes and writes to
614          * a new db. This is different for every subclass!
615          */

616         abstract void mergeEdits(MapFile.Reader db, SequenceFile.Reader edits, MapFile.Writer newDb) throws IOException;
617     }
618
619     /***
620      * The PagesByURLProcessor is used during close() time for
621      * the pagesByURL table. We instantiate one of these, and it
622      * takes care of the entire shutdown process.
623      */

624     private class PagesByURLProcessor extends CloseProcessor {
625         EditSectionGroupWriter futureEdits;
626
627         /**
628          * We store "futureEdits" so we can write edits for the
629          * next table-db step
630          */

631         PagesByURLProcessor(MapFile.Reader db, EditSectionGroupWriter editWriter, EditSectionGroupWriter futureEdits) {
632             super(PAGES_BY_URL, db, editWriter, new SequenceFile.Sorter(nfs, new PageInstruction.UrlComparator(), NullWritable.class), new UTF8.Comparator(), null, Page.class, "PagesByURLPart");
633             this.futureEdits = futureEdits;
634         }
635
636         /**
637          * Merge the existing db with the edit-stream into a brand-new file.
638          */

639         void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
640             // Create the keys and vals we'll be using
641
DeduplicatingPageSequenceReader edits = new DeduplicatingPageSequenceReader(sortedEdits);
642             WritableComparable readerKey = new UTF8();
643             Page readerVal = new Page();
644             PageInstruction editItem = new PageInstruction();
645             int futureOrdering = 0;
646
647             // Read the first items from both streams
648
boolean hasEntries = db.next(readerKey, readerVal);
649             boolean hasEdits = edits.next(editItem);
650
651             // As long as we have both edits and entries, we need to
652
// interleave them....
653
while (hasEntries && hasEdits) {
654                 int comparison = readerKey.compareTo(editItem.getPage().getURL());
655                 int curInstruction = editItem.getInstruction();
656
657                 // Perform operations
658
if ((curInstruction == ADD_PAGE) ||
659                     (curInstruction == ADD_PAGE_WITH_SCORE) ||
660                     (curInstruction == ADD_PAGE_IFN_PRESENT)) {
661
662                     if (comparison < 0) {
663                         // Write readerKey, just passing it along.
664
// Don't process the edit yet.
665
newDb.append(readerKey, readerVal);
666                         itemsWritten++;
667                         hasEntries = db.next(readerKey, readerVal);
668                     } else if (comparison == 0) {
669                         // The keys are equal. If the instruction
670
// is ADD_PAGE, we write the edit's key and
671
// replace the old one.
672
//
673
// Otherwise, if it's ADD_IFN_PRESENT,
674
// keep the reader's item intact.
675
//
676
if ((curInstruction == ADD_PAGE) ||
677                             (curInstruction == ADD_PAGE_WITH_SCORE)) {
678                             // An ADD_PAGE with an identical pair
679
// of pages replaces the existing one.
680
// We may need to note the fact for
681
// Garbage Collection.
682
//
683
// This happens in three stages.
684
// 1. We write necessary items to the future
685
// edits-list.
686
//
687
pagesByMD5Edits++;
688
689                             // If this is a replacing add, we don't want
690
// to disturb the score from the old Page! This,
691
// way, we can run some link analysis scoring
692
// while the new Pages are being fetched and
693
// not lose the info when a Page is replaced.
694
//
695
// If it is an ADD_PAGE_WITH_SCORE, then we
696
// go ahead and replace the old one.
697
//
698
// Either way, from now on we treat it
699
// as an ADD_PAGE
700
//
701
Page editItemPage = editItem.getPage();
702
703                             if (curInstruction == ADD_PAGE) {
704                                 editItemPage.setScore(readerVal.getScore(), readerVal.getNextScore());
705                             }
706
707                             piwriter.appendInstructionInfo(futureEdits, editItemPage, ADD_PAGE, NullWritable.get());
708
709                             //
710
// 2. We write the edit-page to *this* table.
711
//
712
newDb.append(editItemPage.getURL(), editItemPage);
713
714                             //
715
// 3. We want the ADD in the next step (the
716
// MD5-driven table) to be a "replacing add".
717
// But that won't happen if the readerItem and
718
// the editItem Pages are not identical.
719
// (In this scenario, that means their URLs
720
// are the same, but their MD5s are different.)
721
// So, we need to explicitly handle that
722
// case by issuing a DELETE for the now-obsolete
723
// item.
724
if (editItemPage.compareTo(readerVal) != 0) {
725                                 pagesByMD5Edits++;
726                                 piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get());
727                             }
728
729                             itemsWritten++;
730
731                             // "Delete" the readerVal by skipping it.
732
hasEntries = db.next(readerKey, readerVal);
733                         } else {
734                             // ADD_PAGE_IFN_PRESENT. We only add IF_NOT
735
// present. And it was present! So, we treat
736
// this case like we treat a no-op.
737
// Just move to the next edit.
738
}
739                         // In either case, we process the edit.
740
hasEdits = edits.next(editItem);
741
742                     } else if (comparison > 0) {
743                         // We have inserted a Page that's before some
744
// entry in the existing database. So, we just
745
// need to write down the Page from the Edit file.
746
// It's like the above case, except we don't tell
747
// the future-edits to delete anything.
748
//
749
// 1. Write the item down for the future.
750
pagesByMD5Edits++;
751
752                         //
753
// If this is an ADD_PAGE_IFN_PRESENT, then
754
// we may also have a Link we have to take care of!
755
//
756
if (curInstruction == ADD_PAGE_IFN_PRESENT) {
757                             Link editLink = editItem.getLink();
758                             if (editLink != null) {
759                                 addLink(editLink);
760                             }
761                         }
762                         piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get());
763
764                         //
765
// 2. Write the edit-page to *this* table
766
newDb.append(editItem.getPage().getURL(), editItem.getPage());
767                         itemsWritten++;
768
769                         // Process the edit
770
hasEdits = edits.next(editItem);
771                     }
772                 } else if (curInstruction == DEL_PAGE) {
773                     if (comparison < 0) {
774                         // Write the readerKey, just passing it along.
775
// We don't process the edit yet.
776
newDb.append(readerKey, readerVal);
777                         itemsWritten++;
778                         hasEntries = db.next(readerKey, readerVal);
779                     } else if (comparison == 0) {
780                         // Delete it! We can only delete one item
781
// at a time, as all URLs are unique.
782
// 1. Tell the future-edits what page will need to
783
// be deleted.
784
pagesByMD5Edits++;
785                         piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get());
786
787                         //
788
// 2. "Delete" the entry by skipping the Reader
789
// key.
790
hasEntries = db.next(readerKey, readerVal);
791
792                         // Process the edit
793
hasEdits = edits.next(editItem);
794                     } else if (comparison > 0) {
795                         // Ignore it. We tried to delete an item that's
796
// not here.
797
hasEdits = edits.next(editItem);
798                     }
799                 }
800             }
801
802             // Now we have only edits. No more preexisting items!
803
while (! hasEntries && hasEdits) {
804                 int curInstruction = editItem.getInstruction();
805                 if (curInstruction == ADD_PAGE ||
806                     curInstruction == ADD_PAGE_WITH_SCORE ||
807                   &