KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > archive > crawler > frontier > AbstractFrontier


1 /* AbstractFrontier
2  *
3  * $Id: AbstractFrontier.java,v 1.60 2006/08/04 18:06:28 stack-sf Exp $
4  *
5  * Created on Aug 17, 2004
6  *
7  * Copyright (C) 2004 Internet Archive.
8  *
9  * This file is part of the Heritrix web crawler (crawler.archive.org).
10  *
11  * Heritrix is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser Public License as published by
13  * the Free Software Foundation; either version 2.1 of the License, or
14  * any later version.
15  *
16  * Heritrix is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Lesser Public License for more details.
20  *
21  * You should have received a copy of the GNU Lesser Public License
22  * along with Heritrix; if not, write to the Free Software
23  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24  */

25 package org.archive.crawler.frontier;
26
27 import java.io.BufferedWriter JavaDoc;
28 import java.io.File JavaDoc;
29 import java.io.FileWriter JavaDoc;
30 import java.io.IOException JavaDoc;
31 import java.io.PrintWriter JavaDoc;
32 import java.io.Serializable JavaDoc;
33 import java.io.StringWriter JavaDoc;
34 import java.io.Writer JavaDoc;
35 import java.util.Iterator JavaDoc;
36 import java.util.List JavaDoc;
37 import java.util.logging.Level JavaDoc;
38 import java.util.logging.Logger JavaDoc;
39 import java.util.regex.Pattern JavaDoc;
40
41 import javax.management.AttributeNotFoundException JavaDoc;
42
43 import org.apache.commons.httpclient.HttpStatus;
44 import org.archive.crawler.datamodel.CandidateURI;
45 import org.archive.crawler.datamodel.CoreAttributeConstants;
46 import org.archive.crawler.datamodel.CrawlHost;
47 import org.archive.crawler.datamodel.CrawlOrder;
48 import org.archive.crawler.datamodel.CrawlServer;
49 import org.archive.crawler.datamodel.CrawlURI;
50 import org.archive.crawler.datamodel.FetchStatusCodes;
51 import org.archive.crawler.event.CrawlStatusListener;
52 import org.archive.crawler.framework.CrawlController;
53 import org.archive.crawler.framework.Frontier;
54 import org.archive.crawler.framework.ToeThread;
55 import org.archive.crawler.framework.exceptions.EndedException;
56 import org.archive.crawler.framework.exceptions.FatalConfigurationException;
57 import org.archive.crawler.settings.ModuleType;
58 import org.archive.crawler.settings.RegularExpressionConstraint;
59 import org.archive.crawler.settings.SimpleType;
60 import org.archive.crawler.settings.Type;
61 import org.archive.crawler.url.Canonicalizer;
62 import org.archive.net.UURI;
63 import org.archive.util.ArchiveUtils;
64
65 /**
66  * Shared facilities for Frontier implementations.
67  *
68  * @author gojomo
69  */

