KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > tools > DistributedAnalysisTool


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.tools;
5
6 import java.io.*;
7 import java.util.*;
8 import java.util.logging.*;
9
10 import net.nutch.io.*;
11 import net.nutch.db.*;
12 import net.nutch.fs.*;
13 import net.nutch.net.*;
14 import net.nutch.util.*;
15 import net.nutch.linkdb.*;
16 import net.nutch.pagedb.*;
17 import net.nutch.fetcher.*;
18
19 /***************************************
20  * DistributedAnalysisTool performs link-analysis by reading
21  * exclusively from a IWebDBReader, and writing to
22  * an IWebDBWriter.
23  *
24  * This tool can be used in phases via the command line
25  * to compute the LinkAnalysis score across many machines.
26  *
27  * For a single iteration of LinkAnalysis, you must have:
28  *
29  * 1) An "initRound" step that writes down how the work should be
30  * divided. This outputs a "dist" directory which must be made
31  * available to later steps. It requires the input db directory.
32  *
33  * 2) As many simultaneous "computeRound" steps as you like, but this
34  * number must be determined in step 1. Each step may be run
35  * on different machines, or on the same, or however you like.
36  * It requires the the "db" and "dist" directories (or copies) as
37  * inputs. Each run will output an "instructions file".
38  *
39  * 3) A "completeRound" step, which integrates the results of all the
40  * many "computeRound" steps. It writes to a "db" directory. It
41  * assumes that all the instructions files have been gathered into
42  * a single "dist" input directory. If you're running everything
43  * on a single filesystem, this will happen easily. If not, then
44  * you will have to gather the files by hand (or with a script).
45  *
46  * For more iterations, repeat steps 1 - 3!
47  *
48  * @author Mike Cafarella
49  ***************************************/

