1 2 3 4 package net.nutch.tools; 5 6 import java.io.*; 7 import java.util.*; 8 import java.util.logging.*; 9 10 import net.nutch.io.*; 11 import net.nutch.db.*; 12 import net.nutch.fs.*; 13 import net.nutch.net.*; 14 import net.nutch.util.*; 15 import net.nutch.linkdb.*; 16 import net.nutch.pagedb.*; 17 import net.nutch.fetcher.*; 18 19 50 public class DistributedAnalysisTool { 51 final private static String ASSIGN_FILE_PREFIX = "assignment"; 52 final private static String SCORE_EDITS_FILE_PREFIX = "scoreEdits"; 53 final private static String ASSIGN_COMPLETE = "assignComplete"; 54 55 final private static float DEFAULT_SCORE = 0.15f; 56 final private static float DECAY_VALUE = 0.85f; 57 58 public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.DistributedAnalysisTool"); 59 60 66 class EditSet { 67 File distDir; 68 int numEditFiles; 69 int curEditFile; 70 SequenceFile.Reader curReader; 71 72 76 public EditSet(File distDir, int numEditFiles) throws IOException { 77 this.distDir = distDir; 78 this.numEditFiles = numEditFiles; 79 this.curEditFile = 0; 80 getNextReader(); 81 } 82 83 88 public synchronized boolean next(Writable key, Writable val) throws IOException { 89 if (curReader == null) { 93 getNextReader(); 94 if (curReader == null) { 96 return false; 97 } 98 } 99 return curReader.next(key, val); 100 } 101 102 105 private void getNextReader() throws IOException { 106 if (curReader != null) { 107 curReader.close(); 108 } 109 110 if (curEditFile < numEditFiles) { 111 curReader = new SequenceFile.Reader(nfs, new File(distDir, SCORE_EDITS_FILE_PREFIX + "." + curEditFile + ".sorted").getPath()); 112 LOG.info("Opened stream to file " + curEditFile); 113 curEditFile++; 114 } 115 } 116 117 119 public synchronized void close() throws IOException { 120 if (curReader != null) { 121 curReader.close(); 122 } 123 curEditFile = numEditFiles; 124 } 125 } 126 127 131 class ScoreValue implements Writable { 132 float score; 133 float nextScore; 134 135 137 public ScoreValue() { 138 } 139 141 public void setScore(float f) { 142 this.score = f; 143 } 144 146 public void setNextScore(float f) { 147 this.nextScore = f; 148 } 149 150 152 public float score() { 153 return score; 154 } 155 157 public float nextScore() { 158 return nextScore; 159 } 160 161 163 public void write(DataOutput out) throws IOException { 164 out.writeFloat(score); 165 out.writeFloat(nextScore); 166 } 167 168 170 public void readFields(DataInput in) throws IOException { 171 this.score = in.readFloat(); 172 this.nextScore = in.readFloat(); 173 } 174 } 175 176 NutchFileSystem nfs; 177 File dbDir; 178 179 182 public DistributedAnalysisTool(NutchFileSystem nfs, File dbDir) throws IOException, FileNotFoundException { 183 this.nfs = nfs; 184 this.dbDir = dbDir; 185 } 186 187 196 public boolean initRound(int numProcesses, File distDir) throws IOException { 197 if ((nfs.exists(distDir) && nfs.isFile(distDir)) || 201 (nfs.exists(distDir) && (nfs.listFiles(distDir).length != 0))) { 202 LOG.severe("Must be an empty or non-existent dir: " + distDir); 203 return false; 204 } 205 if (! nfs.exists(distDir)) { 206 nfs.mkdirs(distDir); 207 } 208 209 long startPages[] = new long[numProcesses]; 214 long totalPages = 0; 215 IWebDBReader reader = new WebDBReader(nfs, dbDir); 216 try { 217 totalPages = reader.numPages(); 218 } finally { 219 reader.close(); 220 } 221 long chunkSize = totalPages / numProcesses; 222 long pagesProcessedSoFar = 0; 223 224 startPages[0] = 0; 230 for (int i = 1; i < numProcesses; i++) { 231 startPages[i] = startPages[i-1] + chunkSize; 232 } 233 234 try { 238 for (int i = 0; i < numProcesses; i++) { 240 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(nfs.create(new File(distDir, ASSIGN_FILE_PREFIX + "." + i)))); 241 try { 242 out.writeLong(startPages[i]); 244 245 if (i != numProcesses - 1) { 247 out.writeLong(chunkSize); 248 } else { 249 out.writeLong(totalPages - ((numProcesses - 1) * chunkSize)); 251 } 252 } finally { 253 out.close(); 254 } 255 } 256 257 File completeFile = new File(distDir, "assignComplete"); 266 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(nfs.create(completeFile))); 267 try { 268 out.writeInt(numProcesses); 269 out.writeLong(totalPages); 270 271 long extent[] = new long[numProcesses]; 273 for (int i = 0; i < numProcesses - 1; i++) { 274 extent[i] = chunkSize * (i + 1); 275 } 276 extent[numProcesses-1] = totalPages - (chunkSize * (numProcesses - 1)); 277 278 for (int i = 0; i < extent.length; i++) { 280 out.writeLong(extent[i]); 281 } 282 } finally { 283 out.close(); 284 } 285 return true; 286 } catch (IOException ex) { 287 LOG.severe(ex.toString()); 288 LOG.severe("Sorry, could not finish assignments"); 289 } 290 return false; 291 } 292 293 304 public void computeRound(int processId, File distDir) throws IOException { 305 File assignFile = new File(distDir, ASSIGN_FILE_PREFIX + "." + processId); 306 307 long startIndex = 0, extent = 0; 308 DataInputStream in = new DataInputStream(new BufferedInputStream(nfs.open(assignFile))); 309 try { 310 startIndex = in.readLong(); 311 extent = in.readLong(); 312 } finally { 313 in.close(); 314 } 315 316 LOG.info("Start at: "+ startIndex); 317 LOG.info("Extent: "+ extent); 318 319 File scoreEdits = new File(distDir, SCORE_EDITS_FILE_PREFIX + "." + processId); 324 SequenceFile.Writer scoreWriter = new SequenceFile.Writer(nfs, scoreEdits.getPath() + ".unsorted", UTF8.class, ScoreValue.class); 325 326 try { 331 long curIndex = 0; 333 ScoreValue score = new ScoreValue(); 334 IWebDBReader reader = new WebDBReader(nfs, dbDir); 335 try { 336 for (Enumeration e = reader.pagesByMD5(); e.hasMoreElements(); curIndex++) { 337 if (curIndex < startIndex) { 341 e.nextElement(); 342 continue; 343 } 344 345 if (curIndex - startIndex > extent) { 349 break; 350 } 351 352 Page curPage = (Page) e.nextElement(); 356 Link outLinks[] = reader.getLinks(curPage.getMD5()); 357 int targetOutlinkers = 0; 358 for (int i = 0; i < outLinks.length; i++) { 359 if (outLinks[i].targetHasOutlink()) { 360 targetOutlinkers++; 361 } 362 } 363 364 370 float curNextScore = curPage.getNextScore(); 375 if (outLinks.length > 0 && curNextScore == 0.0f) { 376 curNextScore = curPage.getScore(); 377 } 378 379 float contributionForAll = (outLinks.length > 0) ? (curNextScore / outLinks.length) : 0.0f; 383 float contributionForOutlinkers = (targetOutlinkers > 0) ? (curNextScore / targetOutlinkers) : 0.0f; 384 for (int i = 0; i < outLinks.length; i++) { 385 score.setScore(contributionForAll); 387 score.setNextScore(outLinks[i].targetHasOutlink() ? contributionForOutlinkers : 0.0f); 388 scoreWriter.append(outLinks[i].getURL(), score); 389 } 390 391 if (((curIndex - startIndex) % 5000) == 0) { 392 LOG.info("Pages consumed: " + (curIndex - startIndex) + " (at index " + curIndex + ")"); 393 } 394 } 395 } finally { 396 reader.close(); 397 } 398 } finally { 399 scoreWriter.close(); 400 } 401 402 SequenceFile.Sorter sorter = new SequenceFile.Sorter(nfs, new UTF8.Comparator(), ScoreValue.class); 404 sorter.sort(scoreEdits.getPath() + ".unsorted", scoreEdits.getPath() + ".sorted"); 405 nfs.delete(new File(scoreEdits.getPath() + ".unsorted")); 406 } 407 408 409 425 public void completeRound(File distDir, File scoreFile) throws IOException { 426 int numProcesses = 0; 432 long totalPages = 0; 433 long extent[] = null; 434 File overall = new File(distDir, "assignComplete"); 435 DataInputStream in = new DataInputStream(new BufferedInputStream(nfs.open(overall))); 436 try { 437 numProcesses = in.readInt(); 438 totalPages = in.readLong(); 439 extent = new long[numProcesses]; 440 for (int i = 0; i < numProcesses; i++) { 441 extent[i] = in.readLong(); 442 } 443 } finally { 444 in.close(); 445 in = null; 446 } 447 448 ScoreStats scoreStats = new ScoreStats(); 453 IWebDBReader reader = new WebDBReader(nfs, dbDir); 454 IWebDBWriter writer = new WebDBWriter(nfs, dbDir); 455 EditSet editSet = new EditSet(distDir, numProcesses); 456 try { 457 int count = 0; 458 UTF8 curEditURL = new UTF8(); 459 ScoreValue curContribution = new ScoreValue(); 460 boolean hasEdit = editSet.next(curEditURL, curContribution); 461 462 for (Enumeration e = reader.pages(); e.hasMoreElements(); count++) { 467 Page curPage = (Page) e.nextElement(); 468 if (! hasEdit) { 469 break; 470 } 471 472 int comparison = curPage.getURL().compareTo(curEditURL); 477 float newScore = 0.0f, newNextScore = 0.0f; 478 if (comparison < 0) { 479 newScore = (1 - DECAY_VALUE); 489 newNextScore = (1 - DECAY_VALUE); 490 } else if (comparison > 0) { 491 throw new IOException("Impossible situation. There is a score-edit for " + curEditURL + ", which comes after the current Page " + curPage.getURL()); 495 } else { 496 while (hasEdit && curPage.getURL().compareTo(curEditURL) == 0) { 502 newScore += curContribution.score(); 503 newNextScore += curContribution.nextScore(); 504 hasEdit = editSet.next(curEditURL, curContribution); 505 } 506 507 newScore = (1 - DECAY_VALUE) + (DECAY_VALUE * newScore); 508 newNextScore = (1 - DECAY_VALUE) + (DECAY_VALUE * newNextScore); 509 } 510 511 curPage.setScore(newScore, newNextScore); 513 writer.addPageWithScore(curPage); 514 scoreStats.addScore(newScore); 515 if ((count % 5000) == 0) { 516 LOG.info("Pages written: " + count); 517 } 518 } 519 LOG.info("Pages encountered: " + count); 520 LOG.info("Target pages from init(): " + totalPages); 521 } finally { 522 reader.close(); 523 editSet.close(); 524 writer.close(); 525 } 526 527 if (scoreFile.exists()) { 531 scoreFile.delete(); 532 } 533 PrintStream pout = new PrintStream(new BufferedOutputStream(nfs.create(scoreFile))); 534 try { 535 scoreStats.emitDistribution(pout); 536 } finally { 537 pout.close(); 538 } 539 540 FileUtil.fullyDelete(nfs, distDir); 544 } 545 546 554 public static void main(String argv[]) throws IOException { 555 if (argv.length < 2) { 556 System.out.println("usage: java net.nutch.tools.DistributedAnalysisTool (-local | -ndfs <namenode:port>) -initRound|-computeRound|-completeRound (numProcesses | processId) <dist_dir> <db_dir>"); 557 return; 558 } 559 560 String command = null; 561 int numProcesses = 0, processId = 0, numIterations = 0; 562 File distDir = null, dbDir = null; 563 564 NutchFileSystem nfs = NutchFileSystem.parseArgs(argv, 0); 565 for (int i = 0; i < argv.length; i++) { 566 if ("-initRound".equals(argv[i])) { 567 command = argv[i]; 568 numProcesses = Integer.parseInt(argv[i+1]); 569 distDir = new File(argv[i+2]); 570 dbDir = new File(argv[i+3]); 571 i+=3; 572 } else if ("-computeRound".equals(argv[i])) { 573 command = argv[i]; 574 processId = Integer.parseInt(argv[i+1]); 575 distDir = new File(argv[i+2]); 576 dbDir = new File(argv[i+3]); 577 i+=3; 578 } else if ("-completeRound".equals(argv[i])) { 579 command = argv[i]; 580 distDir = new File(argv[i+1]); 581 dbDir = new File(argv[i+2]); 582 i+=2; 583 } 584 } 585 586 System.out.println("Started at " + new Date(System.currentTimeMillis())); 587 try { 588 DistributedAnalysisTool dat = 589 new DistributedAnalysisTool(nfs, dbDir); 590 if ("-initRound".equals(command)) { 591 dat.initRound(numProcesses, distDir); 592 } else if ("-computeRound".equals(command)) { 593 dat.computeRound(processId, distDir); 594 } else if ("-completeRound".equals(command)) { 595 dat.completeRound(distDir, new File(dbDir, "linkstats.txt")); 596 } else { 597 System.out.println("No directive."); 598 } 599 } finally { 600 System.out.println("Finished at " + new Date(System.currentTimeMillis())); 601 nfs.close(); 602 } 603 } 604 } 605 | Popular Tags |