KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > db > WebDBWriter


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.db;
5
6 import java.io.*;
7 import java.util.*;
8 import java.util.logging.*;
9
10 import net.nutch.io.*;
11 import net.nutch.fs.*;
12 import net.nutch.util.*;
13 import net.nutch.pagedb.*;
14 import net.nutch.linkdb.*;
15
16 /***************************************************
17  * This is a wrapper class that allows us to reorder
18  * write operations to the linkdb and pagedb. It is
19  * useful only for objects like UpdateDatabaseTool,
20  * which just does writes.
21  *
22  * The WebDBWriter is a traditional single-pass database writer.
23  * It does not cache any instructions to disk (but it does
24  * in memory, with possible resorting). It certainly does
25  * nothing in a distributed fashion.
26  *
27  * There are other implementors of IWebDBWriter that do
28  * all that fancy stuff.
29  *
30  * @author Mike Cafarella
31  *************************************************/

32 public class WebDBWriter implements IWebDBWriter {
33     static final Logger LOG = LogFormatter.getLogger("net.nutch.db.WebDBWriter");
34     static final byte CUR_VERSION = 0;
35
36     // db opcodes
37
static final byte ADD_PAGE = 0;
38     static final byte ADD_PAGE_WITH_SCORE = 1;
39     static final byte ADD_PAGE_IFN_PRESENT = 2;
40     static final byte DEL_PAGE = 3;
41     static final int ADD_LINK = 0;
42     static final int DEL_LINK = 1;
43     static final int DEL_SINGLE_LINK = 2;
44
45     // filenames
46
static final String JavaDoc PAGES_BY_URL = "pagesByURL";
47     static final String JavaDoc PAGES_BY_MD5 = "pagesByMD5";
48     static final String JavaDoc LINKS_BY_URL = "linksByURL";
49     static final String JavaDoc LINKS_BY_MD5 = "linksByMD5";
50     static final String JavaDoc STATS_FILE = "stats";
51
52     // Result codes for page-url comparisons
53
static final int NO_OUTLINKS = 0;
54     static final int HAS_OUTLINKS = 1;
55     static final int LINK_INVALID = 2;
56
57     /********************************************
58      * PageInstruction holds an operation over a Page.
59      *********************************************/

60     public static class PageInstruction implements WritableComparable {
61         byte opcode;
62         boolean hasLink;
63         Page page;
64         Link link;
65
66         /**
67          */

68         public PageInstruction() {}
69
70         /**
71          */

72         public PageInstruction(Page page, int opcode) {
73             set(page, opcode);
74         }
75
76         /**
77          */

78         public PageInstruction(Page page, Link link, int opcode) {
79             set(page, link, opcode);
80         }
81
82         /**
83          * Init from another PageInstruction object.
84          */

85         public void set(PageInstruction that) {
86             this.opcode = that.opcode;
87
88             if (this.page == null) {
89                 this.page = new Page();
90             }
91             this.page.set(that.page);
92
93             if (this.link == null) {
94                 this.link = new Link();
95             }
96             this.hasLink = that.hasLink;
97             if (this.hasLink) {
98                 this.link.set(that.link);
99             }
100         }
101
102         /**
103          * Init PageInstruction with no Link
104          */

105         public void set(Page page, int opcode) {
106             this.opcode = (byte) opcode;
107             this.page = page;
108             this.hasLink = false;
109             this.link = null;
110         }
111
112         /**
113          * Init PageInstruction with a Link
114          */

115         public void set(Page page, Link link, int opcode) {
116             this.opcode = (byte) opcode;
117             this.page = page;
118             this.hasLink = true;
119             this.link = link;
120         }
121
122         //
123
// WritableComparable
124
//
125
public int compareTo(Object JavaDoc o) {
126             int pageResult = this.page.compareTo(((PageInstruction) o).page);
127             if (pageResult != 0) {
128                 return pageResult;
129             } else {
130                 return this.opcode - (((PageInstruction) o).opcode);
131             }
132         }
133         public void write(DataOutput out) throws IOException {
134             out.writeByte(opcode);
135             page.write(out);
136             out.writeByte(hasLink ? 1 : 0);
137             if (hasLink) {
138                 link.write(out);
139             }
140         }
141         public void readFields(DataInput in) throws IOException {
142             opcode = in.readByte();
143             if (page == null) {
144                 page = new Page();
145             }
146             page.readFields(in);
147             
148             if (link == null) {
149                 link = new Link();
150             }
151             hasLink = (1 == in.readByte());
152             if (hasLink) {
153                 link.readFields(in);
154             }
155         }
156         public Page getPage() {
157             return page;
158         }
159         public Link getLink() {
160             if (hasLink) {
161                 return link;
162             } else {
163                 return null;
164             }
165         }
166         public int getInstruction() {
167             return opcode;
168         }
169
170         /**
171          * Sorts the instruction first by Page, then by opcode.
172          */

173         public static class PageComparator extends WritableComparator {
174             private static final Page.Comparator PAGE_COMPARATOR =
175             new Page.Comparator();
176
177             public PageComparator() { super(PageInstruction.class); }
178
179             /** Optimized comparator. */
180             public int compare(byte[] b1, int s1, int l1,
181                                byte[] b2, int s2, int l2) {
182                 int opcode1 = b1[s1];
183                 int opcode2 = b2[s2];
184                 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
185                 if (c != 0)
186                     return c;
187                 return opcode1 - opcode2;
188             }
189         }
190  
191         /*****************************************************
192          * Sorts the instruction first by url, then by opcode.
193          *****************************************************/

194         public static class UrlComparator extends WritableComparator {
195             private static final Page.UrlComparator PAGE_COMPARATOR =
196             new Page.UrlComparator();
197
198             public UrlComparator() { super(PageInstruction.class); }
199
200             /**
201              * We need to sort by ordered URLs. First, we sort by
202              * URL, then by opcode.
203              */

204             public int compare(WritableComparable a, WritableComparable b) {
205                 PageInstruction instructionA = (PageInstruction)a;
206                 PageInstruction instructionB = (PageInstruction)b;
207                 Page pageA = instructionA.getPage();
208                 Page pageB = instructionB.getPage();
209
210                 int result = pageA.getURL().compareTo(pageB.getURL());
211                 if (result != 0) {
212                     return result;
213                 } else {
214                     return instructionA.opcode - instructionB.opcode;
215                 }
216             }
217
218             /**
219              * Optimized comparator.
220              */

221             public int compare(byte[] b1, int s1, int l1,
222                                byte[] b2, int s2, int l2) {
223                 int opcode1 = b1[s1];
224                 int opcode2 = b2[s2];
225                 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
226                 if (c != 0)
227                     return c;
228                 return opcode1 - opcode2;
229             }
230         }
231     }
232
233     /********************************************************
234      * PageInstructionWriter very efficiently writes a
235      * PageInstruction to a SequenceFile.Writer. Much better
236      * than calling "writer.append(new PageInstruction())"
237      ********************************************************/

238     public static class PageInstructionWriter {
239         PageInstruction pi = new PageInstruction();
240
241         /**
242          */

243         public PageInstructionWriter() {
244         }
245
246         /**
247          * Append the PageInstruction info to the indicated SequenceFile,
248          * and keep the PI for later reuse.
249          */

250         public synchronized void appendInstructionInfo(SequenceFile.Writer writer, Page page, int opcode, Writable val) throws IOException {
251             pi.set(page, opcode);
252             writer.append(pi, val);
253         }
254
255         /**
256          * Append the PageInstruction info to the indicated SequenceFile,
257          * and keep the PI for later reuse.
258          */

259         public synchronized void appendInstructionInfo(SequenceFile.Writer writer, Page page, Link link, int opcode, Writable val) throws IOException {
260             pi.set(page, link, opcode);
261             writer.append(pi, val);
262         }
263     }
264
265     /*************************************************************
266      * Reduce multiple instructions for a given url to the single effective
267      * instruction. ADD is prioritized highest, then ADD_IFN_PRESENT, and then
268      * DEL. Not coincidentally, this is opposite the order they're sorted in.
269      **************************************************************/

270     private static class DeduplicatingPageSequenceReader {
271         SequenceFile.Reader edits;
272         PageInstruction current = new PageInstruction();
273         UTF8 currentUrl = new UTF8();
274         boolean haveCurrent;
275
276         /**
277          */

278         public DeduplicatingPageSequenceReader(SequenceFile.Reader edits) throws IOException {
279             this.edits = edits;
280             this.haveCurrent = edits.next(current, NullWritable.get());
281         }
282
283         /**
284          */

285         public boolean next(PageInstruction result) throws IOException {
286             if (!haveCurrent) {
287                 return false;
288             }
289         
290             currentUrl.set(current.getPage().getURL());
291             result.set(current); // take the first instruction
292

293             do {
294                 // skip the rest
295
} while ((haveCurrent = edits.next(current, NullWritable.get())) &&
296                      currentUrl.compareTo(current.getPage().getURL()) == 0);
297             return true;
298         }
299     }
300
301
302     /*************************************************
303      * Holds an instruction over a Link.
304      *************************************************/

305     public static class LinkInstruction implements WritableComparable {
306         Link link;
307         int instruction;
308
309         /**
310          */

311         public LinkInstruction() {
312         }
313
314         /**
315          */

316         public LinkInstruction(Link link, int instruction) {
317             set(link, instruction);
318         }
319
320         /**
321          * Re-init from another LinkInstruction's info.
322          */

323         public void set(LinkInstruction that) {
324             this.instruction = that.instruction;
325           
326             if (this.link == null)
327                 this.link = new Link();
328
329             this.link.set(that.link);
330         }
331
332         /**
333          * Re-init with a Link and an instruction
334          */

335         public void set(Link link, int instruction) {
336             this.link = link;
337             this.instruction = instruction;
338         }
339
340         //
341
// WritableComparable
342
//
343
public int compareTo(Object JavaDoc o) {
344             return this.link.compareTo(((LinkInstruction) o).link);
345         }
346         public void write(DataOutput out) throws IOException {
347             out.writeByte(instruction);
348             link.write(out);
349         }
350         public void readFields(DataInput in) throws IOException {
351             this.instruction = in.readByte();
352             if (link == null)
353                 link = new Link();
354             link.readFields(in);
355         }
356         public Link getLink() {
357             return link;
358         }
359         public int getInstruction() {
360             return instruction;
361         }
362
363         /*******************************************************
364          * Sorts the instruction first by Md5, then by opcode.
365          *******************************************************/

366         public static class MD5Comparator extends WritableComparator {
367             private static final Link.MD5Comparator MD5_COMPARATOR =
368             new Link.MD5Comparator();
369
370             public MD5Comparator() { super(LinkInstruction.class); }
371
372             public int compare(WritableComparable a, WritableComparable b) {
373                 LinkInstruction instructionA = (LinkInstruction)a;
374                 LinkInstruction instructionB = (LinkInstruction)b;
375                 return instructionA.link.md5Compare(instructionB.link);
376             }
377
378             /** Optimized comparator. */
379             public int compare(byte[] b1, int s1, int l1,
380                                byte[] b2, int s2, int l2) {
381                 return MD5_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
382             }
383         }
384  
385         /*********************************************************
386          * Sorts the instruction first by url, then by opcode.
387          *********************************************************/

388         public static class UrlComparator extends WritableComparator {
389             private static final Link.UrlComparator URL_COMPARATOR =
390             new Link.UrlComparator();
391
392             public UrlComparator() { super(LinkInstruction.class); }
393
394             public int compare(WritableComparable a, WritableComparable b) {
395                 LinkInstruction instructionA = (LinkInstruction)a;
396                 LinkInstruction instructionB = (LinkInstruction)b;
397                 return instructionA.link.urlCompare(instructionB.link);
398
399             }
400
401             /**
402              * Optimized comparator.
403              */

404             public int compare(byte[] b1, int s1, int l1,
405                                byte[] b2, int s2, int l2) {
406                 return URL_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
407             }
408         }
409     }
410
411     /*******************************************************
412      * LinkInstructionWriter very efficiently writes a
413      * LinkInstruction to a SequenceFile.Writer. Much better
414      * than calling "writer.append(new LinkInstruction())"
415      ********************************************************/

416     public static class LinkInstructionWriter {
417         LinkInstruction li = new LinkInstruction();
418
419         /**
420          */

421         public LinkInstructionWriter() {
422         }
423
424         /**
425          * Append the LinkInstruction info to the indicated SequenceFile
426          * and keep the LI for later reuse.
427          */

428         public synchronized void appendInstructionInfo(SequenceFile.Writer writer, Link link, int opcode, Writable val) throws IOException {
429             li.set(link, opcode);
430             writer.append(li, val);
431         }
432     }
433
434     /********************************************************
435      * This class deduplicates link operations. We want to
436      * sort by MD5, then by URL. But all operations
437      * should be unique.
438      *********************************************************/

439     class DeduplicatingLinkSequenceReader {
440         Link currentKey = new Link();
441         LinkInstruction current = new LinkInstruction();
442         SequenceFile.Reader edits;
443         boolean haveCurrent;
444
445         /**
446          */

447         public DeduplicatingLinkSequenceReader(SequenceFile.Reader edits) throws IOException {
448             this.edits = edits;
449             this.haveCurrent = edits.next(current, NullWritable.get());
450         }
451
452
453         /**
454          * The incoming stream of edits is sorted first by MD5, then by URL.
455          * MD5-only values always come before MD5+URL.
456          */

457         public boolean next(LinkInstruction key) throws IOException {
458             if (! haveCurrent) {
459                 return false;
460             }
461
462             currentKey.set(current.getLink());
463             
464             do {
465                 key.set(current);
466             } while ((haveCurrent = edits.next(current, NullWritable.get())) &&
467                      currentKey.compareTo(current.getLink()) == 0);
468             return true;
469         }
470     }
471
472
473     /**************************************************
474      * The CloseProcessor class is used when we close down
475      * the webdb. We give it the path, members, and class values
476      * needed to apply changes to any of our 4 data tables.
477      *
478      * This is an abstract class. Each subclass must define
479      * the exact merge procedure. However, file-handling
480      * and edit-processing is standardized as much as possible.
481      *
482      **************************************************/

483     private abstract class CloseProcessor {
484         String JavaDoc basename;
485         MapFile.Reader oldDb;
486         SequenceFile.Writer editWriter;
487         SequenceFile.Sorter sorter;
488         WritableComparator comparator;
489         Class JavaDoc keyClass, valueClass;
490         long itemsWritten = 0;
491
492         /**
493          * Store away these members for later use.
494          */

495         CloseProcessor(String JavaDoc basename, MapFile.Reader oldDb, SequenceFile.Writer editWriter, SequenceFile.Sorter sorter, WritableComparator comparator, Class JavaDoc keyClass, Class JavaDoc valueClass) {
496             this.basename = basename;
497             this.oldDb = oldDb;
498             this.editWriter = editWriter;
499             this.sorter = sorter;
500             this.comparator = comparator;
501             this.keyClass = keyClass;
502             this.valueClass = valueClass;
503         }
504
505         /**
506          * Perform the shutdown sequence for this Processor.
507          * There is a lot of file-moving and edit-sorting that
508          * is common across all the 4 tables.
509          *
510          * Returns how many items were written out by this close().
511          */

512         long closeDown(File workingDir, File outputDir, long numEdits) throws IOException {
513             File editsFile = new File(workingDir, basename + ".out");
514             File newDbFile = new File(outputDir, basename);
515             File sortedEditsFile = new File(editsFile.getPath() + ".sorted");
516             editWriter.close();
517
518             // If there are edits, then process them.
519
if (numEdits != 0) {
520                 // Sort the edits
521
long startSort = System.currentTimeMillis();
522                 sorter.sort(editsFile.getPath(), sortedEditsFile.getPath());
523                 // sorter.close();
524
long endSort = System.currentTimeMillis();
525                 LOG.info("Processing " + basename + ": Sorted " + numEdits + " instructions in " + ((endSort - startSort) / 1000.0) + " seconds.");
526                 LOG.info("Processing " + basename + ": Sorted " + (numEdits / ((endSort - startSort) / 1000.0)) + " instructions/second");
527             
528                 // Rename appropriately
529
fs.delete(editsFile);
530                 fs.rename(sortedEditsFile, editsFile);
531
532                 // Read the sorted edits
533
SequenceFile.Reader sortedEdits = new SequenceFile.Reader(fs, editsFile.getPath());
534
535                 // Create a brand-new output db for the integrated data
536
MapFile.Writer newDb = (comparator == null) ? new MapFile.Writer(fs, newDbFile.getPath(), keyClass, valueClass) : new MapFile.Writer(fs, newDbFile.getPath(), comparator, valueClass);
537
538                 // Iterate through the edits, and merge changes with existing
539
// db into the brand-new file
540
oldDb.reset();
541             
542                 // Merge the edits. We did it!
543
long startMerge = System.currentTimeMillis();
544                 mergeEdits(oldDb, sortedEdits, newDb);
545                 long endMerge = System.currentTimeMillis();
546                 LOG.info("Processing " + basename + ": Merged to new DB containing " + itemsWritten + " records in " + ((endMerge - startMerge) / 1000.0) + " seconds");
547                 LOG.info("Processing " + basename + ": Merged " + (itemsWritten / ((endMerge - startMerge) / 1000.0)) + " records/second");
548
549                 // Close down readers, writers
550
sortedEdits.close();
551                 newDb.close();
552             } else {
553                 // Otherwise, simply copy the file into place,
554
// without all the processing overhead.
555
long startCopy = System.currentTimeMillis();
556                 File curFile = new File(dbFile, basename);
557                 FileUtil.recursiveCopy(fs, curFile, newDbFile);
558                 long endCopy = System.currentTimeMillis();
559
560                 LOG.info("Processing " + basename + ": Copied file (" + newDbFile.length()+ " bytes) in " + ((endCopy - startCopy) / 1000.0) + " secs.");
561             }
562
563             // Delete the now-consumed edits file to save space
564
fs.delete(editsFile);
565
566             return itemsWritten;
567         }
568
569         /**
570          * The loop that actually applies the changes and writes to
571          * a new db. This is different for every subclass!
572          */

573         abstract void mergeEdits(MapFile.Reader db, SequenceFile.Reader edits, MapFile.Writer newDb) throws IOException;
574     }
575
576     /***
577      * The PagesByURLProcessor is used during close() time for
578      * the pagesByURL table. We instantiate one of these, and it
579      * takes care of the entire shutdown process.
580      */

581     private class PagesByURLProcessor extends CloseProcessor {
582         SequenceFile.Writer futureEdits;
583
584         /**
585          * We store "futureEdits" so we can write edits for the
586          * next table-db step
587          */

588         PagesByURLProcessor(MapFile.Reader db, SequenceFile.Writer editWriter, SequenceFile.Writer futureEdits) {
589             super(PAGES_BY_URL, db, editWriter, new SequenceFile.Sorter(fs, new PageInstruction.UrlComparator(), NullWritable.class), new UTF8.Comparator(), null, Page.class);
590             this.futureEdits = futureEdits;
591         }
592
593         /**
594          * Merge the existing db with the edit-stream into a brand-new file.
595          */

596         void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
597             // Create the keys and vals we'll be using
598
DeduplicatingPageSequenceReader edits = new DeduplicatingPageSequenceReader(sortedEdits);
599             WritableComparable readerKey = new UTF8();
600             Page readerVal = new Page();
601             PageInstruction editItem = new PageInstruction();
602             int futureOrdering = 0;
603
604             // Read the first items from both streams
605
boolean hasEntries = db.next(readerKey, readerVal);
606             boolean hasEdits = edits.next(editItem);
607
608             // As long as we have both edits and entries, we need to
609
// interleave them....
610
while (hasEntries && hasEdits) {
611                 int comparison = readerKey.compareTo(editItem.getPage().getURL());
612                 int curInstruction = editItem.getInstruction();
613
614                 // Perform operations
615
if ((curInstruction == ADD_PAGE) ||
616                     (curInstruction == ADD_PAGE_WITH_SCORE) ||
617                     (curInstruction == ADD_PAGE_IFN_PRESENT)) {
618
619                     if (comparison < 0) {
620                         // Write readerKey, just passing it along.
621
// Don't process the edit yet.
622
newDb.append(readerKey, readerVal);
623                         itemsWritten++;
624                         hasEntries = db.next(readerKey, readerVal);
625                     } else if (comparison == 0) {
626                         // The keys are equal. If the instruction
627
// is ADD_PAGE, we write the edit's key and
628
// replace the old one.
629
//
630
// Otherwise, if it's ADD_IFN_PRESENT,
631
// keep the reader's item intact.
632
//
633
if ((curInstruction == ADD_PAGE) ||
634                             (curInstruction == ADD_PAGE_WITH_SCORE)) {
635                             // An ADD_PAGE with an identical pair
636
// of pages replaces the existing one.
637
// We may need to note the fact for
638
// Garbage Collection.
639
//
640
// This happens in three stages.
641
// 1. We write necessary items to the future
642
// edits-list.
643
//
644
pagesByMD5Edits++;
645
646                             // If this is a replacing add, we don't want
647
// to disturb the score from the old Page! This,
648
// way, we can run some link analysis scoring
649
// while the new Pages are being fetched and
650
// not lose the info when a Page is replaced.
651
//
652
// If it is an ADD_PAGE_WITH_SCORE, then we
653
// go ahead and replace the old one.
654
//
655
// Either way, from now on we treat it
656
// as an ADD_PAGE
657
//
658
Page editItemPage = editItem.getPage();
659
660                             if (curInstruction == ADD_PAGE) {
661                                 editItemPage.setScore(readerVal.getScore(), readerVal.getNextScore());
662                             }
663
664                             piwriter.appendInstructionInfo(futureEdits, editItemPage, ADD_PAGE, NullWritable.get());
665
666                             //
667
// 2. We write the edit-page to *this* table.
668
//
669
newDb.append(editItemPage.getURL(), editItemPage);
670
671                             //
672
// 3. We want the ADD in the next step (the
673
// MD5-driven table) to be a "replacing add".
674
// But that won't happen if the readerItem and
675
// the editItem Pages are not identical.
676
// (In this scenario, that means their URLs
677
// are the same, but their MD5s are different.)
678
// So, we need to explicitly handle that
679
// case by issuing a DELETE for the now-obsolete
680
// item.
681
if (editItemPage.compareTo(readerVal) != 0) {
682                                 pagesByMD5Edits++;
683                                 piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get());
684                             }
685
686                             itemsWritten++;
687
688                             // "Delete" the readerVal by skipping it.
689
hasEntries = db.next(readerKey, readerVal);
690                         } else {
691                             // ADD_PAGE_IFN_PRESENT. We only add IF_NOT
692
// present. And it was present! So, we treat
693
// this case like we treat a no-op.
694
// Just move to the next edit.
695
}
696                         // In either case, we process the edit.
697
hasEdits = edits.next(editItem);
698
699                     } else if (comparison > 0) {
700                         // We have inserted a Page that's before some
701
// entry in the existing database. So, we just
702
// need to write down the Page from the Edit file.
703
// It's like the above case, except we don't tell
704
// the future-edits to delete anything.
705
//
706
// 1. Write the item down for the future.
707
pagesByMD5Edits++;
708
709                         //
710
// If this is an ADD_PAGE_IFN_PRESENT, then
711
// we may also have a Link we have to take care of!
712
//
713
if (curInstruction == ADD_PAGE_IFN_PRESENT) {
714                             Link editLink = editItem.getLink();
715                             if (editLink != null) {
716                                 addLink(editLink);
717                             }
718                         }
719                         piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get());
720
721                         //
722
// 2. Write the edit-page to *this* table
723
newDb.append(editItem.getPage().getURL(), editItem.getPage());
724                         itemsWritten++;
725
726                         // Process the edit
727
hasEdits = edits.next(editItem);
728                     }
729                 } else if (curInstruction == DEL_PAGE) {
730                     if (comparison < 0) {
731                         // Write the readerKey, just passing it along.
732
// We don't process the edit yet.
733
newDb.append(readerKey, readerVal);
734                         itemsWritten++;
735                         hasEntries = db.next(readerKey, readerVal);
736                     } else if (comparison == 0) {
737                         // Delete it! We can only delete one item
738
// at a time, as all URLs are unique.
739
// 1. Tell the future-edits what page will need to
740
// be deleted.
741
pagesByMD5Edits++;
742                         piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get());
743
744                         //
745
// 2. "Delete" the entry by skipping the Reader
746
// key.
747
hasEntries = db.next(readerKey, readerVal);
748
749                         // Process the edit
750
hasEdits = edits.next(editItem);
751                     } else if (comparison > 0) {
752                         // Ignore it. We tried to delete an item that's
753
// not here.
754
hasEdits = edits.next(editItem);
755                     }
756                 }
757             }
758
759             // Now we have only edits. No more preexisting items!
760
while (! hasEntries && hasEdits) {
761                 int curInstruction = editItem.getInstruction();
762                 if (curInstruction == ADD_PAGE ||
763                     curInstruction == ADD_PAGE_WITH_SCORE ||
764                     curInstruction == ADD_PAGE_IFN_PRESENT) {
765                     // No more reader entries, so ADD_PAGE_IFN_PRESENT
766
// is treated like a simple ADD_PAGE.
767

768                     // 1. Tell the future edits-list about this new item
769
pagesByMD5Edits++;
770                     
771                     //
772
// If this is an ADD_PAGE_IFN_PRESENT, then
773
// we may also have a Link we have to take care of!
774
//
775
if (curInstruction == ADD_PAGE_IFN_PRESENT) {
776                         Link editLink = editItem.getLink();
777                         if (editLink != null) {
778                             addLink(editLink);
779                         }
780                     }
781                     piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get());
782
783                     // 2. Write the edit page to this table.
784
newDb.append(editItem.getPage().getURL(), editItem.getPage());
785                     itemsWritten++;
786                 } else if (curInstruction == DEL_PAGE) {
787                     // Ignore it. We tried to delete an item
788
// that's not here.
789
}
790
791                 // Either way, we always process the edit.
792
hasEdits = edits.next(editItem);
793             }
794
795             // Now we have only preexisting items. We just copy
796
// them to the new file, in order.
797
while (hasEntries && ! hasEdits) {
798                 newDb.append(readerKey, readerVal);
799                 itemsWritten++;
800                 hasEntries = db.next(readerKey, readerVal);
801             }
802         }
803     }
804
805     /***
806      * The PagesByMD5Processor is used during close() time for
807      * the pagesByMD5 table. We instantiate one of these, and it
808      * takes care of the entire shutdown process.
809      */

