KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > tools > FetchListTool


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.tools;
5
6 import java.io.*;
7 import java.net.*;
8 import java.util.*;
9 import java.text.*;
10 import java.util.logging.*;
11
12 import net.nutch.db.*;
13 import net.nutch.io.*;
14 import net.nutch.fs.*;
15 import net.nutch.util.*;
16 import net.nutch.pagedb.*;
17 import net.nutch.linkdb.*;
18
19 /**********************************************
20  * This class takes an IWebDBReader, computes a relevant subset,
21  * and then emits the subset.
22  *
23  * @author Mike Cafarella
24  ***********************************************/

25 public class FetchListTool {
26     public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.FetchListTool");
27     private static String JavaDoc TOP_N_SORTER = "topNSorter";
28
29     private static final long FETCH_GENERATION_DELAY_MS = 7 * 24 * 60 * 60 * 1000;
30     private boolean scoreByLinkCount =
31       NutchConf.getBoolean("fetchlist.score.by.link.count", false);
32     
33     NutchFileSystem nfs;
34     File dbDir;
35     boolean refetchOnly, anchorOptimize;
36     float cutoffScore;
37     int seed;
38
39     /**
40      * The TableSet class will allocate a given FetchListEntry
41      * into one of several ArrayFiles. It chooses which
42      * ArrayFile based on a hash of the URL's domain name.
43      *
44      * It uses a hash of the domain name so that pages are
45      * allocated to a random ArrayFile, but same-host pages
46      * go to the same file (for efficiency purposes during
47      * fetch).
48      *
49      * Further, within a given file, the FetchListEntry items
50      * appear in random order. This is so that we don't
51      * hammer the same site over and over again during fetch.
52      *
53      * Each table should receive a roughly
54      * even number of entries, but all URLs for a specific
55      * domain name will be found in a single table. If
56      * the dataset is weirdly skewed toward large domains,
57      * there may be an uneven distribution.
58      */

59     class TableSet {
60         Vector outputPaths = new Vector();
61         Vector tables = new Vector();
62         long appendCounts[];
63         boolean hasAppended = false;
64
65         /**
66          */

67         public TableSet() {
68         }
69
70         /**
71          * Add a table to the list. Cannot be called
72          * after we start appending entries.
73          */

74         public synchronized boolean add(String JavaDoc outputPath) throws IOException {
75             if (hasAppended) {
76                 return false;
77             }
78
79             //
80
// Record where the file should go. Then open a
81
// SequenceFile.Writer to record the set of items
82
// we append to each table.
83
//
84
outputPaths.add(outputPath);
85             tables.add(new SequenceFile.Writer(nfs, outputPath + ".unsorted", MD5Hash.class, FetchListEntry.class));
86             return true;
87         }
88
89         /**
90          * Add FetchListEntry items to one of the tables.
91          * Choose the table based on a hash of the URL domain name.
92          */

93         public synchronized boolean append(FetchListEntry newEntry) throws IOException {
94             hasAppended = true;
95             if (appendCounts == null) {
96                 appendCounts = new long[outputPaths.size()];
97             }
98
99             Page fetchPage = newEntry.getPage();
100
101             // Extract the hostname from the URL
102
String JavaDoc host = null;
103             try {
104                 host = new URL(fetchPage.getURL().toString()).getHost().toLowerCase();
105             } catch (MalformedURLException e) {
106                 // ignore bad URLs
107
return false;
108             }
109
110             // Figure out which table is getting the item
111
MD5Hash hash = MD5Hash.digest(host);
112             int index = Math.abs(hash.hashCode()^seed) % tables.size();
113
114             // Write it down and return
115
SequenceFile.Writer writer = (SequenceFile.Writer) tables.elementAt(index);
116             writer.append(fetchPage.getMD5(), newEntry);
117             appendCounts[index]++;
118
119             return true;
120         }
121
122         /**
123          * Close down the TableSet, so there are no more FetchListEntries
124          * expected. We now:
125          * a) Close down all the SequenceFile.Writer objects.
126          * b) Sort each file
127          * c) Read each newly-sorted file and copy to an ArrayFile
128          */

129         public synchronized void close() throws IOException {
130             hasAppended = true;
131
132             // A) Close all the SequenceFile.Writers
133
for (Enumeration e = tables.elements(); e.hasMoreElements(); ) {
134                 ((SequenceFile.Writer) e.nextElement()).close();
135             }
136
137             // B) Sort the edit-files
138
SequenceFile.Sorter sorter = new SequenceFile.Sorter(nfs, new MD5Hash.Comparator(), FetchListEntry.class);
139
140             //
141
// Iterate through each unsorted file. Sort it (while
142
// measuring the time taken) and upon completion delete
143
// the unsorted version.
144
//
145
long totalEntries = 0;
146             double totalTime = 0;
147             int i = 0;
148             for (Enumeration e = outputPaths.elements(); e.hasMoreElements(); i++) {
149                 String JavaDoc name = (String JavaDoc) e.nextElement();
150                 String JavaDoc unsortedName = name + ".unsorted";
151
152                 long localStart = System.currentTimeMillis();
153                 sorter.sort(unsortedName, name + ".sorted");
154                 long localEnd = System.currentTimeMillis();
155
156                 if (appendCounts != null) {
157                     double localSecs = ((localEnd - localStart) / 1000.0);
158                     LOG.info("Processing " + unsortedName + ": Sorted " + appendCounts[i] + " entries in " + localSecs + " seconds.");
159                     LOG.info("Processing " + unsortedName + ": Sorted " + (appendCounts[i] / localSecs) + " entries/second");
160
161                     totalEntries += appendCounts[i];
162                     totalTime += localSecs;
163                 }
164
165                 nfs.delete(new File(name + ".unsorted"));
166             }
167
168             LOG.info("Overall processing: Sorted " + totalEntries + " entries in " + totalTime + " seconds.");
169             LOG.info("Overall processing: Sorted " + (totalTime / totalEntries) + " entries/second");
170
171             // C) Read in each newly-sorted file. Copy to an ArrayFile.
172
for (Enumeration e = outputPaths.elements(); e.hasMoreElements(); ) {
173                 String JavaDoc name = (String JavaDoc) e.nextElement();
174                 SequenceFile.Reader reader = new SequenceFile.Reader(nfs, name + ".sorted");
175                 ArrayFile.Writer af = new ArrayFile.Writer(nfs, name, FetchListEntry.class);
176                 try {
177                     MD5Hash key = new MD5Hash();
178                     FetchListEntry fle = new FetchListEntry();
179                     while (reader.next(key, fle)) {
180                         af.append(fle);
181                     }
182                 } finally {
183                     af.close();
184                     reader.close();
185                     nfs.delete(new File(name + ".sorted"));
186                 }
187             }
188         }
189     }
190
191     /*************************************
192      * SortableScore is just a WritableComparable Float!
193      *************************************/

194     public static class SortableScore implements WritableComparable {
195         float score;
196
197         /**
198          */

199         public SortableScore() {
200         }
201
202         /**
203          */

204         public void set(float score) {
205             this.score = score;
206         }
207
208         /**
209          */

210         public float getFloat() {
211             return score;
212         }
213
214
215         ////////
216
// WritableComparable
217
////////
218

219         /**
220          * Sort them in descending order!
221          */

222         public int compareTo(Object JavaDoc o) {
223             SortableScore otherScore = (SortableScore) o;
224
225             if (score < otherScore.score) {
226                 return 1;
227             } else if (score == otherScore.score) {
228                 return 0;
229             } else {
230                 return -1;
231             }
232         }
233
234         /**
235          */

236         public void write(DataOutput out) throws IOException {
237             out.writeFloat(score);
238         }
239
240         /**
241          */

242         public void readFields(DataInput in) throws IOException {
243             this.score = in.readFloat();
244         }
245     }
246
247     /**
248      * FetchListTool takes a page db, and emits a RECNO-based
249      * subset of it.
250      */

251     public FetchListTool(NutchFileSystem nfs, File dbDir, boolean refetchOnly, boolean anchorOptimize, float cutoffScore, int seed) throws IOException, FileNotFoundException {
252         this.nfs = nfs;
253         this.dbDir = dbDir;
254         this.refetchOnly = refetchOnly;
255         this.anchorOptimize = anchorOptimize;
256         this.cutoffScore = cutoffScore;
257         this.seed = seed;
258     }
259
260     /**
261      * Spit out several fetchlists, so that we can fetch across
262      * several machines.
263      */

264     public void emitMultipleLists(File dir, int numLists, long topN, long curTime) throws IOException {
265         //
266
// Create tables (and directories) for each fetchlist we want.
267
// Add them all to a TableSet object.
268
//
269
TableSet tables = new TableSet();
270         try {
271             String JavaDoc datePrefix = getDate();
272
273             File workingDir = new File(dir, "tmp_" + getDate());
274             nfs.mkdirs(workingDir);
275             try {
276                 for (int i = 0; i < numLists; i++) {
277                     File subdir = new File(dir, datePrefix + "-" + i);
278                     nfs.mkdirs(subdir);
279                     File file = new File(subdir, FetchListEntry.DIR_NAME);
280                     tables.add(file.getPath());
281                 }
282
283                 // Now go through the fetchlist.
284
emitFetchList(tables, workingDir, topN, curTime);
285             } finally {
286                 FileUtil.fullyDelete(nfs, workingDir);
287             }
288         } finally {
289             tables.close();
290         }
291     }
292
293     /**
294      * Spit out the fetchlist, to a BDB at the indicated filename.
295      */

296     public void emitFetchList(File segmentDir, long topN, long curTime) throws IOException {
297         TableSet tables = new TableSet();
298         File workingDir = new File(segmentDir, "tmp_" + getDate());
299         nfs.mkdirs(workingDir);
300         File subdir = new File(segmentDir, getDate());
301         nfs.mkdirs(subdir);
302
303         try {
304             tables.add(new File(subdir, FetchListEntry.DIR_NAME).getPath());
305         
306             try {
307                 emitFetchList(tables, workingDir, topN, curTime);
308             } finally {
309                 tables.close();
310             }
311         } finally {
312             FileUtil.fullyDelete(nfs, workingDir);
313         }
314     }
315
316     private static String JavaDoc getDate() {
317         return new SimpleDateFormat("yyyyMMddHHmmss").format
318             (new Date(System.currentTimeMillis()));
319     }
320
321     /**
322      * Emit the fetchlist, with the given TableSet. The TableSet is
323      * responsible for actually appending the item to the output file,
324      * which is from this function.
325      */

326     void emitFetchList(TableSet tables, File workingDir, long topN, long curTime) throws IOException {
327         // Iterate through all the Pages, by URL. Iterating
328
// through by URL means we can save disk seeks when
329
// calling webdb.getLinks(URL).
330
//
331
// However, we don't really want the output to be in URL-ordered
332
// format. We would like the output to be URL-randomized, which
333
// an MD5-ordering preserves nicely. But we assume here that
334
// TableSet will do that randomizing for us. We just need to
335
// make sure we give it a good sampling of our data. (That is,
336
// if we are giving TableSet fewer than the max-possible items,
337
// we should make sure the items come evenly from all over the
338
// db.)
339
//
340
long count = 0;
341         TreeMap anchorTable = new TreeMap();
342         Vector unknownDomainLinks = new Vector();
343
344         //
345
// Create a comparator that matches the domainIDs for
346
// Link objects.
347
//
348
Comparator domainComparator = new Comparator() {
349             public int compare(Object JavaDoc o1, Object JavaDoc o2) {
350                 Link l1 = (Link) o1;
351                 Link l2 = (Link) o2;
352                 if (l1.getDomainID() < l2.getDomainID()) {
353                     return -1;
354                 } else if (l1.getDomainID() == l2.getDomainID()) {
355                     return 0;
356                 } else {
357                     return 1;
358                 }
359             }
360         };
361
362         //
363
// Go through all the pages by URL. Filter the ones
364
// we really don't want, and save the others for possible
365
// emission.
366
//
367
SortableScore curScore = new SortableScore();
368         File unsortedFile = new File(workingDir, TOP_N_SORTER + ".unsorted");
369         SequenceFile.Writer writer = new SequenceFile.Writer(nfs, unsortedFile.getPath(), SortableScore.class, FetchListEntry.class);
370         try {
371             IWebDBReader webdb = new WebDBReader(nfs, dbDir);
372             try {
373                 for (Enumeration e = webdb.pages(); e.hasMoreElements(); count++) {
374                     // Grab the next Page.
375
Page page = (Page) e.nextElement();
376                     boolean shouldFetch = true;
377
378                     if (((count % 10000) == 0) && (count != 0)) {
379                         LOG.info("Processing page " + count + "...");
380                     }
381
382                     //
383
// Don't emit it if the Page's score doesn't meet
384
// our cutoff value
385
//
386
if ((cutoffScore >= 0) && (page.getScore() < cutoffScore)) {
387                         continue;
388                     }
389
390                     //
391
// If the item is not yet ready to be fetched, move on.
392
//
393
// Also, if getNextFetchTime is set to Long.MAX_VALUE,
394
// then it should never be fetched.
395
//
396
if (page.getNextFetchTime() > curTime ||
397                         page.getNextFetchTime() == Long.MAX_VALUE) {
398                         continue;
399                     }
400
401                     //
402
// If we're in refetchOnly mode, set shouldFetch to FALSE
403
// for any Pages whose URL's MD5 is the same as the
404
// listed MD5. That indicates that no content has been
405
// downloaded in the past.
406
//
407
if (refetchOnly) {
408                         MD5Hash urlHash = MD5Hash.digest(page.getURL());
409                         if (page.getMD5().equals(urlHash)) {
410                             shouldFetch = false;
411                         }
412                     }
413
414                     //
415
// If anchorOptimize mode is on, AND shouldFetch is
416
// false, then we might apply a further optimization.
417
// Since a non-fetched Page (that is, a URL-only
418
// item) can only be discovered via the incoming
419
// anchor text, we can skip those Pages that have
420
// only *empty* incoming anchor text.
421
//
422
Link inlinks[] = webdb.getLinks(page.getURL());
423                     if ((! shouldFetch) && anchorOptimize) {
424                         boolean foundUsefulAnchor = false;
425                         for (int i = 0; i < inlinks.length; i++) {
426                             UTF8 anchorText = inlinks[i].getAnchorText();
427                             if ((anchorText != null) &&
428                                 (anchorText.toString().trim().length() > 0)) {
429                                 foundUsefulAnchor = true;
430                                 break;
431                             }
432                         }
433                         if (! foundUsefulAnchor) {
434                             continue;
435                         }
436                     }
437             
438                     //
439
// Uniquify identical anchor text strings by source
440
// domain. If the anchor text is identical, and
441
// the domains are identical, then the anchor should
442
// only be included once.
443
//
444
// Links will arrive in the array sorted first by URL,
445
// and then by source-MD5.
446
//
447
int uniqueAnchors = 0;
448                     for (int i = 0; i < inlinks.length; i++) {
449                         String JavaDoc anchor = inlinks[i].getAnchorText().toString().trim();
450
451                         if (anchor.length() > 0) {
452                             if (inlinks[i].getDomainID() == 0) {
453                                 unknownDomainLinks.add(anchor);
454                             } else {
455                                 Set domainUniqueLinks = (Set) anchorTable.get(anchor);
456                                 if (domainUniqueLinks == null) {
457                                     domainUniqueLinks = new TreeSet(domainComparator);
458                                     anchorTable.put(anchor, domainUniqueLinks);
459                                 }
460                                 if (domainUniqueLinks.add(inlinks[i])) {
461                                     uniqueAnchors++;
462                                 }
463                             }
464                         }
465                     }
466
467                     //
468
// Finally, collect the incoming anchor text for
469
// the current URL. Step one is to add the incoming
470
// anchors whose links' source-domains are unknown.
471
// (The target, obviously, the URL we're currently
472
// processing)
473
//
474
int i = 0;
475                     String JavaDoc results[] = new String JavaDoc[uniqueAnchors + unknownDomainLinks.size()];
476                     for (Enumeration e2 = unknownDomainLinks.elements(); e2.hasMoreElements(); i++) {
477                         results[i] = (String JavaDoc) e2.nextElement();
478                     }
479                     unknownDomainLinks.clear();
480
481                     //
482
// Step 2, add the anchors that have actually been
483
// uniquified by source-domain.
484
//
485
for (Iterator it = anchorTable.keySet().iterator(); it.hasNext(); ) {
486                         String JavaDoc key = (String JavaDoc) it.next();
487                         Set domainUniqueLinks = (Set) anchorTable.get(key);
488
489                         for (int j = 0; j < domainUniqueLinks.size(); j++) {
490                             results[i++] = key;
491                         }
492                     }
493                     anchorTable.clear();
494
495                     //
496
// Last, add the FetchListEntry to a file so we can
497
// sort by score. Be sure to modify the Page's
498
// fetchtime; this allows us to soon generate
499
// another fetchlist which would not include this
500
// Page. That's helpful because with two distinct
501
// fetchlists, it should be possible to fetch and
502
// perform dbupdate at the same time.
503
//
504
// Optionally set the score by the log of number of
505
// incoming anchors.
506
curScore.set(scoreByLinkCount ?
507                             (float)Math.log(results.length) : page.getScore());
508                     page.setNextFetchTime(System.currentTimeMillis() + FETCH_GENERATION_DELAY_MS);
509                     writer.append(curScore, new FetchListEntry(shouldFetch, page, results));
510                 }
511             } finally {
512                 webdb.close();
513             }
514         } catch (Exception JavaDoc ex) {
515             ex.printStackTrace();
516         } finally {
517             writer.close();
518         }
519
520         //
521
// The next step is to sort the file we created above.
522
// after being sorted, we add the "topN" items to the
523
// TableSet.
524
//
525
File sortedFile = new File(workingDir, TOP_N_SORTER + ".sorted");
526         SequenceFile.Sorter topNSorter = new SequenceFile.Sorter(nfs, SortableScore.class, FetchListEntry.class);
527         topNSorter.sort(unsortedFile.getPath(), sortedFile.getPath());
528
529         //
530
// Last of all, add the topN items to the table set.
531
//
532
// This is also where we rewrite the WebDB - we need to do
533
// this so we can modify the "date" field. Rewriting the
534
// db can be expensive, but it's that modification that will
535
// allow us to interleave fetching and db-update.
536
//
537
WebDBWriter dbwriter = new WebDBWriter(nfs, dbDir);
538         try {
539             SequenceFile.Reader reader = new SequenceFile.Reader(nfs, sortedFile.getPath());
540             try {
541                 SortableScore key = new SortableScore();
542                 FetchListEntry value = new FetchListEntry();
543                 while (topN > 0 && reader.next(key, value)) {
544                     tables.append(value);
545                     topN--;
546
547                     //
548
// Modify the Page in the webdb so that its date
549
// is set forward a week. This way, we can have
550
// generate two consecutive different fetchlists
551
// without an intervening update. So, we generate
552
// lists A and B, and start fetching A. Upon
553
// completion, we use A to update the db, and start
554
// fetching B. This way we have simultaneous
555
// dbupdate and page fetch, which should double
556
// our throughput.
557
//
558
dbwriter.addPage(value.getPage());
559                 }
560             } finally {
561                 reader.close();
562             }
563         } finally {
564             dbwriter.close();
565         }
566     }
567
568     /**
569      * Generate a fetchlist from the pagedb and linkdb
570      */

571     public static void main(String JavaDoc argv[]) throws IOException, FileNotFoundException {
572         if (argv.length < 2) {
573             System.out.println("Usage: FetchListTool (-local | -ndfs <namenode:port>) <db> <segment_dir> [-refetchonly] [-anchoroptimize linkdb] [-topN N] [-cutoff cutoffscore] [-numFetchers numFetchers] [-adddays numDays]");
574             return;
575         }
576
577         //
578
// Required args
579
//
580
int i = 0;
581         NutchFileSystem nfs = NutchFileSystem.parseArgs(argv, i);
582         File dbDir = new File(argv[i++]);
583         File segmentDir = new File(argv[i++]);
584         long curTime = System.currentTimeMillis();
585
586         //
587
// Optional args
588
//
589
boolean refetchOnly = false, anchorOptimize = false;
590         long topN = Long.MAX_VALUE;
591         float cutoffScore = -1.0f;
592         int numFetchers = 1;
593         int seed = new Random().nextInt();
594
595
596         try {
597             for (; i < argv.length; i++) {
598                 if ("-refetchonly".equals(argv[i])) {
599                     refetchOnly = true;
600                 } else if ("-anchoroptimize".equals(argv[i])) {
601                     anchorOptimize = true;
602                 } else if ("-topN".equals(argv[i])) {
603                     if (i+1 < argv.length) {
604                         topN = Long.parseLong(argv[i+1]);
605                         i++;
606                     } else {
607                         System.out.println("No argument present for -topN");
608                         return;
609                     }
610                 } else if ("-cutoff".equals(argv[i])) {
611                     if (i+1 < argv.length) {
612                         cutoffScore = Float.parseFloat(argv[i+1]);
613                         i++;
614                     } else {
615                         System.out.println("No argument present for -cutoffscore");
616                         return;
617                     }
618                 } else if ("-numFetchers".equals(argv[i])) {
619                     if (i+1 < argv.length) {
620                         numFetchers = Integer.parseInt(argv[i+1]);
621                         i++;
622                     } else {
623                         System.out.println("No argument present for -numFetchers");
624                         return;
625                     }
626                 } else if ("-adddays".equals(argv[i])) {
627                     if (i+1 < argv.length) {
628                         long numDays = Integer.parseInt(argv[i+1]);
629                         curTime += numDays * 1000L * 60 * 60 * 24;
630                     } else {
631                         System.out.println("No argument present for -adddays");
632                         return;
633                     }
634                 }
635             }
636         } catch (NumberFormatException JavaDoc nfe) {
637             System.out.println("Badly-formatted number:: " + nfe);
638             return;
639         }
640
641
642         //
643
// Check that args are consistent
644
//
645
if (anchorOptimize && !refetchOnly) {
646             System.out.println("Tool cannot use -anchoroptimize option without -refetchonly option as well.");
647             return;
648         }
649
650         //
651
// Finally, start things up.
652
//
653
LOG.info("FetchListTool started");
654         if (topN != Long.MAX_VALUE) {
655             LOG.info("topN:" + topN);
656         }
657         if (cutoffScore >= 0) {
658             LOG.info("cutoffscore:" + cutoffScore);
659         }
660         if (numFetchers > 1) {
661             LOG.info("seed:" + seed);
662         }
663
664         FetchListTool flt = new FetchListTool(nfs, dbDir, refetchOnly, anchorOptimize, cutoffScore, seed);
665         if (numFetchers > 1) {
666             flt.emitMultipleLists(segmentDir, numFetchers, topN, curTime);
667         } else {
668             flt.emitFetchList(segmentDir, topN, curTime);
669         }
670         nfs.close();
671         LOG.info("FetchListTool completed");
672     }
673 }
674
Popular Tags