KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > tools > UpdateDatabaseTool


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.tools;
5
6 import java.io.*;
7 import java.util.*;
8 import java.net.*;
9 import java.util.logging.*;
10
11 import net.nutch.db.*;
12 import net.nutch.net.*;
13 import net.nutch.io.*;
14 import net.nutch.fs.*;
15 import net.nutch.linkdb.*;
16 import net.nutch.pagedb.*;
17 import net.nutch.fetcher.*;
18 import net.nutch.parse.*;
19 import net.nutch.util.*;
20
21
22 /*****************************************************
23  * This class takes the output of the fetcher and updates the page and link
24  * DBs accordingly. Eventually, as the database scales, this will broken into
25  * several phases, each consuming and emitting batch files, but, for now, we're
26  * doing it all here.
27  *
28  * @author Doug Cutting
29  *****************************************************/

30 public class UpdateDatabaseTool {
31     public static final float NEW_INTERNAL_LINK_FACTOR =
32       NutchConf.getFloat("db.score.link.internal", 1.0f);
33     public static final float NEW_EXTERNAL_LINK_FACTOR =
34       NutchConf.getFloat("db.score.link.external", 1.0f);
35     public static final int MAX_OUTLINKS_PER_PAGE =
36       NutchConf.getInt("db.max.outlinks.per.page", 100);
37
38     public static final boolean IGNORE_INTERNAL_LINKS =
39       NutchConf.getBoolean("db.ignore.internal.links", true);
40
41
42     public static final Logger LOG =
43       LogFormatter.getLogger("net.nutch.tools.UpdateDatabaseTool");
44
45     private static final int MAX_RETRIES = 2;
46     private static final long MILLISECONDS_PER_DAY = 24 * 60 * 60 * 1000;
47
48     private IWebDBWriter webdb;
49     private int maxCount = 0;
50     private boolean additionsAllowed = true;
51     private Set outlinkSet = new TreeSet(); // used in Page attr calculations
52

53     /**
54      * Take in the WebDBWriter, instantiated elsewhere.
55      */

56     public UpdateDatabaseTool(IWebDBWriter webdb, boolean additionsAllowed, int maxCount) {
57         this.webdb = webdb;
58         this.additionsAllowed = additionsAllowed;
59         this.maxCount = maxCount;
60     }
61
62     /**
63      * Iterate through items in the FetcherOutput. For each one,
64      * determine whether the pages need to be added to the webdb,
65      * or what fields need to be changed.
66      */

67     public void updateForSegment(NutchFileSystem nfs, String JavaDoc directory)
68         throws IOException {
69         ArrayList deleteQueue = new ArrayList();
70         String JavaDoc fetchDir=new File(directory, FetcherOutput.DIR_NAME).toString();
71         String JavaDoc parseDir=new File(directory, ParseData.DIR_NAME).toString();
72         ArrayFile.Reader fetch = null;
73         ArrayFile.Reader parse = null;
74         int count = 0;
75         try {
76           fetch = new ArrayFile.Reader(nfs, fetchDir);
77           parse = new ArrayFile.Reader(nfs, parseDir);
78           FetcherOutput fo = new FetcherOutput();
79           ParseData pd = new ParseData();
80           while (fetch.next(fo) != null) {
81             parse.next(pd);
82
83             if ((count % 1000) == 0) {
84                 LOG.info("Processing document " + count);
85             }
86             if ((maxCount >= 0) && (count >= maxCount)) {
87               break;
88             }
89
90             FetchListEntry fle = fo.getFetchListEntry();
91             Page page = fle.getPage();
92             LOG.fine("Processing " + page.getURL());
93             if (!fle.getFetch()) { // didn't fetch
94
pageContentsUnchanged(fo); // treat as unchanged
95

96             } else if (fo.getStatus() == fo.SUCCESS) { // fetch succeed
97
if (fo.getMD5Hash().equals(page.getMD5())) {
98                 pageContentsUnchanged(fo); // contents unchanged
99
} else {
100                 pageContentsChanged(fo, pd); // contents changed
101
}
102
103             } else if (fo.getStatus() == fo.RETRY &&
104                        page.getRetriesSinceFetch() < MAX_RETRIES) {
105
106               pageRetry(fo); // retry later
107

108             } else {
109               pageGone(fo); // give up: page is gone
110
}
111             count++;
112           }
113         } catch (EOFException e) {
114           LOG.warning("Unexpected EOF in: " + fetchDir +
115                       " at entry #" + count + ". Ignoring.");
116         } finally {
117           if (fetch != null)
118             fetch.close();
119           if (parse != null)
120             parse.close();
121         }
122     }
123
124     /**
125      * There's been no change: update date & retries only
126      */

127     private void pageContentsUnchanged(FetcherOutput fetcherOutput)
128         throws IOException {
129         Page oldPage = fetcherOutput.getFetchListEntry().getPage();
130         Page newPage = (Page)oldPage.clone();
131
132         LOG.fine("unchanged");
133
134         newPage.setNextFetchTime(nextFetch(fetcherOutput)); // set next fetch
135
newPage.setRetriesSinceFetch(0); // zero retries
136

137         webdb.addPage(newPage); // update record in db
138
}
139     
140     /**
141      * We've encountered new content, so update md5, etc.
142      * Also insert the new outlinks into the link DB
143      */

144     private void pageContentsChanged(FetcherOutput fetcherOutput,
145                                      ParseData parseData) throws IOException {
146       Page oldPage = fetcherOutput.getFetchListEntry().getPage();
147       Page newPage = (Page)oldPage.clone();
148
149       LOG.fine("new contents");
150
151       newPage.setNextFetchTime(nextFetch(fetcherOutput)); // set next fetch
152
newPage.setMD5(fetcherOutput.getMD5Hash()); // update md5
153
newPage.setRetriesSinceFetch(0); // zero retries
154

155       // Go through all the outlinks from this page, and add to
156
// the LinkDB.
157
//
158
// If the replaced page is the last ref to its MD5, then
159
// its outlinks must be removed. The WebDBWriter will
160
// handle that, upon page-replacement.
161
//
162
Outlink[] outlinks = parseData.getOutlinks();
163       String JavaDoc sourceHost = getHost(oldPage.getURL().toString());
164       long sourceDomainID = newPage.computeDomainID();
165       long nextFetch = nextFetch(fetcherOutput, 0);
166       outlinkSet.clear(); // Use a hashtable to uniquify the links
167
int end = Math.min(outlinks.length, MAX_OUTLINKS_PER_PAGE);
168       for (int i = 0; i < end; i++) {
169         Outlink link = outlinks[i];
170         String JavaDoc url = link.getToUrl();
171
172         url = URLFilterFactory.getFilter().filter(url);
173         if (url == null)
174           continue;
175
176         outlinkSet.add(url);
177         
178         if (additionsAllowed) {
179             String JavaDoc destHost = getHost(url);
180             boolean internal = destHost == null || destHost.equals(sourceHost);
181
182             try {
183                 //
184
// If it is an in-site link, then we only add a Link if
185
// the Page is also added. So we pass it to addPageIfNotPresent().
186
//
187
// If it is not an in-site link, then we always add the link.
188
// We then conditionally add the Page with addPageIfNotPresent().
189
//
190
Link newLink = new Link(newPage.getMD5(), sourceDomainID, url, link.getAnchor());
191
192                 float newScore = oldPage.getScore();
193                 float newNextScore = oldPage.getNextScore();
194
195                 if (internal) {
196                   newScore *= NEW_INTERNAL_LINK_FACTOR;
197                   newNextScore *= NEW_INTERNAL_LINK_FACTOR;
198                 } else {
199                   newScore *= NEW_EXTERNAL_LINK_FACTOR;
200                   newNextScore *= NEW_EXTERNAL_LINK_FACTOR;
201                 }
202
203                 Page linkedPage = new Page(url, newScore, newNextScore, nextFetch);
204
205                 if (internal && IGNORE_INTERNAL_LINKS) {
206                   webdb.addPageIfNotPresent(linkedPage, newLink);
207                 } else {
208                   webdb.addLink(newLink);
209                   webdb.addPageIfNotPresent(linkedPage);
210                 }
211
212             } catch (MalformedURLException e) {
213                 LOG.fine("skipping " + url + ":" + e);
214             }
215         }
216       }
217
218       // Calculate the number of different outlinks here.
219
// We use the outlinkSet TreeSet so that we count only
220
// the unique links leaving the Page. The WebDB will
221
// only store one value for each (fromID,toURL) pair
222
//
223
// Store the value with the Page, to speed up later
224
// Link Analysis computation.
225
//
226
// NOTE: This value won't necessarily even match what's
227
// in the linkdb! That's OK! It's more important that
228
// this number be a "true count" of the outlinks from
229
// the page in question, than the value reflect what's
230
// actually in our db. (There are a number of reasons,
231
// mainly space economy, to avoid placing URLs in our db.
232
// These reasons slightly pervert the "true out count".)
233
//
234
newPage.setNumOutlinks(outlinkSet.size()); // Store # outlinks
235

236       webdb.addPage(newPage); // update record in db
237
}
238
239     /**
240      * Keep the page, but never re-fetch it.
241      */

242     private void pageGone(FetcherOutput fetcherOutput)
243         throws IOException {
244         Page oldPage = fetcherOutput.getFetchListEntry().getPage();
245         Page newPage = (Page)oldPage.clone();
246
247         LOG.fine("retry never");
248
249         newPage.setNextFetchTime(Long.MAX_VALUE); // never refetch
250
webdb.addPage(newPage); // update record in db
251
}
252
253     /**
254      * Update with new retry count and date
255      */

256     private void pageRetry(FetcherOutput fetcherOutput)
257         throws IOException {
258         Page oldPage = fetcherOutput.getFetchListEntry().getPage();
259         Page newPage = (Page)oldPage.clone();
260
261         LOG.fine("retry later");
262
263         newPage.setNextFetchTime(nextFetch(fetcherOutput,1)); // wait a day
264
newPage.setRetriesSinceFetch
265             (oldPage.getRetriesSinceFetch()+1); // increment retries
266

267         webdb.addPage(newPage); // update record in db
268
}
269
270     /**
271      * Compute the next fetchtime for the Page.
272      */

273     private long nextFetch(FetcherOutput fo) {
274         return nextFetch(fo,
275                          fo.getFetchListEntry().getPage().getFetchInterval());
276     }
277
278     /**
279      * Compute the next fetchtime, from this moment, with the given
280      * number of days.
281      */

282     private long nextFetch(FetcherOutput fetcherOutput, int days) {
283       return fetcherOutput.getFetchDate() + (MILLISECONDS_PER_DAY * days);
284     }
285
286     /**
287      * Parse the hostname from a URL and return it.
288      */

289     private String JavaDoc getHost(String JavaDoc url) {
290       try {
291         return new URL(url).getHost().toLowerCase();
292       } catch (MalformedURLException e) {
293         return null;
294       }
295     }
296
297     /**
298      * Shut everything down.
299      */

300     public void close() throws IOException {
301         webdb.close();
302     }
303
304     /**
305      * Create the UpdateDatabaseTool, and pass in a WebDBWriter.
306      */

307     public static void main(String JavaDoc args[]) throws Exception JavaDoc {
308       int segDirStart = -1;
309       int max = -1;
310       boolean additionsAllowed = true;
311
312       String JavaDoc usage = "UpdateDatabaseTool (-local | -ndfs <namenode:port>) [-max N] [-noAdditions] <db> <seg_dir> [ <seg_dir> ... ]";
313       if (args.length < 2) {
314           System.out.println(usage);
315           return;
316       }
317
318       int i = 0;
319       NutchFileSystem nfs = NutchFileSystem.parseArgs(args, i);
320       for (; i < args.length; i++) { // parse command line
321
if (args[i].equals("-max")) { // found -max option
322
max = Integer.parseInt(args[++i]);
323         } else if (args[i].equals("-noAdditions")) {
324           additionsAllowed = false;
325         } else {
326             break;
327         }
328       }
329
330       File root = new File(args[i++]);
331       segDirStart = i;
332
333       if (segDirStart == -1) {
334         System.err.println(usage);
335         System.exit(-1);
336       }
337       
338       LOG.info("Updating " + root);
339
340       IWebDBWriter webdb = new WebDBWriter(nfs, root);
341       UpdateDatabaseTool tool = new UpdateDatabaseTool(webdb, additionsAllowed, max);
342       for (i = segDirStart; i < args.length; i++) {
343         String JavaDoc segDir = args[i];
344         if (segDir != null) {
345             LOG.info("Updating for " + segDir);
346             tool.updateForSegment(nfs, segDir);
347         }
348       }
349
350       LOG.info("Finishing update");
351       tool.close();
352       nfs.close();
353       LOG.info("Update finished");
354     }
355 }
356
Popular Tags