70 public abstract class AbstractFrontier extends ModuleType
71 implements CrawlStatusListener, Frontier, FetchStatusCodes,
72         CoreAttributeConstants, Serializable JavaDoc {
73     private static final Logger JavaDoc logger = Logger
74             .getLogger(AbstractFrontier.class.getName());
75
76     protected transient CrawlController controller;
77
78     /** ordinal numbers to assign to created CrawlURIs */
79     protected long nextOrdinal = 1;
80
81     /** should the frontier hold any threads asking for URIs? */
82     protected boolean shouldPause = false;
83
84     /**
85      * should the frontier send an EndedException to any threads asking for
86      * URIs?
87      */

88     protected transient boolean shouldTerminate = false;
89
90     /**
91      * how many multiples of last fetch elapsed time to wait before recontacting
92      * same server
93      */

94     public final static String JavaDoc ATTR_DELAY_FACTOR = "delay-factor";
95
96     protected final static Float JavaDoc DEFAULT_DELAY_FACTOR = new Float JavaDoc(5);
97
98     /**
99      * always wait this long after one completion before recontacting same
100      * server, regardless of multiple
101      */

102     public final static String JavaDoc ATTR_MIN_DELAY = "min-delay-ms";
103
104     // 3 secs.
105
protected final static Integer JavaDoc DEFAULT_MIN_DELAY = new Integer JavaDoc(3000);
106
107     /** never wait more than this long, regardless of multiple */
108     public final static String JavaDoc ATTR_MAX_DELAY = "max-delay-ms";
109
110     // 30 secs
111
protected final static Integer JavaDoc DEFAULT_MAX_DELAY = new Integer JavaDoc(30000);
112
113     /** number of hops of embeds (ERX) to bump to front of host queue */
114     public final static String JavaDoc ATTR_PREFERENCE_EMBED_HOPS =
115         "preference-embed-hops";
116
117     protected final static Integer JavaDoc DEFAULT_PREFERENCE_EMBED_HOPS =
118         new Integer JavaDoc(1);
119
120     /** maximum per-host bandwidth usage */
121     public final static String JavaDoc ATTR_MAX_HOST_BANDWIDTH_USAGE =
122         "max-per-host-bandwidth-usage-KB-sec";
123
124     protected final static Integer JavaDoc DEFAULT_MAX_HOST_BANDWIDTH_USAGE =
125         new Integer JavaDoc(0);
126
127     /** maximum overall bandwidth usage */
128     public final static String JavaDoc ATTR_MAX_OVERALL_BANDWIDTH_USAGE =
129         "total-bandwidth-usage-KB-sec";
130
131     protected final static Integer JavaDoc DEFAULT_MAX_OVERALL_BANDWIDTH_USAGE =
132         new Integer JavaDoc(0);
133
134     /** for retryable problems, seconds to wait before a retry */
135     public final static String JavaDoc ATTR_RETRY_DELAY = "retry-delay-seconds";
136
137     // 15 mins
138
protected final static Long JavaDoc DEFAULT_RETRY_DELAY = new Long JavaDoc(900);
139
140     /** maximum times to emit a CrawlURI without final disposition */
141     public final static String JavaDoc ATTR_MAX_RETRIES = "max-retries";
142
143     protected final static Integer JavaDoc DEFAULT_MAX_RETRIES = new Integer JavaDoc(30);
144
145     public final static String JavaDoc ATTR_QUEUE_ASSIGNMENT_POLICY =
146         "queue-assignment-policy";
147
148     /** queue assignment to force onto CrawlURIs; intended to be overridden */
149     public final static String JavaDoc ATTR_FORCE_QUEUE = "force-queue-assignment";
150
151     protected final static String JavaDoc DEFAULT_FORCE_QUEUE = "";
152
153     // word chars, dash, period, comma, colon
154
protected final static String JavaDoc ACCEPTABLE_FORCE_QUEUE = "[-\\w\\.,:]*";
155         
156     /** whether pause, rather than finish, when crawl appears done */
157     public final static String JavaDoc ATTR_PAUSE_AT_FINISH = "pause-at-finish";
158     // TODO: change default to true once well-tested
159
protected final static Boolean JavaDoc DEFAULT_PAUSE_AT_FINISH = Boolean.FALSE;
160     
161     /** whether to pause at crawl start */
162     public final static String JavaDoc ATTR_PAUSE_AT_START = "pause-at-start";
163     protected final static Boolean JavaDoc DEFAULT_PAUSE_AT_START = Boolean.FALSE;
164     
165     /** whether to pause at crawl start */
166     public final static String JavaDoc ATTR_SOURCE_TAG_SEEDS = "source-tag-seeds";
167     protected final static Boolean JavaDoc DEFAULT_SOURCE_TAG_SEEDS = Boolean.FALSE;
168
169     /**
170      * Recover log on or off attribute.
171      */

172     protected final static String JavaDoc ATTR_RECOVERY_ENABLED =
173         "recovery-log-enabled";
174     protected final static Boolean JavaDoc DEFAULT_ATTR_RECOVERY_ENABLED =
175         Boolean.TRUE;
176
177     // top-level stats
178
protected long queuedUriCount = 0; // total URIs queued to be visited
179

180     protected long succeededFetchCount = 0;
181
182     protected long failedFetchCount = 0;
183
184     protected long disregardedUriCount = 0; //URIs that are disregarded (for
185
// example because of robot.txt rules)
186

187     /**
188      * Used when bandwidth constraint are used.
189      */

190     protected long totalProcessedBytes = 0;
191
192     private transient long nextURIEmitTime = 0;
193
194     protected long processedBytesAfterLastEmittedURI = 0;
195     
196     protected int lastMaxBandwidthKB = 0;
197
198     /** Policy for assigning CrawlURIs to named queues */
199     protected transient QueueAssignmentPolicy queueAssignmentPolicy = null;
200
201     /**
202      * Crawl replay logger.
203      *
204      * Currently captures Frontier/URI transitions.
205      * Can be null if user chose not to run a recovery.log.
206      */

207     private transient FrontierJournal recover = null;
208
209     /** file collecting report of ignored seed-file entries (if any) */
210     public static final String JavaDoc IGNORED_SEEDS_FILENAME = "seeds.ignored";
211
212     /**
213      * @param name Name of this frontier.
214      * @param description Description for this frontier.
215      */

216     public AbstractFrontier(String JavaDoc name, String JavaDoc description) {
217         super(name, description);
218         addElementToDefinition(new SimpleType(ATTR_DELAY_FACTOR,
219                 "How many multiples of last fetch elapsed time to wait before "
220                         + "recontacting same server", DEFAULT_DELAY_FACTOR));
221         addElementToDefinition(new SimpleType(ATTR_MAX_DELAY,
222                 "Never wait more than this long.", DEFAULT_MAX_DELAY));
223         addElementToDefinition(new SimpleType(ATTR_MIN_DELAY,
224                 "Always wait this long after one completion before recontacting "
225                         + "same server.", DEFAULT_MIN_DELAY));
226         addElementToDefinition(new SimpleType(ATTR_MAX_RETRIES,
227                 "How often to retry fetching a URI that failed to be retrieved. "
228                         + "If zero, the crawler will get the robots.txt only.",
229                 DEFAULT_MAX_RETRIES));
230         addElementToDefinition(new SimpleType(ATTR_RETRY_DELAY,
231                 "How long to wait by default until we retry fetching a"
232                         + " URI that failed to be retrieved (seconds). ",
233                 DEFAULT_RETRY_DELAY));
234         addElementToDefinition(new SimpleType(
235                 ATTR_PREFERENCE_EMBED_HOPS,
236                 "Number of embedded (or redirected) hops up to which "
237                 + "a URI has higher priority scheduling. For example, if set "
238                 + "to 1 (the default), items such as inline images (1-hop "
239                 + "embedded resources) will be scheduled ahead of all regular "
240                 + "links (or many-hop resources, like nested frames). If set to "
241                 + "zero, no preferencing will occur, and embeds/redirects are "
242                 + "scheduled the same as regular links.",
243                 DEFAULT_PREFERENCE_EMBED_HOPS));
244         Type t;
245         t = addElementToDefinition(new SimpleType(
246                 ATTR_MAX_OVERALL_BANDWIDTH_USAGE,
247                 "The maximum average bandwidth the crawler is allowed to use. "
248                 + "The actual read speed is not affected by this setting, it only "
249                 + "holds back new URIs from being processed when the bandwidth "
250                 + "usage has been to high. 0 means no bandwidth limitation.",
251                 DEFAULT_MAX_OVERALL_BANDWIDTH_USAGE));
252         t.setOverrideable(false);
253         t = addElementToDefinition(new SimpleType(
254                 ATTR_MAX_HOST_BANDWIDTH_USAGE,
255                 "The maximum average bandwidth the crawler is allowed to use per "
256                 + "host. The actual read speed is not affected by this setting, "
257                 + "it only holds back new URIs from being processed when the "
258                 + "bandwidth usage has been to high. 0 means no bandwidth "
259                 + "limitation.", DEFAULT_MAX_HOST_BANDWIDTH_USAGE));
260         t.setExpertSetting(true);
261
262         // Read the list of permissible choices from heritrix.properties.
263
// Its a list of space- or comma-separated values.
264
String JavaDoc queueStr = System.getProperty(AbstractFrontier.class.getName() +
265                 "." + ATTR_QUEUE_ASSIGNMENT_POLICY,
266                 HostnameQueueAssignmentPolicy.class.getName() + " " +
267                 IPQueueAssignmentPolicy.class.getName() + " " +
268                 BucketQueueAssignmentPolicy.class.getName() + " " +
269                 SurtAuthorityQueueAssignmentPolicy.class.getName());
270         Pattern JavaDoc p = Pattern.compile("\\s*,\\s*|\\s+");
271         String JavaDoc [] queues = p.split(queueStr);
272         if (queues.length <= 0) {
273             throw new RuntimeException JavaDoc("Failed parse of " +
274                     " assignment queue policy string: " + queueStr);
275         }
276         t = addElementToDefinition(new SimpleType(ATTR_QUEUE_ASSIGNMENT_POLICY,
277                 "Defines how to assign URIs to queues. Can assign by host, " +
278                 "by ip, and into one of a fixed set of buckets (1k).",
279                 queues[0], queues));
280         t.setExpertSetting(true);
281         t.setOverrideable(false);
282
283         t = addElementToDefinition(new SimpleType(
284                 ATTR_FORCE_QUEUE,
285                 "The queue name into which to force URIs. Should "
286                 + "be left blank at global level. Specify a "
287                 + "per-domain/per-host override to force URIs into "
288                 + "a particular named queue, regardless of the assignment "
289                 + "policy in effect (domain or ip-based politeness). "
290                 + "This could be used on domains known to all be from "
291                 + "the same small set of IPs (eg blogspot, dailykos, etc.) "
292                 + "to simulate IP-based politeness, or it could be used if "
293                 + "you wanted to enforce politeness over a whole domain, even "
294                 + "though the subdomains are split across many IPs.",
295                 DEFAULT_FORCE_QUEUE));
296         t.setOverrideable(true);
297         t.setExpertSetting(true);
298         t.addConstraint(new RegularExpressionConstraint(ACCEPTABLE_FORCE_QUEUE,
299                 Level.WARNING, "This field must contain only alphanumeric "
300                 + "characters plus period, dash, comma, colon, or underscore."));
301         t = addElementToDefinition(new SimpleType(
302                 ATTR_PAUSE_AT_START,
303                 "Whether to pause when the crawl begins, before any URIs " +
304                 "are tried. This gives the operator a chance to verify or " +
305                 "adjust the crawl before actual work begins. " +
306                 "Default is false.", DEFAULT_PAUSE_AT_START));
307         t = addElementToDefinition(new SimpleType(
308                 ATTR_PAUSE_AT_FINISH,
309                 "Whether to pause when the crawl appears finished, rather "
310                 + "than immediately end the crawl. This gives the operator an "
311                 + "opportunity to view crawl results, and possibly add URIs or "
312                 + "adjust settings, while the crawl state is still available. "
313                 + "Default is false.", DEFAULT_PAUSE_AT_FINISH));
314         t.setOverrideable(false);
315         
316         t = addElementToDefinition(new SimpleType(
317                 ATTR_SOURCE_TAG_SEEDS,
318                 "Whether to tag seeds with their own URI as a heritable " +
319                 "'source' String, which will be carried-forward to all URIs " +
320                 "discovered on paths originating from that seed. When " +
321                 "present, such source tags appear in the second-to-last " +
322                 "crawl.log field.", DEFAULT_SOURCE_TAG_SEEDS));
323         t.setOverrideable(false);
324         
325         t = addElementToDefinition(new SimpleType(ATTR_RECOVERY_ENABLED,
326                 "Set to false to disable recovery log writing. Do this if " +
327                 "you you are using the checkpoint feature for recovering " +
328                 "crashed crawls.", DEFAULT_ATTR_RECOVERY_ENABLED));
329         t.setExpertSetting(true);
330         // No sense in it being overrideable.
331
t.setOverrideable(false);
332     }
333
334     public void start() {
335         if (((Boolean JavaDoc)getUncheckedAttribute(null, ATTR_PAUSE_AT_START))
336                 .booleanValue()) {
337             // trigger crawl-wide pause
338
controller.requestCrawlPause();
339         } else {
340             // simply begin
341
unpause();
342         }
343     }
344     
345     synchronized public void pause() {
346         shouldPause = true;
347     }
348
349     synchronized public void unpause() {
350         shouldPause = false;
351         notifyAll();
352     }
353
354     public void initialize(CrawlController c)
355             throws FatalConfigurationException, IOException JavaDoc {
356         c.addCrawlStatusListener(this);
357         File JavaDoc logsDisk = null;
358         try {
359             logsDisk = c.getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
360         } catch (AttributeNotFoundException JavaDoc e) {
361             logger.log(Level.SEVERE, "Failed to get logs directory", e);
362         }
363         if (logsDisk != null) {
364             String JavaDoc logsPath = logsDisk.getAbsolutePath() + File.separatorChar;
365             if (((Boolean JavaDoc)getUncheckedAttribute(null, ATTR_RECOVERY_ENABLED))
366                     .booleanValue()) {
367                 this.recover = new RecoveryJournal(logsPath,
368                     FrontierJournal.LOGNAME_RECOVER);
369             }
370         }
371         try {
372             final Class JavaDoc qapClass = Class.forName((String JavaDoc)getUncheckedAttribute(
373                     null, ATTR_QUEUE_ASSIGNMENT_POLICY));
374
375             queueAssignmentPolicy =
376                 (QueueAssignmentPolicy)qapClass.newInstance();
377         } catch (Exception JavaDoc e) {
378             logger.log(Level.SEVERE, "Bad queue assignment policy class", e);
379             throw new FatalConfigurationException(e.getMessage());
380         }
381     }
382
383     synchronized public void terminate() {
384         shouldTerminate = true;
385         if (this.recover != null) {
386             this.recover.close();
387             this.recover = null;
388         }
389         unpause();
390     }
391
392     protected void doJournalFinishedSuccess(CrawlURI c) {
393         if (this.recover != null) {
394             this.recover.finishedSuccess(c);
395         }
396     }
397
398     protected void doJournalAdded(CrawlURI c) {
399         if (this.recover != null) {
400             this.recover.added(c);
401         }
402     }
403
404     protected void doJournalRescheduled(CrawlURI c) {
405         if (this.recover != null) {
406             this.recover.rescheduled(c);
407         }
408     }
409
410     protected void doJournalFinishedFailure(CrawlURI c) {
411         if (this.recover != null) {
412             this.recover.finishedFailure(c);
413         }
414     }
415
416     protected void doJournalEmitted(CrawlURI c) {
417         if (this.recover != null) {
418             this.recover.emitted(c);
419         }
420     }
421
422     /**
423      * Frontier is empty only if all queues are empty and no URIs are in-process
424      *
425      * @return True if queues are empty.
426      */

427     public synchronized boolean isEmpty() {
428         return queuedUriCount == 0;
429     }
430
431     /**
432      * Increment the running count of queued URIs. Synchronized because
433      * operations on longs are not atomic.
434      */

435     protected synchronized void incrementQueuedUriCount() {
436         queuedUriCount++;
437     }
438
439     /**
440      * Increment the running count of queued URIs. Synchronized because
441      * operations on longs are not atomic.
442      *
443      * @param increment
444      * amount to increment the queued count
445      */

446     protected synchronized void incrementQueuedUriCount(long increment) {
447         queuedUriCount += increment;
448     }
449
450     /**
451      * Note that a number of queued Uris have been deleted.
452      *
453      * @param numberOfDeletes
454      */

455     protected synchronized void decrementQueuedCount(long numberOfDeletes) {
456         queuedUriCount -= numberOfDeletes;
457     }
458
459     /**
460      * (non-Javadoc)
461      *
462      * @see org.archive.crawler.framework.Frontier#queuedUriCount()
463      */

464     public long queuedUriCount() {
465         return queuedUriCount;
466     }
467
468     /**
469      * (non-Javadoc)
470      *
471      * @see org.archive.crawler.framework.Frontier#finishedUriCount()
472      */

473     public long finishedUriCount() {
474         return succeededFetchCount + failedFetchCount + disregardedUriCount;
475     }
476
477     /**
478      * Increment the running count of successfully fetched URIs. Synchronized
479      * because operations on longs are not atomic.
480      */

481     protected synchronized void incrementSucceededFetchCount() {
482         succeededFetchCount++;
483     }
484
485     /**
486      * (non-Javadoc)
487      *
488      * @see org.archive.crawler.framework.Frontier#succeededFetchCount()
489      */

490     public long succeededFetchCount() {
491         return succeededFetchCount;
492     }
493
494     /**
495      * Increment the running count of failed URIs. Synchronized because
496      * operations on longs are not atomic.
497      */

498     protected synchronized void incrementFailedFetchCount() {
499         failedFetchCount++;
500     }
501
502     /**
503      * (non-Javadoc)
504      *
505      * @see org.archive.crawler.framework.Frontier#failedFetchCount()
506      */

507     public long failedFetchCount() {
508         return failedFetchCount;
509     }
510
511     /**
512      * Increment the running count of disregarded URIs. Synchronized because
513      * operations on longs are not atomic.
514      */

515     protected synchronized void incrementDisregardedUriCount() {
516         disregardedUriCount++;
517     }
518
519     public long disregardedUriCount() {
520         return disregardedUriCount;
521     }
522
523     public long totalBytesWritten() {
524         return totalProcessedBytes;
525     }
526
527     /**
528      * Load up the seeds.
529      *
530      * This method is called on initialize and inside in the crawlcontroller
531      * when it wants to force reloading of configuration.
532      *
533      * @see org.archive.crawler.framework.CrawlController#kickUpdate()
534      */

535     public void loadSeeds() {
536         Writer JavaDoc ignoredWriter = new StringWriter JavaDoc();
537         logger.info("beginning");
538         // Get the seeds to refresh.
539
Iterator JavaDoc iter = this.controller.getScope().seedsIterator(ignoredWriter);
540         int count = 0;
541         while (iter.hasNext()) {
542             UURI u = (UURI)iter.next();
543             CandidateURI caUri = CandidateURI.createSeedCandidateURI(u);
544             caUri.setSchedulingDirective(CandidateURI.MEDIUM);
545             if (((Boolean JavaDoc)getUncheckedAttribute(null, ATTR_SOURCE_TAG_SEEDS))
546                     .booleanValue()) {
547                 caUri.putString(CoreAttributeConstants.A_SOURCE_TAG,caUri.toString());
548                 caUri.makeHeritable(CoreAttributeConstants.A_SOURCE_TAG);
549             }
550             schedule(caUri);
551             count++;
552             if(count%1000==0) {
553                 logger.info(count+" seeds");
554             }
555         }
556         // save ignored items (if any) where they can be consulted later
557
saveIgnoredItems(ignoredWriter.toString(), controller.getDisk());
558         logger.info("finished");
559     }
560
561     /**
562      * Dump ignored seed items (if any) to disk; delete file otherwise.
563      * Static to allow non-derived sibling classes (frontiers not yet
564      * subclassed here) to reuse.
565      *
566      * @param ignoredItems
567      * @param dir
568      */

569     public static void saveIgnoredItems(String JavaDoc ignoredItems, File JavaDoc dir) {
570         File JavaDoc ignoredFile = new File JavaDoc(dir, IGNORED_SEEDS_FILENAME);
571         if(ignoredItems==null | ignoredItems.length()>0) {
572             try {
573                 BufferedWriter JavaDoc bw = new BufferedWriter JavaDoc(new FileWriter JavaDoc(ignoredFile));
574                 bw.write(ignoredItems);
575                 bw.close();
576             } catch (IOException JavaDoc e) {
577                 // TODO make an alert?
578
e.printStackTrace();
579             }
580         } else {
581             // delete any older file (if any)
582
ignoredFile.delete();
583         }
584     }
585
586     protected CrawlURI asCrawlUri(CandidateURI caUri) {
587         CrawlURI curi;
588         if (caUri instanceof CrawlURI) {
589             curi = (CrawlURI)caUri;
590         } else {
591             curi = CrawlURI.from(caUri, nextOrdinal++);
592         }
593         curi.setClassKey(getClassKey(curi));
594         return curi;
595     }
596
597     /**
598      * @param now
599      * @throws InterruptedException
600      * @throws EndedException
601      */

602     protected synchronized void preNext(long now) throws InterruptedException JavaDoc,
603             EndedException {
604         if (this.controller == null) {
605             return;
606         }
607         
608         // Check completion conditions
609
if (this.controller.atFinish()) {
610             if (((Boolean JavaDoc)getUncheckedAttribute(null, ATTR_PAUSE_AT_FINISH))
611                     .booleanValue()) {
612                 this.controller.requestCrawlPause();
613             } else {
614                 this.controller.beginCrawlStop();
615             }
616         }
617
618         // enforce operator pause
619
if (shouldPause) {
620             while (shouldPause) {
621                 this.controller.toePaused();
622                 wait();
623             }
624             // exitted pause; possibly finish regardless of pause-at-finish
625
if (controller != null && controller.atFinish()) {
626                 this.controller.beginCrawlStop();
627             }
628         }
629
630         // enforce operator terminate or thread retirement
631
if (shouldTerminate
632                 || ((ToeThread)Thread.currentThread()).shouldRetire()) {
633             throw new EndedException("terminated");
634         }
635
636         enforceBandwidthThrottle(now);
637     }
638
639     /**
640      * Perform any special handling of the CrawlURI, such as promoting its URI
641      * to seed-status, or preferencing it because it is an embed.
642      *
643      * @param curi
644      */

645     protected void applySpecialHandling(CrawlURI curi) {
646         if (curi.isSeed() && curi.getVia() != null
647                 && curi.flattenVia().length() > 0) {
648             // The only way a seed can have a non-empty via is if it is the
649
// result of a seed redirect. Add it to the seeds list.
650
//
651
// This is a feature. This is handling for case where a seed
652
// gets immediately redirected to another page. What we're doing is
653
// treating the immediate redirect target as a seed.
654
this.controller.getScope().addSeed(curi);
655             // And it needs rapid scheduling.
656
if (curi.getSchedulingDirective() == CandidateURI.NORMAL)
657                 curi.setSchedulingDirective(CandidateURI.MEDIUM);
658         }
659
660         // optionally preferencing embeds up to MEDIUM
661
int prefHops = ((Integer JavaDoc)getUncheckedAttribute(curi,
662                 ATTR_PREFERENCE_EMBED_HOPS)).intValue();
663         if (prefHops > 0) {
664             int embedHops = curi.getTransHops();
665             if (embedHops > 0 && embedHops <= prefHops
666                     && curi.getSchedulingDirective() == CandidateURI.NORMAL) {
667                 // number of embed hops falls within the preferenced range, and
668
// uri is not already MEDIUM -- so promote it
669
curi.setSchedulingDirective(CandidateURI.MEDIUM);
670             }
671         }
672     }
673
674     /**
675      * Perform fixups on a CrawlURI about to be returned via next().
676      *
677      * @param curi
678      * CrawlURI about to be returned by next()
679      * @param q
680      * the queue from which the CrawlURI came
681      */

682     protected void noteAboutToEmit(CrawlURI curi, WorkQueue q) {
683         curi.setHolder(q);
684         // if (curi.getServer() == null) {
685
// // TODO: perhaps short-circuit the emit here,
686
// // because URI will be rejected as unfetchable
687
// }
688
doJournalEmitted(curi);
689     }
690
691     /**
692      * @param curi
693      * @return the CrawlServer to be associated with this CrawlURI
694      */

695     protected CrawlServer getServer(CrawlURI curi) {
696         return this.controller.getServerCache().getServerFor(curi);
697     }
698
699     /**
700      * Return a suitable value to wait before retrying the given URI.
701      *
702      * @param curi
703      * CrawlURI to be retried
704      * @return millisecond delay before retry
705      */

706     protected long retryDelayFor(CrawlURI curi) {
707         int status = curi.getFetchStatus();
708         return (status == S_CONNECT_FAILED || status == S_CONNECT_LOST ||
709                 status == S_DOMAIN_UNRESOLVABLE)?
710             ((Long JavaDoc)getUncheckedAttribute(curi, ATTR_RETRY_DELAY)).longValue():
711             0; // no delay for most
712
}
713
714     /**
715      * Update any scheduling structures with the new information in this
716      * CrawlURI. Chiefly means make necessary arrangements for no other URIs at
717      * the same host to be visited within the appropriate politeness window.
718      *
719      * @param curi
720      * The CrawlURI
721      * @return millisecond politeness delay
722      */

723     protected long politenessDelayFor(CrawlURI curi) {
724         long durationToWait = 0;
725         if (curi.containsKey(A_FETCH_BEGAN_TIME)
726                 && curi.containsKey(A_FETCH_COMPLETED_TIME)) {
727
728             long completeTime = curi.getLong(A_FETCH_COMPLETED_TIME);
729             long durationTaken = (completeTime - curi
730                     .getLong(A_FETCH_BEGAN_TIME));
731             durationToWait = (long)(((Float JavaDoc)getUncheckedAttribute(curi,
732                     ATTR_DELAY_FACTOR)).floatValue() * durationTaken);
733
734             long minDelay = ((Integer JavaDoc)getUncheckedAttribute(curi,
735                     ATTR_MIN_DELAY)).longValue();
736             if (minDelay > durationToWait) {
737                 // wait at least the minimum
738
durationToWait = minDelay;
739             }
740
741             long maxDelay = ((Integer JavaDoc)getUncheckedAttribute(curi,
742                     ATTR_MAX_DELAY)).longValue();
743             if (durationToWait > maxDelay) {
744                 // wait no more than the maximum
745
durationToWait = maxDelay;
746             }
747
748             long now = System.currentTimeMillis();
749             int maxBandwidthKB = ((Integer JavaDoc)getUncheckedAttribute(curi,
750                     ATTR_MAX_HOST_BANDWIDTH_USAGE)).intValue();
751             if (maxBandwidthKB > 0) {
752                 // Enforce bandwidth limit
753
CrawlHost host = controller.getServerCache().getHostFor(curi);
754                 long minDurationToWait = host.getEarliestNextURIEmitTime()
755                         - now;
756                 float maxBandwidth = maxBandwidthKB * 1.024F; // kilo factor
757
long processedBytes = curi.getContentSize();
758                 host
759                         .setEarliestNextURIEmitTime((long)(processedBytes / maxBandwidth)
760                                 + now);
761
762                 if (minDurationToWait > durationToWait) {
763                     durationToWait = minDurationToWait;
764                 }
765             }
766         }
767         return durationToWait;
768     }
769
770     /**
771      * Ensure that any overall-bandwidth-usage limit is respected, by pausing as
772      * long as necessary.
773      *
774      * @param now
775      * @throws InterruptedException
776      */

777     private void enforceBandwidthThrottle(long now) throws InterruptedException JavaDoc {
778         int maxBandwidthKB = ((Integer JavaDoc)getUncheckedAttribute(null,
779                 ATTR_MAX_OVERALL_BANDWIDTH_USAGE)).intValue();
780         if (maxBandwidthKB > 0) {
781             // Make sure that new bandwidth setting doesn't affect total crawl
782
if (maxBandwidthKB != lastMaxBandwidthKB) {
783                 lastMaxBandwidthKB = maxBandwidthKB;
784                 processedBytesAfterLastEmittedURI = totalProcessedBytes;
785             }
786
787             // Enforce bandwidth limit
788
long sleepTime = nextURIEmitTime - now;
789             float maxBandwidth = maxBandwidthKB * 1.024F; // Kilo_factor
790
long processedBytes = totalProcessedBytes
791                     - processedBytesAfterLastEmittedURI;
792             long shouldHaveEmittedDiff = nextURIEmitTime == 0? 0
793                     : nextURIEmitTime - now;
794             nextURIEmitTime = (long)(processedBytes / maxBandwidth) + now
795                     + shouldHaveEmittedDiff;
796             processedBytesAfterLastEmittedURI = totalProcessedBytes;
797             if (sleepTime > 0) {
798                 long targetTime = now + sleepTime;
799                 now = System.currentTimeMillis();
800                 while (now < targetTime) {
801                     synchronized (this) {
802                         if (logger.isLoggable(Level.FINE)) {
803                             logger.fine("Frontier waits for: " + sleepTime
804                                     + "ms to respect bandwidth limit.");
805                         }
806                         // TODO: now that this is a wait(), frontier can
807
// still schedule and finish items while waiting,
808
// which is good, but multiple threads could all
809
// wait for the same wakeTime, which somewhat
810
// spoils the throttle... should be fixed.
811
wait(targetTime - now);
812                     }
813                     now = System.currentTimeMillis();
814                 }
815             }
816         }
817     }
818
819     /**
820      * Take note of any processor-local errors that have been entered into the
821      * CrawlURI.
822      *
823      * @param curi
824      *
825      */

826     protected void logLocalizedErrors(CrawlURI curi) {
827         if (curi.containsKey(A_LOCALIZED_ERRORS)) {
828             List JavaDoc localErrors = (List JavaDoc)curi.getObject(A_LOCALIZED_ERRORS);
829             Iterator JavaDoc iter = localErrors.iterator();
830             while (iter.hasNext()) {
831                 Object JavaDoc array[] = {curi, iter.next()};
832                 controller.localErrors.log(Level.WARNING, curi.getUURI()
833                         .toString(), array);
834             }
835             // once logged, discard
836
curi.remove(A_LOCALIZED_ERRORS);
837         }
838     }
839
840     /**
841      * Utility method to return a scratch dir for the given key's temp files.
842      * Every key gets its own subdir. To avoid having any one directory with
843      * thousands of files, there are also two levels of enclosing directory
844      * named by the least-significant hex digits of the key string's java
845      * hashcode.
846      *
847      * @param key
848      * @return File representing scratch directory
849      */

850     protected File JavaDoc scratchDirFor(String JavaDoc key) {
851         String JavaDoc hex = Integer.toHexString(key.hashCode());
852         while (hex.length() < 4) {
853             hex = "0" + hex;
854         }
855         int len = hex.length();
856         return new File JavaDoc(this.controller.getStateDisk(), hex.substring(len - 2,
857                 len)
858                 + File.separator
859                 + hex.substring(len - 4, len - 2)
860                 + File.separator + key);
861     }
862
863     protected boolean overMaxRetries(CrawlURI curi) {
864         // never retry more than the max number of times
865
if (curi.getFetchAttempts() >= ((Integer JavaDoc)getUncheckedAttribute(curi,
866                 ATTR_MAX_RETRIES)).intValue()) {
867             return true;
868         }
869         return false;
870     }
871
872     public void importRecoverLog(String JavaDoc pathToLog, boolean retainFailures)
873             throws IOException JavaDoc {
874         File JavaDoc source = new File JavaDoc(pathToLog);
875         if (!source.isAbsolute()) {
876             source = new File JavaDoc(getSettingsHandler().getOrder().getController()
877                     .getDisk(), pathToLog);
878         }
879         RecoveryJournal.importRecoverLog(source, this, retainFailures);
880     }
881
882     /*
883      * (non-Javadoc)
884      *
885      * @see org.archive.crawler.framework.URIFrontier#kickUpdate()
886      */

887     public void kickUpdate() {
888         // by default, do nothing
889
// (scope will loadSeeds, if appropriate)
890
}
891
892     /**
893      * Log to the main crawl.log
894      *
895      * @param curi
896      */

897     protected void log(CrawlURI curi) {
898         curi.aboutToLog();
899         Object JavaDoc array[] = {curi};
900         this.controller.uriProcessing.log(Level.INFO,
901                 curi.getUURI().toString(), array);
902     }
903
904     protected boolean isDisregarded(CrawlURI curi) {
905         switch (curi.getFetchStatus()) {
906         case S_ROBOTS_PRECLUDED: // they don't want us to have it
907
case S_BLOCKED_BY_CUSTOM_PROCESSOR:
908         case S_OUT_OF_SCOPE: // filtered out by scope
909
case S_BLOCKED_BY_USER: // filtered out by user
910
case S_TOO_MANY_EMBED_HOPS: // too far from last true link
911
case S_TOO_MANY_LINK_HOPS: // too far from seeds
912
case S_DELETED_BY_USER: // user deleted
913
return true;
914         default:
915             return false;
916         }
917     }
918
919     /**
920      * Checks if a recently completed CrawlURI that did not finish successfully
921      * needs to be retried (processed again after some time elapses)
922      *
923      * @param curi
924      * The CrawlURI to check
925      * @return True if we need to retry.
926      */

927     protected boolean needsRetrying(CrawlURI curi) {
928         if (overMaxRetries(curi)) {
929             return false;
930         }
931
932         switch (curi.getFetchStatus()) {
933         case HttpStatus.SC_UNAUTHORIZED:
934             // We can get here though usually a positive status code is
935
// a success. We get here if there is rfc2617 credential data
936
// loaded and we're supposed to go around again. See if any
937
// rfc2617 credential present and if there, assume it got
938
// loaded in FetchHTTP on expectation that we're to go around
939
// again. If no rfc2617 loaded, we should not be here.
940
boolean loaded = curi.hasRfc2617CredentialAvatar();
941             if (!loaded && logger.isLoggable(Level.INFO)) {
942                 logger.info("Have 401 but no creds loaded " + curi);
943             }
944             return loaded;
945         case S_DEFERRED:
946         case S_CONNECT_FAILED:
947         case S_CONNECT_LOST:
948         case S_DOMAIN_UNRESOLVABLE:
949             // these are all worth a retry
950
// TODO: consider if any others (S_TIMEOUT in some cases?) deserve
951
// retry
952
return true;
953         default:
954             return false;
955         }
956     }
957
958     /**
959      * Canonicalize passed uuri. Its would be sweeter if this canonicalize
960      * function was encapsulated by that which it canonicalizes but because
961      * settings change with context -- i.e. there may be overrides in operation
962      * for a particular URI -- its not so easy; Each CandidateURI would need a
963      * reference to the settings system. That's awkward to pass in.
964      *
965      * @param uuri Candidate URI to canonicalize.
966      * @return Canonicalized version of passed <code>uuri</code>.
967      */

968     protected String JavaDoc canonicalize(UURI uuri) {
969         return Canonicalizer.canonicalize(uuri, this.controller.getOrder());
970     }
971
972     /**
973      * Canonicalize passed CandidateURI. This method differs from
974      * {@link #canonicalize(UURI)} in that it takes a look at
975      * the CandidateURI context possibly overriding any canonicalization effect if
976      * it could make us miss content. If canonicalization produces an URL that
977      * was 'alreadyseen', but the entry in the 'alreadyseen' database did
978      * nothing but redirect to the current URL, we won't get the current URL;
979      * we'll think we've already see it. Examples would be archive.org
980      * redirecting to www.archive.org or the inverse, www.netarkivet.net
981      * redirecting to netarkivet.net (assuming stripWWW rule enabled).
982      * <p>Note, this method under circumstance sets the forceFetch flag.
983      *
984      * @param cauri CandidateURI to examine.
985      * @return Canonicalized <code>cacuri</code>.
986      */

987     protected String JavaDoc canonicalize(CandidateURI cauri) {
988         String JavaDoc canon = canonicalize(cauri.getUURI());
989         if (cauri.isLocation()) {
990             // If the via is not the same as where we're being redirected (i.e.
991
// we're not being redirected back to the same page, AND the
992
// canonicalization of the via is equal to the the current cauri,
993
// THEN forcefetch (Forcefetch so no chance of our not crawling
994
// content because alreadyseen check things its seen the url before.
995
// An example of an URL that redirects to itself is:
996
// http://bridalelegance.com/images/buttons3/tuxedos-off.gif.
997
// An example of an URL whose canonicalization equals its via's
998
// canonicalization, and we want to fetch content at the
999
// redirection (i.e. need to set forcefetch), is netarkivet.dk.
1000
if (!cauri.toString().equals(cauri.getVia().toString()) &&
1001                    canonicalize(cauri.getVia()).equals(canon)) {
1002                cauri.setForceFetch(true);
1003            }
1004        }
1005        return canon;
1006    }
1007
1008    /**
1009     * @param cauri CrawlURI we're to get a key for.
1010     * @return a String token representing a queue
1011     */

1012    public String JavaDoc getClassKey(CandidateURI cauri) {
1013        String JavaDoc queueKey = (String JavaDoc)getUncheckedAttribute(cauri,
1014            ATTR_FORCE_QUEUE);
1015        if ("".equals(queueKey)) {
1016            // Typical case, barring overrides
1017
queueKey =
1018                queueAssignmentPolicy.getClassKey(this.controller, cauri);
1019        }
1020        return queueKey;
1021    }
1022
1023    /**
1024     * @return RecoveryJournal instance. May be null.
1025     */

1026    public FrontierJournal getFrontierJournal() {
1027        return this.recover;
1028    }
1029
1030    public void crawlEnding(String JavaDoc sExitMessage) {
1031        // TODO Auto-generated method stub
1032
}
1033
1034    public void crawlEnded(String JavaDoc sExitMessage) {
1035        if (logger.isLoggable(Level.INFO)) {
1036            logger.info("Closing with " + Long.toString(queuedUriCount()) +
1037                " urls still in queue.");
1038        }
1039    }
1040
1041    public void crawlStarted(String JavaDoc message) {
1042        // TODO Auto-generated method stub
1043
}
1044
1045    public void crawlPausing(String JavaDoc statusMessage) {
1046        // TODO Auto-generated method stub
1047
}
1048
1049    public void crawlPaused(String JavaDoc statusMessage) {
1050        // TODO Auto-generated method stub
1051
}
1052
1053    public void crawlResuming(String JavaDoc statusMessage) {
1054        // TODO Auto-generated method stub
1055
}
1056    
1057    public void crawlCheckpoint(File JavaDoc checkpointDir)
1058    throws Exception JavaDoc {
1059        if (this.recover == null) {
1060            return;
1061        }
1062        this.recover.checkpoint(checkpointDir);
1063    }
1064    
1065    //
1066
// Reporter implementation
1067
//
1068
public String JavaDoc singleLineReport() {
1069        return ArchiveUtils.singleLineReport(this);
1070    }
1071
1072    public void reportTo(PrintWriter JavaDoc writer) {
1073        reportTo(null, writer);
1074    }
1075}
1076
Popular Tags