|                                                                                                              1
 2
 3
 4   package net.nutch.tools;
 5
 6   import java.io.*;
 7   import java.util.*;
 8   import java.net.*;
 9   import java.util.logging.*;
 10
 11  import net.nutch.db.*;
 12  import net.nutch.net.*;
 13  import net.nutch.io.*;
 14  import net.nutch.fs.*;
 15  import net.nutch.linkdb.*;
 16  import net.nutch.pagedb.*;
 17  import net.nutch.fetcher.*;
 18  import net.nutch.parse.*;
 19  import net.nutch.util.*;
 20
 21
 22
 30  public class UpdateDatabaseTool {
 31      public static final float NEW_INTERNAL_LINK_FACTOR =
 32        NutchConf.getFloat("db.score.link.internal", 1.0f);
 33      public static final float NEW_EXTERNAL_LINK_FACTOR =
 34        NutchConf.getFloat("db.score.link.external", 1.0f);
 35      public static final int MAX_OUTLINKS_PER_PAGE =
 36        NutchConf.getInt("db.max.outlinks.per.page", 100);
 37
 38      public static final boolean IGNORE_INTERNAL_LINKS =
 39        NutchConf.getBoolean("db.ignore.internal.links", true);
 40
 41
 42      public static final Logger LOG =
 43        LogFormatter.getLogger("net.nutch.tools.UpdateDatabaseTool");
 44
 45      private static final int MAX_RETRIES = 2;
 46      private static final long MILLISECONDS_PER_DAY = 24 * 60 * 60 * 1000;
 47
 48      private IWebDBWriter webdb;
 49      private int maxCount = 0;
 50      private boolean additionsAllowed = true;
 51      private Set outlinkSet = new TreeSet();
 53
 56      public UpdateDatabaseTool(IWebDBWriter webdb, boolean additionsAllowed, int maxCount) {
 57          this.webdb = webdb;
 58          this.additionsAllowed = additionsAllowed;
 59          this.maxCount = maxCount;
 60      }
 61
 62
 67      public void updateForSegment(NutchFileSystem nfs, String
  directory) 68          throws IOException {
 69          ArrayList deleteQueue = new ArrayList();
 70          String
  fetchDir=new File(directory, FetcherOutput.DIR_NAME).toString(); 71          String
  parseDir=new File(directory, ParseData.DIR_NAME).toString(); 72          ArrayFile.Reader fetch = null;
 73          ArrayFile.Reader parse = null;
 74          int count = 0;
 75          try {
 76            fetch = new ArrayFile.Reader(nfs, fetchDir);
 77            parse = new ArrayFile.Reader(nfs, parseDir);
 78            FetcherOutput fo = new FetcherOutput();
 79            ParseData pd = new ParseData();
 80            while (fetch.next(fo) != null) {
 81              parse.next(pd);
 82
 83              if ((count % 1000) == 0) {
 84                  LOG.info("Processing document " + count);
 85              }
 86              if ((maxCount >= 0) && (count >= maxCount)) {
 87                break;
 88              }
 89
 90              FetchListEntry fle = fo.getFetchListEntry();
 91              Page page = fle.getPage();
 92              LOG.fine("Processing " + page.getURL());
 93              if (!fle.getFetch()) {                              pageContentsUnchanged(fo);
 96              } else if (fo.getStatus() == fo.SUCCESS) {               if (fo.getMD5Hash().equals(page.getMD5())) {
 98                  pageContentsUnchanged(fo);                      } else {
 100                 pageContentsChanged(fo, pd);                    }
 102
 103             } else if (fo.getStatus() == fo.RETRY &&
 104                        page.getRetriesSinceFetch() < MAX_RETRIES) {
 105
 106               pageRetry(fo);
 108             } else {
 109               pageGone(fo);                                   }
 111             count++;
 112           }
 113         } catch (EOFException e) {
 114           LOG.warning("Unexpected EOF in: " + fetchDir +
 115                       " at entry #" + count + ".  Ignoring.");
 116         } finally {
 117           if (fetch != null)
 118             fetch.close();
 119           if (parse != null)
 120             parse.close();
 121         }
 122     }
 123
 124
 127     private void pageContentsUnchanged(FetcherOutput fetcherOutput)
 128         throws IOException {
 129         Page oldPage = fetcherOutput.getFetchListEntry().getPage();
 130         Page newPage = (Page)oldPage.clone();
 131
 132         LOG.fine("unchanged");
 133
 134         newPage.setNextFetchTime(nextFetch(fetcherOutput));         newPage.setRetriesSinceFetch(0);
 137         webdb.addPage(newPage);                           }
 139
 140
 144     private void pageContentsChanged(FetcherOutput fetcherOutput,
 145                                      ParseData parseData) throws IOException {
 146       Page oldPage = fetcherOutput.getFetchListEntry().getPage();
 147       Page newPage = (Page)oldPage.clone();
 148
 149       LOG.fine("new contents");
 150
 151       newPage.setNextFetchTime(nextFetch(fetcherOutput));       newPage.setMD5(fetcherOutput.getMD5Hash());         newPage.setRetriesSinceFetch(0);
 155                                                 Outlink[] outlinks = parseData.getOutlinks();
 163       String
  sourceHost = getHost(oldPage.getURL().toString()); 164       long sourceDomainID = newPage.computeDomainID();
 165       long nextFetch = nextFetch(fetcherOutput, 0);
 166       outlinkSet.clear();        int end = Math.min(outlinks.length, MAX_OUTLINKS_PER_PAGE);
 168       for (int i = 0; i < end; i++) {
 169         Outlink link = outlinks[i];
 170         String
  url = link.getToUrl(); 171
 172         url = URLFilterFactory.getFilter().filter(url);
 173         if (url == null)
 174           continue;
 175
 176         outlinkSet.add(url);
 177
 178         if (additionsAllowed) {
 179             String
  destHost = getHost(url); 180             boolean internal = destHost == null || destHost.equals(sourceHost);
 181
 182             try {
 183                                                                                                                                 Link newLink = new Link(newPage.getMD5(), sourceDomainID, url, link.getAnchor());
 191
 192                 float newScore = oldPage.getScore();
 193                 float newNextScore = oldPage.getNextScore();
 194
 195                 if (internal) {
 196                   newScore *= NEW_INTERNAL_LINK_FACTOR;
 197                   newNextScore *= NEW_INTERNAL_LINK_FACTOR;
 198                 } else {
 199                   newScore *= NEW_EXTERNAL_LINK_FACTOR;
 200                   newNextScore *= NEW_EXTERNAL_LINK_FACTOR;
 201                 }
 202
 203                 Page linkedPage = new Page(url, newScore, newNextScore, nextFetch);
 204
 205                 if (internal && IGNORE_INTERNAL_LINKS) {
 206                   webdb.addPageIfNotPresent(linkedPage, newLink);
 207                 } else {
 208                   webdb.addLink(newLink);
 209                   webdb.addPageIfNotPresent(linkedPage);
 210                 }
 211
 212             } catch (MalformedURLException e) {
 213                 LOG.fine("skipping " + url + ":" + e);
 214             }
 215         }
 216       }
 217
 218                                                                                                       newPage.setNumOutlinks(outlinkSet.size());
 236       webdb.addPage(newPage);                         }
 238
 239
 242     private void pageGone(FetcherOutput fetcherOutput)
 243         throws IOException {
 244         Page oldPage = fetcherOutput.getFetchListEntry().getPage();
 245         Page newPage = (Page)oldPage.clone();
 246
 247         LOG.fine("retry never");
 248
 249         newPage.setNextFetchTime(Long.MAX_VALUE);         webdb.addPage(newPage);                       }
 252
 253
 256     private void pageRetry(FetcherOutput fetcherOutput)
 257         throws IOException {
 258         Page oldPage = fetcherOutput.getFetchListEntry().getPage();
 259         Page newPage = (Page)oldPage.clone();
 260
 261         LOG.fine("retry later");
 262
 263         newPage.setNextFetchTime(nextFetch(fetcherOutput,1));         newPage.setRetriesSinceFetch
 265             (oldPage.getRetriesSinceFetch()+1);
 267         webdb.addPage(newPage);                           }
 269
 270
 273     private long nextFetch(FetcherOutput fo) {
 274         return nextFetch(fo,
 275                          fo.getFetchListEntry().getPage().getFetchInterval());
 276     }
 277
 278
 282     private long nextFetch(FetcherOutput fetcherOutput, int days) {
 283       return fetcherOutput.getFetchDate() + (MILLISECONDS_PER_DAY * days);
 284     }
 285
 286
 289     private String
  getHost(String  url) { 290       try {
 291         return new URL(url).getHost().toLowerCase();
 292       } catch (MalformedURLException e) {
 293         return null;
 294       }
 295     }
 296
 297
 300     public void close() throws IOException {
 301         webdb.close();
 302     }
 303
 304
 307     public static void main(String
  args[]) throws Exception  { 308       int segDirStart = -1;
 309       int max = -1;
 310       boolean additionsAllowed = true;
 311
 312       String
  usage = "UpdateDatabaseTool (-local | -ndfs <namenode:port>) [-max N] [-noAdditions] <db> <seg_dir> [ <seg_dir> ... ]"; 313       if (args.length < 2) {
 314           System.out.println(usage);
 315           return;
 316       }
 317
 318       int i = 0;
 319       NutchFileSystem nfs = NutchFileSystem.parseArgs(args, i);
 320       for (; i < args.length; i++) {             if (args[i].equals("-max")) {                max = Integer.parseInt(args[++i]);
 323         } else if (args[i].equals("-noAdditions")) {
 324           additionsAllowed = false;
 325         } else {
 326             break;
 327         }
 328       }
 329
 330       File root = new File(args[i++]);
 331       segDirStart = i;
 332
 333       if (segDirStart == -1) {
 334         System.err.println(usage);
 335         System.exit(-1);
 336       }
 337
 338       LOG.info("Updating " + root);
 339
 340       IWebDBWriter webdb = new WebDBWriter(nfs, root);
 341       UpdateDatabaseTool tool = new UpdateDatabaseTool(webdb, additionsAllowed, max);
 342       for (i = segDirStart; i < args.length; i++) {
 343         String
  segDir = args[i]; 344         if (segDir != null) {
 345             LOG.info("Updating for " + segDir);
 346             tool.updateForSegment(nfs, segDir);
 347         }
 348       }
 349
 350       LOG.info("Finishing update");
 351       tool.close();
 352       nfs.close();
 353       LOG.info("Update finished");
 354     }
 355 }
 356
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |