KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > fetcher > Fetcher


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.fetcher;
5
6 import java.io.IOException JavaDoc;
7 import java.io.File JavaDoc;
8 import java.util.Properties JavaDoc;
9
10 import net.nutch.pagedb.FetchListEntry;
11 import net.nutch.io.*;
12 import net.nutch.db.*;
13 import net.nutch.fs.*;
14 import net.nutch.util.*;
15 import net.nutch.protocol.*;
16 import net.nutch.parse.*;
17 import net.nutch.plugin.*;
18
19 import java.util.logging.*;
20
21 /**
22  * The fetcher. Most of the work is done by plugins.
23  *
24  * <p>
25  * Note by John Xing: As of 20041022, option -noParsing is introduced.
26  * Without this option, fetcher behaves the old way, i.e., it not only
27  * crawls but also parses content. With option -noParsing, fetcher
28  * does crawl only. Use ParseSegment.java to parse fetched contents.
29  * Check FetcherOutput.java and ParseSegment.java for further description.
30  */

31 public class Fetcher {
32
33   public static final Logger LOG =
34     LogFormatter.getLogger("net.nutch.fetcher.Fetcher");
35
36   static {
37     if (NutchConf.getBoolean("fetcher.verbose", false)) {
38       setLogLevel(Level.FINE);
39     }
40   }
41
42   private ArrayFile.Reader fetchList; // the input
43
private ArrayFile.Writer fetcherWriter; // the output
44
private ArrayFile.Writer contentWriter;
45   private ArrayFile.Writer parseTextWriter;
46   private ArrayFile.Writer parseDataWriter;
47
48   private String JavaDoc name; // name of the segment
49
private long start; // start time of fetcher run
50
private long bytes; // total bytes fetched
51
private int pages; // total pages fetched
52
private int errors; // total pages errored
53

54   private boolean parsing = true; // whether do parsing
55

56   private int threadCount = // max number of threads
57
NutchConf.getInt("fetcher.threads.fetch", 10);
58
59   // All threads (FetcherThread or thread started by it) belong to
60
// group "fetcher". Each FetcherThread is named as "fetcherXX",
61
// where XX is the order it's started.
62
private static final String JavaDoc THREAD_GROUP_NAME = "fetcher";
63
64   private ThreadGroup JavaDoc group = new ThreadGroup JavaDoc(THREAD_GROUP_NAME); // our group
65

66   // count of FetcherThreads that are through the loop and just about to return
67
private int atCompletion = 0;
68
69   /********************************************
70    * Fetcher thread
71    ********************************************/

72   private class FetcherThread extends Thread JavaDoc {
73
74     public FetcherThread(String JavaDoc name) { super(group, name); }
75
76     /**
77      * This thread keeps looping, grabbing an item off the list
78      * of URLs to be fetched (in a thread-safe way). It checks
79      * whether the URL is OK to download. If so, we do it.
80      */

81     public void run() {
82
83       FetchListEntry fle = new FetchListEntry();
84
85       while (true) {
86         if (LogFormatter.hasLoggedSevere()) // something bad happened
87
break; // exit
88

89         String JavaDoc url = null;
90         try {
91
92           if (fetchList.next(fle) == null)
93             break;
94
95           url = fle.getPage().getURL().toString();
96
97           if (!fle.getFetch()) { // should we fetch this page?
98
if (LOG.isLoggable(Level.FINE))
99               LOG.fine("not fetching " + url);
100             handleNoFetch(fle, FetcherOutput.SUCCESS);
101             continue;
102           }
103
104           LOG.info("fetching " + url); // fetch the page
105

106           Protocol protocol = ProtocolFactory.getProtocol(url);
107           Content content = protocol.getContent(url);
108
109           handleFetch(url, fle, content);
110
111           synchronized (Fetcher.this) { // update status
112
pages++;
113             bytes += content.getContent().length;
114             if ((pages % 100) == 0) { // show status every 100pp
115
status();
116             }
117           }
118         } catch (ResourceGone e) { // don't retry
119
logError(url, fle, e);
120           handleNoFetch(fle, FetcherOutput.NOT_FOUND);
121
122         // dealt with in handleFetch() below
123
//} catch (ParseException e) { // don't retry
124
// logError(url, fle, e);
125
// handleNoFetch(fle, FetcherOutput.CANT_PARSE);
126

127         } catch (RetryLater e) { // explicit retry
128
logError(url, fle, e);
129           handleNoFetch(fle, FetcherOutput.RETRY);
130
131         } catch (ProtocolException e) { // implicit retry
132
logError(url, fle, e);
133           handleNoFetch(fle, FetcherOutput.RETRY);
134
135         } catch (Throwable JavaDoc t) { // an unchecked exception
136
if (fle != null) {
137             logError(url, fle, t); // retry?
138
handleNoFetch(fle, FetcherOutput.RETRY);
139           }
140         }
141       }
142
143       // Explicitly invoke shutDown() for all possible plugins.
144
// Done by the FetcherThread finished the last.
145
synchronized (Fetcher.this) {
146         atCompletion++;
147         if (atCompletion == threadCount) {
148           try {
149             PluginRepository.getInstance().finalize();
150           } catch (java.lang.Throwable JavaDoc t) {
151             // do nothing
152
}
153         }
154       }
155       return;
156     }
157
158     private void logError(String JavaDoc url, FetchListEntry fle, Throwable JavaDoc t) {
159       LOG.info("fetch of " + url + " failed with: " + t);
160       LOG.log(Level.FINE, "stack", t); // stack trace
161
synchronized (Fetcher.this) { // record failure
162
errors++;
163       }
164     }
165
166     private void handleFetch(String JavaDoc url, FetchListEntry fle, Content content) {
167       if (!Fetcher.this.parsing) {
168         outputPage(new FetcherOutput(fle, MD5Hash.digest(content.getContent()),
169                                     FetcherOutput.SUCCESS),
170                 content, null, null);
171         return;
172       }
173
174       try {
175         String JavaDoc contentType = content.getContentType();
176         Parser parser = ParserFactory.getParser(contentType, url);
177         Parse parse = parser.getParse(content);
178         outputPage(new FetcherOutput(fle, MD5Hash.digest(content.getContent()),
179                                     FetcherOutput.SUCCESS),
180                 content, new ParseText(parse.getText()), parse.getData());
181       } catch (ParseException e) {
182         // 20041026, xing
183
// If fetching succeeds, but parsing fails, content should be saved
184
// so that we can try to parse again in separate pass, possibly
185
// using better/alternative parser.
186
LOG.info("fetch okay, but can't parse " + url + ", reason: "
187           + e.getMessage());
188         outputPage(new FetcherOutput(fle, MD5Hash.digest(content.getContent()),
189                                     FetcherOutput.CANT_PARSE),
190                 content, new ParseText(""),
191                 new ParseData("", new Outlink[0], new Properties JavaDoc()));
192       }
193     }
194
195     private void handleNoFetch(FetchListEntry fle, int status) {
196       String JavaDoc url = fle.getPage().getURL().toString();
197       MD5Hash hash = MD5Hash.digest(url);
198
199       if (Fetcher.this.parsing) {
200         outputPage(new FetcherOutput(fle, hash, status),
201                    new Content(url, url, new byte[0], "", new Properties JavaDoc()),
202                    new ParseText(""),
203                    new ParseData("", new Outlink[0], new Properties JavaDoc()));
204       } else {
205         outputPage(new FetcherOutput(fle, hash, status),
206                    new Content(url, url, new byte[0], "", new Properties JavaDoc()),
207                    null, null);
208       }
209     }
210       
211     private void outputPage(FetcherOutput fo, Content content,
212                             ParseText text, ParseData parseData) {
213       try {
214         synchronized (fetcherWriter) {
215           fetcherWriter.append(fo);
216           contentWriter.append(content);
217           if (Fetcher.this.parsing) {
218             parseTextWriter.append(text);
219             parseDataWriter.append(parseData);
220           }
221         }
222       } catch (Throwable JavaDoc t) {
223         LOG.severe("error writing output:" + t.toString());
224       }
225     }
226                                        
227   }
228             
229   public Fetcher(NutchFileSystem nfs, String JavaDoc directory, boolean parsing)
230     throws IOException JavaDoc {
231
232     this.parsing = parsing;
233
234     // Set up in/out streams
235
fetchList = new ArrayFile.Reader
236       (nfs, new File JavaDoc(directory, FetchListEntry.DIR_NAME).toString());
237     if (this.parsing) {
238       fetcherWriter = new ArrayFile.Writer
239         (nfs, new File JavaDoc(directory, FetcherOutput.DIR_NAME).toString(),
240         FetcherOutput.class);
241     } else {
242       fetcherWriter = new ArrayFile.Writer
243         (nfs, new File JavaDoc(directory, FetcherOutput.DIR_NAME_NP).toString(),
244         FetcherOutput.class);
245     }
246     contentWriter = new ArrayFile.Writer
247       (nfs, new File JavaDoc(directory, Content.DIR_NAME).toString(), Content.class);
248     if (this.parsing) {
249       parseTextWriter = new ArrayFile.Writer(nfs,
250         new File JavaDoc(directory, ParseText.DIR_NAME).toString(), ParseText.class);
251       parseDataWriter = new ArrayFile.Writer(nfs,
252         new File JavaDoc(directory, ParseData.DIR_NAME).toString(), ParseData.class);
253     }
254     name = new File JavaDoc(directory).getName();
255   }
256
257   /** Set thread count */
258   public void setThreadCount(int threadCount) {
259     this.threadCount=threadCount;
260   }
261
262   /** Set the logging level. */
263   public static void setLogLevel(Level level) {
264     LOG.setLevel(level);
265     PluginRepository.LOG.setLevel(level);
266     ParserFactory.LOG.setLevel(level);
267     LOG.info("logging at " + level);
268   }
269
270   /** Runs the fetcher. */
271   public void run() throws IOException JavaDoc, InterruptedException JavaDoc {
272     start = System.currentTimeMillis();
273     for (int i = 0; i < threadCount; i++) { // spawn threads
274
FetcherThread thread = new FetcherThread(THREAD_GROUP_NAME+i);
275       thread.start();
276     }
277
278     // Quit monitoring if all FetcherThreads are gone.
279
// There could still be other threads, which may well be runaway threads
280
// started by external libs via FetcherThreads and it is generally safe
281
// to ignore them because our main FetcherThreads have finished their jobs.
282
// In fact we are a little more cautious here by making sure
283
// there is no more outstanding page fetches via monitoring
284
// changes of pages, errors and bytes.
285
int pages0 = pages; int errors0 = errors; long bytes0 = bytes;
286   
287     while (true) {
288       Thread.sleep(1000);
289
290       if (LogFormatter.hasLoggedSevere())
291         throw new RuntimeException JavaDoc("SEVERE error logged. Exiting fetcher.");
292
293       int n = group.activeCount();
294       Thread JavaDoc[] list = new Thread JavaDoc[n];
295       group.enumerate(list);
296       boolean noMoreFetcherThread = true; // assumption
297
for (int i = 0; i < n; i++) {
298         // this thread may have gone away in the meantime
299
if (list[i] == null) continue;
300         String JavaDoc name = list[i].getName();
301         if (name.startsWith(THREAD_GROUP_NAME)) // prove it
302
noMoreFetcherThread = false;
303         if (LOG.isLoggable(Level.FINE))
304           LOG.fine(list[i].toString());
305       }
306       if (noMoreFetcherThread) {
307         if (LOG.isLoggable(Level.FINE))
308           LOG.fine("number of active threads: "+n);
309         if (pages == pages0 && errors == errors0 && bytes == bytes0)
310           break;
311         status();
312         pages0 = pages; errors0 = errors; bytes0 = bytes;
313       }
314     }
315
316     fetchList.close(); // close databases
317
fetcherWriter.close();
318     contentWriter.close();
319     if (this.parsing) {
320       parseTextWriter.close();
321       parseDataWriter.close();
322     }
323
324   }
325   
326   public static class FetcherStatus {
327     private String JavaDoc name;
328     private long startTime, curTime;
329     private int pageCount, errorCount;
330     private long byteCount;
331     
332     /**
333      * FetcherStatus encapsulates a snapshot of the Fetcher progress status.
334      * @param name short name of the segment being processed
335      * @param start the time in millisec. this fetcher was started
336      * @param pages number of pages fetched
337      * @param errors number of fetching errors
338      * @param bytes number of bytes fetched
339      */

340     public FetcherStatus(String JavaDoc name, long start, int pages, int errors, long bytes) {
341       this.name = name;
342       this.startTime = start;
343       this.curTime = System.currentTimeMillis();
344       this.pageCount = pages;
345       this.errorCount = errors;
346       this.byteCount = bytes;
347     }
348     
349     public String JavaDoc getName() {return name;}
350     public long getStartTime() {return startTime;}
351     public long getCurTime() {return curTime;}
352     public long getElapsedTime() {return curTime - startTime;}
353     public int getPageCount() {return pageCount;}
354     public int getErrorCount() {return errorCount;}
355     public long getByteCount() {return byteCount;}
356     
357     public String JavaDoc toString() {
358       return "status: segment " + name + ", "
359         + pageCount + " pages, "
360         + errorCount + " errors, "
361         + byteCount + " bytes, "
362         + (curTime - startTime) + " ms";
363     }
364   }
365   
366   public synchronized FetcherStatus getStatus() {
367     return new FetcherStatus(name, start, pages, errors, bytes);
368   }
369
370   /** Display the status of the fetcher run. */
371   public synchronized void status() {
372     FetcherStatus status = getStatus();
373     LOG.info(status.toString());
374     LOG.info("status: "
375              + (((float)status.getPageCount())/(status.getElapsedTime()/1000.0f))+" pages/s, "
376              + (((float)status.getByteCount()*8/1024)/(status.getElapsedTime()/1000.0f))+" kb/s, "
377              + (((float)status.getByteCount())/status.getPageCount()) + " bytes/page");
378   }
379
380   /** Run the fetcher. */
381   public static void main(String JavaDoc[] args) throws Exception JavaDoc {
382     int threadCount = -1;
383     long delay = -1;
384     String JavaDoc logLevel = "info";
385     boolean parsing = true;
386     boolean showThreadID = false;
387     String JavaDoc directory = null;
388
389     String JavaDoc usage = "Usage: Fetcher (-local | -ndfs <namenode:port>) [-logLevel level] [-noParsing] [-showThreadID] [-threads n] <dir>";
390
391     if (args.length == 0) {
392       System.err.println(usage);
393       System.exit(-1);
394     }
395       
396     int i = 0;
397     NutchFileSystem nfs = NutchFileSystem.parseArgs(args, i);
398     for (; i < args.length; i++) { // parse command line
399
if (args[i] == null) {
400           continue;
401       } else if (args[i].equals("-threads")) { // found -threads option
402
threadCount = Integer.parseInt(args[++i]);
403       } else if (args[i].equals("-logLevel")) {
404         logLevel = args[++i];
405       } else if (args[i].equals("-noParsing")) {
406         parsing = false;
407       } else if (args[i].equals("-showThreadID")) {
408         showThreadID = true;
409       } else // root is required parameter
410
directory = args[i];
411     }
412
413     Fetcher fetcher = new Fetcher(nfs, directory, parsing);// make a Fetcher
414
if (threadCount != -1) { // set threadCount option
415
fetcher.setThreadCount(threadCount);
416     }
417
418     // set log level
419
fetcher.setLogLevel(Level.parse(logLevel.toUpperCase()));
420
421     if (showThreadID) {
422       LogFormatter.setShowThreadIDs(showThreadID);
423     }
424     
425     try {
426       fetcher.run(); // run the Fetcher
427
} finally {
428       nfs.close();
429     }
430
431   }
432 }
433
Popular Tags