50 public class DistributedAnalysisTool {
51     final private static String JavaDoc ASSIGN_FILE_PREFIX = "assignment";
52     final private static String JavaDoc SCORE_EDITS_FILE_PREFIX = "scoreEdits";
53     final private static String JavaDoc ASSIGN_COMPLETE = "assignComplete";
54     
55     final private static float DEFAULT_SCORE = 0.15f;
56     final private static float DECAY_VALUE = 0.85f;
57
58     public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.DistributedAnalysisTool");
59
60     /**
61      * The EditSet inner class represents all of the sorted edits
62      * files we must process. The edit-loop can repeatedly ask
63      * an EditSet for the "next item", and the EditSet will
64      * seamlessly deal with opening and closing files
65      */

66     class EditSet {
67         File distDir;
68         int numEditFiles;
69         int curEditFile;
70         SequenceFile.Reader curReader;
71
72         /**
73          * The "distDir" is where we find all the edit files.
74          * The "numEditFiles" is now many we can expect to get there.
75          */

76         public EditSet(File distDir, int numEditFiles) throws IOException {
77             this.distDir = distDir;
78             this.numEditFiles = numEditFiles;
79             this.curEditFile = 0;
80             getNextReader();
81         }
82
83         /**
84          * Get the next item for reading, closing and opening
85          * files if necessary. Return false if there are no
86          * more items to return.
87          */

88         public synchronized boolean next(Writable key, Writable val) throws IOException {
89             //
90
// Open the next input stream if necessary
91
//
92
if (curReader == null) {
93                 getNextReader();
94                 // Assume each edits-file has at least one entry in it.
95
if (curReader == null) {
96                     return false;
97                 }
98             }
99             return curReader.next(key, val);
100         }
101
102         /**
103          * Create the next edit reader and return it.
104          */

105         private void getNextReader() throws IOException {
106             if (curReader != null) {
107                 curReader.close();
108             }
109
110             if (curEditFile < numEditFiles) {
111                 curReader = new SequenceFile.Reader(nfs, new File(distDir, SCORE_EDITS_FILE_PREFIX + "." + curEditFile + ".sorted").getPath());
112                 LOG.info("Opened stream to file " + curEditFile);
113                 curEditFile++;
114             }
115         }
116
117         /**
118          */

119         public synchronized void close() throws IOException {
120             if (curReader != null) {
121                 curReader.close();
122             }
123             curEditFile = numEditFiles;
124         }
125     }
126
127     /**
128      * This is a Writable version of a Float. We
129      * need this so we can store it in a SequenceFile
130      */

131     class ScoreValue implements Writable {
132         float score;
133         float nextScore;
134
135         /**
136          */

137         public ScoreValue() {
138         }
139         /**
140          */

141         public void setScore(float f) {
142             this.score = f;
143         }
144         /**
145          */

146         public void setNextScore(float f) {
147             this.nextScore = f;
148         }
149
150         /**
151          */

152         public float score() {
153             return score;
154         }
155         /**
156          */

157         public float nextScore() {
158             return nextScore;
159         }
160
161         /**
162          */

163         public void write(DataOutput out) throws IOException {
164             out.writeFloat(score);
165             out.writeFloat(nextScore);
166         }
167
168         /**
169          */

170         public void readFields(DataInput in) throws IOException {
171             this.score = in.readFloat();
172             this.nextScore = in.readFloat();
173         }
174     }
175
176     NutchFileSystem nfs;
177     File dbDir;
178
179     /**
180      * Give the pagedb and linkdb files and their cache sizes
181      */

182     public DistributedAnalysisTool(NutchFileSystem nfs, File dbDir) throws IOException, FileNotFoundException {
183         this.nfs = nfs;
184         this.dbDir = dbDir;
185     }
186
187     /**
188      * This method prepares the ground for a set of processes
189      * to distribute a round of LinkAnalysis work. It writes out
190      * the "assignments" to a directory. This directory must be
191      * made accessible to all the processes. (It may be mounted by
192      * all of them, or copied to all of them.)
193      *
194      * This is run by a single process, and it is run first.
195      */

196     public boolean initRound(int numProcesses, File distDir) throws IOException {
197         //
198
// The distDir must be empty or non-existent.
199
//
200
if ((nfs.exists(distDir) && nfs.isFile(distDir)) ||
201             (nfs.exists(distDir) && (nfs.listFiles(distDir).length != 0))) {
202             LOG.severe("Must be an empty or non-existent dir: " + distDir);
203             return false;
204         }
205         if (! nfs.exists(distDir)) {
206             nfs.mkdirs(distDir);
207         }
208
209         //
210
// Figure out how many db items we have, and how many
211
// processes they are allocated to.
212
//
213
long startPages[] = new long[numProcesses];
214         long totalPages = 0;
215         IWebDBReader reader = new WebDBReader(nfs, dbDir);
216         try {
217             totalPages = reader.numPages();
218         } finally {
219             reader.close();
220         }
221         long chunkSize = totalPages / numProcesses;
222         long pagesProcessedSoFar = 0;
223
224         //
225
// From zero to the 2nd-to-last item, assign a
226
// chunk's worth of pages. The value at each index
227
// indicates the start page for that process.
228
//
229
startPages[0] = 0;
230         for (int i = 1; i < numProcesses; i++) {
231             startPages[i] = startPages[i-1] + chunkSize;
232         }
233
234         //
235
// Emit the assignments for the processes
236
//
237
try {
238             // Write out each file
239
for (int i = 0; i < numProcesses; i++) {
240                 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(nfs.create(new File(distDir, ASSIGN_FILE_PREFIX + "." + i))));
241                 try {
242                     // Start page
243
out.writeLong(startPages[i]);
244
245                     // How many pages to process
246
if (i != numProcesses - 1) {
247                         out.writeLong(chunkSize);
248                     } else {
249                         // in last index, make up for remainders
250
out.writeLong(totalPages - ((numProcesses - 1) * chunkSize));
251                     }
252                 } finally {
253                     out.close();
254                 }
255             }
256             
257             //
258
// Write a file that indicates we finished correctly.
259
// This makes it easier for controlling scripts to
260
// check whether this process completed.
261
//
262
// It also holds some overall instruction information,
263
// so we can do some error-checking at complete-time.
264
//
265
File completeFile = new File(distDir, "assignComplete");
266             DataOutputStream out = new DataOutputStream(new BufferedOutputStream(nfs.create(completeFile)));
267             try {
268                 out.writeInt(numProcesses);
269                 out.writeLong(totalPages);
270                 
271                 // Compute extents
272
long extent[] = new long[numProcesses];
273                 for (int i = 0; i < numProcesses - 1; i++) {
274                     extent[i] = chunkSize * (i + 1);
275                 }
276                 extent[numProcesses-1] = totalPages - (chunkSize * (numProcesses - 1));
277                 
278                 // Emit extents
279
for (int i = 0; i < extent.length; i++) {
280                     out.writeLong(extent[i]);
281                 }
282             } finally {
283                 out.close();
284             }
285             return true;
286         } catch (IOException ex) {
287             LOG.severe(ex.toString());
288             LOG.severe("Sorry, could not finish assignments");
289         }
290         return false;
291     }
292
293     /**
294      * This method is invoked by one of the many processes involved
295      * in LinkAnalysis. There will be many of these running at the
296      * same time. That's OK, though, since there's no locking
297      * that has to go on between them.
298      *
299      * This computes the LinkAnalysis score for a given region
300      * of the database. It writes its ID, the region params, and
301      * the scores-to-be-written into a flat file. This file is
302      * labelled according to its processid, and is found inside distDir.
303      */

