KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > archive > crawler > frontier > RecoveryJournal


1 /* RecoveryJournal
2  *
3  * $Id: RecoveryJournal.java,v 1.32.6.1 2007/01/13 01:31:23 stack-sf Exp $
4  *
5  * Created on Jul 20, 2004
6  *
7  * Copyright (C) 2004 Internet Archive.
8  *
9  * This file is part of the Heritrix web crawler (crawler.archive.org).
10  *
11  * Heritrix is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser Public License as published by
13  * the Free Software Foundation; either version 2.1 of the License, or
14  * any later version.
15  *
16  * Heritrix is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Lesser Public License for more details.
20  *
21  * You should have received a copy of the GNU Lesser Public License
22  * along with Heritrix; if not, write to the Free Software
23  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24  */

25 package org.archive.crawler.frontier;
26
27 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
28 import it.unimi.dsi.mg4j.util.MutableString;
29
30 import java.io.BufferedInputStream JavaDoc;
31 import java.io.BufferedReader JavaDoc;
32 import java.io.EOFException JavaDoc;
33 import java.io.File JavaDoc;
34 import java.io.FileInputStream JavaDoc;
35 import java.io.FileNotFoundException JavaDoc;
36 import java.io.FileOutputStream JavaDoc;
37 import java.io.IOException JavaDoc;
38 import java.io.InputStreamReader JavaDoc;
39 import java.io.OutputStreamWriter JavaDoc;
40 import java.io.Writer JavaDoc;
41 import java.util.ArrayList JavaDoc;
42 import java.util.logging.Logger JavaDoc;
43 import java.util.zip.GZIPInputStream JavaDoc;
44 import java.util.zip.GZIPOutputStream JavaDoc;
45
46 import org.apache.commons.httpclient.URIException;
47 import org.archive.crawler.datamodel.CandidateURI;
48 import org.archive.crawler.datamodel.CrawlURI;
49 import org.archive.crawler.framework.Frontier;
50 import org.archive.net.UURI;
51 import org.archive.net.UURIFactory;
52 import org.archive.util.ArchiveUtils;
53
54 import java.util.concurrent.CountDownLatch JavaDoc;
55
56 /**
57  * Helper class for managing a simple Frontier change-events journal which is
58  * useful for recovering from crawl problems.
59  *
60  * By replaying the journal into a new Frontier, its state (at least with
61  * respect to URIs alreadyIncluded and in pending queues) will match that of the
62  * original Frontier, allowing a pseudo-resume of a previous crawl, at least as
63  * far as URI visitation/coverage is concerned.
64  *
65  * @author gojomo
66  */

