1 2 3 4 package net.nutch.indexer; 5 6 import net.nutch.pagedb.*; 7 import net.nutch.linkdb.*; 8 import net.nutch.fetcher.*; 9 import net.nutch.parse.*; 10 import net.nutch.analysis.NutchDocumentAnalyzer; 11 import net.nutch.db.*; 12 import net.nutch.io.*; 13 import net.nutch.fs.*; 14 import net.nutch.segment.SegmentReader; 15 import net.nutch.util.*; 16 17 import org.apache.lucene.index.IndexReader; 18 import org.apache.lucene.index.IndexWriter; 19 import org.apache.lucene.document.Document; 20 import org.apache.lucene.document.Field; 21 22 import java.util.logging.*; 23 import java.util.*; 24 import java.io.*; 25 26 27 public class IndexSegment { 28 public static final String DONE_NAME = "index.done"; 29 public static final Logger LOG = 30 LogFormatter.getLogger("net.nutch.index.IndexSegment"); 31 32 public static int LOG_STEP = 20000; 33 34 private boolean boostByLinkCount = 35 NutchConf.getBoolean("indexer.boost.by.link.count", false); 36 37 private float scorePower = NutchConf.getFloat("indexer.score.power", 0.5f); 38 private int maxFieldLength = NutchConf.getInt("indexer.max.tokens", 10000); 39 private NutchFileSystem nfs; 40 private long maxDocs = Long.MAX_VALUE; 41 private File srcDir; 42 private File localWorkingDir; 43 44 47 public IndexSegment(NutchFileSystem nfs, long maxDocs, File srcDir, File localWorkingDir) { 48 this.nfs = nfs; 49 this.maxDocs = maxDocs; 50 this.srcDir = srcDir; 51 this.localWorkingDir = localWorkingDir; 52 } 53 54 58 public void setScorePower(float power) { scorePower = power; } 59 60 public void indexPages() throws Exception { 61 File doneFile = new File(srcDir, DONE_NAME); 65 if (nfs.exists(doneFile)) { 66 throw new IOException("already indexed: " + doneFile + " exists"); 67 } 68 69 File outputIndex = new File(srcDir, "index"); 73 File tmpOutputIndex = new File(localWorkingDir, "index"); 74 75 File localOutput = nfs.startLocalOutput(outputIndex, tmpOutputIndex); 76 77 IndexWriter writer 78 = new IndexWriter(localOutput, 79 new NutchDocumentAnalyzer(), true); 80 writer.mergeFactor = 50; 81 writer.minMergeDocs = 50; 82 writer.maxFieldLength = maxFieldLength; 83 writer.setUseCompoundFile(false); 85 writer.setSimilarity(new NutchSimilarity()); 86 87 SegmentReader sr = null; 88 89 long start = System.currentTimeMillis(); 90 long delta = start; 91 long curTime, total = 0; 92 long count = 0; 93 try { 94 LOG.info("* Opening segment " + srcDir.getName()); 95 sr = new SegmentReader(nfs, srcDir, false, true, true, true); 96 97 total = sr.size; 98 99 String segmentName = srcDir.getCanonicalFile().getName(); 100 FetcherOutput fetcherOutput = new FetcherOutput(); 101 ParseText parseText = new ParseText(); 102 ParseData parseData = new ParseData(); 103 LOG.info("* Indexing segment " + srcDir.getName()); 104 105 maxDocs = Math.min(sr.size, maxDocs); 109 for (count = 0; count < maxDocs; count++) { 110 if (!sr.get(count, fetcherOutput, null, parseText, parseData)) continue; 111 112 if (fetcherOutput.getStatus() != FetcherOutput.SUCCESS) { 114 continue; 115 } 116 117 Parse parse = new ParseImpl(parseText.getText(), parseData); 119 120 Document doc = makeDocument(segmentName, count, 122 fetcherOutput, parse); 123 124 doc = IndexingFilters.filter(doc, parse, fetcherOutput); 126 127 writer.addDocument(doc); 129 if (count > 0 && count % LOG_STEP == 0) { 130 curTime = System.currentTimeMillis(); 131 LOG.info(" Processed " + count + " records (" + 132 ((float)LOG_STEP * 1000.0f / (float)(curTime - delta)) + 133 " rec/s)"); 134 delta = curTime; 135 } 136 } 137 } catch (EOFException e) { 138 LOG.warning("Unexpected EOF in: " + srcDir + 139 " at entry #" + count + ". Ignoring."); 140 } finally { 141 sr.close(); 142 } 143 LOG.info("* Optimizing index..."); 144 writer.optimize(); 145 writer.close(); 146 147 LOG.info("* Moving index to NFS if needed..."); 152 nfs.completeLocalOutput(outputIndex, tmpOutputIndex); 153 154 OutputStream out = nfs.create(doneFile); 158 out.close(); 159 delta = System.currentTimeMillis() - start; 160 float eps = (float) count / (float) (delta / 1000); 161 LOG.info("DONE indexing segment " + srcDir.getName() + ": total " + total + 162 " records in " + ((float) delta / 1000f) + " s (" + eps + " rec/s)."); 163 } 164 165 169 private Document makeDocument(String segmentName, long docNo, 170 FetcherOutput fo, Parse parse) { 171 172 Document doc = new Document(); 173 174 doc.add(Field.UnIndexed("docNo", Long.toString(docNo, 16))); 176 doc.add(Field.UnIndexed("segment", segmentName)); 177 178 doc.add(Field.UnIndexed("digest", fo.getMD5Hash().toString())); 180 181 float boost = fo.getFetchListEntry().getPage().getScore(); 184 boost = (float)Math.pow(boost, scorePower); 186 if (boostByLinkCount) 188 boost *= (float)Math.log(Math.E + fo.getAnchors().length); 189 doc.setBoost(boost); 191 192 doc.add(Field.UnIndexed("boost", Float.toString(boost))); 194 195 return doc; 196 } 197 198 199 202 public static void main(String [] args) throws Exception { 203 String usage = "IndexSegment (-local | -ndfs <namenode:port>) <segment_directory> [-dir <workingdir>]"; 204 if (args.length == 0) { 205 System.err.println("Usage: " + usage); 206 return; 207 } 208 209 NutchFileSystem nfs = NutchFileSystem.parseArgs(args, 0); 210 try { 211 int maxDocs = Integer.MAX_VALUE; 212 File srcDir = null; 213 File workingDir = new File(new File("").getCanonicalPath()); 214 for (int i = 0; i < args.length; i++) { 215 if (args[i] != null) { 216 if (args[i].equals("-max")) { i++; 218 maxDocs = Integer.parseInt(args[i]); 219 } else if (args[i].equals("-dir")) { 220 i++; 221 workingDir = new File(new File(args[i]).getCanonicalPath()); 222 } else { 223 srcDir = new File(args[i]); 224 } 225 } 226 } 227 228 workingDir = new File(workingDir, "indexsegment-workingdir"); 229 if (workingDir.exists()) { 230 FileUtil.fullyDelete(workingDir); 231 } 232 IndexSegment indexer = new IndexSegment(nfs, maxDocs, srcDir, workingDir); 233 LOG.info("indexing segment: " + srcDir); 234 indexer.indexPages(); 235 LOG.info("done indexing"); 236 FileUtil.fullyDelete(workingDir); 237 } finally { 238 nfs.close(); 239 } 240 } 241 } 242 | Popular Tags |