1 2 3 4 package net.nutch.tools; 5 6 import java.io.File ; 7 import java.io.FileFilter ; 8 import java.util.ArrayList ; 9 import java.util.Arrays ; 10 import java.util.HashMap ; 11 import java.util.Iterator ; 12 import java.util.List ; 13 import java.util.Vector ; 14 import java.util.logging.Logger ; 15 16 import net.nutch.fetcher.FetcherOutput; 17 import net.nutch.indexer.IndexSegment; 18 import net.nutch.io.MD5Hash; 19 import net.nutch.fs.*; 20 import net.nutch.parse.ParseData; 21 import net.nutch.parse.ParseText; 22 import net.nutch.protocol.Content; 23 import net.nutch.segment.SegmentReader; 24 import net.nutch.segment.SegmentWriter; 25 import net.nutch.util.LogFormatter; 26 27 import org.apache.lucene.analysis.WhitespaceAnalyzer; 28 import org.apache.lucene.document.DateField; 29 import org.apache.lucene.document.Document; 30 import org.apache.lucene.document.Field; 31 import org.apache.lucene.index.IndexReader; 32 import org.apache.lucene.index.IndexWriter; 33 import org.apache.lucene.index.Term; 34 import org.apache.lucene.index.TermDocs; 35 import org.apache.lucene.index.TermEnum; 36 37 72 public class SegmentMergeTool implements Runnable { 73 74 public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.SegmentMergeTool"); 75 76 77 public static int LOG_STEP = 20000; 78 81 public static int INDEX_SIZE = 250000; 82 public static int INDEX_MERGE_FACTOR = 30; 83 public static int INDEX_MIN_MERGE_DOCS = 100; 84 85 private NutchFileSystem nfs = null; 86 private File [] segments = null; 87 private int stage = SegmentMergeStatus.STAGE_OPENING; 88 private long totalRecords = 0L; 89 private long processedRecords = 0L; 90 private long start = 0L; 91 private long maxCount = Long.MAX_VALUE; 92 private File output = null; 93 private List segdirs = null; 94 private List allsegdirs = null; 95 private boolean runIndexer = false; 96 private boolean delSegs = false; 97 private HashMap readers = new HashMap (); 98 99 110 public SegmentMergeTool(NutchFileSystem nfs, File [] segments, File output, long maxCount, boolean runIndexer, boolean delSegs) throws Exception { 111 this.nfs = nfs; 112 this.segments = segments; 113 this.runIndexer = runIndexer; 114 this.delSegs = delSegs; 115 if (maxCount > 0) this.maxCount = maxCount; 116 allsegdirs = Arrays.asList(segments); 117 this.output = output; 118 if (nfs.exists(output)) { 119 if (!nfs.isDirectory(output)) 120 throw new Exception ("Output is not a directory: " + output); 121 } else nfs.mkdirs(output); 122 } 123 124 public static class SegmentMergeStatus { 125 public static final int STAGE_OPENING = 0; 126 public static final int STAGE_MASTERIDX = 1; 127 public static final int STAGE_MERGEIDX = 2; 128 public static final int STAGE_DEDUP = 3; 129 public static final int STAGE_WRITING = 4; 130 public static final int STAGE_INDEXING = 5; 131 public static final int STAGE_DELETING = 6; 132 public static final String [] stages = { 133 "opening input segments", 134 "creating master index", 135 "merging sub-indexes", 136 "deduplicating", 137 "writing output segment(s)", 138 "indexing output segment(s)", 139 "deleting input segments" 140 }; 141 public int stage; 142 public File [] inputSegments; 143 public long startTime, curTime; 144 public long totalRecords; 145 public long processedRecords; 146 147 public SegmentMergeStatus() {}; 148 149 public SegmentMergeStatus(int stage, File [] inputSegments, long startTime, 150 long totalRecords, long processedRecords) { 151 this.stage = stage; 152 this.inputSegments = inputSegments; 153 this.startTime = startTime; 154 this.curTime = System.currentTimeMillis(); 155 this.totalRecords = totalRecords; 156 this.processedRecords = processedRecords; 157 } 158 } 159 160 public SegmentMergeStatus getStatus() { 161 SegmentMergeStatus status = new SegmentMergeStatus(stage, segments, start, 162 totalRecords, processedRecords); 163 return status; 164 } 165 166 167 public void run() { 168 start = System.currentTimeMillis(); 169 stage = SegmentMergeStatus.STAGE_OPENING; 170 long delta; 171 LOG.info("* Opening " + allsegdirs.size() + " segments:"); 172 try { 173 segdirs = new ArrayList (); 174 for (int i = 0; i < allsegdirs.size(); i++) { 176 File dir = (File ) allsegdirs.get(i); 177 SegmentReader sr = null; 178 try { 179 sr = new SegmentReader(nfs, dir, true); 181 } catch (Exception e) { 182 continue; 184 } 185 segdirs.add(dir); 186 totalRecords += sr.size; 187 LOG.info(" - segment " + dir.getName() + ": " + sr.size + " records."); 188 readers.put(dir.getName(), sr); 189 } 190 long total = totalRecords; 191 LOG.info("* TOTAL " + total + " input records in " + segdirs.size() + " segments."); 192 LOG.info("* Creating master index..."); 193 stage = SegmentMergeStatus.STAGE_MASTERIDX; 194 Vector masters = new Vector (); 197 File fsmtIndexDir = new File (output, ".fastmerge_index"); 198 File masterDir = new File (fsmtIndexDir, "0"); 199 if (!masterDir.mkdirs()) { 200 LOG.severe("Could not create a master index dir: " + masterDir); 201 return; 202 } 203 masters.add(masterDir); 204 IndexWriter iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true); 205 iw.setUseCompoundFile(false); 206 iw.mergeFactor = INDEX_MERGE_FACTOR; 207 iw.minMergeDocs = INDEX_MIN_MERGE_DOCS; 208 long s1 = System.currentTimeMillis(); 209 Iterator it = readers.values().iterator(); 210 processedRecords = 0L; 211 delta = System.currentTimeMillis(); 212 while (it.hasNext()) { 213 SegmentReader sr = (SegmentReader) it.next(); 214 String name = sr.segmentDir.getName(); 215 FetcherOutput fo = new FetcherOutput(); 216 for (long i = 0; i < sr.size; i++) { 217 try { 218 if (!sr.get(i, fo, null, null, null)) break; 219 220 Document doc = new Document(); 221 doc.add(new Field("sd", name + "|" + i, true, false, false)); 222 doc.add(new Field("uh", MD5Hash.digest(fo.getUrl().toString()).toString(), true, true, false)); 223 doc.add(new Field("ch", fo.getMD5Hash().toString(), true, true, false)); 224 doc.add(new Field("time", DateField.timeToString(fo.getFetchDate()), true, false, false)); 225 iw.addDocument(doc); 226 processedRecords++; 227 if (processedRecords > 0 && (processedRecords % LOG_STEP == 0)) { 228 LOG.info(" Processed " + processedRecords + " records (" + 229 (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)"); 230 delta = System.currentTimeMillis(); 231 } 232 if (processedRecords > 0 && (processedRecords % INDEX_SIZE == 0)) { 233 iw.optimize(); 234 iw.close(); 235 LOG.info(" - creating next subindex..."); 236 masterDir = new File (fsmtIndexDir, "" + masters.size()); 237 if (!masterDir.mkdirs()) { 238 LOG.severe("Could not create a master index dir: " + masterDir); 239 return; 240 } 241 masters.add(masterDir); 242 iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true); 243 iw.setUseCompoundFile(false); 244 iw.mergeFactor = INDEX_MERGE_FACTOR; 245 iw.minMergeDocs = INDEX_MIN_MERGE_DOCS; 246 } 247 } catch (Throwable t) { 248 LOG.info(" - segment " + name + " truncated to " + (i + 1) + " records"); 250 break; 251 } 252 } 253 } 254 iw.optimize(); 255 LOG.info("* Creating index took " + (System.currentTimeMillis() - s1) + " ms"); 256 s1 = System.currentTimeMillis(); 257 if (masters.size() > 1) { 259 LOG.info(" - merging subindexes..."); 260 stage = SegmentMergeStatus.STAGE_MERGEIDX; 261 IndexReader[] ireaders = new IndexReader[masters.size() - 1]; 262 for (int i = 0; i < masters.size() - 1; i++) ireaders[i] = IndexReader.open((File )masters.get(i)); 263 iw.addIndexes(ireaders); 264 for (int i = 0; i < masters.size() - 1; i++) { 265 ireaders[i].close(); 266 FileUtil.fullyDelete((File )masters.get(i)); 267 } 268 } 269 iw.close(); 270 LOG.info("* Optimizing index took " + (System.currentTimeMillis() - s1) + " ms"); 271 LOG.info("* Removing duplicate entries..."); 272 stage = SegmentMergeStatus.STAGE_DEDUP; 273 IndexReader ir = IndexReader.open(masterDir); 274 int i = 0; 275 long cnt = 0L; 276 processedRecords = 0L; 277 s1 = System.currentTimeMillis(); 278 delta = s1; 279 TermEnum te = ir.terms(); 280 while(te.next()) { 281 Term t = te.term(); 282 if (t == null) continue; 283 if (!(t.field().equals("ch") || t.field().equals("uh"))) continue; 284 cnt++; 285 processedRecords = cnt / 2; 286 if (cnt > 0 && (cnt % (LOG_STEP * 2) == 0)) { 287 LOG.info(" Processed " + processedRecords + " records (" + 288 (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)"); 289 delta = System.currentTimeMillis(); 290 } 291 TermDocs td = ir.termDocs(t); 293 if (td == null) continue; 294 int id = -1; 295 String time = null; 296 Document doc = null; 297 while (td.next()) { 302 int docid = td.doc(); 303 if (!ir.isDeleted(docid)) { 304 doc = ir.document(docid); 305 if (time == null) { 306 time = doc.get("time"); 307 id = docid; 308 continue; 309 } 310 String dtime = doc.get("time"); 311 if (dtime.compareTo(time) > 0) { 313 if (id != -1) { 314 ir.delete(id); 315 } 316 time = dtime; 317 id = docid; 318 } else { 319 ir.delete(docid); 320 } 321 } 322 } 323 } 324 328 LOG.info("* Deduplicating took " + (System.currentTimeMillis() - s1) + " ms"); 329 stage = SegmentMergeStatus.STAGE_WRITING; 330 processedRecords = 0L; 331 Vector outDirs = new Vector (); 332 File outDir = new File (output, SegmentWriter.getNewSegmentName()); 333 outDirs.add(outDir); 334 LOG.info("* Merging all segments into " + output.getName()); 335 s1 = System.currentTimeMillis(); 336 delta = s1; 337 nfs.mkdirs(outDir); 338 SegmentWriter sw = new SegmentWriter(nfs, outDir, true); 339 LOG.fine(" - opening first output segment in " + outDir.getName()); 340 FetcherOutput fo = new FetcherOutput(); 341 Content co = new Content(); 342 ParseText pt = new ParseText(); 343 ParseData pd = new ParseData(); 344 int outputCnt = 0; 345 for (int n = 0; n < ir.maxDoc(); n++) { 346 if (ir.isDeleted(n)) { 347 continue; 349 } 350 Document doc = ir.document(n); 351 String segDoc = doc.get("sd"); 352 int idx = segDoc.indexOf('|'); 353 String segName = segDoc.substring(0, idx); 354 String docName = segDoc.substring(idx + 1); 355 SegmentReader sr = (SegmentReader) readers.get(segName); 356 long docid; 357 try { 358 docid = Long.parseLong(docName); 359 } catch (Exception e) { 360 continue; 361 } 362 try { 363 sr.get(docid, fo, co, pt, pd); 365 } catch (Throwable thr) { 366 LOG.fine(" - corrupt record no. " + docid + " in segment " + sr.segmentDir.getName() + " - skipping."); 369 continue; 370 } 371 sw.append(fo, co, pt, pd); 372 outputCnt++; 373 processedRecords++; 374 if (processedRecords > 0 && (processedRecords % LOG_STEP == 0)) { 375 LOG.info(" Processed " + processedRecords + " records (" + 376 (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)"); 377 delta = System.currentTimeMillis(); 378 } 379 if (processedRecords % maxCount == 0) { 380 sw.close(); 381 outDir = new File (output, SegmentWriter.getNewSegmentName()); 382 LOG.fine(" - starting next output segment in " + outDir.getName()); 383 nfs.mkdirs(outDir); 384 sw = new SegmentWriter(nfs, outDir, true); 385 outDirs.add(outDir); 386 } 387 } 388 LOG.info("* Merging took " + (System.currentTimeMillis() - s1) + " ms"); 389 ir.close(); 390 sw.close(); 391 FileUtil.fullyDelete(fsmtIndexDir); 392 for (Iterator iter = readers.keySet().iterator(); iter.hasNext();) { 393 SegmentReader sr = (SegmentReader) readers.get(iter.next()); 394 sr.close(); 395 } 396 if (runIndexer) { 397 stage = SegmentMergeStatus.STAGE_INDEXING; 398 totalRecords = outDirs.size(); 399 processedRecords = 0L; 400 LOG.info("* Creating new segment index(es)..."); 401 File workingDir = new File (output, "indexsegment-workingdir"); 402 for (int k = 0; k < outDirs.size(); k++) { 403 processedRecords++; 404 if (workingDir.exists()) { 405 FileUtil.fullyDelete(workingDir); 406 } 407 IndexSegment indexer = new IndexSegment(nfs, Integer.MAX_VALUE, 408 (File )outDirs.get(k), workingDir); 409 indexer.indexPages(); 410 FileUtil.fullyDelete(workingDir); 411 } 412 } 413 if (delSegs) { 414 stage = SegmentMergeStatus.STAGE_DELETING; 417 totalRecords = allsegdirs.size(); 418 processedRecords = 0L; 419 LOG.info("* Deleting old segments..."); 420 for (int k = 0; k < allsegdirs.size(); k++) { 421 processedRecords++; 422 FileUtil.fullyDelete((File ) allsegdirs.get(k)); 423 } 424 } 425 delta = System.currentTimeMillis() - start; 426 float eps = (float) total / (float) (delta / 1000); 427 LOG.info("Finished SegmentMergeTool: INPUT: " + total + " -> OUTPUT: " + outputCnt + " entries in " 428 + ((float) delta / 1000f) + " s (" + eps + " entries/sec)."); 429 } catch (Exception e) { 430 e.printStackTrace(); 431 LOG.severe(e.getMessage()); 432 } 433 } 434 435 public static void main(String [] args) throws Exception { 436 if (args.length < 1) { 437 System.err.println("Too few arguments.\n"); 438 usage(); 439 System.exit(-1); 440 } 441 NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); 442 boolean runIndexer = false; 443 boolean delSegs = false; 444 long maxCount = Long.MAX_VALUE; 445 String segDir = null; 446 File output = null; 447 Vector dirs = new Vector (); 448 for (int i = 0; i < args.length; i++) { 449 if (args[i] == null) continue; 450 if (args[i].equals("-o")) { 451 if (args.length > i + 1) { 452 output = new File (args[++i]); 453 continue; 454 } else { 455 LOG.severe("Required value of '-o' argument missing.\n"); 456 usage(); 457 return; 458 } 459 } else if (args[i].equals("-i")) { 460 runIndexer = true; 461 } else if (args[i].equals("-cm")) { 462 LOG.warning("'-cm' option obsolete - ignored."); 463 } else if (args[i].equals("-max")) { 464 String cnt = args[++i]; 465 try { 466 maxCount = Long.parseLong(cnt); 467 } catch (Exception e) { 468 LOG.warning("Invalid count '" + cnt + "', setting to Long.MAX_VALUE."); 469 } 470 } else if (args[i].equals("-ds")) { 471 delSegs = true; 472 } else if (args[i].equals("-dir")) { 473 segDir = args[++i]; 474 } else dirs.add(new File (args[i])); 475 } 476 if (segDir != null) { 477 File sDir = new File (segDir); 478 if (!sDir.exists() || !sDir.isDirectory()) { 479 LOG.warning("Invalid path: " + sDir); 480 } else { 481 File [] files = sDir.listFiles(new FileFilter () { 482 public boolean accept(File f) { 483 return f.isDirectory(); 484 } 485 }); 486 if (files != null && files.length > 0) { 487 for (int i = 0; i < files.length; i++) dirs.add(files[i]); 488 } 489 } 490 } 491 if (dirs.size() == 0) { 492 LOG.severe("No input segments."); 493 return; 494 } 495 if (output == null) output = (File )dirs.get(0); 496 SegmentMergeTool st = new SegmentMergeTool(nfs, (File [])dirs.toArray(new File [0]), 497 output, maxCount, runIndexer, delSegs); 498 st.run(); 499 } 500 501 private static void usage() { 502 System.err.println("SegmentMergeTool (-local | -nfs ...) (-dir <input_segments_dir> | seg1 seg2 ...) [-o <output_segments_dir>] [-max count] [-i] [-ds]"); 503 System.err.println("\t-dir <input_segments_dir>\tpath to directory containing input segments"); 504 System.err.println("\tseg1 seg2 seg3\t\tindividual paths to input segments"); 505 System.err.println("\t-o <output_segment_dir>\t(optional) path to directory which will\n\t\t\t\tcontain output segment(s).\n\t\t\tNOTE: If not present, the original segments path will be used."); 506 System.err.println("\t-max count\t(optional) output multiple segments, each with maximum 'count' entries"); 507 System.err.println("\t-i\t\t(optional) index the output segment when finished merging."); 508 System.err.println("\t-ds\t\t(optional) delete the original input segments when finished."); 509 System.err.println(); 510 } 511 } 512 | Popular Tags |