67 public class RecoveryJournal
68 implements FrontierJournal {
69     private static final Logger JavaDoc LOGGER = Logger.getLogger(
70             RecoveryJournal.class.getName());
71     
72     public final static String JavaDoc F_ADD = "F+ ";
73     public final static String JavaDoc F_EMIT = "Fe ";
74     public final static String JavaDoc F_RESCHEDULE = "Fr ";
75     public final static String JavaDoc F_SUCCESS = "Fs ";
76     public final static String JavaDoc F_FAILURE = "Ff ";
77     public final static String JavaDoc LOG_ERROR = "E ";
78     public final static String JavaDoc LOG_TIMESTAMP = "T ";
79     public final int TIMESTAMP_INTERVAL = 10000; // timestamp every this many lines
80

81     // show recovery progress every this many lines
82
private final static int PROGRESS_INTERVAL = 1000000;
83     
84     // once this many URIs are queued during recovery, allow
85
// crawl to begin, while enqueuing of other URIs from log
86
// continues in background
87
private static final long ENOUGH_TO_START_CRAWLING = 100000;
88     /**
89      * Stream on which we record frontier events.
90      */

91     private Writer JavaDoc out = null;
92
93     private long lines = 0;
94     
95     public static final String JavaDoc GZIP_SUFFIX = ".gz";
96     
97     /**
98      * File we're writing recovery to.
99      * Keep a reference in case we want to rotate it off.
100      */

101     private File JavaDoc gzipFile = null;
102     
103     /**
104      * Allocate a buffer for accumulating lines to write and reuse it.
105      */

106     private MutableString accumulatingBuffer =
107         new MutableString(1 + F_ADD.length() +
108                 128 /*curi.toString().length()*/ +
109                 1 +
110                 8 /*curi.getPathFromSeed().length()*/ +
111                 1 +
112                 128 /*curi.flattenVia().length()*/);
113
114     
115     /**
116      * Create a new recovery journal at the given location
117      *
118      * @param path Directory to make the recovery journal in.
119      * @param filename Name to use for recovery journal file.
120      * @throws IOException
121      */

122     public RecoveryJournal(String JavaDoc path, String JavaDoc filename)
123     throws IOException JavaDoc {
124         this.gzipFile = new File JavaDoc(path, filename + GZIP_SUFFIX);
125         this.out = initialize(gzipFile);
126     }
127     
128     private Writer JavaDoc initialize (final File JavaDoc f)
129     throws FileNotFoundException JavaDoc, IOException JavaDoc {
130         return new OutputStreamWriter JavaDoc(new GZIPOutputStream JavaDoc(
131             new FastBufferedOutputStream(new FileOutputStream JavaDoc(f))));
132     }
133
134     public synchronized void added(CrawlURI curi) {
135         accumulatingBuffer.length(0);
136         this.accumulatingBuffer.append(F_ADD).
137             append(curi.toString()).
138             append(" ").
139             append(curi.getPathFromSeed()).
140             append(" ").
141             append(curi.flattenVia());
142         writeLine(accumulatingBuffer);
143     }
144
145     public void finishedSuccess(CrawlURI curi) {
146         finishedSuccess(curi.toString());
147     }
148     
149     public void finishedSuccess(UURI uuri) {
150         finishedSuccess(uuri.toString());
151     }
152     
153     protected void finishedSuccess(String JavaDoc uuri) {
154         writeLine(F_SUCCESS, uuri);
155     }
156
157     public void emitted(CrawlURI curi) {
158         writeLine(F_EMIT, curi.toString());
159
160     }
161
162     public void finishedFailure(CrawlURI curi) {
163         finishedFailure(curi.toString());
164     }
165     
166     public void finishedFailure(UURI uuri) {
167         finishedFailure(uuri.toString());
168     }
169     
170     public void finishedFailure(String JavaDoc u) {
171         writeLine(F_FAILURE, u);
172     }
173
174     public void rescheduled(CrawlURI curi) {
175         writeLine(F_RESCHEDULE, curi.toString());
176     }
177
178     private synchronized void writeLine(String JavaDoc string) {
179         try {
180             this.out.write("\n");
181             this.out.write(string);
182             noteLine();
183         } catch (IOException JavaDoc e) {
184             e.printStackTrace();
185         }
186     }
187
188     private synchronized void writeLine(String JavaDoc s1, String JavaDoc s2) {
189         try {
190             this.out.write("\n");
191             this.out.write(s1);
192             this.out.write(s2);
193             noteLine();
194         } catch (IOException JavaDoc e) {
195             e.printStackTrace();
196         }
197     }
198     
199     private synchronized void writeLine(MutableString mstring) {
200         if (this.out == null) {
201             return;
202         }
203         try {
204             this.out.write("\n");
205             mstring.write(out);
206             noteLine();
207         } catch (IOException JavaDoc e) {
208             e.printStackTrace();
209         }
210     }
211     
212     /**
213      * @throws IOException
214      */

215     private void noteLine() throws IOException JavaDoc {
216         lines++;
217         if(lines % TIMESTAMP_INTERVAL == 0) {
218             out.write("\n");
219             out.write(LOG_TIMESTAMP);
220             out.write(ArchiveUtils.getLog14Date());
221         }
222     }
223     
224     /**
225      * Utility method for scanning a recovery journal and applying it to
226      * a Frontier.
227      *
228      * @param source Recover log path.
229      * @param frontier Frontier reference.
230      * @param retainFailures
231      * @throws IOException
232      *
233      * @see org.archive.crawler.framework.Frontier#importRecoverLog(String, boolean)
234      */

235     public static void importRecoverLog(final File JavaDoc source,
236         final Frontier frontier, final boolean retainFailures)
237     throws IOException JavaDoc {
238         if (source == null) {
239             throw new IllegalArgumentException JavaDoc("Passed source file is null.");
240         }
241         LOGGER.info("recovering frontier completion state from "+source);
242         
243         // first, fill alreadyIncluded with successes (and possibly failures),
244
// and count the total lines
245
final int lines =
246             importCompletionInfoFromLog(source, frontier, retainFailures);
247         
248         LOGGER.info("finished completion state; recovering queues from " +
249             source);
250
251         // now, re-add anything that was in old frontier and not already
252
// registered as finished. Do this in a separate thread that signals
253
// this thread once ENOUGH_TO_START_CRAWLING URIs have been queued.
254
final CountDownLatch JavaDoc recoveredEnough = new CountDownLatch JavaDoc(1);
255         new Thread JavaDoc(new Runnable JavaDoc() {
256             public void run() {
257                 importQueuesFromLog(source, frontier, lines, recoveredEnough);
258             }
259         }, "queuesRecoveryThread").start();
260         
261         try {
262             // wait until at least ENOUGH_TO_START_CRAWLING URIs queued
263
recoveredEnough.await();
264         } catch (InterruptedException JavaDoc e) {
265             // TODO Auto-generated catch block
266
e.printStackTrace();
267         }
268     }
269     
270     /**
271      * Import just the SUCCESS (and possibly FAILURE) URIs from the given
272      * recovery log into the frontier as considered included.
273      *
274      * @param source recovery log file to use
275      * @param frontier frontier to update
276      * @param retainFailures whether failure ('Ff') URIs should count as done
277      * @return number of lines in recovery log (for reference)
278      * @throws IOException
279      */

280     private static int importCompletionInfoFromLog(File JavaDoc source,
281             Frontier frontier, boolean retainFailures) throws IOException JavaDoc {
282         // Scan log for all 'Fs' lines: add as 'alreadyIncluded'
283
BufferedInputStream JavaDoc is = getBufferedInput(source);
284         // create MutableString of good starting size (will grow if necessary)
285
MutableString read = new MutableString(UURI.MAX_URL_LENGTH);
286         int lines = 0;
287         try {
288             while (readLine(is,read)) {
289                 lines++;
290                 boolean wasSuccess = read.startsWith(F_SUCCESS);
291                 if (wasSuccess
292                         || (retainFailures && read.startsWith(F_FAILURE))) {
293                     // retrieve first (only) URL on line
294
String JavaDoc s = read.subSequence(3,read.length()).toString();
295                     try {
296                         UURI u = UURIFactory.getInstance(s);
297                         frontier.considerIncluded(u);
298                         if(wasSuccess) {
299                             if (frontier.getFrontierJournal() != null) {
300                                 frontier.getFrontierJournal().
301                                     finishedSuccess(u);
302                             }
303                         } else {
304                             // carryforward failure, in case future recovery
305
// wants to no retain them as finished
306
if (frontier.getFrontierJournal() != null) {
307                                 frontier.getFrontierJournal().
308                                     finishedFailure(u);
309                             }
310                         }
311                     } catch (URIException e) {
312                         e.printStackTrace();
313                     }
314                 }
315                 if((lines%PROGRESS_INTERVAL)==0) {
316                     // every 1 million lines, print progress
317
LOGGER.info(
318                             "at line " + lines
319                             + " alreadyIncluded count = " +
320                             frontier.discoveredUriCount());
321                 }
322             }
323         } catch (EOFException JavaDoc e) {
324             // expected in some uncleanly-closed recovery logs; ignore
325
} finally {
326             is.close();
327         }
328         return lines;
329     }
330
331     /**
332      * Read a line from the given bufferedinputstream into the MutableString.
333      * Return true if a line was read; false if EOF.
334      *
335      * @param is
336      * @param read
337      * @return True if we read a line.
338      * @throws IOException
339      */

340     private static boolean readLine(BufferedInputStream JavaDoc is, MutableString read)
341     throws IOException JavaDoc {
342         read.length(0);
343         int c = is.read();
344         while((c!=-1)&&c!='\n'&&c!='\r') {
345             read.append((char)c);
346             c = is.read();
347         }
348         if(c==-1 && read.length()==0) {
349             // EOF and none read; return false
350
return false;
351         }
352         if(c=='\n') {
353             // consume LF following CR, if present
354
is.mark(1);
355             if(is.read()!='\r') {
356                 is.reset();
357             }
358         }
359         // a line (possibly blank) was read
360
return true;
361     }
362
363     /**
364      * Import all ADDs from given recovery log into the frontier's queues
365      * (excepting those the frontier drops as already having been included)
366      *
367      * @param source recovery log file to use
368      * @param frontier frontier to update
369      * @param lines total lines noted in recovery log earlier
370      * @param enough latch signalling 'enough' URIs queued to begin crawling
371      */

372     private static void importQueuesFromLog(File JavaDoc source, Frontier frontier,
373             int lines, CountDownLatch JavaDoc enough) {
374         BufferedInputStream JavaDoc is;
375         // create MutableString of good starting size (will grow if necessary)
376
MutableString read = new MutableString(UURI.MAX_URL_LENGTH);
377         long queuedAtStart = frontier.queuedUriCount();
378         long queuedDuringRecovery = 0;
379         int qLines = 0;
380         
381         try {
382             // Scan log for all 'F+' lines: if not alreadyIncluded, schedule for
383
// visitation
384
is = getBufferedInput(source);
385             try {
386                 while (readLine(is,read)) {
387                     qLines++;
388                     if (read.startsWith(F_ADD)) {
389                         UURI u;
390                         CharSequence JavaDoc args[] = splitOnSpaceRuns(read);
391                         try {
392                             u = UURIFactory.getInstance(args[1].toString());
393                             String JavaDoc pathFromSeed = (args.length > 2)?
394                                 args[2].toString() : "";
395                             UURI via = (args.length > 3)?
396                                 UURIFactory.getInstance(args[3].toString()):
397                                 null;
398                             String JavaDoc viaContext = (args.length > 4)?
399                                     args[4].toString(): "";
400                             CandidateURI caUri = new CandidateURI(u,
401                                     pathFromSeed, via, viaContext);
402                             frontier.schedule(caUri);
403                             
404                             queuedDuringRecovery =
405                                 frontier.queuedUriCount() - queuedAtStart;
406                             if(((queuedDuringRecovery + 1) %
407                                     ENOUGH_TO_START_CRAWLING) == 0) {
408                                 enough.countDown();
409                             }
410                         } catch (URIException e) {
411                             e.printStackTrace();
412                         }
413                     }
414                     if((qLines%PROGRESS_INTERVAL)==0) {
415                         // every 1 million lines, print progress
416
LOGGER.info(
417                                 "through line "
418                                 + qLines + "/" + lines
419                                 + " queued count = " +
420                                 frontier.queuedUriCount());
421                     }
422                 }
423             } catch (EOFException JavaDoc e) {
424                 // no problem: untidy end of recovery journal
425
} finally {
426                     is.close();
427             }
428         } catch (IOException JavaDoc e) {
429             // TODO Auto-generated catch block
430
e.printStackTrace();
431         }
432         LOGGER.info("finished recovering frontier from "+source+" "
433                 +qLines+" lines processed");
434         enough.countDown();
435     }
436
437     /**
438      * Return an array of the subsequences of the passed-in sequence,
439      * split on space runs.
440      *
441      * @param read
442      * @return CharSequence.
443      */

444     private static CharSequence JavaDoc[] splitOnSpaceRuns(CharSequence JavaDoc read) {
445         int lastStart = 0;
446         ArrayList JavaDoc<CharSequence JavaDoc> segs = new ArrayList JavaDoc<CharSequence JavaDoc>(5);
447         int i;
448         for(i=0;i<read.length();i++) {
449             if (read.charAt(i)==' ') {
450                 segs.add(read.subSequence(lastStart,i));
451                 i++;
452                 while(i < read.length() && read.charAt(i)==' ') {
453                     // skip any space runs
454
i++;
455                 }
456                 lastStart = i;
457             }
458         }
459         if(lastStart<read.length()) {
460             segs.add(read.subSequence(lastStart,i));
461         }
462         return (CharSequence JavaDoc[]) segs.toArray(new CharSequence JavaDoc[segs.size()]);
463     }
464
465     /**
466      * @param source
467      * @return Recover log buffered reader.
468      * @throws IOException
469      */

470     public static BufferedReader JavaDoc getBufferedReader(File JavaDoc source)
471     throws IOException JavaDoc {
472         boolean isGzipped = source.getName().toLowerCase().
473             endsWith(GZIP_SUFFIX);
474         FileInputStream JavaDoc fis = new FileInputStream JavaDoc(source);
475         return new BufferedReader JavaDoc(isGzipped?
476             new InputStreamReader JavaDoc(new GZIPInputStream JavaDoc(fis)):
477             new InputStreamReader JavaDoc(fis));
478     }
479
480     /**
481      * Get a BufferedInputStream on the recovery file given.
482      *
483      * @param source file to open
484      * @return Recover log buffered input stream.
485      * @throws IOException
486      */

487     public static BufferedInputStream JavaDoc getBufferedInput(File JavaDoc source)
488     throws IOException JavaDoc {
489         boolean isGzipped = source.getName().toLowerCase().
490             endsWith(GZIP_SUFFIX);
491         FileInputStream JavaDoc fis = new FileInputStream JavaDoc(source);
492         return isGzipped ? new BufferedInputStream JavaDoc(new GZIPInputStream JavaDoc(fis))
493                 : new BufferedInputStream JavaDoc(fis);
494     }
495     
496     /**
497      * Flush and close the underlying IO objects.
498      */

499     public void close() {
500         if (this.out == null) {
501             return;
502         }
503         try {
504             this.out.flush();
505             this.out.close();
506             this.out = null;
507         } catch (IOException JavaDoc e) {
508             e.printStackTrace();
509         }
510     }
511
512     public void seriousError(String JavaDoc err) {
513         writeLine("\n"+LOG_ERROR+ArchiveUtils.getLog14Date()+" "+err);
514     }
515
516     public synchronized void checkpoint(final File JavaDoc checkpointDir)
517     throws IOException JavaDoc {
518         if (this.out == null || !this.gzipFile.exists()) {
519             return;
520         }
521         close();
522         // Rename gzipFile with the checkpoint name as suffix.
523
this.gzipFile.renameTo(new File JavaDoc(this.gzipFile.getParentFile(),
524                 this.gzipFile.getName() + "." + checkpointDir.getName()));
525         // Open new gzip file.
526
this.out = initialize(this.gzipFile);
527     }
528 }
529
Popular Tags