810     private class PagesByMD5Processor extends CloseProcessor {
811         /**
812          */

813         PagesByMD5Processor(MapFile.Reader db, SequenceFile.Writer editWriter) {
814             super(PAGES_BY_MD5, db, editWriter, new SequenceFile.Sorter(fs, new PageInstruction.PageComparator(), NullWritable.class), null, Page.class, NullWritable.class);
815         }
816
817         /**
818          */

819         void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
820             // Create the keys and vals
821
Page readerItem = new Page();
822             PageInstruction editItem = new PageInstruction();
823
824             // For computing the GC list
825
Page deletedItem = new Page(), lastItem = new Page();
826             boolean justDeletedItem = false;
827             boolean newReaderItem = false;
828             int itemRepeats = 0;
829
830             // Read the first items from both streams
831
boolean hasEntries = db.next(readerItem, NullWritable.get());
832             boolean hasEdits = sortedEdits.next(editItem, NullWritable.get());
833             if (hasEntries) {
834                 // The first thing we read should become
835
// the "previous key". We need this for
836
// garbage collection.
837
outBuf.reset();
838                 readerItem.write(outBuf);
839                 inBuf.reset(outBuf.getData(), outBuf.getLength());
840                 lastItem.readFields(inBuf);
841                 itemRepeats = 0;
842             }
843
844             // As long we have both edits and entries, we need to
845
// interleave them.
846
while (hasEdits && hasEntries) {
847                 int comparison = readerItem.compareTo(editItem.getPage());
848                 int curInstruction = editItem.getInstruction();
849
850                 //
851
// OK! Now perform operations
852
//
853
if (curInstruction == ADD_PAGE) {
854                     if (comparison < 0) {
855                         // Write readerItem, just passing it along.
856
// Don't process the edit yet.
857
newDb.append(readerItem, NullWritable.get());
858                         itemsWritten++;
859                         hasEntries = db.next(readerItem, NullWritable.get());
860                         newReaderItem = true;
861                     } else if (comparison == 0) {
862                         //
863
// This is a "replacing ADD", which is generated
864
// by the above-sequence. We should skip over the
865
// existing item, and add the new one instead.
866
//
867
// Note that by this point, the new version of the
868
// Page from the edit sequence is guaranteed to
869
// have the correct score. We make sure of it in
870
// the mergeEdits() for PagesByURLProcessor.
871
//
872
newDb.append(editItem.getPage(), NullWritable.get());
873                         itemsWritten++;
874                         hasEntries = db.next(readerItem, NullWritable.get());
875                         newReaderItem = true;
876                         hasEdits = sortedEdits.next(editItem, NullWritable.get());
877                     } else if (comparison > 0) {
878                         // Write the edit item. We've inserted an item
879
// that comes before any others.
880
newDb.append(editItem.getPage(), NullWritable.get());
881                         itemsWritten++;
882                         hasEdits = sortedEdits.next(editItem, NullWritable.get());
883                     }
884                 } else if (curInstruction == ADD_PAGE_IFN_PRESENT) {
885                     throw new IOException("Should never process ADD_PAGE_IFN_PRESENT for the index: " + editItem);
886                 } else if (curInstruction == DEL_PAGE) {
887                     if (comparison < 0) {
888                         // Write the readerKey, just passing it along.
889
// Don't process the edit yet.
890
newDb.append(readerItem, NullWritable.get());
891                         itemsWritten++;
892                         hasEntries = db.next(readerItem, NullWritable.get());
893                         newReaderItem = true;
894                     } else if (comparison == 0) {
895                         // Delete it! Remember only one entry can
896
// be deleted at a time!
897
//
898
// "Delete" the entry by skipping over the reader
899
// item. We move onto the next item in the existing
900
// index, as well as the next edit instruction.
901
hasEntries = db.next(readerItem, NullWritable.get());
902                         newReaderItem = true;
903                         hasEdits = sortedEdits.next(editItem, NullWritable.get());
904
905                         // We need to set this flag for GC'ing.
906
justDeletedItem = true;
907                     } else if (comparison > 0) {
908                         // This should never happen! We should only be
909
// deleting items that actually appear!
910
throw new IOException("An unapplicable DEL_PAGE should never appear during index-merge: " + editItem);
911                     }
912                 }
913
914                 // GARBAGE COLLECTION
915
// We want to detect when we have deleted the
916
// last MD5 of a certain value. We can have
917
// multiple MD5s in the same index, as long as
918
// they have different URLs. When the last MD5
919
// is deleted, we want to know so we can modify
920
// the LinkDB.
921
if (newReaderItem) {
922                     // If we have a different readerItem which is just
923
// the same as our last one, then we know it's a
924
// repeat!
925
if (hasEntries && readerItem.getMD5().compareTo(lastItem.getMD5()) == 0) {
926                         itemRepeats++;
927                     } else {
928                         // The current readerItem and the lastItem
929
// MD5s are not equal.
930
//
931
// If the last item was deleted, AND if the
932
// deleted item is not a repeat of the current item,
933
// then that MD5 should be garbage collected.
934
if (justDeletedItem && itemRepeats == 0) {
935                             deleteLink(lastItem.getMD5());
936                         }
937
938                         // The current readerItem is the new "last key".
939
outBuf.reset();
940                         readerItem.write(outBuf);
941                         inBuf.reset(outBuf.getData(), outBuf.getLength());
942                         lastItem.readFields(inBuf);
943                         itemRepeats = 0;
944                     }
945                     // Clear "new-reader-item" bit
946
newReaderItem = false;
947                 }
948                 // Clear "last-deleted" bit
949
justDeletedItem = false;
950             }
951         
952             // Now we have only edits. No more preexisting items!
953
while (! hasEntries && hasEdits) {
954                 int curInstruction = editItem.getInstruction();
955                 if (curInstruction == ADD_PAGE) {
956                     // Just write down the new page!
957
newDb.append(editItem.getPage(), NullWritable.get());
958                     itemsWritten++;
959                 } else if (curInstruction == ADD_PAGE_IFN_PRESENT) {
960                     throw new IOException("Should never process ADD_PAGE_IFN_PRESENT for the index: " + editItem);
961                 } else if (curInstruction == DEL_PAGE) {
962                     // This should never happen! We should only be
963
// deleting items that actually appear!
964
throw new IOException("An unapplicable DEL_PAGE should never appear during index-merge: " + editItem);
965                 }
966                 hasEdits = sortedEdits.next(editItem, NullWritable.get());
967             }
968
969             // Now we have only preexisting items. We just copy them
970
// to the new file, in order
971
while (hasEntries && ! hasEdits) {
972                 // Simply copy through the remaining database items
973
newDb.append(readerItem, NullWritable.get());
974                 itemsWritten++;
975                 hasEntries = db.next(readerItem, NullWritable.get());
976                 newReaderItem = true;
977             }
978         }
979     }
980
981     /**
982      * The LinksByMD5Processor is used during close() for
983      * the pagesByMD5 table. It processes all the edits to
984      * this table, and also generates edits for the linksByURL
985      * table.
986      */

