KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > db > DistributedWebDBWriter


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.db;
5
6 import java.io.*;
7 import java.util.*;
8 import java.util.logging.*;
9 import java.nio.channels.*;
10
11 import net.nutch.io.*;
12 import net.nutch.fs.*;
13 import net.nutch.util.*;
14 import net.nutch.pagedb.*;
15 import net.nutch.linkdb.*;
16
17 /***************************************************
18  * This is a wrapper class that allows us to reorder
19  * write operations to the linkdb and pagedb. It is
20  * useful only for objects like UpdateDatabaseTool,
21  * which just does writes.
22  *
23  * The WebDBWriter is a traditional single-pass database writer.
24  * It does not cache any instructions to disk (but it does
25  * in memory, with possible resorting). It certainly does
26  * nothing in a distributed fashion.
27  *
28  * There are other implementors of IWebDBWriter that do
29  * all that fancy stuff.
30  *
31  * @author Mike Cafarella
32  *************************************************/

33 public class DistributedWebDBWriter implements IWebDBWriter {
34     static final Logger LOG = LogFormatter.getLogger("net.nutch.db.WebDBWriter");
35     static final byte CUR_VERSION = 0;
36     static final byte OPEN_COUNTER_VERSION = 0;
37     static final byte CLOSE_COUNTER_VERSION = 0;
38     static final byte MACHINE_INFO_VERSION = 0;
39
40     // magic number
41
static int READY_TO_USE = 0xbabecafe;
42     static int IS_COMPLETE = 0xbabe0000;
43     static int WRITE_LOCK_INFO = 0xcafe0000;
44     static long LONG_TIMEOUT = 10 * 1000;
45
46     // db opcodes
47
static final byte ADD_PAGE = 0;
48     static final byte ADD_PAGE_WITH_SCORE = 1;
49     static final byte ADD_PAGE_IFN_PRESENT = 2;
50     static final byte DEL_PAGE = 3;
51     static final int ADD_LINK = 0;
52     static final int DEL_LINK = 1;
53     static final int DEL_SINGLE_LINK = 2;
54
55     // filenames
56
static final String JavaDoc PAGES_BY_URL = "pagesByURL";
57     static final String JavaDoc PAGES_BY_MD5 = "pagesByMD5";
58     static final String JavaDoc LINKS_BY_URL = "linksByURL";
59     static final String JavaDoc LINKS_BY_MD5 = "linksByMD5";
60     static final String JavaDoc STATS_FILE = "stats";
61     static final String JavaDoc META_SHAREGROUP = "metashare";
62     static final String JavaDoc METAINFO = "metainfo";
63
64     // Result codes for page-url comparisons
65
static final int NO_OUTLINKS = 0;
66     static final int HAS_OUTLINKS = 1;
67     static final int LINK_INVALID = 2;
68
69     /********************************************
70      * PageInstruction holds an operation over a Page.
71      *********************************************/

72     public static class PageInstruction implements WritableComparable {
73         byte opcode;
74         boolean hasLink;
75         Page page;
76         Link link;
77
78         /**
79          */

80         public PageInstruction() {}
81
82         /**
83          */

84         public PageInstruction(Page page, int opcode) {
85             set(page, opcode);
86         }
87
88         /**
89          */

90         public PageInstruction(Page page, Link link, int opcode) {
91             set(page, link, opcode);
92         }
93
94         /**
95          * Init from another PageInstruction object.
96          */

97         public void set(PageInstruction that) {
98             this.opcode = that.opcode;
99
100             if (this.page == null) {
101                 this.page = new Page();
102             }
103             this.page.set(that.page);
104
105             if (this.link == null) {
106                 this.link = new Link();
107             }
108             this.hasLink = that.hasLink;
109             if (this.hasLink) {
110                 this.link.set(that.link);
111             }
112         }
113
114         /**
115          * Init PageInstruction with no Link
116          */

117         public void set(Page page, int opcode) {
118             this.opcode = (byte) opcode;
119             this.page = page;
120             this.hasLink = false;
121             this.link = null;
122         }
123
124         /**
125          * Init PageInstruction with a Link
126          */

127         public void set(Page page, Link link, int opcode) {
128             this.opcode = (byte) opcode;
129             this.page = page;
130             this.hasLink = true;
131             this.link = link;
132         }
133
134         //
135
// WritableComparable
136
//
137
public int compareTo(Object JavaDoc o) {
138             int pageResult = this.page.compareTo(((PageInstruction) o).page);
139             if (pageResult != 0) {
140                 return pageResult;
141             } else {
142                 return this.opcode - (((PageInstruction) o).opcode);
143             }
144         }
145         public void write(DataOutput out) throws IOException {
146             out.writeByte(opcode);
147             page.write(out);
148             out.writeByte(hasLink ? 1 : 0);
149             if (hasLink) {
150                 link.write(out);
151             }
152         }
153         public void readFields(DataInput in) throws IOException {
154             opcode = in.readByte();
155             if (page == null) {
156                 page = new Page();
157             }
158             page.readFields(in);
159             
160             if (link == null) {
161                 link = new Link();
162             }
163             hasLink = (1 == in.readByte());
164             if (hasLink) {
165                 link.readFields(in);
166             }
167         }
168         public Page getPage() {
169             return page;
170         }
171         public Link getLink() {
172             if (hasLink) {
173                 return link;
174             } else {
175                 return null;
176             }
177         }
178         public int getInstruction() {
179             return opcode;
180         }
181
182         /**
183          * Sorts the instruction first by Page, then by opcode.
184          */

185         public static class PageComparator extends WritableComparator {
186             private static final Page.Comparator PAGE_COMPARATOR =
187             new Page.Comparator();
188
189             public PageComparator() { super(PageInstruction.class); }
190
191             /** Optimized comparator. */
192             public int compare(byte[] b1, int s1, int l1,
193                                byte[] b2, int s2, int l2) {
194                 int opcode1 = b1[s1];
195                 int opcode2 = b2[s2];
196                 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
197                 if (c != 0)
198                     return c;
199                 return opcode1 - opcode2;
200             }
201         }
202  
203         /*****************************************************
204          * Sorts the instruction first by url, then by opcode.
205          *****************************************************/

206         public static class UrlComparator extends WritableComparator {
207             private static final Page.UrlComparator PAGE_COMPARATOR =
208             new Page.UrlComparator();
209
210             public UrlComparator() { super(PageInstruction.class); }
211
212             /**
213              * We need to sort by ordered URLs. First, we sort by
214              * URL, then by opcode.
215              */

216             public int compare(WritableComparable a, WritableComparable b) {
217                 PageInstruction instructionA = (PageInstruction)a;
218                 PageInstruction instructionB = (PageInstruction)b;
219                 Page pageA = instructionA.getPage();
220                 Page pageB = instructionB.getPage();
221
222                 int result = pageA.getURL().compareTo(pageB.getURL());
223                 if (result != 0) {
224                     return result;
225                 } else {
226                     return instructionA.opcode - instructionB.opcode;
227                 }
228             }
229
230             /**
231              * Optimized comparator.
232              */

233             public int compare(byte[] b1, int s1, int l1,
234                                byte[] b2, int s2, int l2) {
235                 int opcode1 = b1[s1];
236                 int opcode2 = b2[s2];
237                 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
238                 if (c != 0)
239                     return c;
240                 return opcode1 - opcode2;
241             }
242         }
243     }
244
245     /********************************************************
246      * PageInstructionWriter very efficiently writes a
247      * PageInstruction to an EditSectionGroupWriter. Much better
248      * than calling "writer.append(new PageInstruction())"
249      ********************************************************/

250     public static class PageInstructionWriter {
251         PageInstruction pi = new PageInstruction();
252
253         /**
254          */

255         public PageInstructionWriter() {
256         }
257
258         /**
259          * Append the PageInstruction info to the indicated SequenceFile,
260          * and keep the PI for later reuse.
261          */

262         public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Page page, int opcode, Writable val) throws IOException {
263             pi.set(page, opcode);
264             writer.append(pi, val);
265         }
266
267         /**
268          * Append the PageInstruction info to the indicated SequenceFile,
269          * and keep the PI for later reuse.
270          */

271         public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Page page, Link link, int opcode, Writable val) throws IOException {
272             pi.set(page, link, opcode);
273             writer.append(pi, val);
274         }
275     }
276
277     /*************************************************************
278      * Reduce multiple instructions for a given url to the single effective
279      * instruction. ADD is prioritized highest, then ADD_IFN_PRESENT, and then
280      * DEL. Not coincidentally, this is opposite the order they're sorted in.
281      **************************************************************/

282     private static class DeduplicatingPageSequenceReader {
283         SequenceFile.Reader edits;
284         PageInstruction current = new PageInstruction();
285         UTF8 currentUrl = new UTF8();
286         boolean haveCurrent;
287
288         /**
289          */

290         public DeduplicatingPageSequenceReader(SequenceFile.Reader edits) throws IOException {
291             this.edits = edits;
292             this.haveCurrent = edits.next(current, NullWritable.get());
293         }
294
295         /**
296          */

297         public boolean next(PageInstruction result) throws IOException {
298             if (!haveCurrent) {
299                 return false;
300             }
301         
302             currentUrl.set(current.getPage().getURL());
303             result.set(current); // take the first instruction
304

305             do {
306                 // skip the rest
307
} while ((haveCurrent = edits.next(current, NullWritable.get())) &&
308                      currentUrl.compareTo(current.getPage().getURL()) == 0);
309             return true;
310         }
311     }
312
313
314     /*************************************************
315      * Holds an instruction over a Link.
316      *************************************************/

