1 2 3 4 package net.nutch.tools; 5 6 import net.nutch.pagedb.FetchListEntry; 7 import net.nutch.io.*; 8 import net.nutch.fs.*; 9 import net.nutch.util.*; 10 import net.nutch.protocol.*; 11 import net.nutch.parse.*; 12 import net.nutch.plugin.*; 13 14 import net.nutch.fetcher.FetcherOutput; 15 16 import java.io.EOFException ; 17 import java.io.File ; 18 import java.io.DataInput ; 19 import java.io.DataOutput ; 20 import java.io.IOException ; 21 22 import java.util.Properties ; 23 import java.util.logging.*; 24 25 55 56 public class ParseSegment { 57 58 public static final Logger LOG = 59 LogFormatter.getLogger(ParseSegment.class.getName()); 60 61 private int threadCount = NutchConf.getInt("parser.threads.parse", 10); 63 64 private NutchFileSystem nfs; 65 66 private String directory; 68 69 private ArrayFile.Reader fetcherNPReader; 71 private ArrayFile.Reader contentReader; 72 73 private File unsortedFile; 75 private SequenceFile.Writer parserOutputWriter; 76 77 private File sortedFile; 79 80 private boolean dryRun = false; 82 83 private boolean clean = true; 85 86 private long entry = -1; 88 89 private long start; private long bytes; private int pages; private int errors; 95 private ThreadGroup group = new ThreadGroup ("parser"); 97 100 private class ParserThread extends Thread { 101 102 private long myEntry = -1; 104 105 private long t0,t1,t2,t3,t4,t5; 107 108 public ParserThread() { super(group, "myThread"); } 109 110 114 public void run() { 115 116 FetcherOutput fetcherOutput = new FetcherOutput(); 117 Content content = new Content(); 118 119 FetchListEntry fle = null; 120 String url = null; 121 122 while (true) { 123 if (LogFormatter.hasLoggedSevere()) break; 126 t0 = System.currentTimeMillis(); 127 128 try { 129 130 synchronized (ParseSegment.this) { 132 t1 = System.currentTimeMillis(); 133 134 try { 135 if (fetcherNPReader.next(fetcherOutput) == null || 136 contentReader.next(content) == null) 137 return; 138 } catch (EOFException eof) { 139 return; 142 } 143 144 entry++; 145 myEntry = entry; 146 if (LOG.isLoggable(Level.FINE)) 147 LOG.fine("Read in entry "+entry); 148 149 } 157 158 t2 = System.currentTimeMillis(); 159 160 fle = fetcherOutput.getFetchListEntry(); 161 url = fle.getPage().getURL().toString(); 162 163 LOG.fine("parsing " + url); 165 if (!url.equals(content.getUrl())) { 167 LOG.severe("Mismatched entries under " 168 + FetcherOutput.DIR_NAME_NP + " and " + Content.DIR_NAME); 169 continue; 170 } 171 172 if (fetcherOutput.getStatus() == FetcherOutput.SUCCESS || 175 fetcherOutput.getStatus() == FetcherOutput.CANT_PARSE) { 176 handleContent(url, content); 177 synchronized (ParseSegment.this) { 178 pages++; bytes += content.getContent().length; 180 if ((pages % 100) == 0) 181 status(); 182 } 183 } else { 184 logError(url, new ProtocolException("Error at fetch stage")); 186 handleNoContent(ParserOutput.NOFETCH); 187 } 188 189 } catch (ParseException e) { 190 logError(url, e); 191 handleNoContent(ParserOutput.FAILURE); 192 193 } catch (Throwable t) { if (fle != null) { 195 logError(url, t); 196 handleNoContent(ParserOutput.UNKNOWN); 197 } else { 198 LOG.severe("Unexpected exception"); 199 } 200 } 201 } 202 } 203 204 private void logError(String url, Throwable t) { 205 LOG.info("parse of " + url + " failed with: " + t); 206 if (LOG.isLoggable(Level.FINE)) 207 LOG.log(Level.FINE, "stack", t); synchronized (ParseSegment.this) { errors++; 210 } 211 } 212 213 private void handleContent(String url, Content content) 214 throws ParseException { 215 216 String contentType = content.getMetadata().getProperty("Content-Type"); 218 219 if (ParseSegment.this.dryRun) { 220 LOG.info("To be handled as Content-Type: "+contentType); 221 return; 222 } 223 224 Parser parser = ParserFactory.getParser(contentType, url); 225 Parse parse = parser.getParse(content); 226 227 outputPage 228 (new ParseText(parse.getText()), parse.getData(),ParserOutput.SUCCESS); 229 } 230 231 private void handleNoContent(int status) { 232 if (ParseSegment.this.dryRun) { 233 LOG.info("To be handled as no content"); 234 return; 235 } 236 outputPage(new ParseText(""), 237 new ParseData("", new Outlink[0], new Properties ()), 238 status); 239 } 240 241 private void outputPage 242 (ParseText parseText, ParseData parseData, int status) { 243 try { 244 t3 = System.currentTimeMillis(); 245 synchronized (parserOutputWriter) { 246 t4 = System.currentTimeMillis(); 247 parserOutputWriter.append(new LongWritable(myEntry), 248 new ParserOutput(parseData, parseText, status)); 249 t5 = System.currentTimeMillis(); 250 if (LOG.isLoggable(Level.FINE)) 251 LOG.fine("Entry: "+myEntry 252 +" "+parseData.getMetadata().getProperty("Content-Length") 253 +" wait="+(t1-t0) +" read="+(t2-t1) +" parse="+(t3-t2) 254 +" wait="+(t4-t3) +" write="+(t5-t4) +"ms"); 255 } 256 } catch (Throwable t) { 257 LOG.severe("error writing output:" + t.toString()); 258 } 259 } 260 261 } 262 263 266 private class ParserOutput extends VersionedWritable { 267 public static final String DIR_NAME = "parser"; 268 269 private final static byte VERSION = 1; 270 271 public final static byte UNKNOWN = (byte)0; public final static byte SUCCESS = (byte)1; public final static byte FAILURE = (byte)2; public final static byte NOFETCH = (byte)3; 277 private int status; 278 279 private ParseData parseData = new ParseData(); 280 private ParseText parseText = new ParseText(); 281 282 public ParserOutput() {} 283 284 public ParserOutput(ParseData parseData, ParseText parseText, int status) { 285 this.parseData = parseData; 286 this.parseText = parseText; 287 this.status = status; 288 } 289 290 public byte getVersion() { return VERSION; } 291 292 public ParseData getParseData() { 293 return this.parseData; 294 } 295 296 public ParseText getParseText() { 297 return this.parseText; 298 } 299 300 public int getStatus() { 301 return this.status; 302 } 303 304 public final void readFields(DataInput in) throws IOException { 305 super.readFields(in); status = in.readByte(); 307 parseData.readFields(in); 308 parseText.readFields(in); 309 return; 310 } 311 312 public final void write(DataOutput out) throws IOException { 313 super.write(out); out.writeByte(status); 315 parseData.write(out); 316 parseText.write(out); 317 return; 318 } 319 } 320 321 324 public ParseSegment(NutchFileSystem nfs, String directory, boolean dryRun) 325 throws IOException { 326 327 File file; 328 329 this.nfs = nfs; 330 this.directory = directory; 331 this.dryRun = dryRun; 332 333 file = new File (directory, FetcherOutput.DIR_NAME_NP); 335 if (!nfs.exists(file)) 336 throw new IOException ("Directory missing: "+FetcherOutput.DIR_NAME_NP); 337 338 if (dryRun) 339 return; 340 341 file = new File (directory, FetcherOutput.DIR_NAME); 343 if (nfs.exists(file)) { 344 LOG.info("Deleting old "+file.getName()); 345 nfs.delete(file); 346 } 347 348 this.unsortedFile = new File (directory, ParserOutput.DIR_NAME+".unsorted"); 350 if (nfs.exists(this.unsortedFile)) { 351 LOG.info("Deleting old "+this.unsortedFile.getName()); 352 nfs.delete(this.unsortedFile); 353 } 354 355 this.sortedFile = new File (directory, ParserOutput.DIR_NAME+".sorted"); 357 if (nfs.exists(this.sortedFile)) { 358 LOG.info("Deleting old "+this.sortedFile.getName()); 359 nfs.delete(this.sortedFile); 360 } 361 362 file = new File (directory, ParseData.DIR_NAME); 364 if (nfs.exists(file)) { 365 LOG.info("Deleting old "+file.getName()); 366 nfs.delete(file); 367 } 368 369 file = new File (directory, ParseText.DIR_NAME); 371 if (nfs.exists(file)) { 372 LOG.info("Deleting old "+file.getName()); 373 nfs.delete(file); 374 } 375 376 } 377 378 379 public void setThreadCount(int threadCount) { 380 this.threadCount=threadCount; 381 } 382 383 384 public static void setLogLevel(Level level) { 385 LOG.setLevel(level); 386 PluginRepository.LOG.setLevel(level); 387 ParserFactory.LOG.setLevel(level); 388 LOG.info("logging at " + level); 389 } 390 391 392 public void setClean(boolean clean) { 393 this.clean = clean; 394 } 395 396 397 public void status() { 398 long ms = System.currentTimeMillis() - start; 399 LOG.info("status: " 400 + pages + " pages, " 401 + errors + " errors, " 402 + bytes + " bytes, " 403 + ms + " ms"); 404 LOG.info("status: " 405 + (((float)pages)/(ms/1000.0f))+" pages/s, " 406 + (((float)bytes*8/1024)/(ms/1000.0f))+" kb/s, " 407 + (((float)bytes)/pages) + " bytes/page"); 408 } 409 410 411 public void parse() throws IOException , InterruptedException { 412 413 fetcherNPReader = new ArrayFile.Reader 414 (nfs, (new File (directory, FetcherOutput.DIR_NAME_NP)).getPath()); 415 contentReader = new ArrayFile.Reader 416 (nfs, (new File (directory, Content.DIR_NAME)).getPath()); 417 418 if (!this.dryRun) { 419 parserOutputWriter = new SequenceFile.Writer 420 (nfs, unsortedFile.getPath(), LongWritable.class, ParserOutput.class); 421 } 422 423 start = System.currentTimeMillis(); 424 425 for (int i = 0; i < threadCount; i++) { ParserThread thread = new ParserThread(); 427 thread.start(); 428 } 429 430 do { 431 Thread.sleep(1000); 432 433 if (LogFormatter.hasLoggedSevere()) 434 throw new RuntimeException ("SEVERE error logged. Exiting parser."); 435 436 } while (group.activeCount() > 0); 438 fetcherNPReader.close(); 439 contentReader.close(); 440 if (!this.dryRun) 441 parserOutputWriter.close(); 442 443 status(); } 445 446 447 public void sort() throws IOException { 448 449 if (this.dryRun) 450 return; 451 452 LOG.info("Sorting ParserOutput"); 453 454 start = System.currentTimeMillis(); 455 456 SequenceFile.Sorter sorter = new SequenceFile.Sorter 457 (nfs, new LongWritable.Comparator(), ParserOutput.class); 458 459 sorter.sort(unsortedFile.getPath(), sortedFile.getPath()); 460 461 double localSecs = (System.currentTimeMillis() - start) / 1000.0; 462 LOG.info("Sorted: " + (pages+errors) + " entries in " + localSecs + "s, " 463 + ((pages+errors)/localSecs) + " entries/s"); 464 465 if (this.clean) { 466 LOG.info("Deleting intermediate "+unsortedFile.getName()); 467 nfs.delete(unsortedFile); 468 } 469 470 return; 471 } 472 473 477 public void save() throws IOException { 478 479 if (this.dryRun) 480 return; 481 482 LOG.info("Saving ParseData and ParseText separately"); 483 484 start = System.currentTimeMillis(); 485 486 SequenceFile.Reader parserOutputReader 487 = new SequenceFile.Reader(nfs, sortedFile.getPath()); 488 489 ArrayFile.Reader fetcherNPReader = new ArrayFile.Reader(nfs, 490 (new File (directory, FetcherOutput.DIR_NAME_NP)).getPath()); 491 492 ArrayFile.Writer fetcherWriter = new ArrayFile.Writer(nfs, 493 (new File (directory, FetcherOutput.DIR_NAME)).getPath(), 494 FetcherOutput.class); 495 496 ArrayFile.Writer parseDataWriter = new ArrayFile.Writer(nfs, 497 (new File (directory, ParseData.DIR_NAME)).getPath(), ParseData.class); 498 ArrayFile.Writer parseTextWriter = new ArrayFile.Writer(nfs, 499 (new File (directory, ParseText.DIR_NAME)).getPath(), ParseText.class); 500 501 try { 502 LongWritable key = new LongWritable(); 503 ParserOutput val = new ParserOutput(); 504 FetcherOutput fo = new FetcherOutput(); 505 int count = 0; 506 int status; 507 while (parserOutputReader.next(key,val)) { 508 fetcherNPReader.next(fo); 509 if (fetcherNPReader.key() != key.get()) 511 throw new IOException ("Mismatch between entries under " 512 + FetcherOutput.DIR_NAME_NP + " and in " + sortedFile.getName()); 513 switch (val.getStatus()) { 515 case ParserOutput.SUCCESS: 516 fo.setStatus(FetcherOutput.SUCCESS); 517 break; 518 case ParserOutput.UNKNOWN: 519 case ParserOutput.FAILURE: 520 fo.setStatus(FetcherOutput.CANT_PARSE); 521 break; 522 case ParserOutput.NOFETCH: 523 default: 524 } 526 fetcherWriter.append(fo); 527 parseDataWriter.append(val.getParseData()); 528 parseTextWriter.append(val.getParseText()); 529 count++; 530 } 531 if (count != (pages+errors)) 534 throw new IOException ("Missing entries: expect "+(pages+errors) 535 +", but have "+count+" entries instead."); 536 } finally { 537 fetcherNPReader.close(); 538 fetcherWriter.close(); 539 parseDataWriter.close(); 540 parseTextWriter.close(); 541 parserOutputReader.close(); 542 } 543 544 double localSecs = (System.currentTimeMillis() - start) / 1000.0; 545 LOG.info("Saved: " + (pages+errors) + " entries in " + localSecs + "s, " 546 + ((pages+errors)/localSecs) + " entries/s"); 547 548 if (this.clean) { 549 LOG.info("Deleting intermediate "+sortedFile.getName()); 550 nfs.delete(sortedFile); 551 } 552 553 return; 554 } 555 556 557 public static void main(String [] args) throws Exception { 558 int threadCount = -1; 559 boolean showThreadID = false; 560 boolean dryRun = false; 561 String logLevel = "info"; 562 boolean clean = true; 563 String directory = null; 564 565 String usage = "Usage: ParseSegment (-local | -ndfs <namenode:port>) [-threads n] [-showThreadID] [-dryRun] [-logLevel level] [-noClean] dir"; 566 567 if (args.length == 0) { 568 System.err.println(usage); 569 System.exit(-1); 570 } 571 572 NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); 574 575 for (int i = 0; i < args.length; i++) { 576 if (args[i] == null) { 577 continue; 578 } else if (args[i].equals("-threads")) { 579 threadCount = Integer.parseInt(args[++i]); 580 } else if (args[i].equals("-showThreadID")) { 581 showThreadID = true; 582 } else if (args[i].equals("-dryRun")) { 583 dryRun = true; 584 } else if (args[i].equals("-logLevel")) { 585 logLevel = args[++i]; 586 } else if (args[i].equals("-noClean")) { 587 clean = false; 588 } else { 589 directory = args[i]; 590 } 591 } 592 593 try { 594 595 ParseSegment parseSegment = new ParseSegment(nfs, directory, dryRun); 596 597 parseSegment.setLogLevel 598 (Level.parse((new String (logLevel)).toUpperCase())); 599 600 if (threadCount != -1) 601 parseSegment.setThreadCount(threadCount); 602 if (showThreadID) 603 LogFormatter.setShowThreadIDs(showThreadID); 604 605 parseSegment.setClean(clean); 606 607 parseSegment.parse(); 608 parseSegment.sort(); 609 parseSegment.save(); 610 611 } finally { 612 nfs.close(); 613 } 614 615 } 616 } 617 | Popular Tags |