987     private class LinksByMD5Processor extends CloseProcessor {
988         SequenceFile.Writer futureEdits;
989
990         /**
991          */

992         public LinksByMD5Processor(MapFile.Reader db, SequenceFile.Writer editWriter, SequenceFile.Writer futureEdits) {
993             super(LINKS_BY_MD5, db, editWriter, new SequenceFile.Sorter(fs, new LinkInstruction.MD5Comparator(), NullWritable.class), new Link.MD5Comparator(), Link.class, NullWritable.class);
994             this.futureEdits = futureEdits;
995         }
996
997         /**
998          * Merges edits into the md5-driven link table. Also generates
999          * edit sequence to apply to the URL-driven table.
1000         */

1001        void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
1002            WritableComparator comparator = new Link.MD5Comparator();
1003            DeduplicatingLinkSequenceReader edits = new DeduplicatingLinkSequenceReader(sortedEdits);
1004
1005            // Create the keys and vals we'll use
1006
LinkInstruction editItem = new LinkInstruction();
1007            Link readerItem = new Link();
1008
1009            // Read the first items from both streams
1010
boolean hasEntries = db.next(readerItem, NullWritable.get());
1011            boolean hasEdits = edits.next(editItem);
1012
1013            // As long as we have both edits and entries to process,
1014
// we need to interleave them
1015
while (hasEntries && hasEdits) {
1016                int curInstruction = editItem.getInstruction();
1017
1018                // Perform operations
1019
if (curInstruction == ADD_LINK) {
1020                    // When we add a link, we may replace a previous
1021
// link with identical URL and MD5 values. The
1022
// MD5FirstComparator will use both values.
1023
//
1024
int comparison = comparator.compare(readerItem, editItem.getLink());
1025
1026                    if (comparison < 0) {
1027                        // Write the readerKey, just passing it along.
1028
// Don't process the edit yet.
1029
newDb.append(readerItem, NullWritable.get());
1030                        itemsWritten++;
1031                        hasEntries = db.next(readerItem, NullWritable.get());
1032                    } else if (comparison == 0) {
1033                        // 1. Write down the item for table-edits
1034
if (futureEdits != null) {
1035                            linksByURLEdits++;
1036                            liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get());
1037                        }
1038
1039                        // 2. Write the new item, "replacing" the old one.
1040
// We move to the next edit instruction and move
1041
// past the replaced db entry.
1042
newDb.append(editItem.getLink(), NullWritable.get());
1043                        itemsWritten++;
1044                        hasEntries = db.next(readerItem, NullWritable.get());
1045                        hasEdits = edits.next(editItem);
1046                    } else if (comparison > 0) {
1047                        // 1. Write down the item for table-edits
1048
if (futureEdits != null) {
1049                            linksByURLEdits++;
1050                            liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get());
1051                        }
1052
1053                        // 2. Write the new item. We stay at the current
1054
// db entry.
1055
newDb.append(editItem.getLink(), NullWritable.get());
1056                        itemsWritten++;
1057                        hasEdits = edits.next(editItem);
1058                    }
1059                } else if ((curInstruction == DEL_LINK) ||
1060                           (curInstruction == DEL_SINGLE_LINK)) {
1061                    // When we delete a link, we might delete many
1062
// at once! We are interested only in the MD5
1063
// here. If there are entries with identical MD5
1064
// values, but different URLs, we get rid of them
1065
// all.
1066
int comparison = 0;
1067                    if (curInstruction == DEL_LINK) {
1068                        comparison = readerItem.getFromID().compareTo(editItem.getLink().getFromID());
1069                    } else {
1070                        comparison = readerItem.md5Compare(editItem.getLink());
1071                    }
1072
1073                    if (comparison < 0) {
1074                        // Write the readerKey, just passing it along.
1075
// Don't process the edit yet.
1076
newDb.append(readerItem, NullWritable.get());
1077                        itemsWritten++;
1078                        hasEntries = db.next(readerItem, NullWritable.get());
1079                    } else if (comparison == 0) {
1080                        // Delete it (or them!)
1081
// 1. Write the full instruction for the next
1082
// delete-stage. That includes the read-in
1083
// value
1084
// 2. "Delete" the entry by skipping the
1085
// readerKey. We DO NOT go to the next edit
1086
// instruction! There might still be more
1087
// entries in the database to which we should
1088
// apply this delete-edit.
1089
//
1090
// Step 1. Write entry for future table-edits
1091
if (futureEdits != null) {
1092                            linksByURLEdits++;
1093                            liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_LINK, NullWritable.get());
1094                        }
1095
1096                        // Step 2.
1097
// We might want to delete multiple MD5s with
1098
// a single delete() operation, so keep this
1099
// edit instruction around
1100
hasEntries = db.next(readerItem, NullWritable.get());
1101                        if (curInstruction == DEL_SINGLE_LINK) {
1102                            hasEdits = edits.next(editItem);
1103                        }
1104                    } else if (comparison > 0) {
1105                        // Ignore, move on to next instruction
1106
hasEdits = edits.next(editItem);
1107                    }
1108                }
1109            }
1110
1111            // Now we have only edits. No more preexisting items!
1112
while (! hasEntries && hasEdits) {
1113                int curInstruction = editItem.getInstruction();
1114
1115                if (curInstruction == ADD_LINK) {
1116                    // 1. Write down the item for future table-edits
1117
if (futureEdits != null) {
1118                        linksByURLEdits++;
1119                        liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get());
1120                    }
1121
1122                    // 2. Just add the item from the edit list
1123
newDb.append(editItem.getLink(), NullWritable.get());
1124                    itemsWritten++;
1125                } else if (curInstruction == DEL_LINK) {
1126                    // Ignore operation
1127
}
1128                // Move on to next edit
1129
hasEdits = edits.next(editItem);
1130            }
1131
1132            // Now we have only preexisting items. Just copy them
1133
// to the new file, in order.
1134
while (hasEntries && ! hasEdits) {
1135                newDb.append(readerItem, NullWritable.get());
1136                itemsWritten++;
1137                hasEntries = db.next(readerItem, NullWritable.get());
1138            }
1139        }
1140    }
1141
1142    /**
1143     * This class helps the LinksByURLProcessor test a list of
1144     * Page objects, sorted by URL, for outlink-counts. We query
1145     * this class with a series of questions, based on Links sorted
1146     * by target URL.
1147     */