317     public static class LinkInstruction implements WritableComparable {
318         Link link;
319         int instruction;
320
321         /**
322          */

323         public LinkInstruction() {
324         }
325
326         /**
327          */

328         public LinkInstruction(Link link, int instruction) {
329             set(link, instruction);
330         }
331
332         /**
333          * Re-init from another LinkInstruction's info.
334          */

335         public void set(LinkInstruction that) {
336             this.instruction = that.instruction;
337           
338             if (this.link == null)
339                 this.link = new Link();
340
341             this.link.set(that.link);
342         }
343
344         /**
345          * Re-init with a Link and an instruction
346          */

347         public void set(Link link, int instruction) {
348             this.link = link;
349             this.instruction = instruction;
350         }
351
352         //
353
// WritableComparable
354
//
355
public int compareTo(Object JavaDoc o) {
356             return this.link.compareTo(((LinkInstruction) o).link);
357         }
358         public void write(DataOutput out) throws IOException {
359             out.writeByte(instruction);
360             link.write(out);
361         }
362         public void readFields(DataInput in) throws IOException {
363             this.instruction = in.readByte();
364             if (link == null)
365                 link = new Link();
366             link.readFields(in);
367         }
368         public Link getLink() {
369             return link;
370         }
371         public int getInstruction() {
372             return instruction;
373         }
374
375         /*******************************************************
376          * Sorts the instruction first by Md5, then by opcode.
377          *******************************************************/

378         public static class MD5Comparator extends WritableComparator {
379             private static final Link.MD5Comparator MD5_COMPARATOR =
380             new Link.MD5Comparator();
381
382             public MD5Comparator() { super(LinkInstruction.class); }
383
384             public int compare(WritableComparable a, WritableComparable b) {
385                 LinkInstruction instructionA = (LinkInstruction)a;
386                 LinkInstruction instructionB = (LinkInstruction)b;
387                 return instructionA.link.md5Compare(instructionB.link);
388             }
389
390             /** Optimized comparator. */
391             public int compare(byte[] b1, int s1, int l1,
392                                byte[] b2, int s2, int l2) {
393                 return MD5_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
394             }
395         }
396  
397         /*********************************************************
398          * Sorts the instruction first by url, then by opcode.
399          *********************************************************/

400         public static class UrlComparator extends WritableComparator {
401             private static final Link.UrlComparator URL_COMPARATOR =
402             new Link.UrlComparator();
403
404             public UrlComparator() { super(LinkInstruction.class); }
405
406             public int compare(WritableComparable a, WritableComparable b) {
407                 LinkInstruction instructionA = (LinkInstruction)a;
408                 LinkInstruction instructionB = (LinkInstruction)b;
409                 return instructionA.link.urlCompare(instructionB.link);
410
411             }
412
413             /**
414              * Optimized comparator.
415              */

416             public int compare(byte[] b1, int s1, int l1,
417                                byte[] b2, int s2, int l2) {
418                 return URL_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
419             }
420         }
421     }
422
423     /*******************************************************
424      * LinkInstructionWriter very efficiently writes a
425      * LinkInstruction to an EditSectionGroupWriter. Much better
426      * than calling "writer.append(new LinkInstruction())"
427      ********************************************************/

428     public static class LinkInstructionWriter {
429         LinkInstruction li = new LinkInstruction();
430
431         /**
432          */

433         public LinkInstructionWriter() {
434         }
435
436         /**
437          * Append the LinkInstruction info to the indicated SequenceFile
438          * and keep the LI for later reuse.
439          */

440         public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Link link, int opcode, Writable val) throws IOException {
441             li.set(link, opcode);
442             writer.append(li, val);
443         }
444     }
445
446     /********************************************************
447      * This class deduplicates link operations. We want to
448      * sort by MD5, then by URL. But all operations
449      * should be unique.
450      *********************************************************/

451     class DeduplicatingLinkSequenceReader {
452         Link currentKey = new Link();
453         LinkInstruction current = new LinkInstruction();
454         SequenceFile.Reader edits;
455         boolean haveCurrent;
456
457         /**
458          */

459         public DeduplicatingLinkSequenceReader(SequenceFile.Reader edits) throws IOException {
460             this.edits = edits;
461             this.haveCurrent = edits.next(current, NullWritable.get());
462         }
463
464
465         /**
466          * The incoming stream of edits is sorted first by MD5, then by URL.
467          * MD5-only values always come before MD5+URL.
468          */

469         public boolean next(LinkInstruction key) throws IOException {
470             if (! haveCurrent) {
471                 return false;
472             }
473
474             currentKey.set(current.getLink());
475             
476             do {
477                 key.set(current);
478             } while ((haveCurrent = edits.next(current, NullWritable.get())) &&
479                      currentKey.compareTo(current.getLink()) == 0);
480             return true;
481         }
482     }
483
484
485     /**************************************************
486      * The CloseProcessor class is used when we close down
487      * the webdb. We give it the path, members, and class values
488      * needed to apply changes to any of our 4 data tables.
489      *
490      * This is an abstract class. Each subclass must define
491      * the exact merge procedure. However, file-handling
492      * and edit-processing is standardized as much as possible.
493      *
494      **************************************************/

495     private abstract class CloseProcessor {
496         String JavaDoc basename;
497         String JavaDoc curDBPart;
498         MapFile.Reader oldDb;
499         EditSectionGroupWriter editWriter;
500         SequenceFile.Sorter sorter;
501         WritableComparator comparator;
502         Class JavaDoc keyClass, valueClass;
503         long itemsWritten = 0;
504
505         /**
506          * Store away these members for later use.
507          */

508         CloseProcessor(String JavaDoc basename, MapFile.Reader oldDb, EditSectionGroupWriter editWriter, SequenceFile.Sorter sorter, WritableComparator comparator, Class JavaDoc keyClass, Class JavaDoc valueClass, String JavaDoc curDBPart) {
509             this.basename = basename;
510             this.oldDb = oldDb;
511             this.editWriter = editWriter;
512             this.sorter = sorter;
513             this.comparator = comparator;
514             this.keyClass = keyClass;
515             this.valueClass = valueClass;
516             this.curDBPart = curDBPart;
517         }
518
519         /**
520          * Perform the shutdown sequence for this Processor.
521          * There is a lot of file-moving and edit-sorting that
522          * is common across all the 4 tables.
523          *
524          * Returns how many items were written out by this close().
525          */

526         long closeDown(File workingDir, File outputDir) throws IOException {
527             //
528
// Done adding edits, so close edit-writer.
529
//
530
editWriter.close();
531
532             //
533
// Where the output is going
534
//
535
File sectionDir = new File(outputDir, "dbsection." + machineNum);
536             File newDbFile = new File(sectionDir, basename);
537
538             //
539
// Grab all the edits that we need to process. We build an EditSectionGroupReader
540
// and aim it at the right location. The ESR will wait until all its
541
// component Sections are written and completed before returning from
542
// any method (other than the constructor). So we expect to possibly wait
543
// inside the call to numEdits().
544
//
545
EditSectionGroupReader edits = new EditSectionGroupReader(nfs, basename, machineNum, totalMachines);
546             int numEdits = edits.numEdits();
547
548             // If there are edits, then process them.
549
if (numEdits != 0) {
550                 File mergedEditsFile = new File(sectionDir, "mergedEdits");
551                 edits.mergeSectionComponents(mergedEditsFile);
552                 File sortedEditsFile = new File(mergedEditsFile.getPath() + ".sorted");
553
554                 // Sort the edits
555
long startSort = System.currentTimeMillis();
556                 sorter.sort(mergedEditsFile.getPath(), sortedEditsFile.getPath());
557                 long endSort = System.currentTimeMillis();
558
559                 LOG.info("Processing " + basename + ": Sorted " + numEdits + " instructions in " + ((endSort - startSort) / 1000.0) + " seconds.");
560                 LOG.info("Processing " + basename + ": Sorted " + (numEdits / ((endSort - startSort) / 1000.0)) + " instructions/second");
561             
562                 // Delete old file
563
nfs.delete(mergedEditsFile);
564
565                 // Read the sorted edits. That means read all
566
// the edits from the local subsection of the
567
// database. We must merge every machine's
568
// contribution to the edit-list first (which
569
// also means waiting until each machine has
570
// completed that step).
571

572                 // Read the sorted edits
573
SequenceFile.Reader sortedEdits = new SequenceFile.Reader(nfs, sortedEditsFile.getPath());
574
575                 // Create a brand-new output db for the integrated data
576
MapFile.Writer newDb = (comparator == null) ? new MapFile.Writer(nfs, newDbFile.getPath(), keyClass, valueClass) : new MapFile.Writer(nfs, newDbFile.getPath(), comparator, valueClass);
577
578                 // Iterate through the edits, and merge changes with existing
579
// db into the brand-new file
580
oldDb.reset();
581             
582                 // Merge the edits. We did it!
583
long startMerge = System.currentTimeMillis();
584                 mergeEdits(oldDb, sortedEdits, newDb);
585                 long endMerge = System.currentTimeMillis();
586                 LOG.info("Processing " + basename + ": Merged to new DB containing " + itemsWritten + " records in " + ((endMerge - startMerge) / 1000.0) + " seconds");
587                 LOG.info("Processing " + basename + ": Merged " + (itemsWritten / ((endMerge - startMerge) / 1000.0)) + " records/second");
588
589                 // Close down readers, writers
590
sortedEdits.close();
591                 newDb.close();
592
593                 // Delete the (sorted) merged-edits
594
nfs.delete(sortedEditsFile);
595             } else {
596                 // Otherwise, simply copy the original file into place,
597
// without all the processing overhead.
598
long startCopy = System.currentTimeMillis();
599
600                 File srcSectionDir = new File(oldDbDir, "dbsection." + machineNum);
601                 File srcDbFile = new File(srcSectionDir, basename);
602                 nfs.rename(srcDbFile, newDbFile);
603                 long endCopy = System.currentTimeMillis();
604                 LOG.info("Processing " + basename + ": Copied file (" + srcDbFile.length()+ " bytes) in " + ((endCopy - startCopy) / 1000.0) + " secs.");
605             }
606
607             // Delete the now-consumed edits file to save space
608
edits.delete();
609             return itemsWritten;
610         }
611
612         /**
613          * The loop that actually applies the changes and writes to
614          * a new db. This is different for every subclass!
615          */

616         abstract void mergeEdits(MapFile.Reader db, SequenceFile.Reader edits, MapFile.Writer newDb) throws IOException;
617     }
618
619     /***
620      * The PagesByURLProcessor is used during close() time for
621      * the pagesByURL table. We instantiate one of these, and it
622      * takes care of the entire shutdown process.
623      */