304     public void computeRound(int processId, File distDir) throws IOException {
305         File assignFile = new File(distDir, ASSIGN_FILE_PREFIX + "." + processId);
306
307         long startIndex = 0, extent = 0;
308         DataInputStream in = new DataInputStream(new BufferedInputStream(nfs.open(assignFile)));
309         try {
310             startIndex = in.readLong();
311             extent = in.readLong();
312         } finally {
313             in.close();
314         }
315
316         LOG.info("Start at: "+ startIndex);
317         LOG.info("Extent: "+ extent);
318
319         //
320
// Open scoreEdits file for this process. Write down
321
// all the score-edits we want to perform.
322
//
323
File scoreEdits = new File(distDir, SCORE_EDITS_FILE_PREFIX + "." + processId);
324         SequenceFile.Writer scoreWriter = new SequenceFile.Writer(nfs, scoreEdits.getPath() + ".unsorted", UTF8.class, ScoreValue.class);
325
326         //
327
// Go through the appropriate WebDB range, and figure out
328
// next scores
329
//
330
try {
331             // Iterate through all items in the webdb, sorted by URL
332
long curIndex = 0;
333             ScoreValue score = new ScoreValue();
334             IWebDBReader reader = new WebDBReader(nfs, dbDir);
335             try {
336                 for (Enumeration e = reader.pagesByMD5(); e.hasMoreElements(); curIndex++) {
337                     //
338
// Find our starting place
339
//
340
if (curIndex < startIndex) {
341                         e.nextElement();
342                         continue;
343                     }
344
345                     //
346
// Bail if we've been here too long
347
//
348
if (curIndex - startIndex > extent) {
349                         break;
350                     }
351
352                     //
353
// OK, do some analysis!
354
//
355
Page curPage = (Page) e.nextElement();
356                     Link outLinks[] = reader.getLinks(curPage.getMD5());
357                     int targetOutlinkers = 0;
358                     for (int i = 0; i < outLinks.length; i++) {
359                         if (outLinks[i].targetHasOutlink()) {
360                             targetOutlinkers++;
361                         }
362                     }
363
364                     //
365
// For our purposes here, assume every Page
366
// has an inlink, even though that might not
367
// really be true. It's close enough.
368
//
369

370                     //
371
// In case there's no previous nextScore, grab
372
// score as an approximation.
373
//
374
float curNextScore = curPage.getNextScore();
375                     if (outLinks.length > 0 && curNextScore == 0.0f) {
376                         curNextScore = curPage.getScore();
377                     }
378
379                     //
380
// Compute contributions
381
//
382
float contributionForAll = (outLinks.length > 0) ? (curNextScore / outLinks.length) : 0.0f;
383                     float contributionForOutlinkers = (targetOutlinkers > 0) ? (curNextScore / targetOutlinkers) : 0.0f;
384                     for (int i = 0; i < outLinks.length; i++) {
385                         // emit the target URL and the contribution
386
score.setScore(contributionForAll);
387                         score.setNextScore(outLinks[i].targetHasOutlink() ? contributionForOutlinkers : 0.0f);
388                         scoreWriter.append(outLinks[i].getURL(), score);
389                     }
390
391                     if (((curIndex - startIndex) % 5000) == 0) {
392                         LOG.info("Pages consumed: " + (curIndex - startIndex) + " (at index " + curIndex + ")");
393                     }
394                 }
395             } finally {
396                 reader.close();
397             }
398         } finally {
399             scoreWriter.close();
400         }
401
402         // Now sort the resulting score-edits file
403
SequenceFile.Sorter sorter = new SequenceFile.Sorter(nfs, new UTF8.Comparator(), ScoreValue.class);
404         sorter.sort(scoreEdits.getPath() + ".unsorted", scoreEdits.getPath() + ".sorted");
405         nfs.delete(new File(scoreEdits.getPath() + ".unsorted"));
406     }
407
408
409     /**
410      * This method collates and executes all the instructions
411      * computed by the many executors of computeRound(). It
412      * figures out what to write by looking at all the flat
413      * files found in the distDir. These files are labelled
414      * according to the processes that filled them. This method
415      * will check to make sure all those files are present
416      * before starting work.
417      *
418      * If the processors are distributed, you might have to
419      * copy all the instruction files to a single distDir before
420      * starting this method.
421      *
422      * Of course, this method is executed on only one process.
423      * It is run last.
424      */