1148    private class TargetTester {
1149        MapFile.Reader pagedb;
1150        boolean hasPage = false;
1151        UTF8 pageURL = null;
1152        Page page = null;
1153
1154        /**
1155         */

1156        public TargetTester(MapFile.Reader pagedb) throws IOException {
1157            this.pagedb = pagedb;
1158            this.pageURL = new UTF8();
1159            this.page = new Page();
1160            this.hasPage = pagedb.next(pageURL, page);
1161        }
1162
1163        /**
1164         * Match the given URL against the sorted series of Page URLs.
1165         */

1166        public int hasOutlinks(UTF8 curURL) throws IOException {
1167            int returnCode = NO_OUTLINKS;
1168            int comparison = pageURL.compareTo(curURL);
1169
1170            while (hasPage && comparison < 0) {
1171                hasPage = pagedb.next(pageURL, page);
1172                if (hasPage) {
1173                    comparison = pageURL.compareTo(curURL);
1174                }
1175            }
1176
1177            if (hasPage) {
1178                if (comparison == 0) {
1179                    returnCode = (page.getNumOutlinks() > 0) ? HAS_OUTLINKS : NO_OUTLINKS;
1180                } else if (comparison > 0) {
1181                    //
1182
// This situation indicates that the Link's
1183
// target page has been deleted, probably
1184
// because we repeatedly failed to fetch the URL.
1185
// So, we should delete the Link.
1186
//
1187
returnCode = LINK_INVALID;
1188                }
1189            }
1190            return returnCode;
1191        }
1192
1193        /**
1194         */

1195        public void close() throws IOException {
1196            pagedb.close();
1197        }
1198    }
1199
1200    /**
1201     * Closes down and merges changes to the URL-driven link
1202     * table. This does nothing fancy, and propagates nothing
1203     * to a further stage. There is no next stage!
1204     */

