| 1 2 3 4 package net.nutch.db; 5 6 import java.io.*; 7 import java.util.*; 8 import java.util.logging.*; 9 import java.nio.channels.*; 10 11 import net.nutch.io.*; 12 import net.nutch.fs.*; 13 import net.nutch.util.*; 14 import net.nutch.pagedb.*; 15 import net.nutch.linkdb.*; 16 17 33 public class DistributedWebDBWriter implements IWebDBWriter { 34 static final Logger LOG = LogFormatter.getLogger("net.nutch.db.WebDBWriter"); 35 static final byte CUR_VERSION = 0; 36 static final byte OPEN_COUNTER_VERSION = 0; 37 static final byte CLOSE_COUNTER_VERSION = 0; 38 static final byte MACHINE_INFO_VERSION = 0; 39 40 static int READY_TO_USE = 0xbabecafe; 42 static int IS_COMPLETE = 0xbabe0000; 43 static int WRITE_LOCK_INFO = 0xcafe0000; 44 static long LONG_TIMEOUT = 10 * 1000; 45 46 static final byte ADD_PAGE = 0; 48 static final byte ADD_PAGE_WITH_SCORE = 1; 49 static final byte ADD_PAGE_IFN_PRESENT = 2; 50 static final byte DEL_PAGE = 3; 51 static final int ADD_LINK = 0; 52 static final int DEL_LINK = 1; 53 static final int DEL_SINGLE_LINK = 2; 54 55 static final String PAGES_BY_URL = "pagesByURL"; 57 static final String PAGES_BY_MD5 = "pagesByMD5"; 58 static final String LINKS_BY_URL = "linksByURL"; 59 static final String LINKS_BY_MD5 = "linksByMD5"; 60 static final String STATS_FILE = "stats"; 61 static final String META_SHAREGROUP = "metashare"; 62 static final String METAINFO = "metainfo"; 63 64 static final int NO_OUTLINKS = 0; 66 static final int HAS_OUTLINKS = 1; 67 static final int LINK_INVALID = 2; 68 69 72 public static class PageInstruction implements WritableComparable { 73 byte opcode; 74 boolean hasLink; 75 Page page; 76 Link link; 77 78 80 public PageInstruction() {} 81 82 84 public PageInstruction(Page page, int opcode) { 85 set(page, opcode); 86 } 87 88 90 public PageInstruction(Page page, Link link, int opcode) { 91 set(page, link, opcode); 92 } 93 94 97 public void set(PageInstruction that) { 98 this.opcode = that.opcode; 99 100 if (this.page == null) { 101 this.page = new Page(); 102 } 103 this.page.set(that.page); 104 105 if (this.link == null) { 106 this.link = new Link(); 107 } 108 this.hasLink = that.hasLink; 109 if (this.hasLink) { 110 this.link.set(that.link); 111 } 112 } 113 114 117 public void set(Page page, int opcode) { 118 this.opcode = (byte) opcode; 119 this.page = page; 120 this.hasLink = false; 121 this.link = null; 122 } 123 124 127 public void set(Page page, Link link, int opcode) { 128 this.opcode = (byte) opcode; 129 this.page = page; 130 this.hasLink = true; 131 this.link = link; 132 } 133 134 public int compareTo(Object o) { 138 int pageResult = this.page.compareTo(((PageInstruction) o).page); 139 if (pageResult != 0) { 140 return pageResult; 141 } else { 142 return this.opcode - (((PageInstruction) o).opcode); 143 } 144 } 145 public void write(DataOutput out) throws IOException { 146 out.writeByte(opcode); 147 page.write(out); 148 out.writeByte(hasLink ? 1 : 0); 149 if (hasLink) { 150 link.write(out); 151 } 152 } 153 public void readFields(DataInput in) throws IOException { 154 opcode = in.readByte(); 155 if (page == null) { 156 page = new Page(); 157 } 158 page.readFields(in); 159 160 if (link == null) { 161 link = new Link(); 162 } 163 hasLink = (1 == in.readByte()); 164 if (hasLink) { 165 link.readFields(in); 166 } 167 } 168 public Page getPage() { 169 return page; 170 } 171 public Link getLink() { 172 if (hasLink) { 173 return link; 174 } else { 175 return null; 176 } 177 } 178 public int getInstruction() { 179 return opcode; 180 } 181 182 185 public static class PageComparator extends WritableComparator { 186 private static final Page.Comparator PAGE_COMPARATOR = 187 new Page.Comparator(); 188 189 public PageComparator() { super(PageInstruction.class); } 190 191 192 public int compare(byte[] b1, int s1, int l1, 193 byte[] b2, int s2, int l2) { 194 int opcode1 = b1[s1]; 195 int opcode2 = b2[s2]; 196 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 197 if (c != 0) 198 return c; 199 return opcode1 - opcode2; 200 } 201 } 202 203 206 public static class UrlComparator extends WritableComparator { 207 private static final Page.UrlComparator PAGE_COMPARATOR = 208 new Page.UrlComparator(); 209 210 public UrlComparator() { super(PageInstruction.class); } 211 212 216 public int compare(WritableComparable a, WritableComparable b) { 217 PageInstruction instructionA = (PageInstruction)a; 218 PageInstruction instructionB = (PageInstruction)b; 219 Page pageA = instructionA.getPage(); 220 Page pageB = instructionB.getPage(); 221 222 int result = pageA.getURL().compareTo(pageB.getURL()); 223 if (result != 0) { 224 return result; 225 } else { 226 return instructionA.opcode - instructionB.opcode; 227 } 228 } 229 230 233 public int compare(byte[] b1, int s1, int l1, 234 byte[] b2, int s2, int l2) { 235 int opcode1 = b1[s1]; 236 int opcode2 = b2[s2]; 237 int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 238 if (c != 0) 239 return c; 240 return opcode1 - opcode2; 241 } 242 } 243 } 244 245 250 public static class PageInstructionWriter { 251 PageInstruction pi = new PageInstruction(); 252 253 255 public PageInstructionWriter() { 256 } 257 258 262 public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Page page, int opcode, Writable val) throws IOException { 263 pi.set(page, opcode); 264 writer.append(pi, val); 265 } 266 267 271 public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Page page, Link link, int opcode, Writable val) throws IOException { 272 pi.set(page, link, opcode); 273 writer.append(pi, val); 274 } 275 } 276 277 282 private static class DeduplicatingPageSequenceReader { 283 SequenceFile.Reader edits; 284 PageInstruction current = new PageInstruction(); 285 UTF8 currentUrl = new UTF8(); 286 boolean haveCurrent; 287 288 290 public DeduplicatingPageSequenceReader(SequenceFile.Reader edits) throws IOException { 291 this.edits = edits; 292 this.haveCurrent = edits.next(current, NullWritable.get()); 293 } 294 295 297 public boolean next(PageInstruction result) throws IOException { 298 if (!haveCurrent) { 299 return false; 300 } 301 302 currentUrl.set(current.getPage().getURL()); 303 result.set(current); 305 do { 306 } while ((haveCurrent = edits.next(current, NullWritable.get())) && 308 currentUrl.compareTo(current.getPage().getURL()) == 0); 309 return true; 310 } 311 } 312 313 314 317 public static class LinkInstruction implements WritableComparable { 318 Link link; 319 int instruction; 320 321 323 public LinkInstruction() { 324 } 325 326 328 public LinkInstruction(Link link, int instruction) { 329 set(link, instruction); 330 } 331 332 335 public void set(LinkInstruction that) { 336 this.instruction = that.instruction; 337 338 if (this.link == null) 339 this.link = new Link(); 340 341 this.link.set(that.link); 342 } 343 344 347 public void set(Link link, int instruction) { 348 this.link = link; 349 this.instruction = instruction; 350 } 351 352 public int compareTo(Object o) { 356 return this.link.compareTo(((LinkInstruction) o).link); 357 } 358 public void write(DataOutput out) throws IOException { 359 out.writeByte(instruction); 360 link.write(out); 361 } 362 public void readFields(DataInput in) throws IOException { 363 this.instruction = in.readByte(); 364 if (link == null) 365 link = new Link(); 366 link.readFields(in); 367 } 368 public Link getLink() { 369 return link; 370 } 371 public int getInstruction() { 372 return instruction; 373 } 374 375 378 public static class MD5Comparator extends WritableComparator { 379 private static final Link.MD5Comparator MD5_COMPARATOR = 380 new Link.MD5Comparator(); 381 382 public MD5Comparator() { super(LinkInstruction.class); } 383 384 public int compare(WritableComparable a, WritableComparable b) { 385 LinkInstruction instructionA = (LinkInstruction)a; 386 LinkInstruction instructionB = (LinkInstruction)b; 387 return instructionA.link.md5Compare(instructionB.link); 388 } 389 390 391 public int compare(byte[] b1, int s1, int l1, 392 byte[] b2, int s2, int l2) { 393 return MD5_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 394 } 395 } 396 397 400 public static class UrlComparator extends WritableComparator { 401 private static final Link.UrlComparator URL_COMPARATOR = 402 new Link.UrlComparator(); 403 404 public UrlComparator() { super(LinkInstruction.class); } 405 406 public int compare(WritableComparable a, WritableComparable b) { 407 LinkInstruction instructionA = (LinkInstruction)a; 408 LinkInstruction instructionB = (LinkInstruction)b; 409 return instructionA.link.urlCompare(instructionB.link); 410 411 } 412 413 416 public int compare(byte[] b1, int s1, int l1, 417 byte[] b2, int s2, int l2) { 418 return URL_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1); 419 } 420 } 421 } 422 423 428 public static class LinkInstructionWriter { 429 LinkInstruction li = new LinkInstruction(); 430 431 433 public LinkInstructionWriter() { 434 } 435 436 440 public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Link link, int opcode, Writable val) throws IOException { 441 li.set(link, opcode); 442 writer.append(li, val); 443 } 444 } 445 446 451 class DeduplicatingLinkSequenceReader { 452 Link currentKey = new Link(); 453 LinkInstruction current = new LinkInstruction(); 454 SequenceFile.Reader edits; 455 boolean haveCurrent; 456 457 459 public DeduplicatingLinkSequenceReader(SequenceFile.Reader edits) throws IOException { 460 this.edits = edits; 461 this.haveCurrent = edits.next(current, NullWritable.get()); 462 } 463 464 465 469 public boolean next(LinkInstruction key) throws IOException { 470 if (! haveCurrent) { 471 return false; 472 } 473 474 currentKey.set(current.getLink()); 475 476 do { 477 key.set(current); 478 } while ((haveCurrent = edits.next(current, NullWritable.get())) && 479 currentKey.compareTo(current.getLink()) == 0); 480 return true; 481 } 482 } 483 484 485 495 private abstract class CloseProcessor { 496 String basename; 497 String curDBPart; 498 MapFile.Reader oldDb; 499 EditSectionGroupWriter editWriter; 500 SequenceFile.Sorter sorter; 501 WritableComparator comparator; 502 Class keyClass, valueClass; 503 long itemsWritten = 0; 504 505 508 CloseProcessor(String basename, MapFile.Reader oldDb, EditSectionGroupWriter editWriter, SequenceFile.Sorter sorter, WritableComparator comparator, Class keyClass, Class valueClass, String curDBPart) { 509 this.basename = basename; 510 this.oldDb = oldDb; 511 this.editWriter = editWriter; 512 this.sorter = sorter; 513 this.comparator = comparator; 514 this.keyClass = keyClass; 515 this.valueClass = valueClass; 516 this.curDBPart = curDBPart; 517 } 518 519 526 long closeDown(File workingDir, File outputDir) throws IOException { 527 editWriter.close(); 531 532 File sectionDir = new File(outputDir, "dbsection." + machineNum); 536 File newDbFile = new File(sectionDir, basename); 537 538 EditSectionGroupReader edits = new EditSectionGroupReader(nfs, basename, machineNum, totalMachines); 546 int numEdits = edits.numEdits(); 547 548 if (numEdits != 0) { 550 File mergedEditsFile = new File(sectionDir, "mergedEdits"); 551 edits.mergeSectionComponents(mergedEditsFile); 552 File sortedEditsFile = new File(mergedEditsFile.getPath() + ".sorted"); 553 554 long startSort = System.currentTimeMillis(); 556 sorter.sort(mergedEditsFile.getPath(), sortedEditsFile.getPath()); 557 long endSort = System.currentTimeMillis(); 558 559 LOG.info("Processing " + basename + ": Sorted " + numEdits + " instructions in " + ((endSort - startSort) / 1000.0) + " seconds."); 560 LOG.info("Processing " + basename + ": Sorted " + (numEdits / ((endSort - startSort) / 1000.0)) + " instructions/second"); 561 562 nfs.delete(mergedEditsFile); 564 565 572 SequenceFile.Reader sortedEdits = new SequenceFile.Reader(nfs, sortedEditsFile.getPath()); 574 575 MapFile.Writer newDb = (comparator == null) ? new MapFile.Writer(nfs, newDbFile.getPath(), keyClass, valueClass) : new MapFile.Writer(nfs, newDbFile.getPath(), comparator, valueClass); 577 578 oldDb.reset(); 581 582 long startMerge = System.currentTimeMillis(); 584 mergeEdits(oldDb, sortedEdits, newDb); 585 long endMerge = System.currentTimeMillis(); 586 LOG.info("Processing " + basename + ": Merged to new DB containing " + itemsWritten + " records in " + ((endMerge - startMerge) / 1000.0) + " seconds"); 587 LOG.info("Processing " + basename + ": Merged " + (itemsWritten / ((endMerge - startMerge) / 1000.0)) + " records/second"); 588 589 sortedEdits.close(); 591 newDb.close(); 592 593 nfs.delete(sortedEditsFile); 595 } else { 596 long startCopy = System.currentTimeMillis(); 599 600 File srcSectionDir = new File(oldDbDir, "dbsection." + machineNum); 601 File srcDbFile = new File(srcSectionDir, basename); 602 nfs.rename(srcDbFile, newDbFile); 603 long endCopy = System.currentTimeMillis(); 604 LOG.info("Processing " + basename + ": Copied file (" + srcDbFile.length()+ " bytes) in " + ((endCopy - startCopy) / 1000.0) + " secs."); 605 } 606 607 edits.delete(); 609 return itemsWritten; 610 } 611 612 616 abstract void mergeEdits(MapFile.Reader db, SequenceFile.Reader edits, MapFile.Writer newDb) throws IOException; 617 } 618 619 624 private class PagesByURLProcessor extends CloseProcessor { 625 EditSectionGroupWriter futureEdits; 626 627 631 PagesByURLProcessor(MapFile.Reader db, EditSectionGroupWriter editWriter, EditSectionGroupWriter futureEdits) { 632 super(PAGES_BY_URL, db, editWriter, new SequenceFile.Sorter(nfs, new PageInstruction.UrlComparator(), NullWritable.class), new UTF8.Comparator(), null, Page.class, "PagesByURLPart"); 633 this.futureEdits = futureEdits; 634 } 635 636 639 void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException { 640 DeduplicatingPageSequenceReader edits = new DeduplicatingPageSequenceReader(sortedEdits); 642 WritableComparable readerKey = new UTF8(); 643 Page readerVal = new Page(); 644 PageInstruction editItem = new PageInstruction(); 645 int futureOrdering = 0; 646 647 boolean hasEntries = db.next(readerKey, readerVal); 649 boolean hasEdits = edits.next(editItem); 650 651 while (hasEntries && hasEdits) { 654 int comparison = readerKey.compareTo(editItem.getPage().getURL()); 655 int curInstruction = editItem.getInstruction(); 656 657 if ((curInstruction == ADD_PAGE) || 659 (curInstruction == ADD_PAGE_WITH_SCORE) || 660 (curInstruction == ADD_PAGE_IFN_PRESENT)) { 661 662 if (comparison < 0) { 663 newDb.append(readerKey, readerVal); 666 itemsWritten++; 667 hasEntries = db.next(readerKey, readerVal); 668 } else if (comparison == 0) { 669 if ((curInstruction == ADD_PAGE) || 677 (curInstruction == ADD_PAGE_WITH_SCORE)) { 678 pagesByMD5Edits++; 688 689 Page editItemPage = editItem.getPage(); 702 703 if (curInstruction == ADD_PAGE) { 704 editItemPage.setScore(readerVal.getScore(), readerVal.getNextScore()); 705 } 706 707 piwriter.appendInstructionInfo(futureEdits, editItemPage, ADD_PAGE, NullWritable.get()); 708 709 newDb.append(editItemPage.getURL(), editItemPage); 713 714 if (editItemPage.compareTo(readerVal) != 0) { 725 pagesByMD5Edits++; 726 piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get()); 727 } 728 729 itemsWritten++; 730 731 hasEntries = db.next(readerKey, readerVal); 733 } else { 734 } 739 hasEdits = edits.next(editItem); 741 742 } else if (comparison > 0) { 743 pagesByMD5Edits++; 751 752 if (curInstruction == ADD_PAGE_IFN_PRESENT) { 757 Link editLink = editItem.getLink(); 758 if (editLink != null) { 759 addLink(editLink); 760 } 761 } 762 piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get()); 763 764 newDb.append(editItem.getPage().getURL(), editItem.getPage()); 767 itemsWritten++; 768 769 hasEdits = edits.next(editItem); 771 } 772 } else if (curInstruction == DEL_PAGE) { 773 if (comparison < 0) { 774 newDb.append(readerKey, readerVal); 777 itemsWritten++; 778 hasEntries = db.next(readerKey, readerVal); 779 } else if (comparison == 0) { 780 pagesByMD5Edits++; 785 piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get()); 786 787 hasEntries = db.next(readerKey, readerVal); 791 792 hasEdits = edits.next(editItem); 794 } else if (comparison > 0) { 795 hasEdits = edits.next(editItem); 798 } 799 } 800 } 801 802 while (! hasEntries && hasEdits) { 804 int curInstruction = editItem.getInstruction(); 805 if (curInstruction == ADD_PAGE || 806 curInstruction == ADD_PAGE_WITH_SCORE || 807 &
|