1 2 3 4 package net.nutch.db; 5 6 import java.io.*; 7 import java.util.*; 8 import java.util.logging.*; 9 import java.nio.channels.*; 10 11 import net.nutch.io.*; 12 import net.nutch.fs.*; 13 import net.nutch.util.*; 14 import net.nutch.pagedb.*; 15 import net.nutch.linkdb.*; 16 17 33 public class DistributedWebDBWriter implements IWebDBWriter { 34 static final Logger LOG = LogFormatter.getLogger("net.nutch.db.WebDBWriter"); 35 static final byte CUR_VERSION = 0; 36 static final byte OPEN_COUNTER_VERSION = 0; 37 static final byte CLOSE_COUNTER_VERSION = 0; 38 static final byte MACHINE_INFO_VERSION = 0; 39 40 static int READY_TO_USE = 0xbabecafe; 42 static int IS_COMPLETE = 0xbabe0000; 43 static int WRITE_LOCK_INFO = 0xcafe0000; 44 static long LONG_TIMEOUT = 10 * 1000; 45 46 static final byte ADD_PAGE = 0; 48 static final byte ADD_PAGE_WITH_SCORE = 1; 49 static final byte ADD_PAGE_IFN_PRESENT = 2; 50 static final byte DEL_PAGE = 3; 51 static final int ADD_LINK = 0; 52 static final int DEL_LINK = 1; 53 static final int DEL_SINGLE_LINK = 2; 54 55 static final String PAGES_BY_URL = "pagesByURL"; 57 static final String PAGES_BY_MD5 = "pagesByMD5"; 58 static final String LINKS_BY_URL = "linksByURL"; 59 static final String LINKS_BY_MD5 = "linksByMD5"; 60 static final String STATS_FILE = "stats"; 61 static final String META_SHAREGROUP = "metashare"; 62 static final String METAINFO = "metainfo"; 63 64 static final int NO_OUTLINKS = 0; 66 static final int HAS_OUTLINKS = 1; 67 static final int LINK_INVALID = 2; 68 69 72 public static class PageInstruction implements WritableComparable { 73 byte opcode; 74 boolean hasLink; 75 Page page; 76 Link link; 77 78 80 public PageInstruction() {} 81 82 84 public PageInstruction(Page page, int opcode) { 85 set(page, opcode); 86 } 87 88 90 public PageInstruction(Page page, Link link, int opcode) { 91 set(page, link, opcode); 92 } 93 94 97 public void set(PageInstruction that) { 98 this.opcode = that.opcode; 99 100 if (this.page == null) { 101 this.page = new Page(); 102 } 103 this.page.set(that.page); 104 105 if (this.link == null) { 106 this.link = new Link(); 107 } 108 this.hasLink = that.hasLink; 109 if (this.hasLink) { 110 this.link.set(that.link); 111 } 112 } 113 114 117 public void set(Page page, int opcode) { 118 this.opcode = (byte) opcode; 119 this.page = page; 120 this.hasLink = false; 121 this.link = null; 122 } 123 124 127 public void set(Page page, Link link, int opcode) { 128 this.opcode = (byte) opcode; 129 this.page = page; 130 this.hasLink = true; 131 this.link = link; 132 } 133 134 public int compareTo(Object o) { 138 int pageResult = this.page.compareTo(((PageInstruction) o).page); 139 if (pageResult != 0) { 140 return pageResult; 141 } else { 142 return this.opcode - (((PageInstruction) o).opcode); 143 } 144 } 145 public void write(DataOutput out) throws IOException { 146 out.writeByte(opcode); 147 page.write(out); 148 out.writeByte(hasLink ? 1 : 0); 149 if (hasLink) { 150 link.write(out); 151 } 152 } 153 public void readFields(DataInput in) throws IOException { 154 opcode = in.readByte(); 155 if (page == null) { 156 page = new Page(); 157 } 158 page.readFields(in); 159 160 if (link == null) { 161 link = new Link(); 162 } 163 hasLink = (1 == in.readByte()); 164 if (hasLink) { 165 link.readFields(in); 166 } 167 } 168 public Page getPage() { 169 return page; 170 } 171 public Link getLink() { 172 if (hasLink) { 173 return link; 174 } else { 175 return null; 176 } 177 } 178 public int getInstruction() { 179 return opcode; 180 } 181 182 185 public static class PageComparator extends WritableComparator { 186 private static final Page.Comparator PAGE_COMPARATOR = 187 new Page.Comparator(); 188 189 public PageComparator() { super(PageInstruction.class); } 190 191 192 public int compare(byte[] b1, int s1, int l1, 193 byte[] b2, int s2, int l2) { 194 int opcode1 = b1[s1]; 195 int opcode2 = b2[s2]; 196 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 197 if (c != 0) 198 return c; 199 return opcode1 - opcode2; 200 } 201 } 202 203 206 public static class UrlComparator extends WritableComparator { 207 private static final Page.UrlComparator PAGE_COMPARATOR = 208 new Page.UrlComparator(); 209 210 public UrlComparator() { super(PageInstruction.class); } 211 212 216 public int compare(WritableComparable a, WritableComparable b) { 217 PageInstruction instructionA = (PageInstruction)a; 218 PageInstruction instructionB = (PageInstruction)b; 219 Page pageA = instructionA.getPage(); 220 Page pageB = instructionB.getPage(); 221 222 int result = pageA.getURL().compareTo(pageB.getURL()); 223 if (result != 0) { 224 return result; 225 } else { 226 return instructionA.opcode - instructionB.opcode; 227 } 228 } 229 230 233 public int compare(byte[] b1, int s1, int l1, 234 byte[] b2, int s2, int l2) { 235 int opcode1 = b1[s1]; 236 int opcode2 = b2[s2]; 237 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 238 if (c != 0) 239 return c; 240 return opcode1 - opcode2; 241 } 242 } 243 } 244 245 250 public static class PageInstructionWriter { 251 PageInstruction pi = new PageInstruction(); 252 253 255 public PageInstructionWriter() { 256 } 257 258 262 public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Page page, int opcode, Writable val) throws IOException { 263 pi.set(page, opcode); 264 writer.append(pi, val); 265 } 266 267 271 public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Page page, Link link, int opcode, Writable val) throws IOException { 272 pi.set(page, link, opcode); 273 writer.append(pi, val); 274 } 275 } 276 277 282 private static class DeduplicatingPageSequenceReader { 283 SequenceFile.Reader edits; 284 PageInstruction current = new PageInstruction(); 285 UTF8 currentUrl = new UTF8(); 286 boolean haveCurrent; 287 288 290 public DeduplicatingPageSequenceReader(SequenceFile.Reader edits) throws IOException { 291 this.edits = edits; 292 this.haveCurrent = edits.next(current, NullWritable.get()); 293 } 294 295 297 public boolean next(PageInstruction result) throws IOException { 298 if (!haveCurrent) { 299 return false; 300 } 301 302 currentUrl.set(current.getPage().getURL()); 303 result.set(current); 305 do { 306 } while ((haveCurrent = edits.next(current, NullWritable.get())) && 308 currentUrl.compareTo(current.getPage().getURL()) == 0); 309 return true; 310 } 311 } 312 313 314 317 public static class LinkInstruction implements WritableComparable { 318 Link link; 319 int instruction; 320 321 323 public LinkInstruction() { 324 } 325 326 328 public LinkInstruction(Link link, int instruction) { 329 set(link, instruction); 330 } 331 332 335 public void set(LinkInstruction that) { 336 this.instruction = that.instruction; 337 338 if (this.link == null) 339 this.link = new Link(); 340 341 this.link.set(that.link); 342 } 343 344 347 public void set(Link link, int instruction) { 348 this.link = link; 349 this.instruction = instruction; 350 } 351 352 public int compareTo(Object o) { 356 return this.link.compareTo(((LinkInstruction) o).link); 357 } 358 public void write(DataOutput out) throws IOException { 359 out.writeByte(instruction); 360 link.write(out); 361 } 362 public void readFields(DataInput in) throws IOException { 363 this.instruction = in.readByte(); 364 if (link == null) 365 link = new Link(); 366 link.readFields(in); 367 } 368 public Link getLink() { 369 return link; 370 } 371 public int getInstruction() { 372 return instruction; 373 } 374 375 378 public static class MD5Comparator extends WritableComparator { 379 private static final Link.MD5Comparator MD5_COMPARATOR = 380 new Link.MD5Comparator(); 381 382 public MD5Comparator() { super(LinkInstruction.class); } 383 384 public int compare(WritableComparable a, WritableComparable b) { 385 LinkInstruction instructionA = (LinkInstruction)a; 386 LinkInstruction instructionB = (LinkInstruction)b; 387 return instructionA.link.md5Compare(instructionB.link); 388 } 389 390 391 public int compare(byte[] b1, int s1, int l1, 392 byte[] b2, int s2, int l2) { 393 return MD5_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 394 } 395 } 396 397 400 public static class UrlComparator extends WritableComparator { 401 private static final Link.UrlComparator URL_COMPARATOR = 402 new Link.UrlComparator(); 403 404 public UrlComparator() { super(LinkInstruction.class); } 405 406 public int compare(WritableComparable a, WritableComparable b) { 407 LinkInstruction instructionA = (LinkInstruction)a; 408 LinkInstruction instructionB = (LinkInstruction)b; 409 return instructionA.link.urlCompare(instructionB.link); 410 411 } 412 413 416 public int compare(byte[] b1, int s1, int l1, 417 byte[] b2, int s2, int l2) { 418 return URL_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 419 } 420 } 421 } 422 423 428 public static class LinkInstructionWriter { 429 LinkInstruction li = new LinkInstruction(); 430 431 433 public LinkInstructionWriter() { 434 } 435 436 440 public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Link link, int opcode, Writable val) throws IOException { 441 li.set(link, opcode); 442 writer.append(li, val); 443 } 444 } 445 446 451 class DeduplicatingLinkSequenceReader { 452 Link currentKey = new Link(); 453 LinkInstruction current = new LinkInstruction(); 454 SequenceFile.Reader edits; 455 boolean haveCurrent; 456 457 459 public DeduplicatingLinkSequenceReader(SequenceFile.Reader edits) throws IOException { 460 this.edits = edits; 461 this.haveCurrent = edits.next(current, NullWritable.get()); 462 } 463 464 465 469 public boolean next(LinkInstruction key) throws IOException { 470 if (! haveCurrent) { 471 return false; 472 } 473 474 currentKey.set(current.getLink()); 475 476 do { 477 key.set(current); 478 } while ((haveCurrent = edits.next(current, NullWritable.get())) && 479 currentKey.compareTo(current.getLink()) == 0); 480 return true; 481 } 482 } 483 484 485 495 private abstract class CloseProcessor { 496 String basename; 497 String curDBPart; 498 MapFile.Reader oldDb; 499 EditSectionGroupWriter editWriter; 500 SequenceFile.Sorter sorter; 501 WritableComparator comparator; 502 Class keyClass, valueClass; 503 long itemsWritten = 0; 504 505 508 CloseProcessor(String basename, MapFile.Reader oldDb, EditSectionGroupWriter editWriter, SequenceFile.Sorter sorter, WritableComparator comparator, Class keyClass, Class valueClass, String curDBPart) { 509 this.basename = basename; 510 this.oldDb = oldDb; 511 this.editWriter = editWriter; 512 this.sorter = sorter; 513 this.comparator = comparator; 514 this.keyClass = keyClass; 515 this.valueClass = valueClass; 516 this.curDBPart = curDBPart; 517 } 518 519 526 long closeDown(File workingDir, File outputDir) throws IOException { 527 editWriter.close(); 531 532 File sectionDir = new File(outputDir, "dbsection." + machineNum); 536 File newDbFile = new File(sectionDir, basename); 537 538 EditSectionGroupReader edits = new EditSectionGroupReader(nfs, basename, machineNum, totalMachines); 546 int numEdits = edits.numEdits(); 547 548 if (numEdits != 0) { 550 File mergedEditsFile = new File(sectionDir, "mergedEdits"); 551 edits.mergeSectionComponents(mergedEditsFile); 552 File sortedEditsFile = new File(mergedEditsFile.getPath() + ".sorted"); 553 554 long startSort = System.currentTimeMillis(); 556 sorter.sort(mergedEditsFile.getPath(), sortedEditsFile.getPath()); 557 long endSort = System.currentTimeMillis(); 558 559 LOG.info("Processing " + basename + ": Sorted " + numEdits + " instructions in " + ((endSort - startSort) / 1000.0) + " seconds."); 560 LOG.info("Processing " + basename + ": Sorted " + (numEdits / ((endSort - startSort) / 1000.0)) + " instructions/second"); 561 562 nfs.delete(mergedEditsFile); 564 565 572 SequenceFile.Reader sortedEdits = new SequenceFile.Reader(nfs, sortedEditsFile.getPath()); 574 575 MapFile.Writer newDb = (comparator == null) ? new MapFile.Writer(nfs, newDbFile.getPath(), keyClass, valueClass) : new MapFile.Writer(nfs, newDbFile.getPath(), comparator, valueClass); 577 578 oldDb.reset(); 581 582 long startMerge = System.currentTimeMillis(); 584 mergeEdits(oldDb, sortedEdits, newDb); 585 long endMerge = System.currentTimeMillis(); 586 LOG.info("Processing " + basename + ": Merged to new DB containing " + itemsWritten + " records in " + ((endMerge - startMerge) / 1000.0) + " seconds"); 587 LOG.info("Processing " + basename + ": Merged " + (itemsWritten / ((endMerge - startMerge) / 1000.0)) + " records/second"); 588 589 sortedEdits.close(); 591 newDb.close(); 592 593 nfs.delete(sortedEditsFile); 595 } else { 596 long startCopy = System.currentTimeMillis(); 599 600 File srcSectionDir = new File(oldDbDir, "dbsection." + machineNum); 601 File srcDbFile = new File(srcSectionDir, basename); 602 nfs.rename(srcDbFile, newDbFile); 603 long endCopy = System.currentTimeMillis(); 604 LOG.info("Processing " + basename + ": Copied file (" + srcDbFile.length()+ " bytes) in " + ((endCopy - startCopy) / 1000.0) + " secs."); 605 } 606 607 edits.delete(); 609 return itemsWritten; 610 } 611 612 616 abstract void mergeEdits(MapFile.Reader db, SequenceFile.Reader edits, MapFile.Writer newDb) throws IOException; 617 } 618 619 624 private class PagesByURLProcessor extends CloseProcessor { 625 EditSectionGroupWriter futureEdits; 626 627 631 PagesByURLProcessor(MapFile.Reader db, EditSectionGroupWriter editWriter, EditSectionGroupWriter futureEdits) { 632 super(PAGES_BY_URL, db, editWriter, new SequenceFile.Sorter(nfs, new PageInstruction.UrlComparator(), NullWritable.class), new UTF8.Comparator(), null, Page.class, "PagesByURLPart"); 633 this.futureEdits = futureEdits; 634 } 635 636 639 void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException { 640 DeduplicatingPageSequenceReader edits = new DeduplicatingPageSequenceReader(sortedEdits); 642 WritableComparable readerKey = new UTF8(); 643 Page readerVal = new Page(); 644 PageInstruction editItem = new PageInstruction(); 645 int futureOrdering = 0; 646 647 boolean hasEntries = db.next(readerKey, readerVal); 649 boolean hasEdits = edits.next(editItem); 650 651 while (hasEntries && hasEdits) { 654 int comparison = readerKey.compareTo(editItem.getPage().getURL()); 655 int curInstruction = editItem.getInstruction(); 656 657 if ((curInstruction == ADD_PAGE) || 659 (curInstruction == ADD_PAGE_WITH_SCORE) || 660 (curInstruction == ADD_PAGE_IFN_PRESENT)) { 661 662 if (comparison < 0) { 663 newDb.append(readerKey, readerVal); 666 itemsWritten++; 667 hasEntries = db.next(readerKey, readerVal); 668 } else if (comparison == 0) { 669 if ((curInstruction == ADD_PAGE) || 677 (curInstruction == ADD_PAGE_WITH_SCORE)) { 678 pagesByMD5Edits++; 688 689 Page editItemPage = editItem.getPage(); 702 703 if (curInstruction == ADD_PAGE) { 704 editItemPage.setScore(readerVal.getScore(), readerVal.getNextScore()); 705 } 706 707 piwriter.appendInstructionInfo(futureEdits, editItemPage, ADD_PAGE, NullWritable.get()); 708 709 newDb.append(editItemPage.getURL(), editItemPage); 713 714 if (editItemPage.compareTo(readerVal) != 0) { 725 pagesByMD5Edits++; 726 piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get()); 727 } 728 729 itemsWritten++; 730 731 hasEntries = db.next(readerKey, readerVal); 733 } else { 734 } 739 hasEdits = edits.next(editItem); 741 742 } else if (comparison > 0) { 743 pagesByMD5Edits++; 751 752 if (curInstruction == ADD_PAGE_IFN_PRESENT) { 757 Link editLink = editItem.getLink(); 758 if (editLink != null) { 759 addLink(editLink); 760 } 761 } 762 piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get()); 763 764 newDb.append(editItem.getPage().getURL(), editItem.getPage()); 767 itemsWritten++; 768 769 hasEdits = edits.next(editItem); 771 } 772 } else if (curInstruction == DEL_PAGE) { 773 if (comparison < 0) { 774 newDb.append(readerKey, readerVal); 777 itemsWritten++; 778 hasEntries = db.next(readerKey, readerVal); 779 } else if (comparison == 0) { 780 pagesByMD5Edits++; 785 piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get()); 786 787 hasEntries = db.next(readerKey, readerVal); 791 792 hasEdits = edits.next(editItem); 794 } else if (comparison > 0) { 795 hasEdits = edits.next(editItem); 798 } 799 } 800 } 801 802 while (! hasEntries && hasEdits) { 804 int curInstruction = editItem.getInstruction(); 805 if (curInstruction == ADD_PAGE || 806 curInstruction == ADD_PAGE_WITH_SCORE || 807 curInstruction == ADD_PAGE_IFN_PRESENT) { 808 811 pagesByMD5Edits++; 813 piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get()); 814 815 newDb.append(editItem.getPage().getURL(), editItem.getPage()); 817 itemsWritten++; 818 } else if (curInstruction == DEL_PAGE) { 819 } 822 823 hasEdits = edits.next(editItem); 825 } 826 827 while (hasEntries && ! hasEdits) { 830 newDb.append(readerKey, readerVal); 831 itemsWritten++; 832 hasEntries = db.next(readerKey, readerVal); 833 } 834 } 835 } 836 837 842 private class PagesByMD5Processor extends CloseProcessor { 843 845 PagesByMD5Processor(MapFile.Reader db, EditSectionGroupWriter editWriter) { 846 super(PAGES_BY_MD5, db, editWriter, new SequenceFile.Sorter(nfs, new PageInstruction.PageComparator(), NullWritable.class), null, Page.class, NullWritable.class, "PagesByMD5Part"); 847 } 848 849 851 void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException { 852 Page readerItem = new Page(); 854 PageInstruction editItem = new PageInstruction(); 855 856 Page deletedItem = new Page(), lastItem = new Page(); 858 boolean justDeletedItem = false; 859 boolean newReaderItem = false; 860 int itemRepeats = 0; 861 862 boolean hasEntries = db.next(readerItem, NullWritable.get()); 864 boolean hasEdits = sortedEdits.next(editItem, NullWritable.get()); 865 if (hasEntries) { 866 outBuf.reset(); 870 readerItem.write(outBuf); 871 inBuf.reset(outBuf.getData(), outBuf.getLength()); 872 lastItem.readFields(inBuf); 873 itemRepeats = 0; 874 } 875 876 while (hasEdits && hasEntries) { 879 int comparison = readerItem.compareTo(editItem.getPage()); 880 int curInstruction = editItem.getInstruction(); 881 882 if (curInstruction == ADD_PAGE) { 886 if (comparison < 0) { 887 newDb.append(readerItem, NullWritable.get()); 890 itemsWritten++; 891 hasEntries = db.next(readerItem, NullWritable.get()); 892 newReaderItem = true; 893 } else if (comparison == 0) { 894 newDb.append(editItem.getPage(), NullWritable.get()); 905 itemsWritten++; 906 hasEntries = db.next(readerItem, NullWritable.get()); 907 newReaderItem = true; 908 hasEdits = sortedEdits.next(editItem, NullWritable.get()); 909 } else if (comparison > 0) { 910 newDb.append(editItem.getPage(), NullWritable.get()); 913 itemsWritten++; 914 hasEdits = sortedEdits.next(editItem, NullWritable.get()); 915 } 916 } else if (curInstruction == ADD_PAGE_IFN_PRESENT) { 917 throw new IOException("Should never process ADD_PAGE_IFN_PRESENT for the index: " + editItem); 918 } else if (curInstruction == DEL_PAGE) { 919 if (comparison < 0) { 920 newDb.append(readerItem, NullWritable.get()); 923 itemsWritten++; 924 hasEntries = db.next(readerItem, NullWritable.get()); 925 newReaderItem = true; 926 } else if (comparison == 0) { 927 hasEntries = db.next(readerItem, NullWritable.get()); 934 newReaderItem = true; 935 hasEdits = sortedEdits.next(editItem, NullWritable.get()); 936 937 justDeletedItem = true; 939 } else if (comparison > 0) { 940 throw new IOException("An unapplicable DEL_PAGE should never appear during index-merge: " + editItem); 943 } 944 } 945 946 if (newReaderItem) { 954 if (hasEntries && readerItem.getMD5().compareTo(lastItem.getMD5()) == 0) { 958 itemRepeats++; 959 } else { 960 if (justDeletedItem && itemRepeats == 0) { 967 deleteLink(lastItem.getMD5()); 968 } 969 970 outBuf.reset(); 972 readerItem.write(outBuf); 973 inBuf.reset(outBuf.getData(), outBuf.getLength()); 974 lastItem.readFields(inBuf); 975 itemRepeats = 0; 976 } 977 newReaderItem = false; 979 } 980 justDeletedItem = false; 982 } 983 984 while (! hasEntries && hasEdits) { 986 int curInstruction = editItem.getInstruction(); 987 if (curInstruction == ADD_PAGE) { 988 newDb.append(editItem.getPage(), NullWritable.get()); 990 itemsWritten++; 991 } else if (curInstruction == ADD_PAGE_IFN_PRESENT) { 992 throw new IOException("Should never process ADD_PAGE_IFN_PRESENT for the index: " + editItem); 993 } else if (curInstruction == DEL_PAGE) { 994 throw new IOException("An unapplicable DEL_PAGE should never appear during index-merge: " + editItem); 997 } 998 hasEdits = sortedEdits.next(editItem, NullWritable.get()); 999 } 1000 1001 while (hasEntries && ! hasEdits) { 1004 newDb.append(readerItem, NullWritable.get()); 1006 itemsWritten++; 1007 hasEntries = db.next(readerItem, NullWritable.get()); 1008 newReaderItem = true; 1009 } 1010 } 1011 } 1012 1013 1019 private class LinksByMD5Processor extends CloseProcessor { 1020 EditSectionGroupWriter futureEdits; 1021 1022 1024 public LinksByMD5Processor(MapFile.Reader db, EditSectionGroupWriter editWriter, EditSectionGroupWriter futureEdits) { 1025 super(LINKS_BY_MD5, db, editWriter, new SequenceFile.Sorter(nfs, new LinkInstruction.MD5Comparator(), NullWritable.class), new Link.MD5Comparator(), Link.class, NullWritable.class, "LinksByMD5Part"); 1026 this.futureEdits = futureEdits; 1027 } 1028 1029 1033 void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException { 1034 WritableComparator comparator = new Link.MD5Comparator(); 1035 DeduplicatingLinkSequenceReader edits = new DeduplicatingLinkSequenceReader(sortedEdits); 1036 1037 LinkInstruction editItem = new LinkInstruction(); 1039 Link readerItem = new Link(); 1040 1041 boolean hasEntries = db.next(readerItem, NullWritable.get()); 1043 boolean hasEdits = edits.next(editItem); 1044 1045 while (hasEntries && hasEdits) { 1048 int curInstruction = editItem.getInstruction(); 1049 1050 if (curInstruction == ADD_LINK) { 1052 int comparison = comparator.compare(readerItem, editItem.getLink()); 1057 1058 if (comparison < 0) { 1059 newDb.append(readerItem, NullWritable.get()); 1062 itemsWritten++; 1063 hasEntries = db.next(readerItem, NullWritable.get()); 1064 } else if (comparison == 0) { 1065 if (futureEdits != null) { 1067 linksByURLEdits++; 1068 liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get()); 1069 } 1070 1071 newDb.append(editItem.getLink(), NullWritable.get()); 1075 itemsWritten++; 1076 hasEntries = db.next(readerItem, NullWritable.get()); 1077 hasEdits = edits.next(editItem); 1078 } else if (comparison > 0) { 1079 if (futureEdits != null) { 1081 linksByURLEdits++; 1082 liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get()); 1083 } 1084 1085 newDb.append(editItem.getLink(), NullWritable.get()); 1088 itemsWritten++; 1089 hasEdits = edits.next(editItem); 1090 } 1091 } else if ((curInstruction == DEL_LINK) || 1092 (curInstruction == DEL_SINGLE_LINK)) { 1093 int comparison = 0; 1099 if (curInstruction == DEL_LINK) { 1100 comparison = readerItem.getFromID().compareTo(editItem.getLink().getFromID()); 1101 } else { 1102 comparison = readerItem.md5Compare(editItem.getLink()); 1103 } 1104 1105 if (comparison < 0) { 1106 newDb.append(readerItem, NullWritable.get()); 1109 itemsWritten++; 1110 hasEntries = db.next(readerItem, NullWritable.get()); 1111 } else if (comparison == 0) { 1112 if (futureEdits != null) { 1124 linksByURLEdits++; 1125 liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_LINK, NullWritable.get()); 1126 } 1127 1128 hasEntries = db.next(readerItem, NullWritable.get()); 1133 if (curInstruction == DEL_SINGLE_LINK) { 1134 hasEdits = edits.next(editItem); 1135 } 1136 } else if (comparison > 0) { 1137 hasEdits = edits.next(editItem); 1139 } 1140 } 1141 } 1142 1143 while (! hasEntries && hasEdits) { 1145 int curInstruction = editItem.getInstruction(); 1146 1147 if (curInstruction == ADD_LINK) { 1148 if (futureEdits != null) { 1150 linksByURLEdits++; 1151 liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get()); 1152 } 1153 1154 newDb.append(editItem.getLink(), NullWritable.get()); 1156 itemsWritten++; 1157 } else if (curInstruction == DEL_LINK) { 1158 } 1160 hasEdits = edits.next(editItem); 1162 } 1163 1164 while (hasEntries && ! hasEdits) { 1167 newDb.append(readerItem, NullWritable.get()); 1168 itemsWritten++; 1169 hasEntries = db.next(readerItem, NullWritable.get()); 1170 } 1171 } 1172 } 1173 1174 1180 private class TargetTester { 1181 MapFile.Reader pagedb; 1182 boolean hasPage = false; 1183 UTF8 pageURL = null; 1184 Page page = null; 1185 1186 1188 public TargetTester(MapFile.Reader pagedb) throws IOException { 1189 this.pagedb = pagedb; 1190 this.pageURL = new UTF8(); 1191 this.page = new Page(); 1192 this.hasPage = pagedb.next(pageURL, page); 1193 } 1194 1195 1198 public int hasOutlinks(UTF8 curURL) throws IOException { 1199 int returnCode = NO_OUTLINKS; 1200 int comparison = pageURL.compareTo(curURL); 1201 1202 while (hasPage && comparison < 0) { 1203 hasPage = pagedb.next(pageURL, page); 1204 if (hasPage) { 1205 comparison = pageURL.compareTo(curURL); 1206 } 1207 } 1208 1209 if (hasPage) { 1210 if (comparison == 0) { 1211 returnCode = (page.getNumOutlinks() > 0) ? HAS_OUTLINKS : NO_OUTLINKS; 1212 } else if (comparison > 0) { 1213 returnCode = LINK_INVALID; 1220 } 1221 } 1222 return returnCode; 1223 } 1224 1225 1227 public void close() throws IOException { 1228 pagedb.close(); 1229 } 1230 } 1231 1232 1237 private class LinksByURLProcessor extends CloseProcessor { 1238 MapFile.Reader pageDb; 1239 EditSectionGroupWriter futureEdits; 1240 1241 1243 public LinksByURLProcessor(MapFile.Reader db, EditSectionGroupWriter editWriter, MapFile.Reader pageDb, EditSectionGroupWriter futureEdits) { 1244 super(LINKS_BY_URL, db, editWriter, new SequenceFile.Sorter(nfs, new LinkInstruction.UrlComparator(), NullWritable.class), new Link.UrlComparator(), Link.class, NullWritable.class, "LinksByURLPart"); 1245 this.pageDb = pageDb; 1246 this.futureEdits = futureEdits; 1247 } 1248 1249 1251 public long closeDown(File workingDir, File outputDir) throws IOException { 1252 long result = super.closeDown(workingDir, outputDir); 1253 pageDb.close(); 1254 return result; 1255 } 1256 1257 1260 void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException { 1261 WritableComparator comparator = new Link.UrlComparator(); 1262 1263 LinkInstruction editItem = new LinkInstruction(); 1265 Link readerItem = new Link(); 1266 1267 boolean hasEntries = db.next(readerItem, NullWritable.get()); 1269 boolean hasEdits = sortedEdits.next(editItem, NullWritable.get()); 1270 TargetTester targetTester = new TargetTester(pageDb); 1271 1272 while (hasEntries && hasEdits) { 1275 int curInstruction = editItem.getInstruction(); 1276 1277 if (curInstruction == ADD_LINK) { 1278 int comparison = comparator.compare(readerItem, editItem.getLink()); 1283 1284 if (comparison < 0) { 1285 int linkTest = targetTester.hasOutlinks(readerItem.getURL()); 1288 1289 if (linkTest == LINK_INVALID) { 1290 liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get()); 1291 targetOutlinkEdits++; 1292 } else { 1293 boolean oldOutlinkStatus = readerItem.targetHasOutlink(); 1294 boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS); 1295 if (oldOutlinkStatus != newOutlinkStatus) { 1298 readerItem.setTargetHasOutlink(newOutlinkStatus); 1299 liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get()); 1300 targetOutlinkEdits++; 1301 } 1302 newDb.append(readerItem, NullWritable.get()); 1303 itemsWritten++; 1304 } 1305 hasEntries = db.next(readerItem, NullWritable.get()); 1306 } else if (comparison == 0) { 1307 Link editLink = editItem.getLink(); 1311 int linkTest = targetTester.hasOutlinks(editLink.getURL()); 1312 1313 if (linkTest == LINK_INVALID) { 1316 liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get()); 1317 } else { 1318 editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS); 1319 liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get()); 1320 1321 newDb.append(editLink, NullWritable.get()); 1322 itemsWritten++; 1323 } 1324 targetOutlinkEdits++; 1325 1326 hasEntries = db.next(readerItem, NullWritable.get()); 1327 hasEdits = sortedEdits.next(editItem, NullWritable.get()); 1328 } else if (comparison > 0) { 1329 Link editLink = editItem.getLink(); 1332 int linkTest = targetTester.hasOutlinks(editLink.getURL()); 1333 1334 if (linkTest == LINK_INVALID) { 1336 liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get()); 1337 } else { 1338 editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS); 1339 liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get()); 1340 newDb.append(editLink, NullWritable.get()); 1341 itemsWritten++; 1342 } 1343 targetOutlinkEdits++; 1344 1345 hasEdits = sortedEdits.next(editItem, NullWritable.get()); 1346 } 1347 } else if (curInstruction == DEL_LINK) { 1348 int comparison = comparator.compare(readerItem, editItem.getLink()); 1356 1357 if (comparison < 0) { 1358 int linkTest = targetTester.hasOutlinks(readerItem.getURL()); 1361 1362 if (linkTest == LINK_INVALID) { 1364 liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get()); 1365 } else { 1366 readerItem.setTargetHasOutlink(linkTest == HAS_OUTLINKS); 1367 liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get()); 1368 newDb.append(readerItem, NullWritable.get()); 1369 itemsWritten++; 1370 } 1371 targetOutlinkEdits++; 1372 1373 hasEntries = db.next(readerItem, NullWritable.get()); 1374 } else if (comparison == 0) { 1375 hasEntries = db.next(readerItem, NullWritable.get()); 1379 hasEdits = sortedEdits.next(editItem, NullWritable.get()); 1380 } else if (comparison > 0) { 1381 hasEdits = sortedEdits.next(editItem, NullWritable.get()); 1383 } 1384 } 1385 } 1386 1387 while (! hasEntries && hasEdits) { 1389 int curInstruction = editItem.getInstruction(); 1390 1391 if (curInstruction == ADD_LINK) { 1392 1396 Link editLink = editItem.getLink(); 1400 int linkTest = targetTester.hasOutlinks(editLink.getURL()); 1401 if (linkTest == LINK_INVALID) { 1402 liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get()); 1403 } else { 1404 editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS); 1405 liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get()); 1406 newDb.append(editLink, NullWritable.get()); 1407 itemsWritten++; 1408 } 1409 targetOutlinkEdits++; 1410 } else if (curInstruction == DEL_LINK) { 1411 } 1413 hasEdits = sortedEdits.next(editItem, NullWritable.get()); 1415 } 1416 1417 while (hasEntries && ! hasEdits) { 1420 1424 int linkTest = targetTester.hasOutlinks(readerItem.getURL()); 1428 if (linkTest == LINK_INVALID) { 1429 liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get()); 1430 targetOutlinkEdits++; 1431 } else { 1432 boolean oldOutlinkStatus = readerItem.targetHasOutlink(); 1433 boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS); 1434 if (oldOutlinkStatus != newOutlinkStatus) { 1435 readerItem.setTargetHasOutlink(newOutlinkStatus); 1436 liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get()); 1437 targetOutlinkEdits++; 1438 } 1439 1440 newDb.append(readerItem, NullWritable.get()); 1442 itemsWritten++; 1443 } 1444 1445 hasEntries = db.next(readerItem, NullWritable.get()); 1447 } 1448 1449 targetTester.close(); 1450 } 1451 } 1452 1453 1457 public static void createDB(NutchFileSystem nfs, File root, int totalMachines) throws IOException { 1458 File stdDir = new File(root, "standard"); 1462 File machineInfo = new File(stdDir, "machineinfo"); 1463 if (nfs.exists(machineInfo)) { 1464 throw new IOException("Cannot create DistributedWebDB at " + nfs + ", as it already exists."); 1465 } 1466 1467 DataOutputStream out = new DataOutputStream(nfs.create(machineInfo)); 1471 try { 1472 out.write(MACHINE_INFO_VERSION); 1473 out.writeInt(totalMachines); 1474 } finally { 1475 out.close(); 1476 } 1477 1478 for (int i = 0; i < totalMachines; i++) { 1482 File webdbDir = new File(stdDir, "webdb"); 1483 File sectionDir = new File(webdbDir, "dbsection." + i); 1484 File pagesByURLFile = new File(sectionDir, PAGES_BY_URL); 1485 File pagesByMD5File = new File(sectionDir, PAGES_BY_MD5); 1486 File linksByURLFile = new File(sectionDir, LINKS_BY_URL); 1487 File linksByMD5File = new File(sectionDir, LINKS_BY_MD5); 1488 1489 new MapFile.Writer(nfs, pagesByURLFile.getPath(), new UTF8.Comparator(), Page.class).close(); 1494 new MapFile.Writer(nfs, pagesByMD5File.getPath(), new Page.Comparator(), NullWritable.class).close(); 1495 new MapFile.Writer(nfs, linksByURLFile.getPath(), new Link.UrlComparator(), NullWritable.class).close(); 1496 new MapFile.Writer(nfs, linksByMD5File.getPath(), new Link.MD5Comparator(), NullWritable.class).close(); 1497 } 1498 1499 File readyToUse = new File(stdDir, "readyToUse"); 1504 out = new DataOutputStream(nfs.create(readyToUse)); 1505 try { 1506 out.writeInt(READY_TO_USE); } finally { 1508 out.close(); 1509 } 1510 } 1511 1512 PageInstructionWriter piwriter = new PageInstructionWriter(); 1513 LinkInstructionWriter liwriter = new LinkInstructionWriter(); 1514 DataInputBuffer inBuf = new DataInputBuffer(); 1515 DataOutputBuffer outBuf = new DataOutputBuffer(); 1516 1517 NutchFileSystem nfs; 1518 File root, dbDir, oldDbDir, newDbDir, tmpDir; 1519 File localWriteLock, globalWriteLock, closeCounter, openCounter; 1520 1521 EditSectionGroupWriter pagesByURLWriter, pagesByMD5Writer, linksByURLWriter, linksByMD5Writer; 1522 MapFile.Reader pagesByURL, pagesByMD5, linksByURL, linksByMD5; 1523 long pagesByURLEdits = 0, pagesByMD5Edits = 0, linksByURLEdits = 0, linksByMD5Edits = 0, targetOutlinkEdits = 0; 1524 int machineNum, totalMachines; 1525 1526 1527 1530 public DistributedWebDBWriter(NutchFileSystem nfs, File root, int machineNum) throws IOException { 1531 this.nfs = nfs; 1535 this.root = root; 1536 this.machineNum = machineNum; 1537 1538 File stdDir = new File(root, "standard"); 1539 this.dbDir = new File(stdDir, "webdb"); 1540 this.oldDbDir = new File(stdDir, "webdb.old"); 1541 this.newDbDir = new File(stdDir, "webdb.new"); 1542 this.tmpDir = new File(newDbDir, "tmp"); 1543 1544 File readyToUse = new File(stdDir, "readyToUse"); 1548 while (! nfs.exists(readyToUse)) { 1549 try { 1550 Thread.sleep(2000); 1551 } catch (InterruptedException ie) { 1552 } 1553 } 1554 1555 1559 this.localWriteLock = new File(stdDir, "sectionLock." + machineNum); 1562 nfs.lock(localWriteLock, true); 1563 1564 this.globalWriteLock = new File(stdDir, "globalWriteLock"); 1573 1574 this.openCounter = new File(newDbDir, "openCounter"); 1578 this.closeCounter = new File(newDbDir, "closeCounter"); 1579 1580 1581 1585 File machineInfo = new File(stdDir, "machineinfo"); 1587 DataInputStream in = new DataInputStream(nfs.open(machineInfo)); 1588 try { 1589 in.read(); this.totalMachines = in.readInt(); 1591 } finally { 1592 in.close(); 1593 } 1594 1595 nfs.lock(globalWriteLock, true); 1599 1600 1629 1630 int numOpens = 0; 1634 1635 if (nfs.exists(openCounter)) { 1636 in = new DataInputStream(nfs.open(openCounter)); 1637 try { 1638 in.read(); numOpens = in.readInt(); 1640 } finally { 1641 in.close(); 1642 } 1643 } 1644 1645 DataOutputStream out = new DataOutputStream(nfs.create(openCounter, true)); 1647 try { 1648 out.write(OPEN_COUNTER_VERSION); 1649 out.writeInt(numOpens + 1); 1650 } finally { 1651 out.close(); 1652 } 1653 1654 if (numOpens == 0) { 1656 EditSectionGroupWriter.createEditGroup(nfs, dbDir, PAGES_BY_URL, totalMachines, EditSectionGroupWriter.URL_KEYSPACE); 1659 EditSectionGroupWriter.createEditGroup(nfs, dbDir, PAGES_BY_MD5, totalMachines, EditSectionGroupWriter.MD5_KEYSPACE); 1660 EditSectionGroupWriter.createEditGroup(nfs, dbDir, LINKS_BY_URL, totalMachines, EditSectionGroupWriter.URL_KEYSPACE); 1661 EditSectionGroupWriter.createEditGroup(nfs, dbDir, LINKS_BY_MD5, totalMachines, EditSectionGroupWriter.MD5_KEYSPACE); 1662 1663 File dirIsComplete = new File(dbDir, "dbIsComplete"); 1665 nfs.delete(dirIsComplete); 1666 } 1667 1668 File sectionDir = new File(dbDir, "dbsection." + machineNum); 1671 File pagesByURL = new File(sectionDir, PAGES_BY_URL); 1672 File pagesByMD5 = new File(sectionDir, PAGES_BY_MD5); 1673 File linksByURL = new File(sectionDir, LINKS_BY_URL); 1674 File linksByMD5 = new File(sectionDir, LINKS_BY_MD5); 1675 1676 nfs.release(globalWriteLock); 1680 1681 this.pagesByURL = new MapFile.Reader(nfs, pagesByURL.getPath(), new UTF8.Comparator()); 1683 this.pagesByMD5 = new MapFile.Reader(nfs, pagesByMD5.getPath(), new Page.Comparator()); 1684 this.linksByURL = new MapFile.Reader(nfs, linksByURL.getPath(), new Link.UrlComparator()); 1685 this.linksByMD5 = new MapFile.Reader(nfs, linksByMD5.getPath(), new Link.MD5Comparator()); 1686 1687 this.pagesByURLWriter = new EditSectionGroupWriter(nfs, machineNum, totalMachines, PAGES_BY_URL, PageInstruction.class, NullWritable.class, new EditSectionGroupWriter.PageURLExtractor()); 1690 this.pagesByMD5Writer = new EditSectionGroupWriter(nfs, machineNum, totalMachines, PAGES_BY_MD5, PageInstruction.class, NullWritable.class, new EditSectionGroupWriter.PageMD5Extractor()); 1691 this.linksByURLWriter = new EditSectionGroupWriter(nfs, machineNum, totalMachines, LINKS_BY_URL, LinkInstruction.class, NullWritable.class, new EditSectionGroupWriter.LinkURLExtractor()); 1692 this.linksByMD5Writer = new EditSectionGroupWriter(nfs, machineNum, totalMachines, LINKS_BY_MD5, LinkInstruction.class, NullWritable.class, new EditSectionGroupWriter.LinkMD5Extractor()); 1693 } 1694 1695 1698 public synchronized void close() throws IOException { 1699 1705 CloseProcessor pagesByURLProcessor = new PagesByURLProcessor(pagesByURL, pagesByURLWriter, pagesByMD5Writer); 1709 long numPBUItems = pagesByURLProcessor.closeDown(tmpDir, newDbDir); 1710 1711 CloseProcessor pagesByMD5Processor = new PagesByMD5Processor(pagesByMD5, pagesByMD5Writer); 1716 long numPBMItems = pagesByMD5Processor.closeDown(tmpDir, newDbDir); 1717 1718 CloseProcessor linksByMD5Processor = new LinksByMD5Processor(linksByMD5, linksByMD5Writer, linksByURLWriter); 1725 long numLBMItems = linksByMD5Processor.closeDown(tmpDir, newDbDir); 1726 1727 1734 EditSectionGroupWriter targetOutlinkEditsWriter = new EditSectionGroupWriter(nfs, machineNum, totalMachines, LINKS_BY_MD5, LinkInstruction.class, NullWritable.class, new EditSectionGroupWriter.LinkMD5Extractor()); 1737 1738 File newSectionDir = new File(newDbDir, "dbsection." + machineNum); 1740 File newPagesByURL = new File(newSectionDir, PAGES_BY_URL); 1741 1742 CloseProcessor linksByURLProcessor = new LinksByURLProcessor(linksByURL, linksByURLWriter, new MapFile.Reader(nfs, newPagesByURL.getPath(), new UTF8.Comparator()), targetOutlinkEditsWriter); 1743 long numLBUItems = linksByURLProcessor.closeDown(tmpDir, newDbDir); 1744 1745 if (numLBUItems != 0) { 1751 File newLinksByMD5 = new File(newSectionDir, LINKS_BY_MD5); 1762 MapFile.Reader linksByMD5ForStageTwo = new MapFile.Reader(nfs, newLinksByMD5.getPath(), new Link.MD5Comparator()); 1763 1764 File stageTwoDbDir = new File(newDbDir, "stage2.subdir"); 1765 CloseProcessor linksByMD5StageTwoProcessor = new LinksByMD5Processor(linksByMD5ForStageTwo, targetOutlinkEditsWriter, null); 1766 numLBMItems = linksByMD5StageTwoProcessor.closeDown(tmpDir, stageTwoDbDir); 1767 1768 linksByMD5ForStageTwo.close(); 1773 File stageOneLinksByMD5 = new File(newDbDir, LINKS_BY_MD5); 1774 File stageTwoLinksByMD5 = new File(stageTwoDbDir, LINKS_BY_MD5); 1775 nfs.delete(stageOneLinksByMD5); 1776 nfs.rename(stageTwoLinksByMD5, stageOneLinksByMD5); 1777 } 1778 1779 File sectionStats = new File(newSectionDir, STATS_FILE); 1783 DataOutputStream out = new DataOutputStream(nfs.create(sectionStats, true)); 1784 try { 1785 out.write(CUR_VERSION); 1799 out.writeLong(numPBUItems); 1800 out.writeLong(numLBMItems); 1801 } finally { 1802 out.close(); 1803 } 1804 1805 pagesByURL.close(); 1807 pagesByMD5.close(); 1808 linksByMD5.close(); 1809 linksByURL.close(); 1810 1811 1818 nfs.lock(globalWriteLock, true); 1825 1826 int numCloses = 0; 1830 if (nfs.exists(closeCounter)) { 1831 DataInputStream in = new DataInputStream(nfs.open(closeCounter)); 1832 try { 1833 in.read(); numCloses = in.readInt(); 1835 } finally { 1836 in.close(); 1837 } 1838 } 1839 if (numCloses == totalMachines) { 1840 throw new IOException("All the processors have already shut down. Impossible condition!"); 1841 } 1842 1843 out = new DataOutputStream(nfs.create(closeCounter, true)); 1845 try { 1846 out.write(CLOSE_COUNTER_VERSION); 1847 out.writeInt(numCloses + 1); 1848 } finally { 1849 out.close(); 1850 } 1851 1852 if (numCloses == totalMachines - 1) { 1854 for (int i = 0; i < totalMachines; i++) { 1856 new EditSectionGroupReader(nfs, PAGES_BY_URL, i, totalMachines).delete(); 1857 new EditSectionGroupReader(nfs, PAGES_BY_MD5, i, totalMachines).delete(); 1858 new EditSectionGroupReader(nfs, LINKS_BY_URL, i, totalMachines).delete(); 1859 new EditSectionGroupReader(nfs, LINKS_BY_MD5, i, totalMachines).delete(); 1860 } 1861 1862 File dirIsComplete = new File(newDbDir, "dbIsComplete"); 1867 out = new DataOutputStream(nfs.create(dirIsComplete)); 1868 try { 1869 out.writeInt(IS_COMPLETE); } finally { 1871 out.close(); 1872 } 1873 1874 1881 nfs.delete(tmpDir); 1883 1884 nfs.rename(dbDir, oldDbDir); 1886 1887 nfs.rename(newDbDir, dbDir); 1889 1890 nfs.delete(oldDbDir); 1892 } 1893 1894 nfs.release(globalWriteLock); 1896 nfs.release(localWriteLock); 1897 } 1898 1899 1903 1906 public synchronized void addPage(Page page) throws IOException { 1907 pagesByURLEdits++; 1909 piwriter.appendInstructionInfo(pagesByURLWriter, page, ADD_PAGE, NullWritable.get()); 1910 } 1911 1912 1915 public synchronized void addPageWithScore(Page page) throws IOException { 1916 pagesByURLEdits++; 1918 piwriter.appendInstructionInfo(pagesByURLWriter, page, ADD_PAGE_WITH_SCORE, NullWritable.get()); 1919 } 1920 1921 1924 public synchronized void addPageIfNotPresent(Page page) throws IOException { 1925 pagesByURLEdits++; 1927 piwriter.appendInstructionInfo(pagesByURLWriter, page, ADD_PAGE_IFN_PRESENT, NullWritable.get()); 1928 } 1929 1930 1936 public synchronized void addPageIfNotPresent(Page page, Link link) throws IOException { 1937 pagesByURLEdits++; 1939 piwriter.appendInstructionInfo(pagesByURLWriter, page, link, ADD_PAGE_IFN_PRESENT, NullWritable.get()); 1940 } 1941 1942 1945 public synchronized void deletePage(String url) throws IOException { 1946 Page p = new Page(); 1948 p.setURL(url); 1949 pagesByURLEdits++; 1950 piwriter.appendInstructionInfo(pagesByURLWriter, p, DEL_PAGE, NullWritable.get()); 1951 } 1952 1953 1956 public synchronized void addLink(Link lr) throws IOException { 1957 linksByMD5Edits++; 1958 liwriter.appendInstructionInfo(linksByMD5Writer, lr, ADD_LINK, NullWritable.get()); 1959 } 1960 1961 1964 private synchronized void deleteLink(MD5Hash md5) throws IOException { 1965 linksByMD5Edits++; 1966 liwriter.appendInstructionInfo(linksByMD5Writer, new Link(md5, 0, "", ""), DEL_LINK, NullWritable.get()); 1967 } 1968 1969 1973 public static void main(String argv[]) throws FileNotFoundException, IOException { 1974 if (argv.length < 2) { 1975 System.out.println("Usage: java net.nutch.db.DistributedWebDBWriter (-local | -ndfs <namenode:port>) <root> [-create <numProcessors>] | <machineInt> ([-addpage id url] | [-addpageifnp id url] | [-deletepage url] | [-addlink fromID url] | [-deletelink fromID])"); 1976 return; 1977 } 1978 1979 int i = 0; 1980 NutchFileSystem nfs = NutchFileSystem.parseArgs(argv, i); 1981 File root = new File(argv[i++]); 1982 1983 if ("-create".equals(argv[i])) { 1984 i++; 1985 DistributedWebDBWriter.createDB(nfs, root, Integer.parseInt(argv[i++])); 1986 System.out.println("Created webdb at " + nfs + ", " + root); 1987 } else { 1988 int machineNum = Integer.parseInt(argv[i++]); 1989 String cmd = argv[i++]; 1990 1991 if ("-addpage".equals(cmd)) { 1992 MD5Hash md5 = new MD5Hash(argv[i++]); 1993 String url = argv[i++]; 1994 1995 DistributedWebDBWriter writer = new DistributedWebDBWriter(nfs, root, machineNum); 1996 Page page = new Page(url, md5); 1997 writer.addPageWithScore(page); 1998 System.out.println("Added page (with score): " + page); 1999 writer.close(); 2000 } else if ("-addpageifnp".equals(cmd)) { 2001 MD5Hash md5 = new MD5Hash(argv[i++]); 2002 String url = argv[i++]; 2003 2004 DistributedWebDBWriter writer = new DistributedWebDBWriter(nfs, root, machineNum); 2005 try { 2006 Page page = new Page(url, md5); 2007 writer.addPageIfNotPresent(page); 2008 System.out.println("Added page: " + page); 2009 } finally { 2010 writer.close(); 2011 } 2012 } else if ("-deletepage".equals(cmd)) { 2013 String url = argv[i++]; 2014 DistributedWebDBWriter writer = new DistributedWebDBWriter(nfs, root, machineNum); 2015 2016 try { 2017 writer.deletePage(url.trim()); 2018 System.out.println("Deleted item(s)"); 2019 } finally { 2020 writer.close(); 2021 } 2022 } else if ("-addlink".equals(cmd)) { 2023 MD5Hash fromID = new MD5Hash(argv[i++]); 2024 String url = argv[i++]; 2025 DistributedWebDBWriter writer = new DistributedWebDBWriter(nfs, root, machineNum); 2026 2027 try { 2028 Link link = new Link(fromID, MD5Hash.digest("randomstring.com").halfDigest(), url, "SomeRandomAnchorText_" + System.currentTimeMillis()); 2029 writer.addLink(link); 2030 System.out.println("Added link: " + link); 2031 } finally { 2032 writer.close(); 2033 } 2034 } else if ("-deletelink".equals(cmd)) { 2035 MD5Hash fromID = new MD5Hash(argv[i++]); 2036 2037 DistributedWebDBWriter writer = new DistributedWebDBWriter(nfs, root, machineNum); 2038 try { 2039 writer.deleteLink(fromID); 2040 System.out.println("Deleted item(s)"); 2041 } finally { 2042 writer.close(); 2043 } 2044 } else { 2045 System.out.println("Sorry, no command with name " + cmd); 2046 } 2047 } 2048 } 2049} 2050 | Popular Tags |