1205    private class LinksByURLProcessor extends CloseProcessor {
1206        MapFile.Reader pageDb;
1207        SequenceFile.Writer futureEdits;
1208
1209        /**
1210         */

1211        public LinksByURLProcessor(MapFile.Reader db, SequenceFile.Writer editWriter, MapFile.Reader pageDb, SequenceFile.Writer futureEdits) {
1212            super(LINKS_BY_URL, db, editWriter, new SequenceFile.Sorter(fs, new LinkInstruction.UrlComparator(), NullWritable.class), new Link.UrlComparator(), Link.class, NullWritable.class);
1213            this.pageDb = pageDb;
1214            this.futureEdits = futureEdits;
1215        }
1216
1217        /**
1218         */

1219        public long closeDown(File workingDir, File outputDir, long numEdits) throws IOException {
1220            long result = super.closeDown(workingDir, outputDir, numEdits);
1221            //pageDb.close();
1222
return result;
1223        }
1224
1225        /**
1226         * Merge the existing db with the edit-stream into a brand-new file.
1227         */

1228        void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
1229            WritableComparator comparator = new Link.UrlComparator();
1230
1231            // Create the keys and vals we'll use
1232
LinkInstruction editItem = new LinkInstruction();
1233            Link readerItem = new Link();
1234        
1235            // Read the first items from both streams
1236
boolean hasEntries = db.next(readerItem, NullWritable.get());
1237            boolean hasEdits = sortedEdits.next(editItem, NullWritable.get());
1238            TargetTester targetTester = new TargetTester(pageDb);
1239
1240            // As long as we have both edits and entries to process,
1241
// we need to interleave them
1242
while (hasEntries && hasEdits) {
1243                int curInstruction = editItem.getInstruction();
1244
1245                if (curInstruction == ADD_LINK) {
1246                    // When we add a link, we may replace a previous
1247
// link with identical URL and MD5 values. Our
1248
// comparator will test both
1249
//
1250
int comparison = comparator.compare(readerItem, editItem.getLink());
1251
1252                    if (comparison < 0) {
1253                        // Write the readerKey, just passing it along.
1254
// Don't process the edit yet.
1255
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
1256
1257                        if (linkTest == LINK_INVALID) {
1258                            liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
1259                            targetOutlinkEdits++;
1260                        } else {
1261                            boolean oldOutlinkStatus = readerItem.targetHasOutlink();
1262                            boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
1263                            // Do the conditional so we minimize unnecessary
1264
// mod-writes.
1265
if (oldOutlinkStatus != newOutlinkStatus) {
1266                                readerItem.setTargetHasOutlink(newOutlinkStatus);
1267                                liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
1268                                targetOutlinkEdits++;
1269                            }
1270                            newDb.append(readerItem, NullWritable.get());
1271                            itemsWritten++;
1272                        }
1273                        hasEntries = db.next(readerItem, NullWritable.get());
1274                    } else if (comparison == 0) {
1275                        // Write the new item, "replacing" the old one.
1276
// We move to the next edit instruction and move
1277
// past the replaced db entry.
1278
Link editLink = editItem.getLink();
1279                        int linkTest = targetTester.hasOutlinks(editLink.getURL());
1280
1281                        // Delete the edit/readerItem from the other table if it's
1282
// found to be invalid.
1283
if (linkTest == LINK_INVALID) {
1284                            liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
1285                        } else {
1286                            editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
1287                            liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
1288
1289                            newDb.append(editLink, NullWritable.get());
1290                            itemsWritten++;
1291                        }
1292                        targetOutlinkEdits++;
1293
1294                        hasEntries = db.next(readerItem, NullWritable.get());
1295                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
1296                    } else if (comparison > 0) {
1297                        // Write the new item. We stay at the current
1298
// db entry.
1299
Link editLink = editItem.getLink();
1300                        int linkTest = targetTester.hasOutlinks(editLink.getURL());
1301
1302                        // Delete the edit from the other table if it's invalid
1303
if (linkTest == LINK_INVALID) {
1304                            liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
1305                        } else {
1306                            editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
1307                            liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
1308                            newDb.append(editLink, NullWritable.get());
1309                            itemsWritten++;
1310                        }
1311                        targetOutlinkEdits++;
1312
1313                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
1314                    }
1315                } else if (curInstruction == DEL_LINK) {
1316                    // When we delete a link, we do it by MD5 and apply
1317
// it to the index first. A single delete instruction
1318
// may remove many items in the db, during the earlier
1319
// processing. However, unlike the index-processing stage,
1320
// here we can expect a new DEL instruction for every
1321
// item that we remove from the db.
1322
//
1323
int comparison = comparator.compare(readerItem, editItem.getLink());
1324
1325                    if (comparison < 0) {
1326                        // Write readerKey, just passing it along. Don't
1327
// process the edit yet.
1328
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
1329
1330                        // Delete the reader item if it's found to be invalid
1331
if (linkTest == LINK_INVALID) {
1332                            liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
1333                        } else {
1334                            readerItem.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
1335                            liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
1336                            newDb.append(readerItem, NullWritable.get());
1337                            itemsWritten++;
1338                        }
1339                        targetOutlinkEdits++;
1340
1341                        hasEntries = db.next(readerItem, NullWritable.get());
1342                    } else if (comparison == 0) {
1343                        // "Delete" the item by passing by the readerKey.
1344
// We want a new entry, as well as the next instruction
1345
// to process.
1346
hasEntries = db.next(readerItem, NullWritable.get());
1347                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
1348                    } else if (comparison > 0) {
1349                        // Ignore, move on to next instruction
1350
hasEdits = sortedEdits.next(editItem, NullWritable.get());
1351                    }
1352                }
1353            }
1354
1355            // Now we have only edits. No more preexisting items!
1356
while (! hasEntries && hasEdits) {
1357                int curInstruction = editItem.getInstruction();
1358
1359                if (curInstruction == ADD_LINK) {
1360                    //
1361
// Add the item from the edit list.
1362
//
1363

1364                    //
1365
// Make sure the outlinks flag is set properly.
1366
//
1367
Link editLink = editItem.getLink();
1368                    int linkTest = targetTester.hasOutlinks(editLink.getURL());
1369                    if (linkTest == LINK_INVALID) {
1370                        liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
1371                    } else {
1372                        editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
1373                        liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
1374                        newDb.append(editLink, NullWritable.get());
1375                        itemsWritten++;
1376                    }
1377                    targetOutlinkEdits++;
1378                } else if (curInstruction == DEL_LINK) {
1379                    // Ignore operation
1380
}
1381                // Move on to next edit
1382
hasEdits = sortedEdits.next(editItem, NullWritable.get());
1383            }
1384
1385            // Now we have only preexisting items. Just copy them
1386
// to the new file, in order.
1387
while (hasEntries && ! hasEdits) {
1388                //
1389
// Simply copy the remaining database items.
1390
//
1391

1392                //
1393
// First, make sure the 'outlinks' flag is set properly.
1394
//
1395
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
1396                if (linkTest == LINK_INVALID) {
1397                    liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
1398                    targetOutlinkEdits++;
1399                } else {
1400                    boolean oldOutlinkStatus = readerItem.targetHasOutlink();
1401                    boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
1402                    if (oldOutlinkStatus != newOutlinkStatus) {
1403                        readerItem.setTargetHasOutlink(newOutlinkStatus);
1404                        liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
1405                        targetOutlinkEdits++;
1406                    }
1407
1408                    // Now copy the object
1409
newDb.append(readerItem, NullWritable.get());
1410                    itemsWritten++;
1411                }
1412
1413                // Move on to next
1414
hasEntries = db.next(readerItem, NullWritable.get());
1415            }
1416
1417            targetTester.close();
1418        }
1419    }
1420
1421    /**
1422     * Create the WebDB for the first time.
1423     */

