KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > tools > ParseSegment


1 /* Copyright (c) 2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.tools;
5
6 import net.nutch.pagedb.FetchListEntry;
7 import net.nutch.io.*;
8 import net.nutch.fs.*;
9 import net.nutch.util.*;
10 import net.nutch.protocol.*;
11 import net.nutch.parse.*;
12 import net.nutch.plugin.*;
13
14 import net.nutch.fetcher.FetcherOutput;
15
16 import java.io.EOFException JavaDoc;
17 import java.io.File JavaDoc;
18 import java.io.DataInput JavaDoc;
19 import java.io.DataOutput JavaDoc;
20 import java.io.IOException JavaDoc;
21
22 import java.util.Properties JavaDoc;
23 import java.util.logging.*;
24
25 /**
26  * Parse contents in one segment.
27  *
28  * <p>
29  * It assumes, under given segment, existence of ./fetcher_output/,
30  * which is typically generated after a non-parsing fetcher run
31  * (i.e., fetcher is started with option -noParsing).
32  *
33  * <p> Contents in one segemnt are parsed and saved in these steps:
34  * <li> (1) ./fetcher_output/ and ./content/ are looped together
35  * (possibly by multiple ParserThreads), and content is parsed for each entry.
36  * The entry number and resultant ParserOutput are saved in ./parser.unsorted.
37  * <li> (2) ./parser.unsorted is sorted by entry number, result saved as
38  * ./parser.sorted.
39  * <li> (3) ./parser.sorted and ./fetcher_output/ are looped together.
40  * At each entry, ParserOutput is split into ParseDate and ParseText,
41  * which are saved in ./parse_data/ and ./parse_text/ respectively. Also
42  * updated is FetcherOutput with parsing status, which is saved in ./fetcher/.
43  *
44  * <p> In the end, ./fetcher/ should be identical to one resulted from
45  * fetcher run WITHOUT option -noParsing.
46  *
47  * <p> By default, intermediates ./parser.unsorted and ./parser.sorted
48  * are removed at the end, unless option -noClean is used. However
49  * ./fetcher_output/ is kept intact.
50  *
51  * <p> Check Fetcher.java and FetcherOutput.java for further discussion.
52  *
53  * @author John Xing
54  */

