1 2 3 4 package net.nutch.segment; 5 6 import java.io.File ; 7 import java.io.FileFilter ; 8 import java.util.Vector ; 9 import java.util.logging.Logger ; 10 11 import net.nutch.fs.*; 12 import net.nutch.fetcher.FetcherOutput; 13 import net.nutch.parse.ParseData; 14 import net.nutch.parse.ParseText; 15 import net.nutch.protocol.Content; 16 import net.nutch.util.LogFormatter; 17 18 35 public class SegmentSlicer implements Runnable { 36 public static final Logger LOG = LogFormatter.getLogger("net.nutch.segment.SegmentSlicer"); 37 public static int LOG_STEP = 20000; 38 39 private NutchFileSystem nfs = null; 40 private File [] input = null; 41 private File output = null; 42 private boolean withContent = true; 43 private boolean withParseData = true; 44 private boolean withParseText = true; 45 private boolean autoFix = false; 46 private long maxCount = Long.MAX_VALUE; 47 48 61 public SegmentSlicer(NutchFileSystem nfs, File [] input, File output, 62 boolean withContent, boolean withParseText, boolean withParseData, 63 boolean autoFix, long maxCount) { 64 this.nfs = nfs; 65 this.input = input; 66 this.output = output; 67 this.withContent = withContent; 68 this.withParseData = withParseData; 69 this.withParseText = withParseText; 70 this.autoFix = autoFix; 71 if (maxCount > 0) this.maxCount = maxCount; 72 } 73 74 75 public void run() { 76 long start = System.currentTimeMillis(); 77 Vector readers = new Vector (); 78 long total = 0L; 79 boolean parsed = true; 80 for (int i = 0; i < input.length; i++) { 81 SegmentReader sr = null; 82 try { 83 sr = new SegmentReader(nfs, input[i], withContent, withParseText, withParseData, autoFix); 84 } catch (Exception e) { 85 LOG.warning(e.getMessage()); 86 continue; 87 } 88 total += sr.size; 89 parsed = parsed && sr.isParsed; 90 readers.add(sr); 91 } 92 LOG.info("Input: " + total + " entries in " + readers.size() + " segments."); 93 if (!parsed) 94 LOG.warning(" - some input segments are non-parsed, forcing non-parsed output!"); 95 FetcherOutput fo = new FetcherOutput(); 96 Content co = new Content(); 97 ParseData pd = new ParseData(); 98 ParseText pt = new ParseText(); 99 long outputCnt = 0L; 100 int segCnt = 1; 101 File outDir = new File (output, SegmentWriter.getNewSegmentName()); 102 LOG.info("Writing output in " + output); 103 try { 104 LOG.info(" - starting first output segment in " + outDir.getName()); 105 SegmentWriter sw = new SegmentWriter(nfs, 106 outDir, true, parsed, withContent, withParseText, withParseData); 107 long delta = System.currentTimeMillis(); 108 for (int i = 0; i < readers.size(); i++) { 109 SegmentReader sr = (SegmentReader)readers.get(i); 110 for (long k = 0L; k < sr.size; k++) { 111 try { 112 if (!sr.next(fo, co, pt, pd)) break; 113 } catch (Throwable t) { 114 LOG.warning(" - error reading entry #" + k + " from " + sr.segmentDir.getName()); 115 break; 116 } 117 sw.append(fo, co, pt, pd); 118 outputCnt++; 119 if (outputCnt % LOG_STEP == 0) { 120 LOG.info(" Processed " + outputCnt + " entries (" + 121 (float)LOG_STEP / (float)(System.currentTimeMillis() - delta) * 1000.0f + " rec/s)"); 122 delta = System.currentTimeMillis(); 123 } 124 if (outputCnt % maxCount == 0) { 125 sw.close(); 126 outDir = new File (output, SegmentWriter.getNewSegmentName()); 127 segCnt++; 128 LOG.info(" - starting next output segment in " + outDir.getName()); 129 sw = new SegmentWriter(nfs, outDir, 130 true, parsed, withContent, withParseText, withParseData); 131 } 132 } 133 sr.close(); 134 } 135 sw.close(); 136 delta = System.currentTimeMillis() - start; 137 float eps = (float) outputCnt / (float) (delta / 1000); 138 LOG.info("DONE segment slicing, INPUT: " + total + " -> OUTPUT: " + outputCnt + " entries in " 139 + segCnt + " segment(s), " + ((float) delta / 1000f) + " s (" + eps + " entries/sec)."); 140 } catch (Throwable t) { 141 t.printStackTrace(); 142 LOG.info("Unexpected error " + t.getMessage() + ", aborting at " + outputCnt + " output entries."); 143 } 144 } 145 146 147 public static void main(String [] args) throws Exception { 148 if (args.length == 0) { 149 usage(); 150 return; 151 } 152 String segDir = null; 153 String outDir = null; 154 Vector dirs = new Vector (); 155 boolean fix = false; 156 long maxCount = Long.MAX_VALUE; 157 boolean withParseText = true; 158 boolean withParseData = true; 159 boolean withContent = true; 160 NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); 161 for (int i = 0; i < args.length; i++) { 162 if (args[i] != null) { 163 if (args[i].equals("-noparsetext")) withParseText = false; 164 else if (args[i].equals("-noparsedata")) withParseData = false; 165 else if (args[i].equals("-nocontent")) withContent = false; 166 else if (args[i].equals("-fix")) fix = true; 167 else if (args[i].equals("-dir")) segDir = args[++i]; 168 else if (args[i].equals("-o")) outDir = args[++i]; 169 else if (args[i].equals("-max")) { 170 String cnt = args[++i]; 171 try { 172 maxCount = Long.parseLong(cnt); 173 } catch (Exception e) { 174 LOG.warning("Invalid count '" + cnt + "', setting to Long.MAX_VALUE."); 175 } 176 } else dirs.add(new File (args[i])); 177 } 178 } 179 if (outDir == null) { 180 LOG.severe("Missing output path."); 181 usage(); 182 return; 183 } 184 if (segDir != null) { 185 File sDir = new File (segDir); 186 if (!sDir.exists() || !sDir.isDirectory()) { 187 LOG.warning("Invalid path: " + sDir); 188 } else { 189 File [] files = sDir.listFiles(new FileFilter () { 190 public boolean accept(File f) { 191 return f.isDirectory(); 192 } 193 }); 194 if (files != null && files.length > 0) { 195 for (int i = 0; i < files.length; i++) dirs.add(files[i]); 196 } 197 } 198 } 199 if (dirs.size() == 0) { 200 LOG.severe("No input segment dirs."); 201 usage(); 202 return; 203 } 204 File [] input = (File [])dirs.toArray(new File [0]); 205 File output = new File (outDir); 206 SegmentSlicer slicer = new SegmentSlicer(nfs, input, output, 207 withContent, withParseText, withParseData, fix, maxCount); 208 slicer.run(); 209 } 210 211 private static void usage() { 212 System.err.println("SegmentSlicer (-local | -ndfs <namenode:port>) -o outputDir [-max count] [-fix] [-nocontent] [-noparsedata] [-noparsetext] (-dir segments | seg1 seg2 ...)"); 213 System.err.println("\tNOTE: at least one segment dir name is required, or '-dir' option."); 214 System.err.println("\t outputDir is always required."); 215 System.err.println("\t-o outputDir\toutput directory for segments"); 216 System.err.println("\t-max count\t(optional) output multiple segments, each with maximum 'count' entries"); 217 System.err.println("\t-fix\t\t(optional) automatically fix corrupted segments"); 218 System.err.println("\t-nocontent\t(optional) ignore content data"); 219 System.err.println("\t-noparsedata\t(optional) ignore parse_data data"); 220 System.err.println("\t-nocontent\t(optional) ignore parse_text data"); 221 System.err.println("\t-dir segments\tdirectory containing multiple segments"); 222 System.err.println("\tseg1 seg2 ...\tsegment directories\n"); 223 } 224 } 225 | Popular Tags |