1 25 package org.archive.crawler.frontier; 26 27 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; 28 import it.unimi.dsi.mg4j.util.MutableString; 29 30 import java.io.BufferedInputStream ; 31 import java.io.BufferedReader ; 32 import java.io.EOFException ; 33 import java.io.File ; 34 import java.io.FileInputStream ; 35 import java.io.FileNotFoundException ; 36 import java.io.FileOutputStream ; 37 import java.io.IOException ; 38 import java.io.InputStreamReader ; 39 import java.io.OutputStreamWriter ; 40 import java.io.Writer ; 41 import java.util.ArrayList ; 42 import java.util.logging.Logger ; 43 import java.util.zip.GZIPInputStream ; 44 import java.util.zip.GZIPOutputStream ; 45 46 import org.apache.commons.httpclient.URIException; 47 import org.archive.crawler.datamodel.CandidateURI; 48 import org.archive.crawler.datamodel.CrawlURI; 49 import org.archive.crawler.framework.Frontier; 50 import org.archive.net.UURI; 51 import org.archive.net.UURIFactory; 52 import org.archive.util.ArchiveUtils; 53 54 import java.util.concurrent.CountDownLatch ; 55 56 67 public class RecoveryJournal 68 implements FrontierJournal { 69 private static final Logger LOGGER = Logger.getLogger( 70 RecoveryJournal.class.getName()); 71 72 public final static String F_ADD = "F+ "; 73 public final static String F_EMIT = "Fe "; 74 public final static String F_RESCHEDULE = "Fr "; 75 public final static String F_SUCCESS = "Fs "; 76 public final static String F_FAILURE = "Ff "; 77 public final static String LOG_ERROR = "E "; 78 public final static String LOG_TIMESTAMP = "T "; 79 public final int TIMESTAMP_INTERVAL = 10000; 81 private final static int PROGRESS_INTERVAL = 1000000; 83 84 private static final long ENOUGH_TO_START_CRAWLING = 100000; 88 91 private Writer out = null; 92 93 private long lines = 0; 94 95 public static final String GZIP_SUFFIX = ".gz"; 96 97 101 private File gzipFile = null; 102 103 106 private MutableString accumulatingBuffer = 107 new MutableString(1 + F_ADD.length() + 108 128 + 109 1 + 110 8 + 111 1 + 112 128 ); 113 114 115 122 public RecoveryJournal(String path, String filename) 123 throws IOException { 124 this.gzipFile = new File (path, filename + GZIP_SUFFIX); 125 this.out = initialize(gzipFile); 126 } 127 128 private Writer initialize (final File f) 129 throws FileNotFoundException , IOException { 130 return new OutputStreamWriter (new GZIPOutputStream ( 131 new FastBufferedOutputStream(new FileOutputStream (f)))); 132 } 133 134 public synchronized void added(CrawlURI curi) { 135 accumulatingBuffer.length(0); 136 this.accumulatingBuffer.append(F_ADD). 137 append(curi.toString()). 138 append(" "). 139 append(curi.getPathFromSeed()). 140 append(" "). 141 append(curi.flattenVia()); 142 writeLine(accumulatingBuffer); 143 } 144 145 public void finishedSuccess(CrawlURI curi) { 146 finishedSuccess(curi.toString()); 147 } 148 149 public void finishedSuccess(UURI uuri) { 150 finishedSuccess(uuri.toString()); 151 } 152 153 protected void finishedSuccess(String uuri) { 154 writeLine(F_SUCCESS, uuri); 155 } 156 157 public void emitted(CrawlURI curi) { 158 writeLine(F_EMIT, curi.toString()); 159 160 } 161 162 public void finishedFailure(CrawlURI curi) { 163 finishedFailure(curi.toString()); 164 } 165 166 public void finishedFailure(UURI uuri) { 167 finishedFailure(uuri.toString()); 168 } 169 170 public void finishedFailure(String u) { 171 writeLine(F_FAILURE, u); 172 } 173 174 public void rescheduled(CrawlURI curi) { 175 writeLine(F_RESCHEDULE, curi.toString()); 176 } 177 178 private synchronized void writeLine(String string) { 179 try { 180 this.out.write("\n"); 181 this.out.write(string); 182 noteLine(); 183 } catch (IOException e) { 184 e.printStackTrace(); 185 } 186 } 187 188 private synchronized void writeLine(String s1, String s2) { 189 try { 190 this.out.write("\n"); 191 this.out.write(s1); 192 this.out.write(s2); 193 noteLine(); 194 } catch (IOException e) { 195 e.printStackTrace(); 196 } 197 } 198 199 private synchronized void writeLine(MutableString mstring) { 200 if (this.out == null) { 201 return; 202 } 203 try { 204 this.out.write("\n"); 205 mstring.write(out); 206 noteLine(); 207 } catch (IOException e) { 208 e.printStackTrace(); 209 } 210 } 211 212 215 private void noteLine() throws IOException { 216 lines++; 217 if(lines % TIMESTAMP_INTERVAL == 0) { 218 out.write("\n"); 219 out.write(LOG_TIMESTAMP); 220 out.write(ArchiveUtils.getLog14Date()); 221 } 222 } 223 224 235 public static void importRecoverLog(final File source, 236 final Frontier frontier, final boolean retainFailures) 237 throws IOException { 238 if (source == null) { 239 throw new IllegalArgumentException ("Passed source file is null."); 240 } 241 LOGGER.info("recovering frontier completion state from "+source); 242 243 final int lines = 246 importCompletionInfoFromLog(source, frontier, retainFailures); 247 248 LOGGER.info("finished completion state; recovering queues from " + 249 source); 250 251 final CountDownLatch recoveredEnough = new CountDownLatch (1); 255 new Thread (new Runnable () { 256 public void run() { 257 importQueuesFromLog(source, frontier, lines, recoveredEnough); 258 } 259 }, "queuesRecoveryThread").start(); 260 261 try { 262 recoveredEnough.await(); 264 } catch (InterruptedException e) { 265 e.printStackTrace(); 267 } 268 } 269 270 280 private static int importCompletionInfoFromLog(File source, 281 Frontier frontier, boolean retainFailures) throws IOException { 282 BufferedInputStream is = getBufferedInput(source); 284 MutableString read = new MutableString(UURI.MAX_URL_LENGTH); 286 int lines = 0; 287 try { 288 while (readLine(is,read)) { 289 lines++; 290 boolean wasSuccess = read.startsWith(F_SUCCESS); 291 if (wasSuccess 292 || (retainFailures && read.startsWith(F_FAILURE))) { 293 String s = read.subSequence(3,read.length()).toString(); 295 try { 296 UURI u = UURIFactory.getInstance(s); 297 frontier.considerIncluded(u); 298 if(wasSuccess) { 299 if (frontier.getFrontierJournal() != null) { 300 frontier.getFrontierJournal(). 301 finishedSuccess(u); 302 } 303 } else { 304 if (frontier.getFrontierJournal() != null) { 307 frontier.getFrontierJournal(). 308 finishedFailure(u); 309 } 310 } 311 } catch (URIException e) { 312 e.printStackTrace(); 313 } 314 } 315 if((lines%PROGRESS_INTERVAL)==0) { 316 LOGGER.info( 318 "at line " + lines 319 + " alreadyIncluded count = " + 320 frontier.discoveredUriCount()); 321 } 322 } 323 } catch (EOFException e) { 324 } finally { 326 is.close(); 327 } 328 return lines; 329 } 330 331 340 private static boolean readLine(BufferedInputStream is, MutableString read) 341 throws IOException { 342 read.length(0); 343 int c = is.read(); 344 while((c!=-1)&&c!='\n'&&c!='\r') { 345 read.append((char)c); 346 c = is.read(); 347 } 348 if(c==-1 && read.length()==0) { 349 return false; 351 } 352 if(c=='\n') { 353 is.mark(1); 355 if(is.read()!='\r') { 356 is.reset(); 357 } 358 } 359 return true; 361 } 362 363 372 private static void importQueuesFromLog(File source, Frontier frontier, 373 int lines, CountDownLatch enough) { 374 BufferedInputStream is; 375 MutableString read = new MutableString(UURI.MAX_URL_LENGTH); 377 long queuedAtStart = frontier.queuedUriCount(); 378 long queuedDuringRecovery = 0; 379 int qLines = 0; 380 381 try { 382 is = getBufferedInput(source); 385 try { 386 while (readLine(is,read)) { 387 qLines++; 388 if (read.startsWith(F_ADD)) { 389 UURI u; 390 CharSequence args[] = splitOnSpaceRuns(read); 391 try { 392 u = UURIFactory.getInstance(args[1].toString()); 393 String pathFromSeed = (args.length > 2)? 394 args[2].toString() : ""; 395 UURI via = (args.length > 3)? 396 UURIFactory.getInstance(args[3].toString()): 397 null; 398 String viaContext = (args.length > 4)? 399 args[4].toString(): ""; 400 CandidateURI caUri = new CandidateURI(u, 401 pathFromSeed, via, viaContext); 402 frontier.schedule(caUri); 403 404 queuedDuringRecovery = 405 frontier.queuedUriCount() - queuedAtStart; 406 if(((queuedDuringRecovery + 1) % 407 ENOUGH_TO_START_CRAWLING) == 0) { 408 enough.countDown(); 409 } 410 } catch (URIException e) { 411 e.printStackTrace(); 412 } 413 } 414 if((qLines%PROGRESS_INTERVAL)==0) { 415 LOGGER.info( 417 "through line " 418 + qLines + "/" + lines 419 + " queued count = " + 420 frontier.queuedUriCount()); 421 } 422 } 423 } catch (EOFException e) { 424 } finally { 426 is.close(); 427 } 428 } catch (IOException e) { 429 e.printStackTrace(); 431 } 432 LOGGER.info("finished recovering frontier from "+source+" " 433 +qLines+" lines processed"); 434 enough.countDown(); 435 } 436 437 444 private static CharSequence [] splitOnSpaceRuns(CharSequence read) { 445 int lastStart = 0; 446 ArrayList <CharSequence > segs = new ArrayList <CharSequence >(5); 447 int i; 448 for(i=0;i<read.length();i++) { 449 if (read.charAt(i)==' ') { 450 segs.add(read.subSequence(lastStart,i)); 451 i++; 452 while(i < read.length() && read.charAt(i)==' ') { 453 i++; 455 } 456 lastStart = i; 457 } 458 } 459 if(lastStart<read.length()) { 460 segs.add(read.subSequence(lastStart,i)); 461 } 462 return (CharSequence []) segs.toArray(new CharSequence [segs.size()]); 463 } 464 465 470 public static BufferedReader getBufferedReader(File source) 471 throws IOException { 472 boolean isGzipped = source.getName().toLowerCase(). 473 endsWith(GZIP_SUFFIX); 474 FileInputStream fis = new FileInputStream (source); 475 return new BufferedReader (isGzipped? 476 new InputStreamReader (new GZIPInputStream (fis)): 477 new InputStreamReader (fis)); 478 } 479 480 487 public static BufferedInputStream getBufferedInput(File source) 488 throws IOException { 489 boolean isGzipped = source.getName().toLowerCase(). 490 endsWith(GZIP_SUFFIX); 491 FileInputStream fis = new FileInputStream (source); 492 return isGzipped ? new BufferedInputStream (new GZIPInputStream (fis)) 493 : new BufferedInputStream (fis); 494 } 495 496 499 public void close() { 500 if (this.out == null) { 501 return; 502 } 503 try { 504 this.out.flush(); 505 this.out.close(); 506 this.out = null; 507 } catch (IOException e) { 508 e.printStackTrace(); 509 } 510 } 511 512 public void seriousError(String err) { 513 writeLine("\n"+LOG_ERROR+ArchiveUtils.getLog14Date()+" "+err); 514 } 515 516 public synchronized void checkpoint(final File checkpointDir) 517 throws IOException { 518 if (this.out == null || !this.gzipFile.exists()) { 519 return; 520 } 521 close(); 522 this.gzipFile.renameTo(new File (this.gzipFile.getParentFile(), 524 this.gzipFile.getName() + "." + checkpointDir.getName())); 525 this.out = initialize(this.gzipFile); 527 } 528 } 529 | Popular Tags |