KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > segment > SegmentReader


1 /* Copyright (c) 2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.segment;
5
6 import java.io.EOFException JavaDoc;
7 import java.io.File JavaDoc;
8 import java.io.FileFilter JavaDoc;
9 import java.io.IOException JavaDoc;
10 import java.io.PrintStream JavaDoc;
11 import java.text.DecimalFormat JavaDoc;
12 import java.text.SimpleDateFormat JavaDoc;
13 import java.util.Date JavaDoc;
14 import java.util.Vector JavaDoc;
15 import java.util.logging.Logger JavaDoc;
16
17 import net.nutch.fetcher.FetcherOutput;
18 import net.nutch.io.ArrayFile;
19 import net.nutch.io.LongWritable;
20 import net.nutch.io.MapFile;
21 import net.nutch.io.SequenceFile;
22 import net.nutch.io.UTF8;
23 import net.nutch.fs.*;
24 import net.nutch.pagedb.FetchListEntry;
25 import net.nutch.parse.ParseData;
26 import net.nutch.parse.ParseText;
27 import net.nutch.protocol.Content;
28 import net.nutch.util.LogFormatter;
29
30 /**
31  * This class holds together all data readers for an existing segment.
32  * Some convenience methods are also provided, to read from the segment and
33  * to reposition the current pointer.
34  *
35  * @author Andrzej Bialecki <ab@getopt.org>
36  */