425     public void completeRound(File distDir, File scoreFile) throws IOException {
426         //
427
// Load the overall assignment file, so we can
428
// see how many processes we have and how many
429
// operations each should include
430
//
431
int numProcesses = 0;
432         long totalPages = 0;
433         long extent[] = null;
434         File overall = new File(distDir, "assignComplete");
435         DataInputStream in = new DataInputStream(new BufferedInputStream(nfs.open(overall)));
436         try {
437             numProcesses = in.readInt();
438             totalPages = in.readLong();
439             extent = new long[numProcesses];
440             for (int i = 0; i < numProcesses; i++) {
441                 extent[i] = in.readLong();
442             }
443         } finally {
444             in.close();
445             in = null;
446         }
447         
448         //
449
// Go through each instructions file we have, and
450
// apply each one to the webdb.
451
//
452
ScoreStats scoreStats = new ScoreStats();
453         IWebDBReader reader = new WebDBReader(nfs, dbDir);
454         IWebDBWriter writer = new WebDBWriter(nfs, dbDir);
455         EditSet editSet = new EditSet(distDir, numProcesses);
456         try {
457             int count = 0;
458             UTF8 curEditURL = new UTF8();
459             ScoreValue curContribution = new ScoreValue();
460             boolean hasEdit = editSet.next(curEditURL, curContribution);
461
462             //
463
// Go through all the Pages, in URL-sort order.
464
// We also read from the score-edit file in URL-sort order.
465
//
466
for (Enumeration e = reader.pages(); e.hasMoreElements(); count++) {
467                 Page curPage = (Page) e.nextElement();
468                 if (! hasEdit) {
469                     break;
470                 }
471
472                 //
473
// Apply the current score-edit to the db item,
474
// if appropriate
475
//
476
int comparison = curPage.getURL().compareTo(curEditURL);
477                 float newScore = 0.0f, newNextScore = 0.0f;
478                 if (comparison < 0) {
479                     // Fine. The edit applies to a Page we will
480
// hit later. Ignore it, and move onto the next
481
// Page. This should only happen with Pages
482
// that have no incoming links, which are necessarily
483
// special-case Pages.
484
//
485
// However, that means the Page's score should
486
// be set to the minimum possible, as we have no
487
// incoming links.
488
newScore = (1 - DECAY_VALUE);
489                     newNextScore = (1 - DECAY_VALUE);
490                 } else if (comparison > 0) {
491                     // Error! We should never hit this situation.
492
// It means we have a score-edit for an item
493
// that's not found in the database!
494
throw new IOException("Impossible situation. There is a score-edit for " + curEditURL + ", which comes after the current Page " + curPage.getURL());
495                 } else {
496                     //
497
// The only really interesting case is when the
498
// score-edit and the curPage are the same.
499
//
500
// Sum all the contributions
501
while (hasEdit && curPage.getURL().compareTo(curEditURL) == 0) {
502                         newScore += curContribution.score();
503                         newNextScore += curContribution.nextScore();
504                         hasEdit = editSet.next(curEditURL, curContribution);
505                     }
506                     
507                     newScore = (1 - DECAY_VALUE) + (DECAY_VALUE * newScore);
508                     newNextScore = (1 - DECAY_VALUE) + (DECAY_VALUE * newNextScore);
509                 }
510                 
511                 // Finally, assign it.
512
curPage.setScore(newScore, newNextScore);
513                 writer.addPageWithScore(curPage);
514                 scoreStats.addScore(newScore);
515                 if ((count % 5000) == 0) {
516                     LOG.info("Pages written: " + count);
517                 }
518             }
519             LOG.info("Pages encountered: " + count);
520             LOG.info("Target pages from init(): " + totalPages);
521         } finally {
522             reader.close();
523             editSet.close();
524             writer.close();
525         }
526
527         //
528
// Emit the score distribution info
529
//
530
if (scoreFile.exists()) {
531             scoreFile.delete();
532         }
533         PrintStream pout = new PrintStream(new BufferedOutputStream(nfs.create(scoreFile)));
534         try {
535             scoreStats.emitDistribution(pout);
536         } finally {
537             pout.close();
538         }
539
540         //
541
// Delete all the distributed overhead files
542
//
543
FileUtil.fullyDelete(nfs, distDir);
544     }
545
546     /**
547      * Kick off the link analysis. Submit the locations of the
548      * Webdb and the number of iterations.
549      *
550      * DAT -initRound <n_processes> <dist_dir> <db_dir>
551      * DAT -computeRound <process_id> <dist_dir> <db_dir>
552      * DAT -completeRound <dist_dir> <db_dir>
553      */