624     private class PagesByURLProcessor extends CloseProcessor {
625         EditSectionGroupWriter futureEdits;
626
627         /**
628          * We store "futureEdits" so we can write edits for the
629          * next table-db step
630          */

631         PagesByURLProcessor(MapFile.Reader db, EditSectionGroupWriter editWriter, EditSectionGroupWriter futureEdits) {
632             super(PAGES_BY_URL, db, editWriter, new SequenceFile.Sorter(nfs, new PageInstruction.UrlComparator(), NullWritable.class), new UTF8.Comparator(), null, Page.class, "PagesByURLPart");
633             this.futureEdits = futureEdits;
634         }
635
636         /**
637          * Merge the existing db with the edit-stream into a brand-new file.
638          */

639         void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
640             // Create the keys and vals we'll be using
641
DeduplicatingPageSequenceReader edits = new DeduplicatingPageSequenceReader(sortedEdits);
642             WritableComparable readerKey = new UTF8();
643             Page readerVal = new Page();
644             PageInstruction editItem = new PageInstruction();
645             int futureOrdering = 0;
646
647             // Read the first items from both streams
648
boolean hasEntries = db.next(readerKey, readerVal);
649             boolean hasEdits = edits.next(editItem);
650
651             // As long as we have both edits and entries, we need to
652
// interleave them....
653
while (hasEntries && hasEdits) {
654                 int comparison = readerKey.compareTo(editItem.getPage().getURL());
655                 int curInstruction = editItem.getInstruction();
656
657                 // Perform operations
658
if ((curInstruction == ADD_PAGE) ||
659                     (curInstruction == ADD_PAGE_WITH_SCORE) ||
660                     (curInstruction == ADD_PAGE_IFN_PRESENT)) {
661
662                     if (comparison < 0) {
663                         // Write readerKey, just passing it along.
664
// Don't process the edit yet.
665
newDb.append(readerKey, readerVal);
666                         itemsWritten++;
667                         hasEntries = db.next(readerKey, readerVal);
668                     } else if (comparison == 0) {
669                         // The keys are equal. If the instruction
670
// is ADD_PAGE, we write the edit's key and
671
// replace the old one.
672
//
673
// Otherwise, if it's ADD_IFN_PRESENT,
674
// keep the reader's item intact.
675
//
676
if ((curInstruction == ADD_PAGE) ||
677                             (curInstruction == ADD_PAGE_WITH_SCORE)) {
678                             // An ADD_PAGE with an identical pair
679
// of pages replaces the existing one.
680
// We may need to note the fact for
681
// Garbage Collection.
682
//
683
// This happens in three stages.
684
// 1. We write necessary items to the future
685
// edits-list.
686
//
687
pagesByMD5Edits++;
688
689                             // If this is a replacing add, we don't want
690
// to disturb the score from the old Page! This,
691
// way, we can run some link analysis scoring
692
// while the new Pages are being fetched and
693
// not lose the info when a Page is replaced.
694
//
695
// If it is an ADD_PAGE_WITH_SCORE, then we
696
// go ahead and replace the old one.
697
//
698
// Either way, from now on we treat it
699
// as an ADD_PAGE
700
//
701
Page editItemPage = editItem.getPage();
702
703                             if (curInstruction == ADD_PAGE) {
704                                 editItemPage.setScore(readerVal.getScore(), readerVal.getNextScore());
705                             }
706
707                             piwriter.appendInstructionInfo(futureEdits, editItemPage, ADD_PAGE, NullWritable.get());
708
709                             //
710
// 2. We write the edit-page to *this* table.
711
//
712
newDb.append(editItemPage.getURL(), editItemPage);
713
714                             //
715
// 3. We want the ADD in the next step (the
716
// MD5-driven table) to be a "replacing add".
717
// But that won't happen if the readerItem and
718
// the editItem Pages are not identical.
719
// (In this scenario, that means their URLs
720
// are the same, but their MD5s are different.)
721
// So, we need to explicitly handle that
722
// case by issuing a DELETE for the now-obsolete
723
// item.
724
if (editItemPage.compareTo(readerVal) != 0) {
725                                 pagesByMD5Edits++;
726                                 piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get());
727                             }
728
729                             itemsWritten++;
730
731                             // "Delete" the readerVal by skipping it.
732
hasEntries = db.next(readerKey, readerVal);
733                         } else {
734                             // ADD_PAGE_IFN_PRESENT. We only add IF_NOT
735
// present. And it was present! So, we treat
736
// this case like we treat a no-op.
737
// Just move to the next edit.
738
}
739                         // In either case, we process the edit.
740
hasEdits = edits.next(editItem);
741
742                     } else if (comparison > 0) {
743                         // We have inserted a Page that's before some
744
// entry in the existing database. So, we just
745
// need to write down the Page from the Edit file.
746
// It's like the above case, except we don't tell
747
// the future-edits to delete anything.
748
//
749
// 1. Write the item down for the future.
750
pagesByMD5Edits++;
751
752                         //
753
// If this is an ADD_PAGE_IFN_PRESENT, then
754
// we may also have a Link we have to take care of!
755
//
756
if (curInstruction == ADD_PAGE_IFN_PRESENT) {
757                             Link editLink = editItem.getLink();
758                             if (editLink != null) {
759                                 addLink(editLink);
760                             }
761                         }
762                         piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get());
763
764                         //
765
// 2. Write the edit-page to *this* table
766
newDb.append(editItem.getPage().getURL(), editItem.getPage());
767                         itemsWritten++;
768
769                         // Process the edit
770
hasEdits = edits.next(editItem);
771                     }
772                 } else if (curInstruction == DEL_PAGE) {
773                     if (comparison < 0) {
774                         // Write the readerKey, just passing it along.
775
// We don't process the edit yet.
776
newDb.append(readerKey, readerVal);
777                         itemsWritten++;
778                         hasEntries = db.next(readerKey, readerVal);
779                     } else if (comparison == 0) {
780                         // Delete it! We can only delete one item
781
// at a time, as all URLs are unique.
782
// 1. Tell the future-edits what page will need to
783
// be deleted.
784
pagesByMD5Edits++;
785                         piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get());
786
787                         //
788
// 2. "Delete" the entry by skipping the Reader
789
// key.
790
hasEntries = db.next(readerKey, readerVal);
791
792                         // Process the edit
793
hasEdits = edits.next(editItem);
794                     } else if (comparison > 0) {
795                         // Ignore it. We tried to delete an item that's
796
// not here.
797
hasEdits = edits.next(editItem);
798                     }
799                 }
800             }
801
802             // Now we have only edits. No more preexisting items!
803
while (! hasEntries && hasEdits) {
804                 int curInstruction = editItem.getInstruction();
805                 if (curInstruction == ADD_PAGE ||
806                     curInstruction == ADD_PAGE_WITH_SCORE ||
807                     curInstruction == ADD_PAGE_IFN_PRESENT) {
808                     // No more reader entries, so ADD_PAGE_IFN_PRESENT
809
// is treated like a simple ADD_PAGE.
810

811                     // 1. Tell the future edits-list about this new item
812
pagesByMD5Edits++;
813                     piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get());
814
815                     // 2. Write the edit page to this table.
816
newDb.append(editItem.getPage().getURL(), editItem.getPage());
817                     itemsWritten++;
818                 } else if (curInstruction == DEL_PAGE) {
819                     // Ignore it. We tried to delete an item
820
// that's not here.
821
}
822
823                 // Either way, we always process the edit.
824
hasEdits = edits.next(editItem);
825             }
826
827             // Now we have only preexisting items. We just copy
828
// them to the new file, in order.
829
while (hasEntries && ! hasEdits) {
830                 newDb.append(readerKey, readerVal);
831                 itemsWritten++;
832                 hasEntries = db.next(readerKey, readerVal);
833             }
834         }
835     }
836
837     /***
838      * The PagesByMD5Processor is used during close() time for
839      * the pagesByMD5 table. We instantiate one of these, and it
840      * takes care of the entire shutdown process.
841      */

