KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > tools > SegmentMergeTool


1 /* Copyright (c) 2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.tools;
5
6 import java.io.File JavaDoc;
7 import java.io.FileFilter JavaDoc;
8 import java.util.ArrayList JavaDoc;
9 import java.util.Arrays JavaDoc;
10 import java.util.HashMap JavaDoc;
11 import java.util.Iterator JavaDoc;
12 import java.util.List JavaDoc;
13 import java.util.Vector JavaDoc;
14 import java.util.logging.Logger JavaDoc;
15
16 import net.nutch.fetcher.FetcherOutput;
17 import net.nutch.indexer.IndexSegment;
18 import net.nutch.io.MD5Hash;
19 import net.nutch.fs.*;
20 import net.nutch.parse.ParseData;
21 import net.nutch.parse.ParseText;
22 import net.nutch.protocol.Content;
23 import net.nutch.segment.SegmentReader;
24 import net.nutch.segment.SegmentWriter;
25 import net.nutch.util.LogFormatter;
26
27 import org.apache.lucene.analysis.WhitespaceAnalyzer;
28 import org.apache.lucene.document.DateField;
29 import org.apache.lucene.document.Document;
30 import org.apache.lucene.document.Field;
31 import org.apache.lucene.index.IndexReader;
32 import org.apache.lucene.index.IndexWriter;
33 import org.apache.lucene.index.Term;
34 import org.apache.lucene.index.TermDocs;
35 import org.apache.lucene.index.TermEnum;
36
37 /**
38  * This class cleans up accumulated segments data, and merges them into a single
39  * (or optionally multiple) segment(s), with no duplicates in it.
40  *
41  * <p>
42  * There are no prerequisites for its correct
43  * operation except for a set of already fetched segments (they don't have to
44  * contain parsed content, only fetcher output is required). This tool does not
45  * use DeleteDuplicates, but creates its own "master" index of all pages in all
46  * segments. Then it walks sequentially through this index and picks up only
47  * most recent versions of pages for every unique value of url or hash.
48  * </p>
49  * <p>If some of the input segments are corrupted, this tool will attempt to
50  * repair them, using
51  * {@link net.nutch.segment.SegmentReader#fixSegment(NutchFileSystem, File, boolean, boolean, boolean, boolean)} method.</p>
52  * <p>Output segment can be optionally split on the fly into several segments of fixed
53  * length.</p>
54  * <p>
55  * The newly created segment(s) can be then optionally indexed, so that it can be
56  * either merged with more new segments, or used for searching as it is.
57  * </p>
58  * <p>
59  * Old segments may be optionally removed, because all needed data has already
60  * been copied to the new merged segment. NOTE: this tool will remove also all
61  * corrupted input segments, which are not useable anyway - however, this option
62  * may be dangerous if you inadvertently included non-segment directories as
63  * input...</p>
64  * <p>
65  * You may want to run SegmentMergeTool instead of following the manual procedures,
66  * with all options turned on, i.e. to merge segments into the output segment(s),
67  * index it, and then delete the original segments data.
68  * </p>
69  *
70  * @author Andrzej Bialecki &lt;ab@getopt.org&gt;
71  */

