1 2 3 4 package net.nutch.tools; 5 6 import java.io.*; 7 import java.util.*; 8 import java.net.*; 9 import java.util.logging.*; 10 11 import net.nutch.db.*; 12 import net.nutch.net.*; 13 import net.nutch.io.*; 14 import net.nutch.fs.*; 15 import net.nutch.linkdb.*; 16 import net.nutch.pagedb.*; 17 import net.nutch.fetcher.*; 18 import net.nutch.parse.*; 19 import net.nutch.util.*; 20 21 22 30 public class UpdateDatabaseTool { 31 public static final float NEW_INTERNAL_LINK_FACTOR = 32 NutchConf.getFloat("db.score.link.internal", 1.0f); 33 public static final float NEW_EXTERNAL_LINK_FACTOR = 34 NutchConf.getFloat("db.score.link.external", 1.0f); 35 public static final int MAX_OUTLINKS_PER_PAGE = 36 NutchConf.getInt("db.max.outlinks.per.page", 100); 37 38 public static final boolean IGNORE_INTERNAL_LINKS = 39 NutchConf.getBoolean("db.ignore.internal.links", true); 40 41 42 public static final Logger LOG = 43 LogFormatter.getLogger("net.nutch.tools.UpdateDatabaseTool"); 44 45 private static final int MAX_RETRIES = 2; 46 private static final long MILLISECONDS_PER_DAY = 24 * 60 * 60 * 1000; 47 48 private IWebDBWriter webdb; 49 private int maxCount = 0; 50 private boolean additionsAllowed = true; 51 private Set outlinkSet = new TreeSet(); 53 56 public UpdateDatabaseTool(IWebDBWriter webdb, boolean additionsAllowed, int maxCount) { 57 this.webdb = webdb; 58 this.additionsAllowed = additionsAllowed; 59 this.maxCount = maxCount; 60 } 61 62 67 public void updateForSegment(NutchFileSystem nfs, String directory) 68 throws IOException { 69 ArrayList deleteQueue = new ArrayList(); 70 String fetchDir=new File(directory, FetcherOutput.DIR_NAME).toString(); 71 String parseDir=new File(directory, ParseData.DIR_NAME).toString(); 72 ArrayFile.Reader fetch = null; 73 ArrayFile.Reader parse = null; 74 int count = 0; 75 try { 76 fetch = new ArrayFile.Reader(nfs, fetchDir); 77 parse = new ArrayFile.Reader(nfs, parseDir); 78 FetcherOutput fo = new FetcherOutput(); 79 ParseData pd = new ParseData(); 80 while (fetch.next(fo) != null) { 81 parse.next(pd); 82 83 if ((count % 1000) == 0) { 84 LOG.info("Processing document " + count); 85 } 86 if ((maxCount >= 0) && (count >= maxCount)) { 87 break; 88 } 89 90 FetchListEntry fle = fo.getFetchListEntry(); 91 Page page = fle.getPage(); 92 LOG.fine("Processing " + page.getURL()); 93 if (!fle.getFetch()) { pageContentsUnchanged(fo); 96 } else if (fo.getStatus() == fo.SUCCESS) { if (fo.getMD5Hash().equals(page.getMD5())) { 98 pageContentsUnchanged(fo); } else { 100 pageContentsChanged(fo, pd); } 102 103 } else if (fo.getStatus() == fo.RETRY && 104 page.getRetriesSinceFetch() < MAX_RETRIES) { 105 106 pageRetry(fo); 108 } else { 109 pageGone(fo); } 111 count++; 112 } 113 } catch (EOFException e) { 114 LOG.warning("Unexpected EOF in: " + fetchDir + 115 " at entry #" + count + ". Ignoring."); 116 } finally { 117 if (fetch != null) 118 fetch.close(); 119 if (parse != null) 120 parse.close(); 121 } 122 } 123 124 127 private void pageContentsUnchanged(FetcherOutput fetcherOutput) 128 throws IOException { 129 Page oldPage = fetcherOutput.getFetchListEntry().getPage(); 130 Page newPage = (Page)oldPage.clone(); 131 132 LOG.fine("unchanged"); 133 134 newPage.setNextFetchTime(nextFetch(fetcherOutput)); newPage.setRetriesSinceFetch(0); 137 webdb.addPage(newPage); } 139 140 144 private void pageContentsChanged(FetcherOutput fetcherOutput, 145 ParseData parseData) throws IOException { 146 Page oldPage = fetcherOutput.getFetchListEntry().getPage(); 147 Page newPage = (Page)oldPage.clone(); 148 149 LOG.fine("new contents"); 150 151 newPage.setNextFetchTime(nextFetch(fetcherOutput)); newPage.setMD5(fetcherOutput.getMD5Hash()); newPage.setRetriesSinceFetch(0); 155 Outlink[] outlinks = parseData.getOutlinks(); 163 String sourceHost = getHost(oldPage.getURL().toString()); 164 long sourceDomainID = newPage.computeDomainID(); 165 long nextFetch = nextFetch(fetcherOutput, 0); 166 outlinkSet.clear(); int end = Math.min(outlinks.length, MAX_OUTLINKS_PER_PAGE); 168 for (int i = 0; i < end; i++) { 169 Outlink link = outlinks[i]; 170 String url = link.getToUrl(); 171 172 url = URLFilterFactory.getFilter().filter(url); 173 if (url == null) 174 continue; 175 176 outlinkSet.add(url); 177 178 if (additionsAllowed) { 179 String destHost = getHost(url); 180 boolean internal = destHost == null || destHost.equals(sourceHost); 181 182 try { 183 Link newLink = new Link(newPage.getMD5(), sourceDomainID, url, link.getAnchor()); 191 192 float newScore = oldPage.getScore(); 193 float newNextScore = oldPage.getNextScore(); 194 195 if (internal) { 196 newScore *= NEW_INTERNAL_LINK_FACTOR; 197 newNextScore *= NEW_INTERNAL_LINK_FACTOR; 198 } else { 199 newScore *= NEW_EXTERNAL_LINK_FACTOR; 200 newNextScore *= NEW_EXTERNAL_LINK_FACTOR; 201 } 202 203 Page linkedPage = new Page(url, newScore, newNextScore, nextFetch); 204 205 if (internal && IGNORE_INTERNAL_LINKS) { 206 webdb.addPageIfNotPresent(linkedPage, newLink); 207 } else { 208 webdb.addLink(newLink); 209 webdb.addPageIfNotPresent(linkedPage); 210 } 211 212 } catch (MalformedURLException e) { 213 LOG.fine("skipping " + url + ":" + e); 214 } 215 } 216 } 217 218 newPage.setNumOutlinks(outlinkSet.size()); 236 webdb.addPage(newPage); } 238 239 242 private void pageGone(FetcherOutput fetcherOutput) 243 throws IOException { 244 Page oldPage = fetcherOutput.getFetchListEntry().getPage(); 245 Page newPage = (Page)oldPage.clone(); 246 247 LOG.fine("retry never"); 248 249 newPage.setNextFetchTime(Long.MAX_VALUE); webdb.addPage(newPage); } 252 253 256 private void pageRetry(FetcherOutput fetcherOutput) 257 throws IOException { 258 Page oldPage = fetcherOutput.getFetchListEntry().getPage(); 259 Page newPage = (Page)oldPage.clone(); 260 261 LOG.fine("retry later"); 262 263 newPage.setNextFetchTime(nextFetch(fetcherOutput,1)); newPage.setRetriesSinceFetch 265 (oldPage.getRetriesSinceFetch()+1); 267 webdb.addPage(newPage); } 269 270 273 private long nextFetch(FetcherOutput fo) { 274 return nextFetch(fo, 275 fo.getFetchListEntry().getPage().getFetchInterval()); 276 } 277 278 282 private long nextFetch(FetcherOutput fetcherOutput, int days) { 283 return fetcherOutput.getFetchDate() + (MILLISECONDS_PER_DAY * days); 284 } 285 286 289 private String getHost(String url) { 290 try { 291 return new URL(url).getHost().toLowerCase(); 292 } catch (MalformedURLException e) { 293 return null; 294 } 295 } 296 297 300 public void close() throws IOException { 301 webdb.close(); 302 } 303 304 307 public static void main(String args[]) throws Exception { 308 int segDirStart = -1; 309 int max = -1; 310 boolean additionsAllowed = true; 311 312 String usage = "UpdateDatabaseTool (-local | -ndfs <namenode:port>) [-max N] [-noAdditions] <db> <seg_dir> [ <seg_dir> ... ]"; 313 if (args.length < 2) { 314 System.out.println(usage); 315 return; 316 } 317 318 int i = 0; 319 NutchFileSystem nfs = NutchFileSystem.parseArgs(args, i); 320 for (; i < args.length; i++) { if (args[i].equals("-max")) { max = Integer.parseInt(args[++i]); 323 } else if (args[i].equals("-noAdditions")) { 324 additionsAllowed = false; 325 } else { 326 break; 327 } 328 } 329 330 File root = new File(args[i++]); 331 segDirStart = i; 332 333 if (segDirStart == -1) { 334 System.err.println(usage); 335 System.exit(-1); 336 } 337 338 LOG.info("Updating " + root); 339 340 IWebDBWriter webdb = new WebDBWriter(nfs, root); 341 UpdateDatabaseTool tool = new UpdateDatabaseTool(webdb, additionsAllowed, max); 342 for (i = segDirStart; i < args.length; i++) { 343 String segDir = args[i]; 344 if (segDir != null) { 345 LOG.info("Updating for " + segDir); 346 tool.updateForSegment(nfs, segDir); 347 } 348 } 349 350 LOG.info("Finishing update"); 351 tool.close(); 352 nfs.close(); 353 LOG.info("Update finished"); 354 } 355 } 356 | Popular Tags |