842     private class PagesByMD5Processor extends CloseProcessor {
843         /**
844          */

845         PagesByMD5Processor(MapFile.Reader db, EditSectionGroupWriter editWriter) {
846             super(PAGES_BY_MD5, db, editWriter, new SequenceFile.Sorter(nfs, new PageInstruction.PageComparator(), NullWritable.class), null, Page.class, NullWritable.class, "PagesByMD5Part");
847         }
848
849         /**
850          */

851         void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
852             // Create the keys and vals
853
Page readerItem = new Page();
854             PageInstruction editItem = new PageInstruction();
855
856             // For computing the GC list
857
Page deletedItem = new Page(), lastItem = new Page();
858             boolean justDeletedItem = false;
859             boolean newReaderItem = false;
860             int itemRepeats = 0;
861
862             // Read the first items from both streams
863
boolean hasEntries = db.next(readerItem, NullWritable.get());
864             boolean hasEdits = sortedEdits.next(editItem, NullWritable.get());
865             if (hasEntries) {
866                 // The first thing we read should become
867
// the "previous key". We need this for
868
// garbage collection.
869
outBuf.reset();
870                 readerItem.write(outBuf);
871                 inBuf.reset(outBuf.getData(), outBuf.getLength());
872                 lastItem.readFields(inBuf);
873                 itemRepeats = 0;
874             }
875
876             // As long we have both edits and entries, we need to
877
// interleave them.
878
while (hasEdits && hasEntries) {
879                 int comparison = readerItem.compareTo(editItem.getPage());
880                 int curInstruction = editItem.getInstruction();
881
882                 //
883
// OK! Now perform operations
884
//
885
if (curInstruction == ADD_PAGE) {
886                     if (comparison < 0) {
887                         // Write readerItem, just passing it along.
888
// Don't process the edit yet.
889
newDb.append(readerItem, NullWritable.get());
890                         itemsWritten++;
891                         hasEntries = db.next(readerItem, NullWritable.get());
892                         newReaderItem = true;
893                     } else if (comparison == 0) {
894                         //
895
// This is a "replacing ADD", which is generated
896
// by the above-sequence. We should skip over the
897
// existing item, and add the new one instead.
898
//
899
// Note that by this point, the new version of the
900
// Page from the edit sequence is guaranteed to
901
// have the correct score. We make sure of it in
902
// the mergeEdits() for PagesByURLProcessor.
903
//
904
newDb.append(editItem.getPage(), NullWritable.get());
905                         itemsWritten++;
906                         hasEntries = db.next(readerItem, NullWritable.get());
907                         newReaderItem = true;
908                         hasEdits = sortedEdits.next(editItem, NullWritable.get());
909                     } else if (comparison > 0) {
910                         // Write the edit item. We've inserted an item
911
// that comes before any others.
912
newDb.append(editItem.getPage(), NullWritable.get());
913                         itemsWritten++;
914                         hasEdits = sortedEdits.next(editItem, NullWritable.get());
915                     }
916                 } else if (curInstruction == ADD_PAGE_IFN_PRESENT) {
917                     throw new IOException("Should never process ADD_PAGE_IFN_PRESENT for the index: " + editItem);
918                 } else if (curInstruction == DEL_PAGE) {
919                     if (comparison < 0) {
920                         // Write the readerKey, just passing it along.
921
// Don't process the edit yet.
922
newDb.append(readerItem, NullWritable.get());
923                         itemsWritten++;
924                         hasEntries = db.next(readerItem, NullWritable.get());
925                         newReaderItem = true;
926                     } else if (comparison == 0) {
927                         // Delete it! Remember only one entry can
928
// be deleted at a time!
929
//
930
// "Delete" the entry by skipping over the reader
931
// item. We move onto the next item in the existing
932
// index, as well as the next edit instruction.
933
hasEntries = db.next(readerItem, NullWritable.get());
934                         newReaderItem = true;
935                         hasEdits = sortedEdits.next(editItem, NullWritable.get());
936                         
937                         // We need to set this flag for GC'ing.
938
justDeletedItem = true;
939                     } else if (comparison > 0) {
940                         // This should never happen! We should only be
941
// deleting items that actually appear!
942
throw new IOException("An unapplicable DEL_PAGE should never appear during index-merge: " + editItem);
943                     }
944                 }
945
946                 // GARBAGE COLLECTION
947
// We want to detect when we have deleted the
948
// last MD5 of a certain value. We can have
949
// multiple MD5s in the same index, as long as
950
// they have different URLs. When the last MD5
951
// is deleted, we want to know so we can modify
952
// the LinkDB.
953
if (newReaderItem) {
954                     // If we have a different readerItem which is just
955
// the same as our last one, then we know it's a
956
// repeat!
957
if (hasEntries && readerItem.getMD5().compareTo(lastItem.getMD5()) == 0) {
958                         itemRepeats++;
959                     } else {
960                         // The current readerItem and the lastItem
961
// MD5s are not equal.
962
//
963
// If the last item was deleted, AND if the
964
// deleted item is not a repeat of the current item,
965
// then that MD5 should be garbage collected.
966
if (justDeletedItem && itemRepeats == 0) {
967                             deleteLink(lastItem.getMD5());
968                         }
969
970                         // The current readerItem is the new "last key".
971
outBuf.reset();
972                         readerItem.write(outBuf);
973                         inBuf.reset(outBuf.getData(), outBuf.getLength());
974                         lastItem.readFields(inBuf);
975                         itemRepeats = 0;
976                     }
977                     // Clear "new-reader-item" bit
978
newReaderItem = false;
979                 }
980                 // Clear "last-deleted" bit
981
justDeletedItem = false;
982             }
983         
984             // Now we have only edits. No more preexisting items!
985
while (! hasEntries && hasEdits) {
986                 int curInstruction = editItem.getInstruction();
987                 if (curInstruction == ADD_PAGE) {
988                     // Just write down the new page!
989
newDb.append(editItem.getPage(), NullWritable.get());
990                     itemsWritten++;
991                 } else if (curInstruction == ADD_PAGE_IFN_PRESENT) {
992                     throw new IOException("Should never process ADD_PAGE_IFN_PRESENT for the index: " + editItem);
993                 } else if (curInstruction == DEL_PAGE) {
994                     // This should never happen! We should only be
995
// deleting items that actually appear!
996
throw new IOException("An unapplicable DEL_PAGE should never appear during index-merge: " + editItem);
997                 }
998                 hasEdits = sortedEdits.next(editItem, NullWritable.get());
999             }
1000
1001            // Now we have only preexisting items. We just copy them
1002
// to the new file, in order
1003
while (hasEntries && ! hasEdits) {
1004                // Simply copy through the remaining database items
1005
newDb.append(readerItem, NullWritable.get());
1006                itemsWritten++;
1007                hasEntries = db.next(readerItem, NullWritable.get());
1008                newReaderItem = true;
1009            }
1010        }
1011    }
1012
1013    /**
1014     * The LinksByMD5Processor is used during close() for
1015     * the pagesByMD5 table. It processes all the edits to
1016     * this table, and also generates edits for the linksByURL
1017     * table.
1018     */

1019    private class LinksByMD5Processor extends CloseProcessor {
1020        EditSectionGroupWriter futureEdits;
1021
1022        /**
1023         */

1024        public LinksByMD5Processor(MapFile.Reader db, EditSectionGroupWriter editWriter, EditSectionGroupWriter futureEdits) {
1025            super(LINKS_BY_MD5, db, editWriter, new SequenceFile.Sorter(nfs, new LinkInstruction.MD5Comparator(), NullWritable.class), new Link.MD5Comparator(), Link.class, NullWritable.class, "LinksByMD5Part");
1026            this.futureEdits = futureEdits;
1027        }
1028
1029        /**
1030         * Merges edits into the md5-driven link table. Also generates
1031         * edit sequence to apply to the URL-driven table.
1032         */

1033        void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
1034            WritableComparator comparator = new Link.MD5Comparator();
1035            DeduplicatingLinkSequenceReader edits = new DeduplicatingLinkSequenceReader(sortedEdits);
1036
1037            // Create the keys and vals we'll use
1038
LinkInstruction editItem = new LinkInstruction();
1039            Link readerItem = new Link();
1040
1041            // Read the first items from both streams
1042
boolean hasEntries = db.next(readerItem, NullWritable.get());
1043            boolean hasEdits = edits.next(editItem);
1044
1045            // As long as we have both edits and entries to process,
1046
// we need to interleave them
1047
while (hasEntries && hasEdits) {
1048                int curInstruction = editItem.getInstruction();
1049
1050                // Perform operations
1051
if (curInstruction == ADD_LINK) {
1052                    // When we add a link, we may replace a previous
1053
// link with identical URL and MD5 values. The
1054
// MD5FirstComparator will use both values.
1055
//
1056
int comparison = comparator.compare(readerItem, editItem.getLink());
1057
1058                    if (comparison < 0) {
1059                        // Write the readerKey, just passing it along.
1060
// Don't process the edit yet.
1061
newDb.append(readerItem, NullWritable.get());
1062                        itemsWritten++;
1063                        hasEntries = db.next(readerItem, NullWritable.get());
1064                    } else if (comparison == 0) {
1065                        // 1. Write down the item for table-edits
1066
if (futureEdits != null) {
1067                            linksByURLEdits++;
1068                            liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get());
1069                        }
1070
1071                        // 2. Write the new item, "replacing" the old one.
1072
// We move to the next edit instruction and move
1073
// past the replaced db entry.
1074
newDb.append(editItem.getLink(), NullWritable.get());
1075                        itemsWritten++;
1076                        hasEntries = db.next(readerItem, NullWritable.get());
1077                        hasEdits = edits.next(editItem);
1078                    } else if (comparison > 0) {
1079                        // 1. Write down the item for table-edits
1080
if (futureEdits != null) {
1081                            linksByURLEdits++;
1082                            liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get());
1083                        }
1084
1085                        // 2. Write the new item. We stay at the current
1086
// db entry.
1087
newDb.append(editItem.getLink(), NullWritable.get());
1088                        itemsWritten++;
1089                        hasEdits = edits.next(editItem);
1090                    }
1091                } else if ((curInstruction == DEL_LINK) ||
1092                           (curInstruction == DEL_SINGLE_LINK)) {
1093                    // When we delete a link, we might delete many
1094
// at once! We are interested only in the MD5
1095
// here. If there are entries with identical MD5
1096
// values, but different URLs, we get rid of them
1097
// all.
1098
int comparison = 0;
1099                    if (curInstruction == DEL_LINK) {
1100                        comparison = readerItem.getFromID().compareTo(editItem.getLink().getFromID());
1101                    } else {
1102                        comparison = readerItem.md5Compare(editItem.getLink());
1103                    }
1104
1105                    if (comparison < 0) {
1106                        // Write the readerKey, just passing it along.
1107
// Don't process the edit yet.
1108
newDb.append(readerItem, NullWritable.get());
1109                        itemsWritten++;
1110                        hasEntries = db.next(readerItem, NullWritable.get());
1111                    } else if (comparison == 0) {
1112                        // Delete it (or them!)
1113
// 1. Write the full instruction for the next
1114
// delete-stage. That includes the read-in
1115
// value
1116
// 2. "Delete" the entry by skipping the
1117
// readerKey. We DO NOT go to the next edit
1118
// instruction! There might still be more
1119
// entries in the database to which we should
1120
// apply this delete-edit.
1121
//
1122
// Step 1. Write entry for future table-edits
1123
if (futureEdits != null) {
1124                            linksByURLEdits++;
1125                            liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_LINK, NullWritable.get());
1126                        }
1127
1128                        // Step 2.
1129
// We might want to delete multiple MD5s with
1130
// a single delete() operation, so keep this
1131
// edit instruction around
1132
hasEntries = db.next(readerItem, NullWritable.get());
1133                        if (curInstruction == DEL_SINGLE_LINK) {
1134                            hasEdits = edits.next(editItem);
1135                        }
1136                    } else if (comparison > 0) {
1137                        // Ignore, move on to next instruction
1138
hasEdits = edits.next(editItem);
1139                    }
1140                }
1141            }
1142
1143            // Now we have only edits. No more preexisting items!
1144
while (! hasEntries && hasEdits) {
1145                int curInstruction = editItem.getInstruction();
1146
1147                if (curInstruction == ADD_LINK) {
1148                    // 1. Write down the item for future table-edits
1149
if (futureEdits != null) {
1150                        linksByURLEdits++;
1151                        liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get());
1152                    }
1153
1154                    // 2. Just add the item from the edit list
1155
newDb.append(editItem.getLink(), NullWritable.get());
1156                    itemsWritten++;
1157                } else if (curInstruction == DEL_LINK) {
1158                    // Ignore operation
1159
}
1160                // Move on to next edit
1161
hasEdits = edits.next(editItem);
1162            }
1163
1164            // Now we have only preexisting items. Just copy them
1165
// to the new file, in order.
1166
while (hasEntries && ! hasEdits) {
1167                newDb.append(readerItem, NullWritable.get());
1168                itemsWritten++;
1169                hasEntries = db.next(readerItem, NullWritable.get());
1170            }
1171        }
1172    }
1173
1174    /**
1175     * This class helps the LinksByURLProcessor test a list of
1176     * Page objects, sorted by URL, for outlink-counts. We query
1177     * this class with a series of questions, based on Links sorted
1178     * by target URL.
1179     */