55
56 public class ParseSegment {
57
58   public static final Logger LOG =
59     LogFormatter.getLogger(ParseSegment.class.getName());
60
61   private int threadCount = // max number of threads
62
NutchConf.getInt("parser.threads.parse", 10);
63
64   private NutchFileSystem nfs;
65
66   // segment dir
67
private String JavaDoc directory;
68
69   // readers for FetcherOutput (no-parsing) and Content
70
private ArrayFile.Reader fetcherNPReader;
71   private ArrayFile.Reader contentReader;
72
73   // SequenceFile (unsorted) for ParserOutput
74
private File JavaDoc unsortedFile;
75   private SequenceFile.Writer parserOutputWriter;
76
77   // SequenceFile (sorted) for ParserOutput
78
private File JavaDoc sortedFile;
79
80   // whether dryRun only (i.e., no real parsing is done)
81
private boolean dryRun = false;
82
83   // whether clean intermediate files
84
private boolean clean = true;
85
86   // entry (record number) in fetcherNPReader (same in contentReader)
87
private long entry = -1;
88
89   // for stats
90
private long start; // start time
91
private long bytes; // total bytes parsed
92
private int pages; // total pages parsed
93
private int errors; // total pages errored
94

95   private ThreadGroup JavaDoc group = new ThreadGroup JavaDoc("parser"); // our thread group
96

97   /**
98    * Inner class ParserThread
99    */

100   private class ParserThread extends Thread JavaDoc {
101
102     // current entry that this thread is parsing
103
private long myEntry = -1;
104
105     // for detailed stats
106
private long t0,t1,t2,t3,t4,t5;
107
108     public ParserThread() { super(group, "myThread"); }
109
110     /**
111      * This thread participates in looping through
112      * entries of FetcherOutput and Content
113      */

114     public void run() {
115
116       FetcherOutput fetcherOutput = new FetcherOutput();
117       Content content = new Content();
118
119       FetchListEntry fle = null;
120       String JavaDoc url = null;
121
122       while (true) {
123         if (LogFormatter.hasLoggedSevere()) // something bad happened
124
break; // exit
125

126         t0 = System.currentTimeMillis();
127
128         try {
129
130           // must be read in order! thus synchronize threads.
131
synchronized (ParseSegment.this) {
132             t1 = System.currentTimeMillis();
133
134             try {
135               if (fetcherNPReader.next(fetcherOutput) == null ||
136                 contentReader.next(content) == null)
137               return;
138             } catch (EOFException JavaDoc eof) {
139               // only partial data available, stop this thread,
140
// other threads will be stopped also.
141
return;
142             }
143
144             entry++;
145             myEntry = entry;
146             if (LOG.isLoggable(Level.FINE))
147               LOG.fine("Read in entry "+entry);
148
149             // safe guard against mismatched files
150
//if (entry != fetcherNPReader.key() ||
151
// entry != contentReader.key()) {
152
// LOG.severe("Mismatched entries under "
153
// + FetcherOutput.DIR_NAME_NP + " and " + Content.DIR_NAME);
154
// continue;
155
//}
156
}
157
158           t2 = System.currentTimeMillis();
159
160           fle = fetcherOutput.getFetchListEntry();
161           url = fle.getPage().getURL().toString();
162
163           LOG.fine("parsing " + url); // parse the page
164

165           // safe guard against mismatched files
166
if (!url.equals(content.getUrl())) {
167             LOG.severe("Mismatched entries under "
168               + FetcherOutput.DIR_NAME_NP + " and " + Content.DIR_NAME);
169             continue;
170           }
171
172           // if fetch was successful or
173
// previously unable to parse (so try again)
174
if (fetcherOutput.getStatus() == FetcherOutput.SUCCESS ||
175               fetcherOutput.getStatus() == FetcherOutput.CANT_PARSE) {
176             handleContent(url, content);
177             synchronized (ParseSegment.this) {
178               pages++; // record successful parse
179
bytes += content.getContent().length;
180               if ((pages % 100) == 0)
181                 status();
182             }
183           } else {
184             // errored at fetch step
185
logError(url, new ProtocolException("Error at fetch stage"));
186             handleNoContent(ParserOutput.NOFETCH);
187           }
188
189         } catch (ParseException e) {
190           logError(url, e);
191           handleNoContent(ParserOutput.FAILURE);
192
193         } catch (Throwable JavaDoc t) { // an unchecked exception
194
if (fle != null) {
195             logError(url, t);
196             handleNoContent(ParserOutput.UNKNOWN);
197           } else {
198             LOG.severe("Unexpected exception");
199           }
200         }
201       }
202     }
203
204     private void logError(String JavaDoc url, Throwable JavaDoc t) {
205       LOG.info("parse of " + url + " failed with: " + t);
206       if (LOG.isLoggable(Level.FINE))
207         LOG.log(Level.FINE, "stack", t); // stack trace
208
synchronized (ParseSegment.this) { // record failure
209
errors++;
210       }
211     }
212
213     private void handleContent(String JavaDoc url, Content content)
214       throws ParseException {
215
216       //String contentType = content.getContentType();
217
String JavaDoc contentType = content.getMetadata().getProperty("Content-Type");
218
219       if (ParseSegment.this.dryRun) {
220         LOG.info("To be handled as Content-Type: "+contentType);
221         return;
222       }
223
224       Parser parser = ParserFactory.getParser(contentType, url);
225       Parse parse = parser.getParse(content);
226
227       outputPage
228         (new ParseText(parse.getText()), parse.getData(),ParserOutput.SUCCESS);
229     }
230
231     private void handleNoContent(int status) {
232       if (ParseSegment.this.dryRun) {
233         LOG.info("To be handled as no content");
234         return;
235       }
236       outputPage(new ParseText(""),
237                  new ParseData("", new Outlink[0], new Properties JavaDoc()),
238                  status);
239     }
240       
241     private void outputPage
242       (ParseText parseText, ParseData parseData, int status) {
243       try {
244         t3 = System.currentTimeMillis();
245         synchronized (parserOutputWriter) {
246           t4 = System.currentTimeMillis();
247           parserOutputWriter.append(new LongWritable(myEntry),
248             new ParserOutput(parseData, parseText, status));
249           t5 = System.currentTimeMillis();
250           if (LOG.isLoggable(Level.FINE))
251             LOG.fine("Entry: "+myEntry
252               +" "+parseData.getMetadata().getProperty("Content-Length")
253               +" wait="+(t1-t0) +" read="+(t2-t1) +" parse="+(t3-t2)
254               +" wait="+(t4-t3) +" write="+(t5-t4) +"ms");
255         }
256       } catch (Throwable JavaDoc t) {
257         LOG.severe("error writing output:" + t.toString());
258       }
259     }
260
261   }
262
263   /**
264    * Inner class ParserOutput: ParseData + ParseText + status
265    */

266   private class ParserOutput extends VersionedWritable {
267     public static final String JavaDoc DIR_NAME = "parser";
268
269     private final static byte VERSION = 1;
270
271     // could be more detailed
272
public final static byte UNKNOWN = (byte)0; // unknown problem in parsing
273
public final static byte SUCCESS = (byte)1; // parsing succeeded
274
public final static byte FAILURE = (byte)2; // parsing failed
275
public final static byte NOFETCH = (byte)3; // fetch was not a SUCCESS
276

277     private int status;
278
279     private ParseData parseData = new ParseData();
280     private ParseText parseText = new ParseText();
281
282     public ParserOutput() {}
283     
284     public ParserOutput(ParseData parseData, ParseText parseText, int status) {
285       this.parseData = parseData;
286       this.parseText = parseText;
287       this.status = status;
288     }
289
290     public byte getVersion() { return VERSION; }
291
292     public ParseData getParseData() {
293       return this.parseData;
294     }
295
296     public ParseText getParseText() {
297       return this.parseText;
298     }
299
300     public int getStatus() {
301       return this.status;
302     }
303
304     public final void readFields(DataInput JavaDoc in) throws IOException JavaDoc {
305       super.readFields(in); // check version
306
status = in.readByte();
307       parseData.readFields(in);
308       parseText.readFields(in);
309       return;
310     }
311
312     public final void write(DataOutput JavaDoc out) throws IOException JavaDoc {
313       super.write(out); // write version
314
out.writeByte(status);
315       parseData.write(out);
316       parseText.write(out);
317       return;
318     }
319   }
320             
321   /**
322    * ParseSegment constructor
323    */

324   public ParseSegment(NutchFileSystem nfs, String JavaDoc directory, boolean dryRun)
325     throws IOException JavaDoc {
326
327     File JavaDoc file;
328
329     this.nfs = nfs;
330     this.directory = directory;
331     this.dryRun = dryRun;
332
333     // FetcherOutput.DIR_NAME_NP must exist
334
file = new File JavaDoc(directory, FetcherOutput.DIR_NAME_NP);
335     if (!nfs.exists(file))
336       throw new IOException JavaDoc("Directory missing: "+FetcherOutput.DIR_NAME_NP);
337
338     if (dryRun)
339       return;
340
341     // clean old FetcherOutput.DIR_NAME
342
file = new File JavaDoc(directory, FetcherOutput.DIR_NAME);
343     if (nfs.exists(file)) {
344       LOG.info("Deleting old "+file.getName());
345       nfs.delete(file);
346     }
347
348     // clean old unsortedFile
349
this.unsortedFile = new File JavaDoc(directory, ParserOutput.DIR_NAME+".unsorted");
350     if (nfs.exists(this.unsortedFile)) {
351       LOG.info("Deleting old "+this.unsortedFile.getName());
352       nfs.delete(this.unsortedFile);
353     }
354
355     // clean old sortedFile
356
this.sortedFile = new File JavaDoc(directory, ParserOutput.DIR_NAME+".sorted");
357     if (nfs.exists(this.sortedFile)) {
358       LOG.info("Deleting old "+this.sortedFile.getName());
359       nfs.delete(this.sortedFile);
360     }
361
362     // clean old ParseData.DIR_NAME
363
file = new File JavaDoc(directory, ParseData.DIR_NAME);
364     if (nfs.exists(file)) {
365       LOG.info("Deleting old "+file.getName());
366       nfs.delete(file);
367     }
368
369     // clean old ParseText.DIR_NAME
370
file = new File JavaDoc(directory, ParseText.DIR_NAME);
371     if (nfs.exists(file)) {
372       LOG.info("Deleting old "+file.getName());
373       nfs.delete(file);
374     }
375
376   }
377
378   /** Set thread count */
379   public void setThreadCount(int threadCount) {
380     this.threadCount=threadCount;
381   }
382
383   /** Set the logging level. */
384   public static void setLogLevel(Level level) {
385     LOG.setLevel(level);
386     PluginRepository.LOG.setLevel(level);
387     ParserFactory.LOG.setLevel(level);
388     LOG.info("logging at " + level);
389   }
390
391   /** Set if clean intermediates. */
392   public void setClean(boolean clean) {
393     this.clean = clean;
394   }
395
396   /** Display the status of the parser run. */
397   public void status() {
398     long ms = System.currentTimeMillis() - start;
399     LOG.info("status: "
400              + pages + " pages, "
401              + errors + " errors, "
402              + bytes + " bytes, "
403              + ms + " ms");
404     LOG.info("status: "
405              + (((float)pages)/(ms/1000.0f))+" pages/s, "
406              + (((float)bytes*8/1024)/(ms/1000.0f))+" kb/s, "
407              + (((float)bytes)/pages) + " bytes/page");
408   }
409
410   /** Parse contents by multiple threads and save as unsorted ParserOutput */
411   public void parse() throws IOException JavaDoc, InterruptedException JavaDoc {
412
413     fetcherNPReader = new ArrayFile.Reader
414       (nfs, (new File JavaDoc(directory, FetcherOutput.DIR_NAME_NP)).getPath());
415     contentReader = new ArrayFile.Reader
416       (nfs, (new File JavaDoc(directory, Content.DIR_NAME)).getPath());
417
418     if (!this.dryRun) {
419       parserOutputWriter = new SequenceFile.Writer
420         (nfs, unsortedFile.getPath(), LongWritable.class, ParserOutput.class);
421     }
422
423     start = System.currentTimeMillis();
424
425     for (int i = 0; i < threadCount; i++) { // spawn threads
426
ParserThread thread = new ParserThread();
427       thread.start();
428     }
429
430     do {
431       Thread.sleep(1000);
432
433       if (LogFormatter.hasLoggedSevere())
434         throw new RuntimeException JavaDoc("SEVERE error logged. Exiting parser.");
435
436     } while (group.activeCount() > 0); // wait for threads to finish
437

438     fetcherNPReader.close();
439     contentReader.close();
440     if (!this.dryRun)
441       parserOutputWriter.close();
442
443     status(); // print final status
444
}
445
446   /** Sort ParserOutput */
447   public void sort() throws IOException JavaDoc {
448
449     if (this.dryRun)
450       return;
451
452     LOG.info("Sorting ParserOutput");
453
454     start = System.currentTimeMillis();
455
456     SequenceFile.Sorter sorter = new SequenceFile.Sorter
457       (nfs, new LongWritable.Comparator(), ParserOutput.class);
458
459     sorter.sort(unsortedFile.getPath(), sortedFile.getPath());
460
461     double localSecs = (System.currentTimeMillis() - start) / 1000.0;
462     LOG.info("Sorted: " + (pages+errors) + " entries in " + localSecs + "s, "
463       + ((pages+errors)/localSecs) + " entries/s");
464
465     if (this.clean) {
466       LOG.info("Deleting intermediate "+unsortedFile.getName());
467       nfs.delete(unsortedFile);
468     }
469
470     return;
471   }
472
473   /**
474    * Split sorted ParserOutput into ParseData and ParseText,
475    * and generate new FetcherOutput with updated status
476    */

477   public void save() throws IOException JavaDoc {
478
479     if (this.dryRun)
480       return;
481
482     LOG.info("Saving ParseData and ParseText separately");
483
484     start = System.currentTimeMillis();
485
486     SequenceFile.Reader parserOutputReader
487       = new SequenceFile.Reader(nfs, sortedFile.getPath());
488
489     ArrayFile.Reader fetcherNPReader = new ArrayFile.Reader(nfs,
490       (new File JavaDoc(directory, FetcherOutput.DIR_NAME_NP)).getPath());
491
492     ArrayFile.Writer fetcherWriter = new ArrayFile.Writer(nfs,
493       (new File JavaDoc(directory, FetcherOutput.DIR_NAME)).getPath(),
494       FetcherOutput.class);
495
496     ArrayFile.Writer parseDataWriter = new ArrayFile.Writer(nfs,
497       (new File JavaDoc(directory, ParseData.DIR_NAME)).getPath(), ParseData.class);
498     ArrayFile.Writer parseTextWriter = new ArrayFile.Writer(nfs,
499       (new File JavaDoc(directory, ParseText.DIR_NAME)).getPath(), ParseText.class);
500
501     try {
502       LongWritable key = new LongWritable();
503       ParserOutput val = new ParserOutput();
504       FetcherOutput fo = new FetcherOutput();
505       int count = 0;
506       int status;
507       while (parserOutputReader.next(key,val)) {
508         fetcherNPReader.next(fo);
509         // safe guarding
510
if (fetcherNPReader.key() != key.get())
511           throw new IOException JavaDoc("Mismatch between entries under "
512             + FetcherOutput.DIR_NAME_NP + " and in " + sortedFile.getName());
513         // reset status in fo (FetcherOutput), using status in ParserOutput
514
switch (val.getStatus()) {
515         case ParserOutput.SUCCESS:
516           fo.setStatus(FetcherOutput.SUCCESS);
517           break;
518         case ParserOutput.UNKNOWN:
519         case ParserOutput.FAILURE:
520           fo.setStatus(FetcherOutput.CANT_PARSE);
521           break;
522         case ParserOutput.NOFETCH:
523         default:
524           // do not reset
525
}
526         fetcherWriter.append(fo);
527         parseDataWriter.append(val.getParseData());
528         parseTextWriter.append(val.getParseText());
529         count++;
530       }
531       // safe guard! make sure there are identical entries
532
// in (fetcher, content) and in (parseData, parseText)
533
if (count != (pages+errors))
534         throw new IOException JavaDoc("Missing entries: expect "+(pages+errors)
535           +", but have "+count+" entries instead.");
536     } finally {
537       fetcherNPReader.close();
538       fetcherWriter.close();
539       parseDataWriter.close();
540       parseTextWriter.close();
541       parserOutputReader.close();
542     }
543
544     double localSecs = (System.currentTimeMillis() - start) / 1000.0;
545     LOG.info("Saved: " + (pages+errors) + " entries in " + localSecs + "s, "
546       + ((pages+errors)/localSecs) + " entries/s");
547
548     if (this.clean) {
549       LOG.info("Deleting intermediate "+sortedFile.getName());
550       nfs.delete(sortedFile);
551     }
552
553     return;
554   }
555
556   /** main method */
557   public static void main(String JavaDoc[] args) throws Exception JavaDoc {
558     int threadCount = -1;
559     boolean showThreadID = false;
560     boolean dryRun = false;
561     String JavaDoc logLevel = "info";
562     boolean clean = true;
563     String JavaDoc directory = null;
564
565     String JavaDoc usage = "Usage: ParseSegment (-local | -ndfs <namenode:port>) [-threads n] [-showThreadID] [-dryRun] [-logLevel level] [-noClean] dir";
566
567     if (args.length == 0) {
568       System.err.println(usage);
569       System.exit(-1);
570     }
571       
572     // parse command line
573
NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0);
574
575     for (int i = 0; i < args.length; i++) {
576       if (args[i] == null) {
577           continue;
578       } else if (args[i].equals("-threads")) {
579         threadCount = Integer.parseInt(args[++i]);
580       } else if (args[i].equals("-showThreadID")) {
581         showThreadID = true;
582       } else if (args[i].equals("-dryRun")) {
583         dryRun = true;
584       } else if (args[i].equals("-logLevel")) {
585         logLevel = args[++i];
586       } else if (args[i].equals("-noClean")) {
587         clean = false;
588       } else {
589         directory = args[i];
590       }
591     }
592
593     try {
594
595       ParseSegment parseSegment = new ParseSegment(nfs, directory, dryRun);
596
597       parseSegment.setLogLevel
598         (Level.parse((new String JavaDoc(logLevel)).toUpperCase()));
599
600       if (threadCount != -1)
601         parseSegment.setThreadCount(threadCount);
602       if (showThreadID)
603         LogFormatter.setShowThreadIDs(showThreadID);
604
605       parseSegment.setClean(clean);
606
607       parseSegment.parse();
608       parseSegment.sort();
609       parseSegment.save();
610
611     } finally {
612       nfs.close();
613     }
614
615   }
616 }
617
Popular Tags