554     public static void main(String JavaDoc argv[]) throws IOException {
555         if (argv.length < 2) {
556             System.out.println("usage: java net.nutch.tools.DistributedAnalysisTool (-local | -ndfs <namenode:port>) -initRound|-computeRound|-completeRound (numProcesses | processId) <dist_dir> <db_dir>");
557             return;
558         }
559
560         String JavaDoc command = null;
561         int numProcesses = 0, processId = 0, numIterations = 0;
562         File distDir = null, dbDir = null;
563
564         NutchFileSystem nfs = NutchFileSystem.parseArgs(argv, 0);
565         for (int i = 0; i < argv.length; i++) {
566             if ("-initRound".equals(argv[i])) {
567                 command = argv[i];
568                 numProcesses = Integer.parseInt(argv[i+1]);
569                 distDir = new File(argv[i+2]);
570                 dbDir = new File(argv[i+3]);
571                 i+=3;
572             } else if ("-computeRound".equals(argv[i])) {
573                 command = argv[i];
574                 processId = Integer.parseInt(argv[i+1]);
575                 distDir = new File(argv[i+2]);
576                 dbDir = new File(argv[i+3]);
577                 i+=3;
578             } else if ("-completeRound".equals(argv[i])) {
579                 command = argv[i];
580                 distDir = new File(argv[i+1]);
581                 dbDir = new File(argv[i+2]);
582                 i+=2;
583             }
584         }
585
586         System.out.println("Started at " + new Date(System.currentTimeMillis()));
587         try {
588             DistributedAnalysisTool dat =
589                 new DistributedAnalysisTool(nfs, dbDir);
590             if ("-initRound".equals(command)) {
591                 dat.initRound(numProcesses, distDir);
592             } else if ("-computeRound".equals(command)) {
593                 dat.computeRound(processId, distDir);
594             } else if ("-completeRound".equals(command)) {
595                 dat.completeRound(distDir, new File(dbDir, "linkstats.txt"));
596             } else {
597                 System.out.println("No directive.");
598             }
599         } finally {
600             System.out.println("Finished at " + new Date(System.currentTimeMillis()));
601             nfs.close();
602         }
603     }
604 }
605
Popular Tags