1180    private class TargetTester {
1181        MapFile.Reader pagedb;
1182        boolean hasPage = false;
1183        UTF8 pageURL = null;
1184        Page page = null;
1185
1186        /**
1187         */

1188        public TargetTester(MapFile.Reader pagedb) throws IOException {
1189            this.pagedb = pagedb;
1190            this.pageURL = new UTF8();
1191            this.page = new Page();
1192            this.hasPage = pagedb.next(pageURL, page);
1193        }
1194
1195        /**
1196         * Match the given URL against the sorted series of Page URLs.
1197         */

1198        public int hasOutlinks(UTF8 curURL) throws IOException {
1199            int returnCode = NO_OUTLINKS;
1200            int comparison = pageURL.compareTo(curURL);
1201
1202            while (hasPage && comparison < 0) {
1203                hasPage = pagedb.next(pageURL, page);
1204                if (hasPage) {
1205                    comparison = pageURL.compareTo(curURL);
1206                }
1207            }
1208
1209            if (hasPage) {
1210                if (comparison == 0) {
1211                    returnCode = (page.getNumOutlinks() > 0) ? HAS_OUTLINKS : NO_OUTLINKS;
1212                } else if (comparison > 0) {
1213                    //
1214
// This situation indicates that the Link's
1215
// target page has been deleted, probably
1216
// because we repeatedly failed to fetch the URL.
1217
// So, we should delete the Link.
1218
//
1219
returnCode = LINK_INVALID;
1220                }
1221            }
1222            return returnCode;
1223        }
1224
1225        /**
1226         */

1227        public void close() throws IOException {
1228            pagedb.close();
1229        }
1230    }
1231
1232    /**
1233     * Closes down and merges changes to the URL-driven link
1234     * table. This does nothing fancy, and propagates nothing
1235     * to a further stage. There is no next stage!
1236     */

1237    private class LinksByURLProcessor extends CloseProcessor {
1238        MapFile.Reader pageDb;
1239        EditSectionGroupWriter futureEdits;
1240
1241        /**
1242         */

1243        public LinksByURLProcessor(MapFile.Reader db, EditSectionGroupWriter editWriter, MapFile.Reader pageDb, EditSectionGroupWriter futureEdits) {
1244            super(LINKS_BY_URL, db, editWriter, new SequenceFile.Sorter(nfs, new LinkInstruction.UrlComparator(), NullWritable.class), new Link.UrlComparator(), Link.class, NullWritable.class, "LinksByURLPart");
1245            this.pageDb = pageDb;
1246            this.futureEdits = futureEdits;
1247        }
1248
1249        /**
1250         */

1251        public long closeDown(File workingDir, File outputDir) throws IOException {
1252            long result = super.closeDown(workingDir, outputDir);
1253            pageDb.close();
1254            return result;
1255        }
1256
1257        /**
1258         * Merge the existing db with the edit-stream into a brand-new file.
1259         */

1260        void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
1261            WritableComparator comparator = new Link.UrlComparator();
1262
1263            // Create the keys and vals we'll use
1264
LinkInstruction editItem = new LinkInstruction();
1265            Link readerItem = new Link();
1266        
1267            // Read the first items from both streams
1268
boolean hasEntries = db.next(readerItem, NullWritable.get());
1269            boolean hasEdits = sortedEdits.next(editItem, NullWritable.get());
1270            TargetTester targetTester = new TargetTester(pageDb);
1271
1272            // As long as we have both edits and entries to process,
1273
// we need to interleave them
1274
while (hasEntries && hasEdits) {
1275                int curInstruction = editItem.getInstruction();
1276
1277                if (curInstruction == ADD_LINK) {
1278                    // When we add a link, we may replace a previous
1279
// link with identical URL and MD5 values. Our
1280
// comparator will test both
1281
//
1282
int comparison = comparator.compare(readerItem, editItem.getLink());
1283
1284                    if (comparison < 0) {
1285                        // Write the readerKey, just passing it along.
1286
// Don't process the edit yet.
1287
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
1288
1289                        if (linkTest == LINK_INVALID) {
1290                            liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
1291                            targetOutlinkEdits++;
1292                        } else {
1293                            boolean oldOutlinkStatus = readerItem.targetHasOutlink();
1294                            boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
1295                            // Do the conditional so we minimize unnecessary
1296
// mod-writes.
1297
if (oldOutlinkStatus != newOutlinkStatus) {
1298                                readerItem.setTargetHasOutlink(newOutlinkStatus);
1299                                liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
1300                                targetOutlinkEdits++;
1301                            }
1302                            newDb.append(readerItem, NullWritable.get());
1303                            itemsWritten++;
1304                        }
1305                        hasEntries = db.next(readerItem, NullWritable.get());
1306                    } else if (comparison == 0) {
1307                        // Write the new item, "replacing" the old one.
1308
// We move to the next edit instruction and move
1309
// past the replaced db entry.
1310
Link editLink = editItem.getLink();
1311                        int linkTest = targetTester.hasOutlinks(editLink.getURL());
1312
1313                        // Delete the edit/readerItem from the other table if it's
1314
// found to be invalid.
1315
if (linkTest == LINK_INVALID) {
1316                            liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
1317                        } else {
1318                            editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
1319                            liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
1320
1321                            newDb.append(editLink, NullWritable.get());
1322                            itemsWritten++;
1323                        }
1324                        targetOutlinkEdits++;
1325
1326                        hasEntries = db.next(readerItem, NullWritable.get());
1327                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
1328                    } else if (comparison > 0) {
1329                        // Write the new item. We stay at the current
1330
// db entry.
1331
Link editLink = editItem.getLink();
1332                        int linkTest = targetTester.hasOutlinks(editLink.getURL());
1333
1334                        // Delete the edit from the other table if it's invalid
1335
if (linkTest == LINK_INVALID) {
1336                            liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
1337                        } else {
1338                            editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
1339                            liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
1340                            newDb.append(editLink, NullWritable.get());
1341                            itemsWritten++;
1342                        }
1343                        targetOutlinkEdits++;
1344
1345                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
1346                    }
1347                } else if (curInstruction == DEL_LINK) {
1348                    // When we delete a link, we do it by MD5 and apply
1349
// it to the index first. A single delete instruction
1350
// may remove many items in the db, during the earlier
1351
// processing. However, unlike the index-processing stage,
1352
// here we can expect a new DEL instruction for every
1353
// item that we remove from the db.
1354
//
1355
int comparison = comparator.compare(readerItem, editItem.getLink());
1356
1357                    if (comparison < 0) {
1358                        // Write readerKey, just passing it along. Don't
1359
// process the edit yet.
1360
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
1361
1362                        // Delete the reader item if it's found to be invalid
1363
if (linkTest == LINK_INVALID) {
1364                            liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
1365                        } else {
1366                            readerItem.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
1367                            liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
1368                            newDb.append(readerItem, NullWritable.get());
1369                            itemsWritten++;
1370                        }
1371                        targetOutlinkEdits++;
1372
1373                        hasEntries = db.next(readerItem, NullWritable.get());
1374                    } else if (comparison == 0) {
1375                        // "Delete" the item by passing by the readerKey.
1376
// We want a new entry, as well as the next instruction
1377
// to process.
1378
hasEntries = db.next(readerItem, NullWritable.get());
1379                        hasEdits = sortedEdits.next(editItem, NullWritable.get());
1380                    } else if (comparison > 0) {
1381                        // Ignore, move on to next instruction
1382
hasEdits = sortedEdits.next(editItem, NullWritable.get());
1383                    }
1384                }
1385            }
1386
1387            // Now we have only edits. No more preexisting items!
1388
while (! hasEntries && hasEdits) {
1389                int curInstruction = editItem.getInstruction();
1390
1391                if (curInstruction == ADD_LINK) {
1392                    //
1393
// Add the item from the edit list.
1394
//
1395

1396                    //
1397
// Make sure the outlinks flag is set properly.
1398
//
1399
Link editLink = editItem.getLink();
1400                    int linkTest = targetTester.hasOutlinks(editLink.getURL());
1401                    if (linkTest == LINK_INVALID) {
1402                        liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
1403                    } else {
1404                        editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
1405                        liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
1406                        newDb.append(editLink, NullWritable.get());
1407                        itemsWritten++;
1408                    }
1409                    targetOutlinkEdits++;
1410                } else if (curInstruction == DEL_LINK) {
1411                    // Ignore operation
1412
}
1413                // Move on to next edit
1414
hasEdits = sortedEdits.next(editItem, NullWritable.get());
1415            }
1416
1417            // Now we have only preexisting items. Just copy them
1418
// to the new file, in order.
1419
while (hasEntries && ! hasEdits) {
1420                //
1421
// Simply copy the remaining database items.
1422
//
1423

1424                //
1425
// First, make sure the 'outlinks' flag is set properly.
1426
//
1427
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
1428                if (linkTest == LINK_INVALID) {
1429                    liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
1430                    targetOutlinkEdits++;
1431                } else {
1432                    boolean oldOutlinkStatus = readerItem.targetHasOutlink();
1433                    boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
1434                    if (oldOutlinkStatus != newOutlinkStatus) {
1435                        readerItem.setTargetHasOutlink(newOutlinkStatus);
1436                        liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
1437                        targetOutlinkEdits++;
1438                    }
1439
1440                    // Now copy the object
1441
newDb.append(readerItem, NullWritable.get());
1442                    itemsWritten++;
1443                }
1444
1445                // Move on to next
1446
hasEntries = db.next(readerItem, NullWritable.get());
1447            }
1448
1449            targetTester.close();
1450        }
1451    }
1452
1453    /**
1454     * Method useful for the first time we create a distributed db project.
1455     * Basically need to write down the number of dirs we can expect.
1456     */

