| 1 2 3 4 package net.nutch.db; 5 6 import java.io.*; 7 import java.util.*; 8 import java.util.logging.*; 9 10 import net.nutch.io.*; 11 import net.nutch.fs.*; 12 import net.nutch.util.*; 13 import net.nutch.pagedb.*; 14 import net.nutch.linkdb.*; 15 16 32 public class WebDBWriter implements IWebDBWriter { 33 static final Logger LOG = LogFormatter.getLogger("net.nutch.db.WebDBWriter"); 34 static final byte CUR_VERSION = 0; 35 36 static final byte ADD_PAGE = 0; 38 static final byte ADD_PAGE_WITH_SCORE = 1; 39 static final byte ADD_PAGE_IFN_PRESENT = 2; 40 static final byte DEL_PAGE = 3; 41 static final int ADD_LINK = 0; 42 static final int DEL_LINK = 1; 43 static final int DEL_SINGLE_LINK = 2; 44 45 static final String PAGES_BY_URL = "pagesByURL"; 47 static final String PAGES_BY_MD5 = "pagesByMD5"; 48 static final String LINKS_BY_URL = "linksByURL"; 49 static final String LINKS_BY_MD5 = "linksByMD5"; 50 static final String STATS_FILE = "stats"; 51 52 static final int NO_OUTLINKS = 0; 54 static final int HAS_OUTLINKS = 1; 55 static final int LINK_INVALID = 2; 56 57 60 public static class PageInstruction implements WritableComparable { 61 byte opcode; 62 boolean hasLink; 63 Page page; 64 Link link; 65 66 68 public PageInstruction() {} 69 70 72 public PageInstruction(Page page, int opcode) { 73 set(page, opcode); 74 } 75 76 78 public PageInstruction(Page page, Link link, int opcode) { 79 set(page, link, opcode); 80 } 81 82 85 public void set(PageInstruction that) { 86 this.opcode = that.opcode; 87 88 if (this.page == null) { 89 this.page = new Page(); 90 } 91 this.page.set(that.page); 92 93 if (this.link == null) { 94 this.link = new Link(); 95 } 96 this.hasLink = that.hasLink; 97 if (this.hasLink) { 98 this.link.set(that.link); 99 } 100 } 101 102 105 public void set(Page page, int opcode) { 106 this.opcode = (byte) opcode; 107 this.page = page; 108 this.hasLink = false; 109 this.link = null; 110 } 111 112 115 public void set(Page page, Link link, int opcode) { 116 this.opcode = (byte) opcode; 117 this.page = page; 118 this.hasLink = true; 119 this.link = link; 120 } 121 122 public int compareTo(Object o) { 126 int pageResult = this.page.compareTo(((PageInstruction) o).page); 127 if (pageResult != 0) { 128 return pageResult; 129 } else { 130 return this.opcode - (((PageInstruction) o).opcode); 131 } 132 } 133 public void write(DataOutput out) throws IOException { 134 out.writeByte(opcode); 135 page.write(out); 136 out.writeByte(hasLink ? 1 : 0); 137 if (hasLink) { 138 link.write(out); 139 } 140 } 141 public void readFields(DataInput in) throws IOException { 142 opcode = in.readByte(); 143 if (page == null) { 144 page = new Page(); 145 } 146 page.readFields(in); 147 148 if (link == null) { 149 link = new Link(); 150 } 151 hasLink = (1 == in.readByte()); 152 if (hasLink) { 153 link.readFields(in); 154 } 155 } 156 public Page getPage() { 157 return page; 158 } 159 public Link getLink() { 160 if (hasLink) { 161 return link; 162 } else { 163 return null; 164 } 165 } 166 public int getInstruction() { 167 return opcode; 168 } 169 170 173 public static class PageComparator extends WritableComparator { 174 private static final Page.Comparator PAGE_COMPARATOR = 175 new Page.Comparator(); 176 177 public PageComparator() { super(PageInstruction.class); } 178 179 180 public int compare(byte[] b1, int s1, int l1, 181 byte[] b2, int s2, int l2) { 182 int opcode1 = b1[s1]; 183 int opcode2 = b2[s2]; 184 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 185 if (c != 0) 186 return c; 187 return opcode1 - opcode2; 188 } 189 } 190 191 194 public static class UrlComparator extends WritableComparator { 195 private static final Page.UrlComparator PAGE_COMPARATOR = 196 new Page.UrlComparator(); 197 198 public UrlComparator() { super(PageInstruction.class); } 199 200 204 public int compare(WritableComparable a, WritableComparable b) { 205 PageInstruction instructionA = (PageInstruction)a; 206 PageInstruction instructionB = (PageInstruction)b; 207 Page pageA = instructionA.getPage(); 208 Page pageB = instructionB.getPage(); 209 210 int result = pageA.getURL().compareTo(pageB.getURL()); 211 if (result != 0) { 212 return result; 213 } else { 214 return instructionA.opcode - instructionB.opcode; 215 } 216 } 217 218 221 public int compare(byte[] b1, int s1, int l1, 222 byte[] b2, int s2, int l2) { 223 int opcode1 = b1[s1]; 224 int opcode2 = b2[s2]; 225 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 226 if (c != 0) 227 return c; 228 return opcode1 - opcode2; 229 } 230 } 231 } 232 233 238 public static class PageInstructionWriter { 239 PageInstruction pi = new PageInstruction(); 240 241 243 public PageInstructionWriter() { 244 } 245 246 250 public synchronized void appendInstructionInfo(SequenceFile.Writer writer, Page page, int opcode, Writable val) throws IOException { 251 pi.set(page, opcode); 252 writer.append(pi, val); 253 } 254 255 259 public synchronized void appendInstructionInfo(SequenceFile.Writer writer, Page page, Link link, int opcode, Writable val) throws IOException { 260 pi.set(page, link, opcode); 261 writer.append(pi, val); 262 } 263 } 264 265 270 private static class DeduplicatingPageSequenceReader { 271 SequenceFile.Reader edits; 272 PageInstruction current = new PageInstruction(); 273 UTF8 currentUrl = new UTF8(); 274 boolean haveCurrent; 275 276 278 public DeduplicatingPageSequenceReader(SequenceFile.Reader edits) throws IOException { 279 this.edits = edits; 280 this.haveCurrent = edits.next(current, NullWritable.get()); 281 } 282 283 285 public boolean next(PageInstruction result) throws IOException { 286 if (!haveCurrent) { 287 return false; 288 } 289 290 currentUrl.set(current.getPage().getURL()); 291 result.set(current); 293 do { 294 } while ((haveCurrent = edits.next(current, NullWritable.get())) && 296 currentUrl.compareTo(current.getPage().getURL()) == 0); 297 return true; 298 } 299 } 300 301 302 305 public static class LinkInstruction implements WritableComparable { 306 Link link; 307 int instruction; 308 309 311 public LinkInstruction() { 312 } 313 314 316 public LinkInstruction(Link link, int instruction) { 317 set(link, instruction); 318 } 319 320 323 public void set(LinkInstruction that) { 324 this.instruction = that.instruction; 325 326 if (this.link == null) 327 this.link = new Link(); 328 329 this.link.set(that.link); 330 } 331 332 335 public void set(Link link, int instruction) { 336 this.link = link; 337 this.instruction = instruction; 338 } 339 340 public int compareTo(Object o) { 344 return this.link.compareTo(((LinkInstruction) o).link); 345 } 346 public void write(DataOutput out) throws IOException { 347 out.writeByte(instruction); 348 link.write(out); 349 } 350 public void readFields(DataInput in) throws IOException { 351 this.instruction = in.readByte(); 352 if (link == null) 353 link = new Link(); 354 link.readFields(in); 355 } 356 public Link getLink() { 357 return link; 358 } 359 public int getInstruction() { 360 return instruction; 361 } 362 363 366 public static class MD5Comparator extends WritableComparator { 367 private static final Link.MD5Comparator MD5_COMPARATOR = 368 new Link.MD5Comparator(); 369 370 public MD5Comparator() { super(LinkInstruction.class); } 371 372 public int compare(WritableComparable a, WritableComparable b) { 373 LinkInstruction instructionA = (LinkInstruction)a; 374 LinkInstruction instructionB = (LinkInstruction)b; 375 return instructionA.link.md5Compare(instructionB.link); 376 } 377 378 379 public int compare(byte[] b1, int s1, int l1, 380 byte[] b2, int s2, int l2) { 381 return MD5_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 382 } 383 } 384 385 388 public static class UrlComparator extends WritableComparator { 389 private static final Link.UrlComparator URL_COMPARATOR = 390 new Link.UrlComparator(); 391 392 public UrlComparator() { super(LinkInstruction.class); } 393 394 public int compare(WritableComparable a, WritableComparable b) { 395 LinkInstruction instructionA = (LinkInstruction)a; 396 LinkInstruction instructionB = (LinkInstruction)b; 397 return instructionA.link.urlCompare(instructionB.link); 398 399 } 400 401 404 public int compare(byte[] b1, int s1, int l1, 405 byte[] b2, int s2, int l2) { 406 return URL_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 407 } 408 } 409 } 410 411 416 public static class LinkInstructionWriter { 417 LinkInstruction li = new LinkInstruction(); 418 419 421 public LinkInstructionWriter() { 422 } 423 424 428 public synchronized void appendInstructionInfo(SequenceFile.Writer writer, Link link, int opcode, Writable val) throws IOException { 429 li.set(link, opcode); 430 writer.append(li, val); 431 } 432 } 433 434 439 class DeduplicatingLinkSequenceReader { 440 Link currentKey = new Link(); 441 LinkInstruction current = new LinkInstruction(); 442 SequenceFile.Reader edits; 443 boolean haveCurrent; 444 445 447 public DeduplicatingLinkSequenceReader(SequenceFile.Reader edits) throws IOException { 448 this.edits = edits; 449 this.haveCurrent = edits.next(current, NullWritable.get()); 450 } 451 452 453 457 public boolean next(LinkInstruction key) throws IOException { 458 if (! haveCurrent) { 459 return false; 460 } 461 462 currentKey.set(current.getLink()); 463 464 do { 465 key.set(current); 466 } while ((haveCurrent = edits.next(current, NullWritable.get())) && 467 currentKey.compareTo(current.getLink()) == 0); 468 return true; 469 } 470 } 471 472 473 483 private abstract class CloseProcessor { 484 String basename; 485 MapFile.Reader oldDb; 486 SequenceFile.Writer editWriter; 487 SequenceFile.Sorter sorter; 488 WritableComparator comparator; 489 Class keyClass, valueClass; 490 long itemsWritten = 0; 491 492 495 CloseProcessor(String basename, MapFile.Reader oldDb, SequenceFile.Writer editWriter, SequenceFile.Sorter sorter, WritableComparator comparator, Class keyClass, Class valueClass) { 496 this.basename = basename; 497 this.oldDb = oldDb; 498 this.editWriter = editWriter; 499 this.sorter = sorter; 500 this.comparator = comparator; 501 this.keyClass = keyClass; 502 this.valueClass = valueClass; 503 } 504 505 512 long closeDown(File workingDir, File outputDir, long numEdits) throws IOException { 513 File editsFile = new File(workingDir, basename + ".out"); 514 File newDbFile = new File(outputDir, basename); 515 File sortedEditsFile = new File(editsFile.getPath() + ".sorted"); 516 editWriter.close(); 517 518 if (numEdits != 0) { 520 long startSort = System.currentTimeMillis(); 522 sorter.sort(editsFile.getPath(), sortedEditsFile.getPath()); 523 long endSort = System.currentTimeMillis(); 525 LOG.info("Processing " + basename + ": Sorted " + numEdits + " instructions in " + ((endSort - startSort) / 1000.0) + " seconds."); 526 LOG.info("Processing " + basename + ": Sorted " + (numEdits / ((endSort - startSort) / 1000.0)) + " instructions/second"); 527 528 fs.delete(editsFile); 530 fs.rename(sortedEditsFile, editsFile); 531 532 SequenceFile.Reader sortedEdits = new SequenceFile.Reader(fs, editsFile.getPath()); 534 535 MapFile.Writer newDb = (comparator == null) ? new MapFile.Writer(fs, newDbFile.getPath(), keyClass, valueClass) : new MapFile.Writer(fs, newDbFile.getPath(), comparator, valueClass); 537 538 oldDb.reset(); 541 542 long startMerge = System.currentTimeMillis(); 544 mergeEdits(oldDb, sortedEdits, newDb); 545 long endMerge = System.currentTimeMillis(); 546 LOG.info("Processing " + basename + ": Merged to new DB containing " + itemsWritten + " records in " + ((endMerge - startMerge) / 1000.0) + " seconds"); 547 LOG.info("Processing " + basename + ": Merged " + (itemsWritten / ((endMerge - startMerge) / 1000.0)) + " records/second"); 548 549 sortedEdits.close(); 551 newDb.close(); 552 } else { 553 long startCopy = System.currentTimeMillis(); 556 File curFile = new File(dbFile, basename); 557 FileUtil.recursiveCopy(fs, curFile, newDbFile); 558 long endCopy = System.currentTimeMillis(); 559 560 LOG.info("Processing " + basename + ": Copied file (" + newDbFile.length()+ " bytes) in " + ((endCopy - startCopy) / 1000.0) + " secs."); 561 } 562 563 fs.delete(editsFile); 565 566 return itemsWritten; 567 } 568 569 573 abstract void mergeEdits(MapFile.Reader db, SequenceFile.Reader edits, MapFile.Writer newDb) throws IOException; 574 } 575 576 581 private class PagesByURLProcessor extends CloseProcessor { 582 SequenceFile.Writer futureEdits; 583 584 588 PagesByURLProcessor(MapFile.Reader db, SequenceFile.Writer editWriter, SequenceFile.Writer futureEdits) { 589 super(PAGES_BY_URL, db, editWriter, new SequenceFile.Sorter(fs, new PageInstruction.UrlComparator(), NullWritable.class), new UTF8.Comparator(), null, Page.class); 590 this.futureEdits = futureEdits; 591 } 592 593 596 void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException { 597 DeduplicatingPageSequenceReader edits = new DeduplicatingPageSequenceReader(sortedEdits); 599 WritableComparable readerKey = new UTF8(); 600 Page readerVal = new Page(); 601 PageInstruction editItem = new PageInstruction(); 602 int futureOrdering = 0; 603 604 boolean hasEntries = db.next(readerKey, readerVal); 606 boolean hasEdits = edits.next(editItem); 607 608 while (hasEntries && hasEdits) { 611 int comparison = readerKey.compareTo(editItem.getPage().getURL()); 612 int curInstruction = editItem.getInstruction(); 613 614 if ((curInstruction == ADD_PAGE) || 616 (curInstruction == ADD_PAGE_WITH_SCORE) || 617 (curInstruction == ADD_PAGE_IFN_PRESENT)) { 618 619 if (comparison < 0) { 620 newDb.append(readerKey, readerVal); 623 itemsWritten++; 624 hasEntries = db.next(readerKey, readerVal); 625 } else if (comparison == 0) { 626 if ((curInstruction == ADD_PAGE) || 634 (curInstruction == ADD_PAGE_WITH_SCORE)) { 635 pagesByMD5Edits++; 645 646 Page editItemPage = editItem.getPage(); 659 660 if (curInstruction == ADD_PAGE) { 661 editItemPage.setScore(readerVal.getScore(), readerVal.getNextScore()); 662 } 663 664 piwriter.appendInstructionInfo(futureEdits, editItemPage, ADD_PAGE, NullWritable.get()); 665 666 newDb.append(editItemPage.getURL(), editItemPage); 670 671 if (editItemPage.compareTo(readerVal) != 0) { 682 pagesByMD5Edits++; 683 piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get()); 684 } 685 686 itemsWritten++; 687 688 hasEntries = db.next(readerKey, readerVal); 690 } else { 691 } 696 hasEdits = edits.next(editItem); 698 699 } else if (comparison > 0) { 700 pagesByMD5Edits++; 708 709 if (curInstruction == ADD_PAGE_IFN_PRESENT) { 714 Link editLink = editItem.getLink(); 715 if (editLink != null) { 716 addLink(editLink); 717 } 718 } 719 piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get()); 720 721 newDb.append(editItem.getPage().getURL(), editItem.getPage()); 724 itemsWritten++; 725 726 hasEdits = edits.next(editItem); 728 } 729 } else if (curInstruction == DEL_PAGE) { 730 if (comparison < 0) { 731 newDb.append(readerKey, readerVal); 734 itemsWritten++; 735 hasEntries = db.next(readerKey, readerVal); 736 } else if (comparison == 0) { 737 pagesByMD5Edits++; 742 piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get()); 743 744 hasEntries = db.next(readerKey, readerVal); 748 749 hasEdits = edits.next(editItem); 751 } else if (comparison > 0) { 752 hasEdits = edits.next(editItem); 755 } 756 } 757 } 758 759 while (! hasEntries && hasEdits) { 761 int curInstruction = editItem.getInstruction(); 762 if (curInstruction == ADD_PAGE || 763 curInstruction == ADD_PAGE_WITH_SCORE || 764 curInstruction == ADD_PAGE_IFN_PRESENT) { 765 768 pagesByMD5Edits++; 770 771 if (curInstruction == ADD_PAGE_IFN_PRESENT) { 776 Link editLink = editItem.getLink(); 777 if (editLink != null) { 778 addLink(editLink); 779 } 780 } 781 piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get()); 782 783 newDb.append(editItem.getPage().getURL(), editItem.getPage()); 785 itemsWritten++; 786 } else if (curInstruction == DEL_PAGE) { 787 } 790 791 hasEdits = edits.next(editItem); 793 } 794 795 while (hasEntries && ! hasEdits) { 798 newDb.append(readerKey, readerVal); 799 itemsWritten++; 800 hasEntries = db.next(readerKey, readerVal); 801 } 802 } 803 } 804 805 810 private class PagesByMD5Processor extends CloseProcessor { 811 813 PagesByMD5Processor(MapFile.Reader db, SequenceFile.Writer editWriter) { 814 super(PAGES_BY_MD5, db, editWriter, new SequenceFile.Sorter(fs, new PageInstruction.PageComparator(),
|