1 2 3 4 package net.nutch.segment; 5 6 import java.io.EOFException ; 7 import java.io.File ; 8 import java.io.FileFilter ; 9 import java.io.IOException ; 10 import java.io.PrintStream ; 11 import java.text.DecimalFormat ; 12 import java.text.SimpleDateFormat ; 13 import java.util.Date ; 14 import java.util.Vector ; 15 import java.util.logging.Logger ; 16 17 import net.nutch.fetcher.FetcherOutput; 18 import net.nutch.io.ArrayFile; 19 import net.nutch.io.LongWritable; 20 import net.nutch.io.MapFile; 21 import net.nutch.io.SequenceFile; 22 import net.nutch.io.UTF8; 23 import net.nutch.fs.*; 24 import net.nutch.pagedb.FetchListEntry; 25 import net.nutch.parse.ParseData; 26 import net.nutch.parse.ParseText; 27 import net.nutch.protocol.Content; 28 import net.nutch.util.LogFormatter; 29 30 37 public class SegmentReader { 38 public static final Logger LOG = LogFormatter.getLogger("net.nutch.segment.SegmentReader"); 39 40 public ArrayFile.Reader fetcherReader; 41 public ArrayFile.Reader contentReader; 42 public ArrayFile.Reader parseTextReader; 43 public ArrayFile.Reader parseDataReader; 44 public boolean isParsed = false; 45 46 50 public long started = 0L; 51 55 public long finished = 0L; 56 public long size = 0L; 57 private long key = -1L; 58 59 60 public File segmentDir; 61 public NutchFileSystem nfs; 62 63 68 public SegmentReader(File dir) throws Exception { 69 this(new LocalFileSystem(), dir, true, true, true, false); 70 } 71 72 78 public SegmentReader(NutchFileSystem nfs, File dir) throws Exception { 79 this(nfs, dir, true, true, true, false); 80 } 81 82 91 public SegmentReader(File dir, boolean autoFix) throws Exception { 92 this(new LocalFileSystem(), dir, true, true, true, autoFix); 93 } 94 95 105 public SegmentReader(NutchFileSystem nfs, File dir, boolean autoFix) throws Exception { 106 this(nfs, dir, true, true, true, autoFix); 107 } 108 109 127 public SegmentReader(NutchFileSystem nfs, File dir, 128 boolean withContent, boolean withParseText, boolean withParseData, 129 boolean autoFix) throws Exception { 130 isParsed = isParsedSegment(nfs, dir); 131 if (!isParsed) { 132 withParseText = false; 133 withParseData = false; 134 } 135 try { 136 init(nfs, dir, withContent, withParseText, withParseData); 137 } catch (Exception e) { 138 boolean ok = false; 139 if (autoFix) { 140 ok = fixSegment(nfs, dir, withContent, withParseText, withParseData, false); 142 } 143 if (ok) 144 init(nfs, dir, withContent, withParseText, withParseData); 145 else throw new Exception ("Segment " + dir + " is corrupted."); 146 } 147 } 148 149 public static boolean isParsedSegment(NutchFileSystem nfs, File segdir) throws Exception { 150 boolean res; 151 File foDir = new File (segdir, FetcherOutput.DIR_NAME); 152 if (nfs.exists(foDir) && nfs.isDirectory(foDir)) return true; 153 foDir = new File (segdir, FetcherOutput.DIR_NAME_NP); 154 if (nfs.exists(foDir) && nfs.isDirectory(foDir)) return false; 155 throw new Exception ("Missing or invalid '" + FetcherOutput.DIR_NAME + "' or '" 156 + FetcherOutput.DIR_NAME_NP + "' directory in " + segdir); 157 } 158 159 171 public static boolean fixSegment(NutchFileSystem nfs, File dir, 172 boolean withContent, boolean withParseText, boolean withParseData, 173 boolean dryrun) { 174 String dr = ""; 175 if (dryrun) dr = "[DRY RUN] "; 176 File fetcherOutput = null; 177 File content = new File (dir, Content.DIR_NAME); 178 File parseData = new File (dir, ParseData.DIR_NAME); 179 File parseText = new File (dir, ParseText.DIR_NAME); 180 long cnt = 0L; 181 try { 182 if (isParsedSegment(nfs, dir)) { 183 fetcherOutput = new File (dir, FetcherOutput.DIR_NAME); 184 } else { 185 fetcherOutput = new File (dir, FetcherOutput.DIR_NAME_NP); 186 withParseText = false; 187 withParseData = false; 188 } 189 cnt = MapFile.fix(nfs, fetcherOutput, LongWritable.class, FetcherOutput.class, dryrun); 190 if (cnt != -1) LOG.info(dr + " - fixed " + fetcherOutput.getName()); 191 if (withContent) { 192 cnt = MapFile.fix(nfs, content, LongWritable.class, Content.class, dryrun); 193 if (cnt != -1) LOG.info(dr + " - fixed " + content.getName()); 194 } 195 if (withParseData) { 196 cnt = MapFile.fix(nfs, parseData, LongWritable.class, ParseData.class, dryrun); 197 if (cnt != -1) LOG.info(dr + " - fixed " + parseData.getName()); 198 } 199 if (withParseText) { 200 cnt = MapFile.fix(nfs, parseText, LongWritable.class, ParseText.class, dryrun); 201 if (cnt != -1) LOG.info(dr + " - fixed " + parseText.getName()); 202 } 203 LOG.info(dr + "Finished fixing " + dir.getName()); 204 return true; 205 } catch (Throwable t) { 206 LOG.warning(dr + "Unable to fix segment " + dir.getName() + ": " + t.getMessage()); 207 return false; 208 } 209 } 210 211 private void init(NutchFileSystem nfs, File dir, 212 boolean withContent, boolean withParseText, boolean withParseData) throws Exception { 213 segmentDir = dir; 214 this.nfs = nfs; 215 if (isParsed) { 216 fetcherReader = new ArrayFile.Reader(nfs, new File (dir, FetcherOutput.DIR_NAME).toString()); 217 } else { 218 fetcherReader = new ArrayFile.Reader(nfs, new File (dir, FetcherOutput.DIR_NAME_NP).toString()); 219 } 220 if (withContent) contentReader = new ArrayFile.Reader(nfs, new File (dir, Content.DIR_NAME).toString()); 221 if (withParseText) parseTextReader = new ArrayFile.Reader(nfs, new File (dir, ParseText.DIR_NAME).toString()); 222 if (withParseData) parseDataReader = new ArrayFile.Reader(nfs, new File (dir, ParseData.DIR_NAME).toString()); 223 FetcherOutput fo = new FetcherOutput(); 230 fetcherReader.next(fo); 231 started = fo.getFetchDate(); 232 LongWritable w = new LongWritable(); 233 w.set(++size); 234 try { 235 while (fetcherReader.seek(w)) { 236 w.set(++size); 237 } 238 } catch (Throwable eof) { 239 LOG.warning(" - data in segment " + dir + " is corrupt, using only " + size + " entries."); 242 } 243 boolean ok = false; 245 int back = 0; 246 do { 247 try { 248 fetcherReader.seek(size - 2 - back); 249 fetcherReader.next(fo); 250 ok = true; 251 } catch (Throwable t) { 252 back++; 253 } 254 } while (!ok && back < 10); 255 if (back >= 10) 256 throw new Exception (" - fetcher output is unreadable"); 257 if (back > 0) LOG.warning(" - fetcher output truncated by " + back + " to " + size); 258 size = size - back; 259 finished = fo.getFetchDate(); 260 fetcherReader.reset(); 262 } 263 264 277 public synchronized boolean get(long n, FetcherOutput fo, Content co, 278 ParseText pt, ParseData pd) throws IOException { 279 boolean valid = true; 284 if (fetcherReader.get(n, fo) == null) valid = false; 285 if (contentReader != null) { 286 if (co != null) { 287 if (contentReader.get(n, co) == null) valid = false; 288 } else contentReader.seek(n); 289 } 290 if (parseTextReader != null) { 291 if (pt != null) { 292 if (parseTextReader.get(n, pt) == null) valid = false; 293 } else parseTextReader.seek(n); 294 } 295 if (parseDataReader != null) { 296 if (pd != null) { 297 if (parseDataReader.get(n, pd) == null) valid = false; 298 } else parseDataReader.seek(n); 299 } 300 key = n; 301 return valid; 302 } 303 304 private Content _co = new Content(); 305 private ParseText _pt = new ParseText(); 306 private ParseData _pd = new ParseData(); 307 308 312 public synchronized boolean next(FetcherOutput fo, Content co, 313 ParseText pt, ParseData pd) throws IOException { 314 boolean valid = true; 315 Content rco = (co == null) ? _co : co; 316 ParseText rpt = (pt == null) ? _pt : pt; 317 ParseData rpd = (pd == null) ? _pd : pd; 318 if (fetcherReader.next(fo) == null) valid = false; 319 if (contentReader != null) 320 if (contentReader.next(rco) == null) valid = false; 321 if (parseTextReader != null) 322 if (parseTextReader.next(rpt) == null) valid = false; 323 if (parseDataReader != null) 324 if (parseDataReader.next(rpd) == null) valid = false; 325 key++; 326 return valid; 327 } 328 329 330 public synchronized void seek(long n) throws IOException { 331 fetcherReader.seek(n); 332 if (contentReader != null) contentReader.seek(n); 333 if (parseTextReader != null) parseTextReader.seek(n); 334 if (parseDataReader != null) parseDataReader.seek(n); 335 key = n; 336 } 337 338 339 public long key() { 340 return key; 341 } 342 343 344 public synchronized void reset() throws IOException { 345 fetcherReader.reset(); 346 if (contentReader != null) contentReader.reset(); 347 if (parseTextReader != null) parseTextReader.reset(); 348 if (parseDataReader != null) parseDataReader.reset(); 349 } 350 351 352 public synchronized void close() { 353 try { 354 fetcherReader.close(); 355 } catch (Exception e) {}; 356 if (contentReader != null) try { 357 contentReader.close(); 358 } catch (Exception e) {}; 359 if (parseTextReader != null) try { 360 parseTextReader.close(); 361 } catch (Exception e) {}; 362 if (parseDataReader != null) try { 363 parseDataReader.close(); 364 } catch (Exception e) {}; 365 } 366 367 374 public synchronized void dump(boolean sorted, PrintStream output) throws Exception { 375 reset(); 376 FetcherOutput fo = new FetcherOutput(); 377 Content co = new Content(); 378 ParseData pd = new ParseData(); 379 ParseText pt = new ParseText(); 380 long recNo = 0L; 381 if (!sorted) { 382 while(next(fo, co, pt, pd)) { 383 output.println("Recno:: " + recNo++); 384 output.println("FetcherOutput::\n" + fo.toString()); 385 if (contentReader != null) 386 output.println("Content::\n" + co.toString()); 387 if (parseDataReader != null) 388 output.println("ParseData::\n" + pd.toString()); 389 if (parseTextReader != null) 390 output.println("ParseText::\n" + pt.toString()); 391 output.println(""); 392 } 393 } else { 394 File unsortedFile = new File (segmentDir, ".unsorted"); 395 File sortedFile = new File (segmentDir, ".sorted"); 396 nfs.delete(unsortedFile); 397 nfs.delete(sortedFile); 398 SequenceFile.Writer seqWriter = new SequenceFile.Writer(nfs, 399 unsortedFile.toString(), UTF8.class, LongWritable.class); 400 FetchListEntry fle; 401 LongWritable rec = new LongWritable(); 402 UTF8 url = new UTF8(); 403 String urlString; 404 while (fetcherReader.next(fo) != null) { 405 fle = fo.getFetchListEntry(); 406 urlString = fle.getPage().getURL().toString(); 407 rec.set(recNo); 408 url.set(urlString); 409 seqWriter.append(url, rec); 410 recNo++; 411 } 412 seqWriter.close(); 413 long start = System.currentTimeMillis(); 415 416 SequenceFile.Sorter sorter = new SequenceFile.Sorter(nfs, 417 new UTF8.Comparator(), LongWritable.class); 418 419 sorter.sort(unsortedFile.toString(), sortedFile.toString()); 420 421 float localSecs = (System.currentTimeMillis() - start) / 1000.0f; 422 LOG.info(" - sorted: " + recNo + " entries in " + localSecs + "s, " 423 + (recNo/localSecs) + " entries/s"); 424 425 nfs.delete(unsortedFile); 426 SequenceFile.Reader seqReader = new SequenceFile.Reader(nfs, sortedFile.toString()); 427 while (seqReader.next(url, rec)) { 428 recNo = rec.get(); 429 get(recNo, fo, co, pt, pd); 430 output.println("Recno:: " + recNo++); 431 output.println("FetcherOutput::\n" + fo.toString()); 432 if (contentReader != null) 433 output.println("Content::\n" + co.toString()); 434 if (parseDataReader != null) 435 output.println("ParseData::\n" + pd.toString()); 436 if (parseTextReader != null) 437 output.println("ParseText::\n" + pt.toString()); 438 output.println(""); 439 } 440 seqReader.close(); 441 nfs.delete(sortedFile); 442 } 443 } 444 445 446 public static void main(String [] args) throws Exception { 447 if (args.length == 0) { 448 usage(); 449 return; 450 } 451 SegmentReader reader = null; 452 NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); 453 String segDir = null; 454 Vector dirs = new Vector (); 455 boolean fix = false; 456 boolean list = false; 457 boolean dump = false; 458 boolean sorted = false; 459 boolean withParseText = true; 460 boolean withParseData = true; 461 boolean withContent = true; 462 for (int i = 0; i < args.length; i++) { 463 if (args[i] != null) { 464 if (args[i].equals("-noparsetext")) withParseText = false; 465 else if (args[i].equals("-noparsedata")) withParseData = false; 466 else if (args[i].equals("-nocontent")) withContent = false; 467 else if (args[i].equals("-fix")) fix = true; 468 else if (args[i].equals("-dump")) dump = true; 469 else if (args[i].equals("-dumpsort")) { 470 dump = true; 471 sorted = true; 472 } else if (args[i].equals("-list")) list = true; 473 else if (args[i].equals("-dir")) segDir = args[++i]; 474 else dirs.add(new File (args[i])); 475 } 476 } 477 if (segDir != null) { 478 File sDir = new File (segDir); 479 if (!sDir.exists() || !sDir.isDirectory()) { 480 LOG.warning("Invalid path: " + sDir); 481 } else { 482 File [] files = sDir.listFiles(new FileFilter () { 483 public boolean accept(File f) { 484 return f.isDirectory(); 485 } 486 }); 487 if (files != null && files.length > 0) { 488 for (int i = 0; i < files.length; i++) dirs.add(files[i]); 489 } 490 } 491 } 492 if (dirs.size() == 0) { 493 LOG.severe("No input segment dirs."); 494 usage(); 495 return; 496 } 497 long total = 0L; 498 int cnt = 0; 499 SimpleDateFormat sdf = new SimpleDateFormat ("yyyyMMdd'-'HH:mm:ss"); 500 DecimalFormat df = new DecimalFormat ("########"); 501 df.setParseIntegerOnly(true); 502 if (list) 503 LOG.info("PARSED?\tSTARTED\t\t\tFINISHED\t\tCOUNT\tDIR NAME"); 504 for (int i = 0; i < dirs.size(); i++) { 505 File dir = (File )dirs.get(i); 506 try { 507 reader = new SegmentReader(nfs, dir, 508 withContent, withParseText, withParseData, fix); 509 if (list) { 510 LOG.info(reader.isParsed + 511 "\t" + sdf.format(new Date (reader.started)) + 512 "\t" + sdf.format(new Date (reader.finished)) + 513 "\t" + df.format(reader.size) + 514 "\t" + dir); 515 } 516 total += reader.size; 517 cnt++; 518 if (dump) reader.dump(sorted, System.out); 519 } catch (Throwable t) { 520 LOG.warning(t.getMessage()); 521 } 522 } 523 if (list) 524 LOG.info("TOTAL: " + total + " entries in " + cnt + " segments."); 525 } 526 527 private static void usage() { 528 System.err.println("SegmentReader [-fix] [-dump] [-dumpsort] [-list] [-nocontent] [-noparsedata] [-noparsetext] (-dir segments | seg1 seg2 ...)"); 529 System.err.println("\tNOTE: at least one segment dir name is required, or '-dir' option."); 530 System.err.println("\t-fix\t\tautomatically fix corrupted segments"); 531 System.err.println("\t-dump\t\tdump segment data in human-readable format"); 532 System.err.println("\t-dumpsort\tdump segment data in human-readable format, sorted by URL"); 533 System.err.println("\t-list\t\tprint useful information about segments"); 534 System.err.println("\t-nocontent\tignore content data"); 535 System.err.println("\t-noparsedata\tignore parse_data data"); 536 System.err.println("\t-nocontent\tignore parse_text data"); 537 System.err.println("\t-dir segments\tdirectory containing multiple segments"); 538 System.err.println("\tseg1 seg2 ...\tsegment directories\n"); 539 } 540 } 541 | Popular Tags |