1457    public static void createDB(NutchFileSystem nfs, File root, int totalMachines) throws IOException {
1458        //
1459
// Check to see if the db already exists
1460
//
1461
File stdDir = new File(root, "standard");
1462        File machineInfo = new File(stdDir, "machineinfo");
1463        if (nfs.exists(machineInfo)) {
1464            throw new IOException("Cannot create DistributedWebDB at " + nfs + ", as it already exists.");
1465        }
1466
1467        //
1468
// Write down how many machines live in the distributed pool
1469
//
1470
DataOutputStream out = new DataOutputStream(nfs.create(machineInfo));
1471        try {
1472            out.write(MACHINE_INFO_VERSION);
1473            out.writeInt(totalMachines);
1474        } finally {
1475            out.close();
1476        }
1477
1478        //
1479
// Create the lower directory structures for each machine in pool.
1480
//
1481
for (int i = 0; i < totalMachines; i++) {
1482            File webdbDir = new File(stdDir, "webdb");
1483            File sectionDir = new File(webdbDir, "dbsection." + i);
1484            File pagesByURLFile = new File(sectionDir, PAGES_BY_URL);
1485            File pagesByMD5File = new File(sectionDir, PAGES_BY_MD5);
1486            File linksByURLFile = new File(sectionDir, LINKS_BY_URL);
1487            File linksByMD5File = new File(sectionDir, LINKS_BY_MD5);
1488
1489            //
1490
// If we're creating the db, we make a zero-length file for each
1491
// db file
1492
//
1493
new MapFile.Writer(nfs, pagesByURLFile.getPath(), new UTF8.Comparator(), Page.class).close();
1494            new MapFile.Writer(nfs, pagesByMD5File.getPath(), new Page.Comparator(), NullWritable.class).close();
1495            new MapFile.Writer(nfs, linksByURLFile.getPath(), new Link.UrlComparator(), NullWritable.class).close();
1496            new MapFile.Writer(nfs, linksByMD5File.getPath(), new Link.MD5Comparator(), NullWritable.class).close();
1497        }
1498
1499        //
1500
// Create the "ready-to-use" flag that tells all subsequent
1501
// WebDBWriters it's OK to proceed.
1502
//
1503
File readyToUse = new File(stdDir, "readyToUse");
1504        out = new DataOutputStream(nfs.create(readyToUse));
1505        try {
1506            out.writeInt(READY_TO_USE); // Magic number
1507
} finally {
1508            out.close();
1509        }
1510    }
1511
1512    PageInstructionWriter piwriter = new PageInstructionWriter();
1513    LinkInstructionWriter liwriter = new LinkInstructionWriter();
1514    DataInputBuffer inBuf = new DataInputBuffer();
1515    DataOutputBuffer outBuf = new DataOutputBuffer();
1516
1517    NutchFileSystem nfs;
1518    File root, dbDir, oldDbDir, newDbDir, tmpDir;
1519    File localWriteLock, globalWriteLock, closeCounter, openCounter;
1520
1521    EditSectionGroupWriter pagesByURLWriter, pagesByMD5Writer, linksByURLWriter, linksByMD5Writer;
1522    MapFile.Reader pagesByURL, pagesByMD5, linksByURL, linksByMD5;
1523    long pagesByURLEdits = 0, pagesByMD5Edits = 0, linksByURLEdits = 0, linksByMD5Edits = 0, targetOutlinkEdits = 0;
1524    int machineNum, totalMachines;
1525
1526
1527    /**
1528     * Open the db files.
1529     */