72 public class SegmentMergeTool implements Runnable JavaDoc {
73
74   public static final Logger JavaDoc LOG = LogFormatter.getLogger("net.nutch.tools.SegmentMergeTool");
75
76   /** Log progress update every LOG_STEP items. */
77   public static int LOG_STEP = 20000;
78   /** Temporary de-dup index size. Larger indexes tend to slow down indexing.
79    * Too many indexes slow down the subsequent index merging. It's a tradeoff value...
80    */

81   public static int INDEX_SIZE = 250000;
82   public static int INDEX_MERGE_FACTOR = 30;
83   public static int INDEX_MIN_MERGE_DOCS = 100;
84   
85   private NutchFileSystem nfs = null;
86   private File JavaDoc[] segments = null;
87   private int stage = SegmentMergeStatus.STAGE_OPENING;
88   private long totalRecords = 0L;
89   private long processedRecords = 0L;
90   private long start = 0L;
91   private long maxCount = Long.MAX_VALUE;
92   private File JavaDoc output = null;
93   private List JavaDoc segdirs = null;
94   private List JavaDoc allsegdirs = null;
95   private boolean runIndexer = false;
96   private boolean delSegs = false;
97   private HashMap JavaDoc readers = new HashMap JavaDoc();
98
99   /**
100    * Create a SegmentMergeTool.
101    * @param nfs filesystem
102    * @param segments list of input segments
103    * @param output output directory, where output segments will be created
104    * @param maxCount maximum number of records per output segment. If this
105    * value is 0, then the default value {@link Long#MAX_VALUE} is used.
106    * @param runIndexer run indexer on output segment(s)
107    * @param delSegs delete input segments when finished
108    * @throws Exception
109    */

110   public SegmentMergeTool(NutchFileSystem nfs, File JavaDoc[] segments, File JavaDoc output, long maxCount, boolean runIndexer, boolean delSegs) throws Exception JavaDoc {
111     this.nfs = nfs;
112     this.segments = segments;
113     this.runIndexer = runIndexer;
114     this.delSegs = delSegs;
115     if (maxCount > 0) this.maxCount = maxCount;
116     allsegdirs = Arrays.asList(segments);
117     this.output = output;
118     if (nfs.exists(output)) {
119       if (!nfs.isDirectory(output))
120         throw new Exception JavaDoc("Output is not a directory: " + output);
121     } else nfs.mkdirs(output);
122   }
123
124   public static class SegmentMergeStatus {
125     public static final int STAGE_OPENING = 0;
126     public static final int STAGE_MASTERIDX = 1;
127     public static final int STAGE_MERGEIDX = 2;
128     public static final int STAGE_DEDUP = 3;
129     public static final int STAGE_WRITING = 4;
130     public static final int STAGE_INDEXING = 5;
131     public static final int STAGE_DELETING = 6;
132     public static final String JavaDoc[] stages = {
133             "opening input segments",
134             "creating master index",
135             "merging sub-indexes",
136             "deduplicating",
137             "writing output segment(s)",
138             "indexing output segment(s)",
139             "deleting input segments"
140     };
141     public int stage;
142     public File JavaDoc[] inputSegments;
143     public long startTime, curTime;
144     public long totalRecords;
145     public long processedRecords;
146     
147     public SegmentMergeStatus() {};
148     
149     public SegmentMergeStatus(int stage, File JavaDoc[] inputSegments, long startTime,
150             long totalRecords, long processedRecords) {
151       this.stage = stage;
152       this.inputSegments = inputSegments;
153       this.startTime = startTime;
154       this.curTime = System.currentTimeMillis();
155       this.totalRecords = totalRecords;
156       this.processedRecords = processedRecords;
157     }
158   }
159   
160   public SegmentMergeStatus getStatus() {
161     SegmentMergeStatus status = new SegmentMergeStatus(stage, segments, start,
162             totalRecords, processedRecords);
163     return status;
164   }
165   
166   /** Run the tool, periodically reporting progress. */
167   public void run() {
168     start = System.currentTimeMillis();
169     stage = SegmentMergeStatus.STAGE_OPENING;
170     long delta;
171     LOG.info("* Opening " + allsegdirs.size() + " segments:");
172     try {
173       segdirs = new ArrayList JavaDoc();
174       // open all segments
175
for (int i = 0; i < allsegdirs.size(); i++) {
176         File JavaDoc dir = (File JavaDoc) allsegdirs.get(i);
177         SegmentReader sr = null;
178         try {
179           // try to autofix it if corrupted...
180
sr = new SegmentReader(nfs, dir, true);
181         } catch (Exception JavaDoc e) {
182           // this segment is hosed beyond repair, don't use it
183
continue;
184         }
185         segdirs.add(dir);
186         totalRecords += sr.size;
187         LOG.info(" - segment " + dir.getName() + ": " + sr.size + " records.");
188         readers.put(dir.getName(), sr);
189       }
190       long total = totalRecords;
191       LOG.info("* TOTAL " + total + " input records in " + segdirs.size() + " segments.");
192       LOG.info("* Creating master index...");
193       stage = SegmentMergeStatus.STAGE_MASTERIDX;
194       // XXX Note that Lucene indexes don't work with NutchFileSystem for now.
195
// XXX For now always assume LocalFileSystem here...
196
Vector JavaDoc masters = new Vector JavaDoc();
197       File JavaDoc fsmtIndexDir = new File JavaDoc(output, ".fastmerge_index");
198       File JavaDoc masterDir = new File JavaDoc(fsmtIndexDir, "0");
199       if (!masterDir.mkdirs()) {
200         LOG.severe("Could not create a master index dir: " + masterDir);
201         return;
202       }
203       masters.add(masterDir);
204       IndexWriter iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true);
205       iw.setUseCompoundFile(false);
206       iw.mergeFactor = INDEX_MERGE_FACTOR;
207       iw.minMergeDocs = INDEX_MIN_MERGE_DOCS;
208       long s1 = System.currentTimeMillis();
209       Iterator JavaDoc it = readers.values().iterator();
210       processedRecords = 0L;
211       delta = System.currentTimeMillis();
212       while (it.hasNext()) {
213         SegmentReader sr = (SegmentReader) it.next();
214         String JavaDoc name = sr.segmentDir.getName();
215         FetcherOutput fo = new FetcherOutput();
216         for (long i = 0; i < sr.size; i++) {
217           try {
218             if (!sr.get(i, fo, null, null, null)) break;
219
220             Document doc = new Document();
221             doc.add(new Field("sd", name + "|" + i, true, false, false));
222             doc.add(new Field("uh", MD5Hash.digest(fo.getUrl().toString()).toString(), true, true, false));
223             doc.add(new Field("ch", fo.getMD5Hash().toString(), true, true, false));
224             doc.add(new Field("time", DateField.timeToString(fo.getFetchDate()), true, false, false));
225             iw.addDocument(doc);
226             processedRecords++;
227             if (processedRecords > 0 && (processedRecords % LOG_STEP == 0)) {
228               LOG.info(" Processed " + processedRecords + " records (" +
229                       (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)");
230               delta = System.currentTimeMillis();
231             }
232             if (processedRecords > 0 && (processedRecords % INDEX_SIZE == 0)) {
233               iw.optimize();
234               iw.close();
235               LOG.info(" - creating next subindex...");
236               masterDir = new File JavaDoc(fsmtIndexDir, "" + masters.size());
237               if (!masterDir.mkdirs()) {
238                 LOG.severe("Could not create a master index dir: " + masterDir);
239                 return;
240               }
241               masters.add(masterDir);
242               iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true);
243               iw.setUseCompoundFile(false);
244               iw.mergeFactor = INDEX_MERGE_FACTOR;
245               iw.minMergeDocs = INDEX_MIN_MERGE_DOCS;
246             }
247           } catch (Throwable JavaDoc t) {
248             // we can assume the data is invalid from now on - break here
249
LOG.info(" - segment " + name + " truncated to " + (i + 1) + " records");
250             break;
251           }
252         }
253       }
254       iw.optimize();
255       LOG.info("* Creating index took " + (System.currentTimeMillis() - s1) + " ms");
256       s1 = System.currentTimeMillis();
257       // merge all other indexes using the latest IndexWriter (still open):
258
if (masters.size() > 1) {
259         LOG.info(" - merging subindexes...");
260         stage = SegmentMergeStatus.STAGE_MERGEIDX;
261         IndexReader[] ireaders = new IndexReader[masters.size() - 1];
262         for (int i = 0; i < masters.size() - 1; i++) ireaders[i] = IndexReader.open((File JavaDoc)masters.get(i));
263         iw.addIndexes(ireaders);
264         for (int i = 0; i < masters.size() - 1; i++) {
265           ireaders[i].close();
266           FileUtil.fullyDelete((File JavaDoc)masters.get(i));
267         }
268       }
269       iw.close();
270       LOG.info("* Optimizing index took " + (System.currentTimeMillis() - s1) + " ms");
271       LOG.info("* Removing duplicate entries...");
272       stage = SegmentMergeStatus.STAGE_DEDUP;
273       IndexReader ir = IndexReader.open(masterDir);
274       int i = 0;
275       long cnt = 0L;
276       processedRecords = 0L;
277       s1 = System.currentTimeMillis();
278       delta = s1;
279       TermEnum te = ir.terms();
280       while(te.next()) {
281         Term t = te.term();
282         if (t == null) continue;
283         if (!(t.field().equals("ch") || t.field().equals("uh"))) continue;
284         cnt++;
285         processedRecords = cnt / 2;
286         if (cnt > 0 && (cnt % (LOG_STEP * 2) == 0)) {
287           LOG.info(" Processed " + processedRecords + " records (" +
288                   (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)");
289           delta = System.currentTimeMillis();
290         }
291         // Enumerate all docs with the same URL hash or content hash
292
TermDocs td = ir.termDocs(t);
293         if (td == null) continue;
294         int id = -1;
295         String JavaDoc time = null;
296         Document doc = null;
297         // Keep only the latest version of the document with
298
// the same hash (url or content). Note: even if the content
299
// hash is identical, other metadata may be different, so even
300
// in this case it makes sense to keep the latest version.
301
while (td.next()) {
302           int docid = td.doc();
303           if (!ir.isDeleted(docid)) {
304             doc = ir.document(docid);
305             if (time == null) {
306               time = doc.get("time");
307               id = docid;
308               continue;
309             }
310             String JavaDoc dtime = doc.get("time");
311             // "time" is a DateField, and can be compared lexicographically
312
if (dtime.compareTo(time) > 0) {
313               if (id != -1) {
314                 ir.delete(id);
315               }
316               time = dtime;
317               id = docid;
318             } else {
319               ir.delete(docid);
320             }
321           }
322         }
323       }
324       //
325
// keep the IndexReader open...
326
//
327

328       LOG.info("* Deduplicating took " + (System.currentTimeMillis() - s1) + " ms");
329       stage = SegmentMergeStatus.STAGE_WRITING;
330       processedRecords = 0L;
331       Vector JavaDoc outDirs = new Vector JavaDoc();
332       File JavaDoc outDir = new File JavaDoc(output, SegmentWriter.getNewSegmentName());
333       outDirs.add(outDir);
334       LOG.info("* Merging all segments into " + output.getName());
335       s1 = System.currentTimeMillis();
336       delta = s1;
337       nfs.mkdirs(outDir);
338       SegmentWriter sw = new SegmentWriter(nfs, outDir, true);
339       LOG.fine(" - opening first output segment in " + outDir.getName());
340       FetcherOutput fo = new FetcherOutput();
341       Content co = new Content();
342       ParseText pt = new ParseText();
343       ParseData pd = new ParseData();
344       int outputCnt = 0;
345       for (int n = 0; n < ir.maxDoc(); n++) {
346         if (ir.isDeleted(n)) {
347           //System.out.println("-del");
348
continue;
349         }
350         Document doc = ir.document(n);
351         String JavaDoc segDoc = doc.get("sd");
352         int idx = segDoc.indexOf('|');
353         String JavaDoc segName = segDoc.substring(0, idx);
354         String JavaDoc docName = segDoc.substring(idx + 1);
355         SegmentReader sr = (SegmentReader) readers.get(segName);
356         long docid;
357         try {
358           docid = Long.parseLong(docName);
359         } catch (Exception JavaDoc e) {
360           continue;
361         }
362         try {
363           // get data from the reader
364
sr.get(docid, fo, co, pt, pd);
365         } catch (Throwable JavaDoc thr) {
366           // don't break the loop, because only one of the segments
367
// may be corrupted...
368
LOG.fine(" - corrupt record no. " + docid + " in segment " + sr.segmentDir.getName() + " - skipping.");
369           continue;
370         }
371         sw.append(fo, co, pt, pd);
372         outputCnt++;
373         processedRecords++;
374         if (processedRecords > 0 && (processedRecords % LOG_STEP == 0)) {
375           LOG.info(" Processed " + processedRecords + " records (" +
376                   (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)");
377           delta = System.currentTimeMillis();
378         }
379         if (processedRecords % maxCount == 0) {
380           sw.close();
381           outDir = new File JavaDoc(output, SegmentWriter.getNewSegmentName());
382           LOG.fine(" - starting next output segment in " + outDir.getName());
383           nfs.mkdirs(outDir);
384           sw = new SegmentWriter(nfs, outDir, true);
385           outDirs.add(outDir);
386         }
387       }
388       LOG.info("* Merging took " + (System.currentTimeMillis() - s1) + " ms");
389       ir.close();
390       sw.close();
391       FileUtil.fullyDelete(fsmtIndexDir);
392       for (Iterator JavaDoc iter = readers.keySet().iterator(); iter.hasNext();) {
393         SegmentReader sr = (SegmentReader) readers.get(iter.next());
394         sr.close();
395       }
396       if (runIndexer) {
397         stage = SegmentMergeStatus.STAGE_INDEXING;
398         totalRecords = outDirs.size();
399         processedRecords = 0L;
400         LOG.info("* Creating new segment index(es)...");
401         File JavaDoc workingDir = new File JavaDoc(output, "indexsegment-workingdir");
402         for (int k = 0; k < outDirs.size(); k++) {
403           processedRecords++;
404           if (workingDir.exists()) {
405               FileUtil.fullyDelete(workingDir);
406           }
407           IndexSegment indexer = new IndexSegment(nfs, Integer.MAX_VALUE,
408                   (File JavaDoc)outDirs.get(k), workingDir);
409           indexer.indexPages();
410           FileUtil.fullyDelete(workingDir);
411         }
412       }
413       if (delSegs) {
414         // This deletes also all corrupt segments, which are
415
// unusable anyway
416
stage = SegmentMergeStatus.STAGE_DELETING;
417         totalRecords = allsegdirs.size();
418         processedRecords = 0L;
419         LOG.info("* Deleting old segments...");
420         for (int k = 0; k < allsegdirs.size(); k++) {
421           processedRecords++;
422           FileUtil.fullyDelete((File JavaDoc) allsegdirs.get(k));
423         }
424       }
425       delta = System.currentTimeMillis() - start;
426       float eps = (float) total / (float) (delta / 1000);
427       LOG.info("Finished SegmentMergeTool: INPUT: " + total + " -> OUTPUT: " + outputCnt + " entries in "
428               + ((float) delta / 1000f) + " s (" + eps + " entries/sec).");
429     } catch (Exception JavaDoc e) {
430       e.printStackTrace();
431       LOG.severe(e.getMessage());
432     }
433   }
434
435   public static void main(String JavaDoc[] args) throws Exception JavaDoc {
436     if (args.length < 1) {
437       System.err.println("Too few arguments.\n");
438       usage();
439       System.exit(-1);
440     }
441     NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0);
442     boolean runIndexer = false;
443     boolean delSegs = false;
444     long maxCount = Long.MAX_VALUE;
445     String JavaDoc segDir = null;
446     File JavaDoc output = null;
447     Vector JavaDoc dirs = new Vector JavaDoc();
448     for (int i = 0; i < args.length; i++) {
449       if (args[i] == null) continue;
450       if (args[i].equals("-o")) {
451         if (args.length > i + 1) {
452           output = new File JavaDoc(args[++i]);
453           continue;
454         } else {
455           LOG.severe("Required value of '-o' argument missing.\n");
456           usage();
457           return;
458         }
459       } else if (args[i].equals("-i")) {
460         runIndexer = true;
461       } else if (args[i].equals("-cm")) {
462         LOG.warning("'-cm' option obsolete - ignored.");
463       } else if (args[i].equals("-max")) {
464         String JavaDoc cnt = args[++i];
465         try {
466           maxCount = Long.parseLong(cnt);
467         } catch (Exception JavaDoc e) {
468           LOG.warning("Invalid count '" + cnt + "', setting to Long.MAX_VALUE.");
469         }
470       } else if (args[i].equals("-ds")) {
471         delSegs = true;
472       } else if (args[i].equals("-dir")) {
473         segDir = args[++i];
474       } else dirs.add(new File JavaDoc(args[i]));
475     }
476     if (segDir != null) {
477       File JavaDoc sDir = new File JavaDoc(segDir);
478       if (!sDir.exists() || !sDir.isDirectory()) {
479         LOG.warning("Invalid path: " + sDir);
480       } else {
481         File JavaDoc[] files = sDir.listFiles(new FileFilter JavaDoc() {
482           public boolean accept(File JavaDoc f) {
483             return f.isDirectory();
484           }
485         });
486         if (files != null && files.length > 0) {
487           for (int i = 0; i < files.length; i++) dirs.add(files[i]);
488         }
489       }
490     }
491     if (dirs.size() == 0) {
492       LOG.severe("No input segments.");
493       return;
494     }
495     if (output == null) output = (File JavaDoc)dirs.get(0);
496     SegmentMergeTool st = new SegmentMergeTool(nfs, (File JavaDoc[])dirs.toArray(new File JavaDoc[0]),
497             output, maxCount, runIndexer, delSegs);
498     st.run();
499   }
500
501   private static void usage() {
502     System.err.println("SegmentMergeTool (-local | -nfs ...) (-dir <input_segments_dir> | seg1 seg2 ...) [-o <output_segments_dir>] [-max count] [-i] [-ds]");
503     System.err.println("\t-dir <input_segments_dir>\tpath to directory containing input segments");
504     System.err.println("\tseg1 seg2 seg3\t\tindividual paths to input segments");
505     System.err.println("\t-o <output_segment_dir>\t(optional) path to directory which will\n\t\t\t\tcontain output segment(s).\n\t\t\tNOTE: If not present, the original segments path will be used.");
506     System.err.println("\t-max count\t(optional) output multiple segments, each with maximum 'count' entries");
507     System.err.println("\t-i\t\t(optional) index the output segment when finished merging.");
508     System.err.println("\t-ds\t\t(optional) delete the original input segments when finished.");
509     System.err.println();
510   }
511 }
512
Popular Tags