1424    public static void createWebDB(NutchFileSystem nfs, File dbDir) throws IOException {
1425        WebDBWriter starter = new WebDBWriter(nfs, dbDir, true);
1426        starter.close();
1427    }
1428    
1429    boolean haveEdits = false;
1430    NutchFileSystem fs = null;
1431    File dbDir, dbFile, oldDbFile, newDbFile, tmp;
1432    MapFile.Reader pagesByURL, pagesByMD5, linksByURL, linksByMD5;
1433    SequenceFile.Writer pagesByURLWriter, pagesByMD5Writer, linksByURLWriter, linksByMD5Writer;
1434    long pagesByURLEdits = 0, pagesByMD5Edits = 0, linksByURLEdits = 0, linksByMD5Edits = 0, targetOutlinkEdits = 0;
1435    PageInstructionWriter piwriter = new PageInstructionWriter();
1436    LinkInstructionWriter liwriter = new LinkInstructionWriter();
1437    DataInputBuffer inBuf = new DataInputBuffer();
1438    DataOutputBuffer outBuf = new DataOutputBuffer();
1439
1440    /**
1441     * Create a WebDBWriter.
1442     */

1443    public WebDBWriter(NutchFileSystem fs, File dbDir) throws IOException {
1444        this(fs, dbDir, false);
1445    }
1446
1447    /**
1448     * Private constructor, so we can either open or create the db files.
1449     */