1530    public DistributedWebDBWriter(NutchFileSystem nfs, File root, int machineNum) throws IOException {
1531        //
1532
// Store the nfs. Build dir set.
1533
//
1534
this.nfs = nfs;
1535        this.root = root;
1536        this.machineNum = machineNum;
1537
1538        File stdDir = new File(root, "standard");
1539        this.dbDir = new File(stdDir, "webdb");
1540        this.oldDbDir = new File(stdDir, "webdb.old");
1541        this.newDbDir = new File(stdDir, "webdb.new");
1542        this.tmpDir = new File(newDbDir, "tmp");
1543
1544        //
1545
// Wait indefinitely for "ready-to-use-flag".
1546
//
1547
File readyToUse = new File(stdDir, "readyToUse");
1548        while (! nfs.exists(readyToUse)) {
1549            try {
1550                Thread.sleep(2000);
1551            } catch (InterruptedException JavaDoc ie) {
1552            }
1553        }
1554
1555        //////////////////////////////////////////////////////////
1556
// Locks
1557
//////////////////////////////////////////////////////////
1558

1559        // 1. Each dbsection has a lock so only one writer ever accesses
1560
// it at once. Lock the local one immediately.
1561
this.localWriteLock = new File(stdDir, "sectionLock." + machineNum);
1562        nfs.lock(localWriteLock, true);
1563
1564        // 2. A global writeLock protects writers that need to make
1565
// changes that affect many processors (such as moving dbDir or
1566
// deleting tmp).
1567
//
1568
// Readers will obtain this lock non-exclusively. When it comes
1569
// time for global changes to the db, writers will obtain it
1570
// exclusively. Readers need to leave before these changes can
1571
// be made.
1572
this.globalWriteLock = new File(stdDir, "globalWriteLock");
1573
1574        // 3. Not quite a lock, but related: the closeCounter, which
1575
// tracks how many processors have made it through the db close
1576
// sequence. This is protected by globalWriteLock.
1577
this.openCounter = new File(newDbDir, "openCounter");
1578        this.closeCounter = new File(newDbDir, "closeCounter");
1579
1580
1581        //////////////////////////////////////////////////////////
1582
// Setup and Initialization
1583
//////////////////////////////////////////////////////////
1584

1585        // Load # of machines
1586
File machineInfo = new File(stdDir, "machineinfo");
1587        DataInputStream in = new DataInputStream(nfs.open(machineInfo));
1588        try {
1589            in.read(); // version
1590
this.totalMachines = in.readInt();
1591        } finally {
1592            in.close();
1593        }
1594
1595        //
1596
// Seize global lock
1597
//
1598
nfs.lock(globalWriteLock, true);
1599
1600        // Now we use these locks to resolve any partially-completed
1601
// state directories from a previous run.
1602
// REMIND - mjc - Fixing/defining the db/newdb and tmp-delete
1603
// sequence is the most important next step!
1604
/***
1605            File oldDbDirFile = nutchfs.get(oldDbDir, SHORT_TIMEOUT);
1606            if (oldDbDirFile != null) {
1607            File dbDirFile = nutchfs.get(dbDir, SHORT_TIMEOUT);
1608            if (dbDirFile != null) {
1609            throw new IOException("Impossible condition: directories " + oldDbDir + " and " + dbDir + " cannot exist simultaneously");
1610            }
1611
1612            File newDbDirFile = nutchfs.get(newDbDir, SHORT_TIMEOUT);
1613            if (newDbDirFile != null) {
1614            nutchfs.renameTo(newDbDir, dbDir);
1615            }
1616            nutchfs.delete(oldDbDir);
1617            } else {
1618            File newDbDirFile = nutchfs.get(newDbDir, SHORT_TIMEOUT);
1619            if (newDbDirFile != null) {
1620            nutchfs.delete(newDbDir);
1621            }
1622            }
1623
1624            // Delete any partial edits from last time.
1625            if (nutchfs.get(tmpDir, LONG_TIMEOUT) != null) {
1626            nutchfs.delete(tmpDir);
1627            }
1628        ****/

1629
1630        // Load how many machines have started yet. If we're the
1631
// first one, then we have to create the EditSectionWriter
1632
// structures.
1633
int numOpens = 0;
1634
1635        if (nfs.exists(openCounter)) {
1636            in = new DataInputStream(nfs.open(openCounter));
1637            try {
1638                in.read(); // version
1639
numOpens = in.readInt();
1640            } finally {
1641                in.close();
1642            }
1643        }
1644
1645        // Bump number by 1.
1646
DataOutputStream out = new DataOutputStream(nfs.create(openCounter, true));
1647        try {
1648            out.write(OPEN_COUNTER_VERSION);
1649            out.writeInt(numOpens + 1);
1650        } finally {
1651            out.close();
1652        }
1653        
1654        // Check if we're the first ones to open.
1655
if (numOpens == 0) {
1656            // Build an edit-section for each of the 4 edit types
1657
// REMIND - mjc - I'm not sure about the args to these 4 calls.
1658
EditSectionGroupWriter.createEditGroup(nfs, dbDir, PAGES_BY_URL, totalMachines, EditSectionGroupWriter.URL_KEYSPACE);
1659            EditSectionGroupWriter.createEditGroup(nfs, dbDir, PAGES_BY_MD5, totalMachines, EditSectionGroupWriter.MD5_KEYSPACE);
1660            EditSectionGroupWriter.createEditGroup(nfs, dbDir, LINKS_BY_URL, totalMachines, EditSectionGroupWriter.URL_KEYSPACE);
1661            EditSectionGroupWriter.createEditGroup(nfs, dbDir, LINKS_BY_MD5, totalMachines, EditSectionGroupWriter.MD5_KEYSPACE);
1662
1663            // Remove the flag that tells readers it's OK to proceed
1664
File dirIsComplete = new File(dbDir, "dbIsComplete");
1665            nfs.delete(dirIsComplete);
1666        }
1667
1668        // These are the NutchFiles for this section of the read-only
1669
// db.
1670
File sectionDir = new File(dbDir, "dbsection." + machineNum);
1671        File pagesByURL = new File(sectionDir, PAGES_BY_URL);
1672        File pagesByMD5 = new File(sectionDir, PAGES_BY_MD5);
1673        File linksByURL = new File(sectionDir, LINKS_BY_URL);
1674        File linksByMD5 = new File(sectionDir, LINKS_BY_MD5);
1675
1676        //
1677
// Release the global lock
1678
//
1679
nfs.release(globalWriteLock);
1680
1681        // Create Readers for the above NutchFiles
1682
this.pagesByURL = new MapFile.Reader(nfs, pagesByURL.getPath(), new UTF8.Comparator());
1683        this.pagesByMD5 = new MapFile.Reader(nfs, pagesByMD5.getPath(), new Page.Comparator());
1684        this.linksByURL = new MapFile.Reader(nfs, linksByURL.getPath(), new Link.UrlComparator());
1685        this.linksByMD5 = new MapFile.Reader(nfs, linksByMD5.getPath(), new Link.MD5Comparator());
1686
1687        // Create writers for new edit-files. We write changes
1688
// into these files, then apply them to the db upon close().
1689
this.pagesByURLWriter = new EditSectionGroupWriter(nfs, machineNum, totalMachines, PAGES_BY_URL, PageInstruction.class, NullWritable.class, new EditSectionGroupWriter.PageURLExtractor());
1690        this.pagesByMD5Writer = new EditSectionGroupWriter(nfs, machineNum, totalMachines, PAGES_BY_MD5, PageInstruction.class, NullWritable.class, new EditSectionGroupWriter.PageMD5Extractor());
1691        this.linksByURLWriter = new EditSectionGroupWriter(nfs, machineNum, totalMachines, LINKS_BY_URL, LinkInstruction.class, NullWritable.class, new EditSectionGroupWriter.LinkURLExtractor());
1692        this.linksByMD5Writer = new EditSectionGroupWriter(nfs, machineNum, totalMachines, LINKS_BY_MD5, LinkInstruction.class, NullWritable.class, new EditSectionGroupWriter.LinkMD5Extractor());
1693    }
1694
1695    /**
1696     * Shutdown
1697     */

1698    public synchronized void close() throws IOException {
1699        // Process the 4 tables:
1700
// 1. pagesByURL
1701
// 2. pagesByMD5
1702
// 3. linksByMD5
1703
// 4. linksByURL
1704

1705        // 1. Process pagesByURL. Processing this stream will
1706
// generate a number of edits for the pagesByMD5 step.
1707
//
1708
CloseProcessor pagesByURLProcessor = new PagesByURLProcessor(pagesByURL, pagesByURLWriter, pagesByMD5Writer);
1709        long numPBUItems = pagesByURLProcessor.closeDown(tmpDir, newDbDir);
1710
1711        //
1712
// 2. Process the pagesByMD5 edit stream. This will
1713
// make calls to deleteLink(), which are processed later.
1714
//
1715
CloseProcessor pagesByMD5Processor = new PagesByMD5Processor(pagesByMD5, pagesByMD5Writer);
1716        long numPBMItems = pagesByMD5Processor.closeDown(tmpDir, newDbDir);
1717
1718        //
1719
// 3. Process the linksByMD5 edit stream first. This
1720
// will generate a number of edits for the linksByURL
1721
// stream. This also processes the calls to deleteLink()
1722
// that may have been invoked as part of the above call
1723
// to process pagesByMD5.
1724
CloseProcessor linksByMD5Processor = new LinksByMD5Processor(linksByMD5, linksByMD5Writer, linksByURLWriter);
1725        long numLBMItems = linksByMD5Processor.closeDown(tmpDir, newDbDir);
1726
1727        //
1728
// 4. Process the linksByURL edit stream. This will also
1729
// read through the sorted PagesByURL file, and modify
1730
// the Links so that they indicated whether the target
1731
// Page has any outlinks or not.
1732
//
1733

1734        // Duplicate the LINKS_BY_MD5 editsWriter, because the 1st one has
1735
// already been closed.
1736
EditSectionGroupWriter targetOutlinkEditsWriter = new EditSectionGroupWriter(nfs, machineNum, totalMachines, LINKS_BY_MD5, LinkInstruction.class, NullWritable.class, new EditSectionGroupWriter.LinkMD5Extractor());
1737
1738        // Find the just-written dbsection output for PAGES_BY_URL
1739
File newSectionDir = new File(newDbDir, "dbsection." + machineNum);
1740        File newPagesByURL = new File(newSectionDir, PAGES_BY_URL);
1741
1742        CloseProcessor linksByURLProcessor = new LinksByURLProcessor(linksByURL, linksByURLWriter, new MapFile.Reader(nfs, newPagesByURL.getPath(), new UTF8.Comparator()), targetOutlinkEditsWriter);
1743        long numLBUItems = linksByURLProcessor.closeDown(tmpDir, newDbDir);
1744
1745        //
1746
// If the number of linksByURL processed is zero, then
1747
// there's no reason to do all of the following with
1748
// a 2nd pass through linksByMD5.
1749
//
1750
if (numLBUItems != 0) {
1751            //
1752
// 5. Step 4 did several things to the LinksByURL db.
1753
// First, it implemented all the changes generated
1754
// by instructions from LinksByMD5Processor. Second,
1755
// it made lots of calls to setTargetHasOutlink. This
1756
// changes the content of the Link objects.
1757
//
1758
// So now we need to reconstruct the LinksByMD5
1759
// list, using the Links we created in step #4.
1760
//
1761
File newLinksByMD5 = new File(newSectionDir, LINKS_BY_MD5);
1762            MapFile.Reader linksByMD5ForStageTwo = new MapFile.Reader(nfs, newLinksByMD5.getPath(), new Link.MD5Comparator());
1763
1764            File stageTwoDbDir = new File(newDbDir, "stage2.subdir");
1765            CloseProcessor linksByMD5StageTwoProcessor = new LinksByMD5Processor(linksByMD5ForStageTwo, targetOutlinkEditsWriter, null);
1766            numLBMItems = linksByMD5StageTwoProcessor.closeDown(tmpDir, stageTwoDbDir);
1767
1768            //
1769
// 6. Now move the Stage2 LinksByMD5 file up to replace
1770
// the one at the primary level
1771
//
1772
linksByMD5ForStageTwo.close();
1773            File stageOneLinksByMD5 = new File(newDbDir, LINKS_BY_MD5);
1774            File stageTwoLinksByMD5 = new File(stageTwoDbDir, LINKS_BY_MD5);
1775            nfs.delete(stageOneLinksByMD5);
1776            nfs.rename(stageTwoLinksByMD5, stageOneLinksByMD5);
1777        }
1778
1779        //
1780
// 7. Finally, write out the total num of pages and links
1781
//
1782
File sectionStats = new File(newSectionDir, STATS_FILE);
1783        DataOutputStream out = new DataOutputStream(nfs.create(sectionStats, true));
1784        try {
1785            //
1786
// These counts are guaranteed to be correct; they're
1787
// based on the counts made during processing of primary-key
1788
// edits. Pages are always counted by URL first, and only
1789
// subsequently by MD5 if there are any edits to make. Links
1790
// are always counted by MD5 first, and only by URL subsequently
1791
// and conditionally.
1792
//
1793
// If there are a bunch of edits that result in no modifications
1794
// to the db, the two sets of counts (one for URL, one for
1795
// MD5) could become out of sync. So we use the ones that
1796
// are sure to be accurate.
1797
//
1798
out.write(CUR_VERSION);
1799            out.writeLong(numPBUItems);
1800            out.writeLong(numLBMItems);
1801        } finally {
1802            out.close();
1803        }
1804
1805        // Close down the db-readers
1806
pagesByURL.close();
1807        pagesByMD5.close();
1808        linksByMD5.close();
1809        linksByURL.close();
1810
1811        //////////////////////////////////////////////////////////////
1812
// Now we need to do a distributed-close. It works by
1813
// the "last person out turns off the lights" protocol.
1814
// All the processors but one will exit without doing anything.
1815
// The last one to exit does all the directory moves.
1816
//////////////////////////////////////////////////////////////
1817

1818        //
1819
// First step is to obtain the global writeLock exclusively.
1820
// DBReaders will try to obtain this non-exclusively. That
1821
// way, there can be many readers at once, but these must
1822
// leave before a single process can blow away the directories.
1823
//
1824
nfs.lock(globalWriteLock, true);
1825
1826        //
1827
// Read in how many processes have closed already
1828
//
1829
int numCloses = 0;
1830        if (nfs.exists(closeCounter)) {
1831            DataInputStream in = new DataInputStream(nfs.open(closeCounter));
1832            try {
1833                in.read(); // version
1834
numCloses = in.readInt();
1835            } finally {
1836                in.close();
1837            }
1838        }
1839        if (numCloses == totalMachines) {
1840            throw new IOException("All the processors have already shut down. Impossible condition!");
1841        }
1842        
1843        // Bump that number by 1.
1844
out = new DataOutputStream(nfs.create(closeCounter, true));
1845        try {
1846            out.write(CLOSE_COUNTER_VERSION);
1847            out.writeInt(numCloses + 1);
1848        } finally {
1849            out.close();
1850        }
1851
1852        // Check if this processor is the last one to close.
1853
if (numCloses == totalMachines - 1) {
1854            // Delete edits that might still be lingering around...
1855
for (int i = 0; i < totalMachines; i++) {
1856                new EditSectionGroupReader(nfs, PAGES_BY_URL, i, totalMachines).delete();
1857                new EditSectionGroupReader(nfs, PAGES_BY_MD5, i, totalMachines).delete();
1858                new EditSectionGroupReader(nfs, LINKS_BY_URL, i, totalMachines).delete();
1859                new EditSectionGroupReader(nfs, LINKS_BY_MD5, i, totalMachines).delete();
1860            }
1861            
1862            //
1863
// Write out the "complete" flag, which tells
1864
// readers it's OK to proceed
1865
//
1866
File dirIsComplete = new File(newDbDir, "dbIsComplete");
1867            out = new DataOutputStream(nfs.create(dirIsComplete));
1868            try {
1869                out.writeInt(IS_COMPLETE); // Magic number
1870
} finally {
1871                out.close();
1872            }
1873
1874            // Here we need to 'finish' the db operation.
1875
// That involves: 1. Removing the tmpdir.
1876
// 2. Moving the dbDir to oldDbDir
1877
// 3. Renaming the newDbDir to dbDir
1878
// 4. Removing the oldDbDir
1879
//
1880

1881            // 1.
1882
nfs.delete(tmpDir);
1883
1884            // 2.
1885
nfs.rename(dbDir, oldDbDir);
1886            
1887            // 3.
1888
nfs.rename(newDbDir, dbDir);
1889
1890            // 4.
1891
nfs.delete(oldDbDir);
1892        }
1893
1894        // Done.
1895
nfs.release(globalWriteLock);
1896        nfs.release(localWriteLock);
1897    }
1898
1899    /////////////////////
1900
// Methods for adding, and managing, db operations
1901
////////////////////
1902

