1 2 3 4 package net.nutch.fetcher; 5 6 import java.io.IOException ; 7 import java.io.File ; 8 import java.util.Properties ; 9 10 import net.nutch.pagedb.FetchListEntry; 11 import net.nutch.io.*; 12 import net.nutch.db.*; 13 import net.nutch.fs.*; 14 import net.nutch.util.*; 15 import net.nutch.protocol.*; 16 import net.nutch.parse.*; 17 import net.nutch.plugin.*; 18 19 import java.util.logging.*; 20 21 31 public class Fetcher { 32 33 public static final Logger LOG = 34 LogFormatter.getLogger("net.nutch.fetcher.Fetcher"); 35 36 static { 37 if (NutchConf.getBoolean("fetcher.verbose", false)) { 38 setLogLevel(Level.FINE); 39 } 40 } 41 42 private ArrayFile.Reader fetchList; private ArrayFile.Writer fetcherWriter; private ArrayFile.Writer contentWriter; 45 private ArrayFile.Writer parseTextWriter; 46 private ArrayFile.Writer parseDataWriter; 47 48 private String name; private long start; private long bytes; private int pages; private int errors; 54 private boolean parsing = true; 56 private int threadCount = NutchConf.getInt("fetcher.threads.fetch", 10); 58 59 private static final String THREAD_GROUP_NAME = "fetcher"; 63 64 private ThreadGroup group = new ThreadGroup (THREAD_GROUP_NAME); 66 private int atCompletion = 0; 68 69 72 private class FetcherThread extends Thread { 73 74 public FetcherThread(String name) { super(group, name); } 75 76 81 public void run() { 82 83 FetchListEntry fle = new FetchListEntry(); 84 85 while (true) { 86 if (LogFormatter.hasLoggedSevere()) break; 89 String url = null; 90 try { 91 92 if (fetchList.next(fle) == null) 93 break; 94 95 url = fle.getPage().getURL().toString(); 96 97 if (!fle.getFetch()) { if (LOG.isLoggable(Level.FINE)) 99 LOG.fine("not fetching " + url); 100 handleNoFetch(fle, FetcherOutput.SUCCESS); 101 continue; 102 } 103 104 LOG.info("fetching " + url); 106 Protocol protocol = ProtocolFactory.getProtocol(url); 107 Content content = protocol.getContent(url); 108 109 handleFetch(url, fle, content); 110 111 synchronized (Fetcher.this) { pages++; 113 bytes += content.getContent().length; 114 if ((pages % 100) == 0) { status(); 116 } 117 } 118 } catch (ResourceGone e) { logError(url, fle, e); 120 handleNoFetch(fle, FetcherOutput.NOT_FOUND); 121 122 127 } catch (RetryLater e) { logError(url, fle, e); 129 handleNoFetch(fle, FetcherOutput.RETRY); 130 131 } catch (ProtocolException e) { logError(url, fle, e); 133 handleNoFetch(fle, FetcherOutput.RETRY); 134 135 } catch (Throwable t) { if (fle != null) { 137 logError(url, fle, t); handleNoFetch(fle, FetcherOutput.RETRY); 139 } 140 } 141 } 142 143 synchronized (Fetcher.this) { 146 atCompletion++; 147 if (atCompletion == threadCount) { 148 try { 149 PluginRepository.getInstance().finalize(); 150 } catch (java.lang.Throwable t) { 151 } 153 } 154 } 155 return; 156 } 157 158 private void logError(String url, FetchListEntry fle, Throwable t) { 159 LOG.info("fetch of " + url + " failed with: " + t); 160 LOG.log(Level.FINE, "stack", t); synchronized (Fetcher.this) { errors++; 163 } 164 } 165 166 private void handleFetch(String url, FetchListEntry fle, Content content) { 167 if (!Fetcher.this.parsing) { 168 outputPage(new FetcherOutput(fle, MD5Hash.digest(content.getContent()), 169 FetcherOutput.SUCCESS), 170 content, null, null); 171 return; 172 } 173 174 try { 175 String contentType = content.getContentType(); 176 Parser parser = ParserFactory.getParser(contentType, url); 177 Parse parse = parser.getParse(content); 178 outputPage(new FetcherOutput(fle, MD5Hash.digest(content.getContent()), 179 FetcherOutput.SUCCESS), 180 content, new ParseText(parse.getText()), parse.getData()); 181 } catch (ParseException e) { 182 LOG.info("fetch okay, but can't parse " + url + ", reason: " 187 + e.getMessage()); 188 outputPage(new FetcherOutput(fle, MD5Hash.digest(content.getContent()), 189 FetcherOutput.CANT_PARSE), 190 content, new ParseText(""), 191 new ParseData("", new Outlink[0], new Properties ())); 192 } 193 } 194 195 private void handleNoFetch(FetchListEntry fle, int status) { 196 String url = fle.getPage().getURL().toString(); 197 MD5Hash hash = MD5Hash.digest(url); 198 199 if (Fetcher.this.parsing) { 200 outputPage(new FetcherOutput(fle, hash, status), 201 new Content(url, url, new byte[0], "", new Properties ()), 202 new ParseText(""), 203 new ParseData("", new Outlink[0], new Properties ())); 204 } else { 205 outputPage(new FetcherOutput(fle, hash, status), 206 new Content(url, url, new byte[0], "", new Properties ()), 207 null, null); 208 } 209 } 210 211 private void outputPage(FetcherOutput fo, Content content, 212 ParseText text, ParseData parseData) { 213 try { 214 synchronized (fetcherWriter) { 215 fetcherWriter.append(fo); 216 contentWriter.append(content); 217 if (Fetcher.this.parsing) { 218 parseTextWriter.append(text); 219 parseDataWriter.append(parseData); 220 } 221 } 222 } catch (Throwable t) { 223 LOG.severe("error writing output:" + t.toString()); 224 } 225 } 226 227 } 228 229 public Fetcher(NutchFileSystem nfs, String directory, boolean parsing) 230 throws IOException { 231 232 this.parsing = parsing; 233 234 fetchList = new ArrayFile.Reader 236 (nfs, new File (directory, FetchListEntry.DIR_NAME).toString()); 237 if (this.parsing) { 238 fetcherWriter = new ArrayFile.Writer 239 (nfs, new File (directory, FetcherOutput.DIR_NAME).toString(), 240 FetcherOutput.class); 241 } else { 242 fetcherWriter = new ArrayFile.Writer 243 (nfs, new File (directory, FetcherOutput.DIR_NAME_NP).toString(), 244 FetcherOutput.class); 245 } 246 contentWriter = new ArrayFile.Writer 247 (nfs, new File (directory, Content.DIR_NAME).toString(), Content.class); 248 if (this.parsing) { 249 parseTextWriter = new ArrayFile.Writer(nfs, 250 new File (directory, ParseText.DIR_NAME).toString(), ParseText.class); 251 parseDataWriter = new ArrayFile.Writer(nfs, 252 new File (directory, ParseData.DIR_NAME).toString(), ParseData.class); 253 } 254 name = new File (directory).getName(); 255 } 256 257 258 public void setThreadCount(int threadCount) { 259 this.threadCount=threadCount; 260 } 261 262 263 public static void setLogLevel(Level level) { 264 LOG.setLevel(level); 265 PluginRepository.LOG.setLevel(level); 266 ParserFactory.LOG.setLevel(level); 267 LOG.info("logging at " + level); 268 } 269 270 271 public void run() throws IOException , InterruptedException { 272 start = System.currentTimeMillis(); 273 for (int i = 0; i < threadCount; i++) { FetcherThread thread = new FetcherThread(THREAD_GROUP_NAME+i); 275 thread.start(); 276 } 277 278 int pages0 = pages; int errors0 = errors; long bytes0 = bytes; 286 287 while (true) { 288 Thread.sleep(1000); 289 290 if (LogFormatter.hasLoggedSevere()) 291 throw new RuntimeException ("SEVERE error logged. Exiting fetcher."); 292 293 int n = group.activeCount(); 294 Thread [] list = new Thread [n]; 295 group.enumerate(list); 296 boolean noMoreFetcherThread = true; for (int i = 0; i < n; i++) { 298 if (list[i] == null) continue; 300 String name = list[i].getName(); 301 if (name.startsWith(THREAD_GROUP_NAME)) noMoreFetcherThread = false; 303 if (LOG.isLoggable(Level.FINE)) 304 LOG.fine(list[i].toString()); 305 } 306 if (noMoreFetcherThread) { 307 if (LOG.isLoggable(Level.FINE)) 308 LOG.fine("number of active threads: "+n); 309 if (pages == pages0 && errors == errors0 && bytes == bytes0) 310 break; 311 status(); 312 pages0 = pages; errors0 = errors; bytes0 = bytes; 313 } 314 } 315 316 fetchList.close(); fetcherWriter.close(); 318 contentWriter.close(); 319 if (this.parsing) { 320 parseTextWriter.close(); 321 parseDataWriter.close(); 322 } 323 324 } 325 326 public static class FetcherStatus { 327 private String name; 328 private long startTime, curTime; 329 private int pageCount, errorCount; 330 private long byteCount; 331 332 340 public FetcherStatus(String name, long start, int pages, int errors, long bytes) { 341 this.name = name; 342 this.startTime = start; 343 this.curTime = System.currentTimeMillis(); 344 this.pageCount = pages; 345 this.errorCount = errors; 346 this.byteCount = bytes; 347 } 348 349 public String getName() {return name;} 350 public long getStartTime() {return startTime;} 351 public long getCurTime() {return curTime;} 352 public long getElapsedTime() {return curTime - startTime;} 353 public int getPageCount() {return pageCount;} 354 public int getErrorCount() {return errorCount;} 355 public long getByteCount() {return byteCount;} 356 357 public String toString() { 358 return "status: segment " + name + ", " 359 + pageCount + " pages, " 360 + errorCount + " errors, " 361 + byteCount + " bytes, " 362 + (curTime - startTime) + " ms"; 363 } 364 } 365 366 public synchronized FetcherStatus getStatus() { 367 return new FetcherStatus(name, start, pages, errors, bytes); 368 } 369 370 371 public synchronized void status() { 372 FetcherStatus status = getStatus(); 373 LOG.info(status.toString()); 374 LOG.info("status: " 375 + (((float)status.getPageCount())/(status.getElapsedTime()/1000.0f))+" pages/s, " 376 + (((float)status.getByteCount()*8/1024)/(status.getElapsedTime()/1000.0f))+" kb/s, " 377 + (((float)status.getByteCount())/status.getPageCount()) + " bytes/page"); 378 } 379 380 381 public static void main(String [] args) throws Exception { 382 int threadCount = -1; 383 long delay = -1; 384 String logLevel = "info"; 385 boolean parsing = true; 386 boolean showThreadID = false; 387 String directory = null; 388 389 String usage = "Usage: Fetcher (-local | -ndfs <namenode:port>) [-logLevel level] [-noParsing] [-showThreadID] [-threads n] <dir>"; 390 391 if (args.length == 0) { 392 System.err.println(usage); 393 System.exit(-1); 394 } 395 396 int i = 0; 397 NutchFileSystem nfs = NutchFileSystem.parseArgs(args, i); 398 for (; i < args.length; i++) { if (args[i] == null) { 400 continue; 401 } else if (args[i].equals("-threads")) { threadCount = Integer.parseInt(args[++i]); 403 } else if (args[i].equals("-logLevel")) { 404 logLevel = args[++i]; 405 } else if (args[i].equals("-noParsing")) { 406 parsing = false; 407 } else if (args[i].equals("-showThreadID")) { 408 showThreadID = true; 409 } else directory = args[i]; 411 } 412 413 Fetcher fetcher = new Fetcher(nfs, directory, parsing); if (threadCount != -1) { fetcher.setThreadCount(threadCount); 416 } 417 418 fetcher.setLogLevel(Level.parse(logLevel.toUpperCase())); 420 421 if (showThreadID) { 422 LogFormatter.setShowThreadIDs(showThreadID); 423 } 424 425 try { 426 fetcher.run(); } finally { 428 nfs.close(); 429 } 430 431 } 432 } 433 | Popular Tags |