1450    private WebDBWriter(NutchFileSystem fs, File dbDir, boolean create) throws IOException {
1451        this.fs = fs;
1452        this.dbDir = dbDir;
1453        this.dbFile = new File(dbDir, "webdb");
1454        this.oldDbFile = new File(dbDir, "webdb.old");
1455        this.newDbFile = new File(dbDir, "webdb.new");
1456        this.tmp = new File(newDbFile, "tmp");
1457
1458        if ((! fs.exists(dbDir)) && create) {
1459            fs.mkdirs(dbDir);
1460        }
1461        if (! fs.exists(dbDir) || ! fs.isDirectory(dbDir)) {
1462            throw new IOException("Database " + dbDir + " is not a directory.");
1463        }
1464
1465        // Lock the writeLock immediately.
1466
fs.lock(new File(dbDir, "dbwritelock"), false);
1467
1468        // Resolve any partial-state dirs from the last run.
1469
if (fs.exists(oldDbFile)) {
1470            if (fs.exists(dbFile)) {
1471                throw new IOException("Impossible condition: directories " + oldDbFile + " and " + dbFile + " cannot exist simultaneously");
1472            }
1473            if (fs.exists(newDbFile)) {
1474                fs.rename(newDbFile, dbFile);
1475            }
1476            FileUtil.fullyDelete(fs, oldDbFile);
1477        } else if (fs.exists(newDbFile)) {
1478            FileUtil.fullyDelete(fs, newDbFile);
1479        }
1480
1481        // Create the directory, if necessary
1482
if ((! fs.exists(dbFile)) && create) {
1483            fs.mkdirs(dbFile);
1484        }
1485
1486        // Delete any partial edits from last time.
1487
if (fs.exists(tmp)) {
1488            FileUtil.fullyDelete(fs, tmp);
1489        }
1490        fs.mkdirs(tmp);
1491
1492        // Create the file names we need
1493
if (create) {
1494            new MapFile.Writer(fs, new File(dbFile, PAGES_BY_URL).getPath(), new UTF8.Comparator(), Page.class).close();
1495            new MapFile.Writer(fs, new File(dbFile, PAGES_BY_MD5).getPath(), new Page.Comparator(), NullWritable.class).close();
1496            new MapFile.Writer(fs, new File(dbFile, LINKS_BY_URL).getPath(), new Link.UrlComparator(), NullWritable.class).close();
1497            new MapFile.Writer(fs, new File(dbFile, LINKS_BY_MD5).getPath(), new Link.MD5Comparator(), NullWritable.class).close();
1498        }
1499
1500        // Create the Readers for those files
1501
this.pagesByURL = new MapFile.Reader(fs, new File(dbFile, PAGES_BY_URL).getPath(), new UTF8.Comparator());
1502        this.pagesByMD5 = new MapFile.Reader(fs, new File(dbFile, PAGES_BY_MD5).getPath(), new Page.Comparator());
1503        this.linksByURL = new MapFile.Reader(fs, new File(dbFile, LINKS_BY_URL).getPath(), new Link.UrlComparator());
1504        this.linksByMD5 = new MapFile.Reader(fs, new File(dbFile, LINKS_BY_MD5).getPath(), new Link.MD5Comparator());
1505
1506        // Create writers for new edit-files. We write changes
1507
// into these files, then apply them to the db upon close().
1508
pagesByURLWriter = new SequenceFile.Writer(fs, new File(tmp, PAGES_BY_URL + ".out").getPath(), PageInstruction.class, NullWritable.class);
1509        pagesByMD5Writer = new SequenceFile.Writer(fs, new File(tmp, PAGES_BY_MD5 + ".out").getPath(), PageInstruction.class, NullWritable.class);
1510        linksByURLWriter = new SequenceFile.Writer(fs, new File(tmp, LINKS_BY_URL + ".out").getPath(), LinkInstruction.class, NullWritable.class);
1511        linksByMD5Writer = new SequenceFile.Writer(fs, new File(tmp, LINKS_BY_MD5 + ".out").getPath(), LinkInstruction.class, NullWritable.class);
1512    }
1513
1514    /**
1515     * Shutdown
1516     */

1517    public synchronized void close() throws IOException {
1518        if (haveEdits) {
1519            fs.mkdirs(newDbFile);
1520
1521            // Process the 4 tables:
1522
// 1. pagesByURL
1523
// 2. pagesByMD5
1524
// 3. linksByMD5
1525
// 4. linksByURL
1526

1527            // 1. Process pagesByURL. Processing this stream will
1528
// generate a number of edits for the pagesByMD5 step.
1529
//
1530
CloseProcessor pagesByURLProcessor = new PagesByURLProcessor(pagesByURL, pagesByURLWriter, pagesByMD5Writer);
1531            long numPBUItems = pagesByURLProcessor.closeDown(tmp, newDbFile, pagesByURLEdits);
1532
1533            //
1534
// 2. Process the pagesByMD5 edit stream. This will
1535
// make calls to deleteLink(), which are processed later.
1536
//
1537
CloseProcessor pagesByMD5Processor = new PagesByMD5Processor(pagesByMD5, pagesByMD5Writer);
1538            long numPBMItems = pagesByMD5Processor.closeDown(tmp, newDbFile, pagesByMD5Edits);
1539
1540            //
1541
// 3. Process the linksByMD5 edit stream first. This
1542
// will generate a number of edits for the linksByURL
1543
// stream. This also processes the calls to deleteLink()
1544
// that may have been invoked as part of the above call
1545
// to process pagesByMD5.
1546
CloseProcessor linksByMD5Processor = new LinksByMD5Processor(linksByMD5, linksByMD5Writer, linksByURLWriter);
1547            long numLBMItems = linksByMD5Processor.closeDown(tmp, newDbFile, linksByMD5Edits);
1548
1549            //
1550
// 4. Process the linksByURL edit stream. This will also
1551
// read through the sorted PagesByURL file, and modify
1552
// the Links so that they indicated whether the target
1553
// Page has any outlinks or not.
1554
//
1555
SequenceFile.Writer targetOutlinkEditsWriter = new SequenceFile.Writer(fs, new File(tmp, LINKS_BY_MD5 + ".out").getPath(), LinkInstruction.class, NullWritable.class);
1556            CloseProcessor linksByURLProcessor = new LinksByURLProcessor(linksByURL, linksByURLWriter, new MapFile.Reader(fs, new File(newDbFile, PAGES_BY_URL).getPath(), new UTF8.Comparator()), targetOutlinkEditsWriter);
1557            long numLBUItems = linksByURLProcessor.closeDown(tmp, newDbFile, linksByURLEdits);
1558
1559            //
1560
// If the number of linksByURL processed is zero, then
1561
// there's no reason to do all of the following with
1562
// a 2nd pass through linksByMD5.
1563
//
1564
if (numLBUItems == 0) {
1565                targetOutlinkEditsWriter.close();
1566
1567                //
1568
// Need to load in the previous number of links, so we
1569
// don't write over with the wrong value.
1570
//
1571
File stats = new File(dbFile, STATS_FILE);
1572                if (fs.exists(stats)) {
1573                    DataInputStream in = new DataInputStream(fs.open(stats));
1574                    try {
1575                        in.read(); // version
1576
in.readLong(); // previous num of pages
1577
numLBMItems = in.readLong(); // previous num of links
1578
} finally {
1579                        in.close();
1580                    }
1581                }
1582            } else {
1583                //
1584
// 5. Step 4 did several things to the LinksByURL db.
1585
// First, it implemented all the changes generated
1586
// by instructions from LinksByMD5Processor. Second,
1587
// it made lots of calls to setTargetHasOutlink. This
1588
// changes the content of the Link objects.
1589
//
1590
// So now we need to reconstruct the LinksByMD5
1591
// list, using the Links we created in step #4.
1592
//
1593

1594                File stageTwoDbFile = new File(newDbFile, "stage2.subdir");
1595                fs.mkdirs(stageTwoDbFile);
1596
1597                MapFile.Reader linksByMD5ForStageTwo = new MapFile.Reader(fs, new File(newDbFile, LINKS_BY_MD5).getPath(), new Link.MD5Comparator());
1598                CloseProcessor linksByMD5StageTwoProcessor = new LinksByMD5Processor(linksByMD5ForStageTwo, targetOutlinkEditsWriter, null);
1599                numLBMItems = linksByMD5StageTwoProcessor.closeDown(tmp, stageTwoDbFile, targetOutlinkEdits);
1600
1601                //
1602
// 6. Now move the Stage2 LinksByMD5 file up to replace
1603
// the one at the primary level
1604
//
1605
linksByMD5ForStageTwo.close();
1606                File stageOneLinksByMD5 = new File(newDbFile, LINKS_BY_MD5);
1607                fs.delete(stageOneLinksByMD5);
1608                fs.rename(new File(stageTwoDbFile, LINKS_BY_MD5), stageOneLinksByMD5);
1609                fs.delete(stageTwoDbFile);
1610            }
1611
1612            //
1613
// 7. Finally, write out the total num of pages and links
1614
//
1615
File stats = new File(newDbFile, STATS_FILE);
1616            DataOutputStream out = new DataOutputStream(fs.create(stats));
1617            try {
1618                //
1619
// These counts are guaranteed to be correct; they're
1620
// based on the counts made during processing of primary-key
1621
// edits. Pages are always counted by URL first, and only
1622
// subsequently by MD5 if there are any edits to make. Links
1623
// are always counted by MD5 first, and only by URL subsequently
1624
// and conditionally.
1625
//
1626
// If there are a bunch of edits that result in no modifications
1627
// to the db, the two sets of counts (one for URL, one for
1628
// MD5) could become out of sync. So we use the ones that
1629
// are sure to be accurate.
1630
//
1631
out.write(CUR_VERSION);
1632                out.writeLong(numPBUItems);
1633                out.writeLong(numLBMItems);
1634            } finally {
1635                out.close();
1636            }
1637        } else {
1638            pagesByURLWriter.close();
1639            pagesByMD5Writer.close();
1640            linksByMD5Writer.close();
1641            linksByURLWriter.close();
1642        }
1643
1644        // Close down the db-readers
1645
pagesByURL.close();
1646        pagesByMD5.close();
1647        linksByMD5.close();
1648        linksByURL.close();
1649
1650        // Delete the edits directory.
1651
fs.delete(tmp);
1652
1653        // Before we can move the newdb into place over the
1654
// old one, we need to make sure there are no processes
1655
// reading the old db. This obtains an exclusive lock
1656
// on the directory that holds our dbs (old and new).
1657
fs.lock(new File(dbDir, "dbreadlock"), false);
1658
1659        // We're done! Now we rename the directories and
1660
// all is well.
1661
if (haveEdits) {
1662            fs.rename(dbFile, oldDbFile);
1663            fs.rename(newDbFile, dbFile);
1664            FileUtil.fullyDelete(fs, oldDbFile);
1665        } else {
1666            // Sometimes the "newdb" is created as a side-effect
1667
// of creating the tmp dir, even when there are no edits.
1668
// Get rid of it.
1669
FileUtil.fullyDelete(fs, newDbFile);
1670        }
1671
1672        // release the readlock
1673
fs.release(new File(dbDir, "dbreadlock"));
1674
1675        // release the writelock
1676
fs.release(new File(dbDir, "dbwritelock"));
1677    }
1678
1679    /////////////////////
1680
// Methods for adding, and managing, db operations
1681
////////////////////
1682