37 public class SegmentReader {
38   public static final Logger JavaDoc LOG = LogFormatter.getLogger("net.nutch.segment.SegmentReader");
39   
40   public ArrayFile.Reader fetcherReader;
41   public ArrayFile.Reader contentReader;
42   public ArrayFile.Reader parseTextReader;
43   public ArrayFile.Reader parseDataReader;
44   public boolean isParsed = false;
45
46   /**
47    * The time when fetching of this segment started, as recorded
48    * in fetcher output data.
49    */

50   public long started = 0L;
51   /**
52    * The time when fetching of this segment finished, as recorded
53    * in fetcher output data.
54    */

55   public long finished = 0L;
56   public long size = 0L;
57   private long key = -1L;
58
59   
60   public File JavaDoc segmentDir;
61   public NutchFileSystem nfs;
62
63   /**
64    * Open a segment for reading. If the segment is corrupted, do not attempt to fix it.
65    * @param dir directory containing segment data
66    * @throws Exception
67    */

68   public SegmentReader(File JavaDoc dir) throws Exception JavaDoc {
69     this(new LocalFileSystem(), dir, true, true, true, false);
70   }
71   
72   /**
73    * Open a segment for reading. If segment is corrupted, do not attempt to fix it.
74    * @param nfs filesystem
75    * @param dir directory containing segment data
76    * @throws Exception
77    */

78   public SegmentReader(NutchFileSystem nfs, File JavaDoc dir) throws Exception JavaDoc {
79     this(nfs, dir, true, true, true, false);
80   }
81   
82   /**
83    * Open a segment for reading.
84    * @param dir directory containing segment data
85    * @param autoFix if true, and the segment is corrupted, attempt to
86    * fix errors and try to open it again. If the segment is corrupted, and
87    * autoFix is false, or it was not possible to correct errors, an Exception is
88    * thrown.
89    * @throws Exception
90    */

91   public SegmentReader(File JavaDoc dir, boolean autoFix) throws Exception JavaDoc {
92     this(new LocalFileSystem(), dir, true, true, true, autoFix);
93   }
94   
95   /**
96    * Open a segment for reading.
97    * @param nfs filesystem
98    * @param dir directory containing segment data
99    * @param autoFix if true, and the segment is corrupted, attempt to
100    * fix errors and try to open it again. If the segment is corrupted, and
101    * autoFix is false, or it was not possible to correct errors, an Exception is
102    * thrown.
103    * @throws Exception
104    */

105   public SegmentReader(NutchFileSystem nfs, File JavaDoc dir, boolean autoFix) throws Exception JavaDoc {
106     this(nfs, dir, true, true, true, autoFix);
107   }
108   
109   /**
110    * Open a segment for reading. When a segment is open, its total size is checked
111    * and cached in this class - however, only by actually reading entries one can
112    * be sure about the exact number of valid, non-corrupt entries.
113    *
114    * <p>If the segment was created with no-parse option (see {@link FetcherOutput#DIR_NAME_NP})
115    * then automatically withParseText and withParseData will be forced to false.</p>
116    *
117    * @param nfs NutchFileSystem to use
118    * @param dir directory containing segment data
119    * @param withContent if true, read Content, otherwise ignore it
120    * @param withParseText if true, read ParseText, otherwise ignore it
121    * @param withParseData if true, read ParseData, otherwise ignore it
122    * @param autoFix if true, and the segment is corrupt, try to automatically fix it.
123    * If this parameter is false, and the segment is corrupt, or fixing was unsuccessful,
124    * and Exception is thrown.
125    * @throws Exception
126    */

127   public SegmentReader(NutchFileSystem nfs, File JavaDoc dir,
128           boolean withContent, boolean withParseText, boolean withParseData,
129           boolean autoFix) throws Exception JavaDoc {
130     isParsed = isParsedSegment(nfs, dir);
131     if (!isParsed) {
132       withParseText = false;
133       withParseData = false;
134     }
135     try {
136       init(nfs, dir, withContent, withParseText, withParseData);
137     } catch (Exception JavaDoc e) {
138       boolean ok = false;
139       if (autoFix) {
140         // corrupt segment, attempt to fix
141
ok = fixSegment(nfs, dir, withContent, withParseText, withParseData, false);
142       }
143       if (ok)
144         init(nfs, dir, withContent, withParseText, withParseData);
145       else throw new Exception JavaDoc("Segment " + dir + " is corrupted.");
146     }
147   }
148
149   public static boolean isParsedSegment(NutchFileSystem nfs, File JavaDoc segdir) throws Exception JavaDoc {
150     boolean res;
151     File JavaDoc foDir = new File JavaDoc(segdir, FetcherOutput.DIR_NAME);
152     if (nfs.exists(foDir) && nfs.isDirectory(foDir)) return true;
153     foDir = new File JavaDoc(segdir, FetcherOutput.DIR_NAME_NP);
154     if (nfs.exists(foDir) && nfs.isDirectory(foDir)) return false;
155     throw new Exception JavaDoc("Missing or invalid '" + FetcherOutput.DIR_NAME + "' or '"
156             + FetcherOutput.DIR_NAME_NP + "' directory in " + segdir);
157   }
158   
159   /**
160    * Attempt to fix a partially corrupted segment. Currently this means just
161    * fixing broken MapFile's, using {@link MapFile#fix(NutchFileSystem, File, Class, Class, boolean)}
162    * method.
163    * @param nfs filesystem
164    * @param dir segment directory
165    * @param withContent if true, fix content, otherwise ignore it
166    * @param withParseText if true, fix parse_text, otherwise ignore it
167    * @param withParseData if true, fix parse_data, otherwise ignore it
168    * @param dryrun if true, only show what would be done without performing any actions
169    * @return
170    */

171   public static boolean fixSegment(NutchFileSystem nfs, File JavaDoc dir,
172           boolean withContent, boolean withParseText, boolean withParseData,
173           boolean dryrun) {
174     String JavaDoc dr = "";
175     if (dryrun) dr = "[DRY RUN] ";
176     File JavaDoc fetcherOutput = null;
177     File JavaDoc content = new File JavaDoc(dir, Content.DIR_NAME);
178     File JavaDoc parseData = new File JavaDoc(dir, ParseData.DIR_NAME);
179     File JavaDoc parseText = new File JavaDoc(dir, ParseText.DIR_NAME);
180     long cnt = 0L;
181     try {
182       if (isParsedSegment(nfs, dir)) {
183         fetcherOutput = new File JavaDoc(dir, FetcherOutput.DIR_NAME);
184       } else {
185         fetcherOutput = new File JavaDoc(dir, FetcherOutput.DIR_NAME_NP);
186         withParseText = false;
187         withParseData = false;
188       }
189       cnt = MapFile.fix(nfs, fetcherOutput, LongWritable.class, FetcherOutput.class, dryrun);
190       if (cnt != -1) LOG.info(dr + " - fixed " + fetcherOutput.getName());
191       if (withContent) {
192         cnt = MapFile.fix(nfs, content, LongWritable.class, Content.class, dryrun);
193         if (cnt != -1) LOG.info(dr + " - fixed " + content.getName());
194       }
195       if (withParseData) {
196         cnt = MapFile.fix(nfs, parseData, LongWritable.class, ParseData.class, dryrun);
197         if (cnt != -1) LOG.info(dr + " - fixed " + parseData.getName());
198       }
199       if (withParseText) {
200         cnt = MapFile.fix(nfs, parseText, LongWritable.class, ParseText.class, dryrun);
201         if (cnt != -1) LOG.info(dr + " - fixed " + parseText.getName());
202       }
203       LOG.info(dr + "Finished fixing " + dir.getName());
204       return true;
205     } catch (Throwable JavaDoc t) {
206       LOG.warning(dr + "Unable to fix segment " + dir.getName() + ": " + t.getMessage());
207       return false;
208     }
209   }
210
211   private void init(NutchFileSystem nfs, File JavaDoc dir,
212           boolean withContent, boolean withParseText, boolean withParseData) throws Exception JavaDoc {
213     segmentDir = dir;
214     this.nfs = nfs;
215     if (isParsed) {
216       fetcherReader = new ArrayFile.Reader(nfs, new File JavaDoc(dir, FetcherOutput.DIR_NAME).toString());
217     } else {
218       fetcherReader = new ArrayFile.Reader(nfs, new File JavaDoc(dir, FetcherOutput.DIR_NAME_NP).toString());
219     }
220     if (withContent) contentReader = new ArrayFile.Reader(nfs, new File JavaDoc(dir, Content.DIR_NAME).toString());
221     if (withParseText) parseTextReader = new ArrayFile.Reader(nfs, new File JavaDoc(dir, ParseText.DIR_NAME).toString());
222     if (withParseData) parseDataReader = new ArrayFile.Reader(nfs, new File JavaDoc(dir, ParseData.DIR_NAME).toString());
223     // count the number of valid entries.
224
// XXX We assume that all other data files contain the
225
// XXX same number of valid entries - which is not always
226
// XXX true if Fetcher crashed in the middle of update.
227
// XXX One should check for this later, when actually
228
// XXX reading the entries.
229
FetcherOutput fo = new FetcherOutput();
230     fetcherReader.next(fo);
231     started = fo.getFetchDate();
232     LongWritable w = new LongWritable();
233     w.set(++size);
234     try {
235       while (fetcherReader.seek(w)) {
236         w.set(++size);
237       }
238     } catch (Throwable JavaDoc eof) {
239       // the file is truncated - probably due to a crashed fetcher.
240
// Use just the part that we can...
241
LOG.warning(" - data in segment " + dir + " is corrupt, using only " + size + " entries.");
242     }
243     // go back until you get a good entry
244
boolean ok = false;
245     int back = 0;
246     do {
247       try {
248         fetcherReader.seek(size - 2 - back);
249         fetcherReader.next(fo);
250         ok = true;
251       } catch (Throwable JavaDoc t) {
252         back++;
253       }
254     } while (!ok && back < 10);
255     if (back >= 10)
256       throw new Exception JavaDoc(" - fetcher output is unreadable");
257     if (back > 0) LOG.warning(" - fetcher output truncated by " + back + " to " + size);
258     size = size - back;
259     finished = fo.getFetchDate();
260     // reposition to the start
261
fetcherReader.reset();
262   }
263
264   /**
265    * Get a specified entry from the segment. Note: even if some of the storage objects
266    * are null, but if respective readers are open a seek(n) operation will be performed
267    * anyway, to ensure that the whole entry is valid.
268    *
269    * @param n position of the entry
270    * @param fo storage for FetcherOutput data. Must not be null.
271    * @param co storage for Content data, or null.
272    * @param pt storage for ParseText data, or null.
273    * @param pd storage for ParseData data, or null.
274    * @return true if all requested data successfuly read, false otherwise
275    * @throws IOException
276    */

277   public synchronized boolean get(long n, FetcherOutput fo, Content co,
278           ParseText pt, ParseData pd) throws IOException JavaDoc {
279     //XXX a trivial implementation would be to do the following:
280
//XXX seek(n);
281
//XXX return next(fo, co, pt, pd);
282
//XXX However, get(long, Writable) may be more optimized
283
boolean valid = true;
284     if (fetcherReader.get(n, fo) == null) valid = false;
285     if (contentReader != null) {
286       if (co != null) {
287         if (contentReader.get(n, co) == null) valid = false;
288       } else contentReader.seek(n);
289     }
290     if (parseTextReader != null) {
291       if (pt != null) {
292         if (parseTextReader.get(n, pt) == null) valid = false;
293       } else parseTextReader.seek(n);
294     }
295     if (parseDataReader != null) {
296       if (pd != null) {
297         if (parseDataReader.get(n, pd) == null) valid = false;
298       } else parseDataReader.seek(n);
299     }
300     key = n;
301     return valid;
302   }
303   
304   private Content _co = new Content();
305   private ParseText _pt = new ParseText();
306   private ParseData _pd = new ParseData();
307   
308   /** Read values from all open readers. Note: even if some of the storage objects
309    * are null, but if respective readers are open, an underlying next() operation will
310    * be performed for all streams anyway, to ensure that the whole entry is valid.
311    */

312   public synchronized boolean next(FetcherOutput fo, Content co,
313           ParseText pt, ParseData pd) throws IOException JavaDoc {
314     boolean valid = true;
315     Content rco = (co == null) ? _co : co;
316     ParseText rpt = (pt == null) ? _pt : pt;
317     ParseData rpd = (pd == null) ? _pd : pd;
318     if (fetcherReader.next(fo) == null) valid = false;
319     if (contentReader != null)
320       if (contentReader.next(rco) == null) valid = false;
321     if (parseTextReader != null)
322       if (parseTextReader.next(rpt) == null) valid = false;
323     if (parseDataReader != null)
324       if (parseDataReader.next(rpd) == null) valid = false;
325     key++;
326     return valid;
327   }
328   
329   /** Seek to a position in all readers. */
330   public synchronized void seek(long n) throws IOException JavaDoc {
331     fetcherReader.seek(n);
332     if (contentReader != null) contentReader.seek(n);
333     if (parseTextReader != null) parseTextReader.seek(n);
334     if (parseDataReader != null) parseDataReader.seek(n);
335     key = n;
336   }
337
338   /** Return the current key position. */
339   public long key() {
340     return key;
341   }
342
343   /** Reset all readers. */
344   public synchronized void reset() throws IOException JavaDoc {
345     fetcherReader.reset();
346     if (contentReader != null) contentReader.reset();
347     if (parseTextReader != null) parseTextReader.reset();
348     if (parseDataReader != null) parseDataReader.reset();
349   }
350
351   /** Close all readers. */
352   public synchronized void close() {
353     try {
354       fetcherReader.close();
355     } catch (Exception JavaDoc e) {};
356     if (contentReader != null) try {
357       contentReader.close();
358     } catch (Exception JavaDoc e) {};
359     if (parseTextReader != null) try {
360       parseTextReader.close();
361     } catch (Exception JavaDoc e) {};
362     if (parseDataReader != null) try {
363       parseDataReader.close();
364     } catch (Exception JavaDoc e) {};
365   }
366   
367   /**
368    * Dump the segment's content in human-readable format.
369    * @param sorted if true, sort segment entries by URL (ascending). If false,
370    * output entries in the order they occur in the segment.
371    * @param output where to dump to
372    * @throws Exception
373    */

374   public synchronized void dump(boolean sorted, PrintStream JavaDoc output) throws Exception JavaDoc {
375     reset();
376     FetcherOutput fo = new FetcherOutput();
377     Content co = new Content();
378     ParseData pd = new ParseData();
379     ParseText pt = new ParseText();
380     long recNo = 0L;
381     if (!sorted) {
382       while(next(fo, co, pt, pd)) {
383         output.println("Recno:: " + recNo++);
384         output.println("FetcherOutput::\n" + fo.toString());
385         if (contentReader != null)
386           output.println("Content::\n" + co.toString());
387         if (parseDataReader != null)
388           output.println("ParseData::\n" + pd.toString());
389         if (parseTextReader != null)
390           output.println("ParseText::\n" + pt.toString());
391         output.println("");
392       }
393     } else {
394       File JavaDoc unsortedFile = new File JavaDoc(segmentDir, ".unsorted");
395       File JavaDoc sortedFile = new File JavaDoc(segmentDir, ".sorted");
396       nfs.delete(unsortedFile);
397       nfs.delete(sortedFile);
398       SequenceFile.Writer seqWriter = new SequenceFile.Writer(nfs,
399               unsortedFile.toString(), UTF8.class, LongWritable.class);
400       FetchListEntry fle;
401       LongWritable rec = new LongWritable();
402       UTF8 url = new UTF8();
403       String JavaDoc urlString;
404       while (fetcherReader.next(fo) != null) {
405         fle = fo.getFetchListEntry();
406         urlString = fle.getPage().getURL().toString();
407         rec.set(recNo);
408         url.set(urlString);
409         seqWriter.append(url, rec);
410         recNo++;
411       }
412       seqWriter.close();
413       // sort the SequenceFile
414
long start = System.currentTimeMillis();
415
416       SequenceFile.Sorter sorter = new SequenceFile.Sorter(nfs,
417               new UTF8.Comparator(), LongWritable.class);
418
419       sorter.sort(unsortedFile.toString(), sortedFile.toString());
420
421       float localSecs = (System.currentTimeMillis() - start) / 1000.0f;
422       LOG.info(" - sorted: " + recNo + " entries in " + localSecs + "s, "
423         + (recNo/localSecs) + " entries/s");
424
425       nfs.delete(unsortedFile);
426       SequenceFile.Reader seqReader = new SequenceFile.Reader(nfs, sortedFile.toString());
427       while (seqReader.next(url, rec)) {
428         recNo = rec.get();
429         get(recNo, fo, co, pt, pd);
430         output.println("Recno:: " + recNo++);
431         output.println("FetcherOutput::\n" + fo.toString());
432         if (contentReader != null)
433           output.println("Content::\n" + co.toString());
434         if (parseDataReader != null)
435           output.println("ParseData::\n" + pd.toString());
436         if (parseTextReader != null)
437           output.println("ParseText::\n" + pt.toString());
438         output.println("");
439       }
440       seqReader.close();
441       nfs.delete(sortedFile);
442     }
443   }
444
445   /** Command-line wrapper. Run without arguments to see usage help. */
446   public static void main(String JavaDoc[] args) throws Exception JavaDoc {
447     if (args.length == 0) {
448       usage();
449       return;
450     }
451     SegmentReader reader = null;
452     NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0);
453     String JavaDoc segDir = null;
454     Vector JavaDoc dirs = new Vector JavaDoc();
455     boolean fix = false;
456     boolean list = false;
457     boolean dump = false;
458     boolean sorted = false;
459     boolean withParseText = true;
460     boolean withParseData = true;
461     boolean withContent = true;
462     for (int i = 0; i < args.length; i++) {
463       if (args[i] != null) {
464         if (args[i].equals("-noparsetext")) withParseText = false;
465         else if (args[i].equals("-noparsedata")) withParseData = false;
466         else if (args[i].equals("-nocontent")) withContent = false;
467         else if (args[i].equals("-fix")) fix = true;
468         else if (args[i].equals("-dump")) dump = true;
469         else if (args[i].equals("-dumpsort")) {
470           dump = true;
471           sorted = true;
472         } else if (args[i].equals("-list")) list = true;
473         else if (args[i].equals("-dir")) segDir = args[++i];
474         else dirs.add(new File JavaDoc(args[i]));
475       }
476     }
477     if (segDir != null) {
478       File JavaDoc sDir = new File JavaDoc(segDir);
479       if (!sDir.exists() || !sDir.isDirectory()) {
480         LOG.warning("Invalid path: " + sDir);
481       } else {
482         File JavaDoc[] files = sDir.listFiles(new FileFilter JavaDoc() {
483           public boolean accept(File JavaDoc f) {
484             return f.isDirectory();
485           }
486         });
487         if (files != null && files.length > 0) {
488           for (int i = 0; i < files.length; i++) dirs.add(files[i]);
489         }
490       }
491     }
492     if (dirs.size() == 0) {
493       LOG.severe("No input segment dirs.");
494       usage();
495       return;
496     }
497     long total = 0L;
498     int cnt = 0;
499     SimpleDateFormat JavaDoc sdf = new SimpleDateFormat JavaDoc("yyyyMMdd'-'HH:mm:ss");
500     DecimalFormat JavaDoc df = new DecimalFormat JavaDoc("########");
501     df.setParseIntegerOnly(true);
502     if (list)
503       LOG.info("PARSED?\tSTARTED\t\t\tFINISHED\t\tCOUNT\tDIR NAME");
504     for (int i = 0; i < dirs.size(); i++) {
505       File JavaDoc dir = (File JavaDoc)dirs.get(i);
506       try {
507         reader = new SegmentReader(nfs, dir,
508               withContent, withParseText, withParseData, fix);
509         if (list) {
510           LOG.info(reader.isParsed +
511                   "\t" + sdf.format(new Date JavaDoc(reader.started)) +
512                   "\t" + sdf.format(new Date JavaDoc(reader.finished)) +
513                   "\t" + df.format(reader.size) +
514                   "\t" + dir);
515         }
516         total += reader.size;
517         cnt++;
518         if (dump) reader.dump(sorted, System.out);
519       } catch (Throwable JavaDoc t) {
520         LOG.warning(t.getMessage());
521       }
522     }
523     if (list)
524       LOG.info("TOTAL: " + total + " entries in " + cnt + " segments.");
525   }
526   
527   private static void usage() {
528     System.err.println("SegmentReader [-fix] [-dump] [-dumpsort] [-list] [-nocontent] [-noparsedata] [-noparsetext] (-dir segments | seg1 seg2 ...)");
529     System.err.println("\tNOTE: at least one segment dir name is required, or '-dir' option.");
530     System.err.println("\t-fix\t\tautomatically fix corrupted segments");
531     System.err.println("\t-dump\t\tdump segment data in human-readable format");
532     System.err.println("\t-dumpsort\tdump segment data in human-readable format, sorted by URL");
533     System.err.println("\t-list\t\tprint useful information about segments");
534     System.err.println("\t-nocontent\tignore content data");
535     System.err.println("\t-noparsedata\tignore parse_data data");
536     System.err.println("\t-nocontent\tignore parse_text data");
537     System.err.println("\t-dir segments\tdirectory containing multiple segments");
538     System.err.println("\tseg1 seg2 ...\tsegment directories\n");
539   }
540 }
541
Popular Tags