1903    /**
1904     * Add a page to the page database
1905     */

1906    public synchronized void addPage(Page page) throws IOException {
1907        // The 2nd (byMD5) part is handled during processing of the 1st.
1908
pagesByURLEdits++;
1909        piwriter.appendInstructionInfo(pagesByURLWriter, page, ADD_PAGE, NullWritable.get());
1910    }
1911
1912    /**
1913     * Add a page to the page database, with a brand-new score
1914     */

1915    public synchronized void addPageWithScore(Page page) throws IOException {
1916        // The 2nd (byMD5) part is handled during processing of the 1st.
1917
pagesByURLEdits++;
1918        piwriter.appendInstructionInfo(pagesByURLWriter, page, ADD_PAGE_WITH_SCORE, NullWritable.get());
1919    }
1920
1921    /**
1922     * Don't replace the one in the database, if there is one.
1923     */

1924    public synchronized void addPageIfNotPresent(Page page) throws IOException {
1925        // The 2nd (index) part is handled during processing of the 1st.
1926
pagesByURLEdits++;
1927        piwriter.appendInstructionInfo(pagesByURLWriter, page, ADD_PAGE_IFN_PRESENT, NullWritable.get());
1928    }
1929
1930    /**
1931     * Don't replace the one in the database, if there is one.
1932     *
1933     * If we do insert the new Page, then we should also insert
1934     * the given Link object.
1935     */

1936    public synchronized void addPageIfNotPresent(Page page, Link link) throws IOException {
1937        // The 2nd (index) part is handled during processing of the 1st.
1938
pagesByURLEdits++;
1939        piwriter.appendInstructionInfo(pagesByURLWriter, page, link, ADD_PAGE_IFN_PRESENT, NullWritable.get());
1940    }
1941
1942    /**
1943     * Remove a page from the page database.
1944     */

1945    public synchronized void deletePage(String JavaDoc url) throws IOException {
1946        // The 2nd (index) part is handled during processing of the 1st.
1947
Page p = new Page();
1948        p.setURL(url);
1949        pagesByURLEdits++;
1950        piwriter.appendInstructionInfo(pagesByURLWriter, p, DEL_PAGE, NullWritable.get());
1951    }
1952
1953    /**
1954     * Add a link to the link database
1955     */

1956    public synchronized void addLink(Link lr) throws IOException {
1957        linksByMD5Edits++;
1958        liwriter.appendInstructionInfo(linksByMD5Writer, lr, ADD_LINK, NullWritable.get());
1959    }
1960
1961    /**
1962     * Remove links with the given MD5 from the db.
1963     */

1964    private synchronized void deleteLink(MD5Hash md5) throws IOException {
1965        linksByMD5Edits++;
1966        liwriter.appendInstructionInfo(linksByMD5Writer, new Link(md5, 0, "", ""), DEL_LINK, NullWritable.get());
1967    }
1968
1969    /**
1970     * The WebDBWriter.main() provides some handy methods for
1971     * testing the WebDB.
1972     */

1973    public static void main(String JavaDoc argv[]) throws FileNotFoundException, IOException {
1974        if (argv.length < 2) {
1975            System.out.println("Usage: java net.nutch.db.DistributedWebDBWriter (-local | -ndfs <namenode:port>) <root> [-create <numProcessors>] | <machineInt> ([-addpage id url] | [-addpageifnp id url] | [-deletepage url] | [-addlink fromID url] | [-deletelink fromID])");
1976            return;
1977        }
1978
1979        int i = 0;
1980        NutchFileSystem nfs = NutchFileSystem.parseArgs(argv, i);
1981        File root = new File(argv[i++]);
1982
1983        if ("-create".equals(argv[i])) {
1984            i++;
1985            DistributedWebDBWriter.createDB(nfs, root, Integer.parseInt(argv[i++]));
1986            System.out.println("Created webdb at " + nfs + ", " + root);
1987        } else {
1988            int machineNum = Integer.parseInt(argv[i++]);
1989            String JavaDoc cmd = argv[i++];
1990
1991            if ("-addpage".equals(cmd)) {
1992                MD5Hash md5 = new MD5Hash(argv[i++]);
1993                String JavaDoc url = argv[i++];
1994
1995                DistributedWebDBWriter writer = new DistributedWebDBWriter(nfs, root, machineNum);
1996                Page page = new Page(url, md5);
1997                writer.addPageWithScore(page);
1998                System.out.println("Added page (with score): " + page);
1999                writer.close();
2000            } else if ("-addpageifnp".equals(cmd)) {
2001                MD5Hash md5 = new MD5Hash(argv[i++]);
2002                String JavaDoc url = argv[i++];
2003
2004                DistributedWebDBWriter writer = new DistributedWebDBWriter(nfs, root, machineNum);
2005                try {
2006                    Page page = new Page(url, md5);
2007                    writer.addPageIfNotPresent(page);
2008                    System.out.println("Added page: " + page);
2009                } finally {
2010                    writer.close();
2011                }
2012            } else if ("-deletepage".equals(cmd)) {
2013                String JavaDoc url = argv[i++];
2014                DistributedWebDBWriter writer = new DistributedWebDBWriter(nfs, root, machineNum);
2015
2016                try {
2017                    writer.deletePage(url.trim());
2018                    System.out.println("Deleted item(s)");
2019                } finally {
2020                    writer.close();
2021                }
2022            } else if ("-addlink".equals(cmd)) {
2023                MD5Hash fromID = new MD5Hash(argv[i++]);
2024                String JavaDoc url = argv[i++];
2025                DistributedWebDBWriter writer = new DistributedWebDBWriter(nfs, root, machineNum);
2026
2027                try {
2028                    Link link = new Link(fromID, MD5Hash.digest("randomstring.com").halfDigest(), url, "SomeRandomAnchorText_" + System.currentTimeMillis());
2029                    writer.addLink(link);
2030                    System.out.println("Added link: " + link);
2031                } finally {
2032                    writer.close();
2033                }
2034            } else if ("-deletelink".equals(cmd)) {
2035                MD5Hash fromID = new MD5Hash(argv[i++]);
2036
2037                DistributedWebDBWriter writer = new DistributedWebDBWriter(nfs, root, machineNum);
2038                try {
2039                    writer.deleteLink(fromID);
2040                    System.out.println("Deleted item(s)");
2041                } finally {
2042                    writer.close();
2043                }
2044            } else {
2045                System.out.println("Sorry, no command with name " + cmd);
2046            }
2047        }
2048    }
2049}
2050
Popular Tags