1683    /**
1684     * Add a page to the page database
1685     */

1686    public synchronized void addPage(Page page) throws IOException {
1687        // The 2nd (byMD5) part is handled during processing of the 1st.
1688
haveEdits = true;
1689        pagesByURLEdits++;
1690        piwriter.appendInstructionInfo(pagesByURLWriter, page, ADD_PAGE, NullWritable.get());
1691    }
1692
1693    /**
1694     * Add a page to the page database, with a brand-new score
1695     */

1696    public synchronized void addPageWithScore(Page page) throws IOException {
1697        // The 2nd (byMD5) part is handled during processing of the 1st.
1698
haveEdits = true;
1699        pagesByURLEdits++;
1700        piwriter.appendInstructionInfo(pagesByURLWriter, page, ADD_PAGE_WITH_SCORE, NullWritable.get());
1701    }
1702
1703    /**
1704     * Don't replace the one in the database, if there is one.
1705     */

1706    public synchronized void addPageIfNotPresent(Page page) throws IOException {
1707        // The 2nd (index) part is handled during processing of the 1st.
1708
haveEdits = true;
1709        pagesByURLEdits++;
1710        piwriter.appendInstructionInfo(pagesByURLWriter, page, ADD_PAGE_IFN_PRESENT, NullWritable.get());
1711    }
1712
1713    /**
1714     * Don't replace the one in the database, if there is one.
1715     *
1716     * If we do insert the new Page, then we should also insert
1717     * the given Link object.
1718     */

1719    public synchronized void addPageIfNotPresent(Page page, Link link) throws IOException {
1720        // The 2nd (index) part is handled during processing of the 1st.
1721
haveEdits = true;
1722        pagesByURLEdits++;
1723        piwriter.appendInstructionInfo(pagesByURLWriter, page, link, ADD_PAGE_IFN_PRESENT, NullWritable.get());
1724    }
1725
1726    /**
1727     * Remove a page from the page database.
1728     */

1729    public synchronized void deletePage(String JavaDoc url) throws IOException {
1730        // The 2nd (index) part is handled during processing of the 1st.
1731
haveEdits = true;
1732        Page p = new Page();
1733        p.setURL(url);
1734        pagesByURLEdits++;
1735        piwriter.appendInstructionInfo(pagesByURLWriter, p, DEL_PAGE, NullWritable.get());
1736    }
1737
1738    /**
1739     * Add a link to the link database
1740     */

1741    public synchronized void addLink(Link lr) throws IOException {
1742        haveEdits = true;
1743        linksByMD5Edits++;
1744        liwriter.appendInstructionInfo(linksByMD5Writer, lr, ADD_LINK, NullWritable.get());
1745    }
1746
1747    /**
1748     * Remove links with the given MD5 from the db.
1749     */

1750    public synchronized void deleteLink(MD5Hash md5) throws IOException {
1751        haveEdits = true;
1752        linksByMD5Edits++;
1753        liwriter.appendInstructionInfo(linksByMD5Writer, new Link(md5, 0, "", ""), DEL_LINK, NullWritable.get());
1754    }
1755
1756    /**
1757     * The WebDBWriter.main() provides some handy methods for
1758     * testing the WebDB.
1759     */

1760    public static void main(String JavaDoc argv[]) throws FileNotFoundException, IOException {
1761        if (argv.length < 2) {
1762            System.out.println("Usage: java net.nutch.db.WebDBWriter (-local | -ndfs <namenode:port>) <db> [-create] | [-addpage id url] | [-addpageifnp id url] | [-deletepage url] | [-addlink fromID url] | [-deletelink fromID]");
1763            return;
1764        }
1765
1766        int i = 0;
1767        NutchFileSystem nfs = NutchFileSystem.parseArgs(argv, i);
1768        File dbDir = new File(argv[i++]);
1769        try {
1770            if ("-create".equals(argv[i])) {
1771                i++;
1772                WebDBWriter.createWebDB(nfs, dbDir);
1773                System.out.println("Created webdb at " + nfs + ":" + dbDir);
1774            } else if ("-addpage".equals(argv[i])) {
1775                i++;
1776                MD5Hash md5 = new MD5Hash(argv[i++]);
1777                String JavaDoc url = argv[i++];
1778
1779                WebDBWriter writer = new WebDBWriter(nfs, dbDir);
1780                try {
1781                    Page page = new Page(url, md5);
1782                    writer.addPageWithScore(page);
1783                    System.out.println("Added page (with score): " + page);
1784                } finally {
1785                    writer.close();
1786                }
1787            } else if ("-addpageifnp".equals(argv[i])) {
1788                i++;
1789                MD5Hash md5 = new MD5Hash(argv[i++]);
1790                String JavaDoc url = argv[i++];
1791
1792                WebDBWriter writer = new WebDBWriter(nfs, dbDir);
1793                try {
1794                    Page page = new Page(url, md5);
1795                    writer.addPageIfNotPresent(page);
1796                    System.out.println("Added page: " + page);
1797                } finally {
1798                    writer.close();
1799                }
1800            } else if ("-deletepage".equals(argv[i])) {
1801                i++;
1802                String JavaDoc url = argv[i++];
1803
1804                WebDBWriter writer = new WebDBWriter(nfs, dbDir);
1805                try {
1806                    writer.deletePage(url.trim());
1807                    System.out.println("Deleted item(s)");
1808                } finally {
1809                    writer.close();
1810                }
1811            } else if ("-addlink".equals(argv[i])) {
1812                i++;
1813                MD5Hash fromID = new MD5Hash(argv[i++]);
1814                String JavaDoc url = argv[i++];
1815
1816                WebDBWriter writer = new WebDBWriter(nfs, dbDir);
1817                try {
1818                    Link link = new Link(fromID, MD5Hash.digest("randomstring.com").halfDigest(), url, "SomeRandomAnchorText_" + System.currentTimeMillis());
1819                    writer.addLink(link);
1820                    System.out.println("Added link: " + link);
1821                } finally {
1822                    writer.close();
1823                }
1824            } else if ("-deletelink".equals(argv[i])) {
1825                i++;
1826                MD5Hash fromID = new MD5Hash(argv[i++]);
1827
1828                WebDBWriter writer = new WebDBWriter(nfs, dbDir);
1829                try {
1830                    writer.deleteLink(fromID);
1831                    System.out.println("Deleted item(s)");
1832                } finally {
1833                    writer.close();
1834                }
1835            } else {
1836                System.out.println("Sorry, no command with name " + argv[i]);
1837            }
1838        } finally {
1839            nfs.close();
1840        }
1841    }
1842}
1843
1844
1845
1846
Popular Tags