1 23 package org.archive.crawler.frontier; 24 25 import java.io.File ; 26 import java.io.IOException ; 27 import java.io.PrintWriter ; 28 import java.io.Serializable ; 29 import java.io.StringWriter ; 30 import java.io.Writer ; 31 import java.util.ArrayList ; 32 import java.util.Date ; 33 import java.util.Iterator ; 34 import java.util.List ; 35 import java.util.logging.Level ; 36 import java.util.logging.Logger ; 37 38 import javax.management.AttributeNotFoundException ; 39 40 import org.apache.commons.httpclient.HttpStatus; 41 import org.archive.crawler.datamodel.CandidateURI; 42 import org.archive.crawler.datamodel.CoreAttributeConstants; 43 import org.archive.crawler.datamodel.CrawlServer; 44 import org.archive.crawler.datamodel.CrawlURI; 45 import org.archive.crawler.datamodel.FetchStatusCodes; 46 import org.archive.crawler.datamodel.UriUniqFilter; 47 import org.archive.crawler.datamodel.UriUniqFilter.HasUriReceiver; 48 import org.archive.crawler.event.CrawlStatusListener; 49 import org.archive.crawler.framework.CrawlController; 50 import org.archive.crawler.framework.Frontier; 51 import org.archive.crawler.framework.FrontierMarker; 52 import org.archive.crawler.framework.exceptions.EndedException; 53 import org.archive.crawler.framework.exceptions.FatalConfigurationException; 54 import org.archive.crawler.framework.exceptions.InvalidFrontierMarkerException; 55 import org.archive.crawler.settings.ModuleType; 56 import org.archive.crawler.settings.RegularExpressionConstraint; 57 import org.archive.crawler.settings.SimpleType; 58 import org.archive.crawler.settings.Type; 59 import org.archive.crawler.url.Canonicalizer; 60 import org.archive.crawler.util.BdbUriUniqFilter; 61 import org.archive.net.UURI; 62 import org.archive.queue.MemQueue; 63 import org.archive.queue.Queue; 64 import org.archive.util.ArchiveUtils; 65 66 67 79 public class AdaptiveRevisitFrontier extends ModuleType 80 implements Frontier, FetchStatusCodes, CoreAttributeConstants, 81 AdaptiveRevisitAttributeConstants, CrawlStatusListener, HasUriReceiver { 82 83 private static final long serialVersionUID = -8666872690438543671L; 84 85 private static final Logger logger = 86 Logger.getLogger(AdaptiveRevisitFrontier.class.getName()); 87 88 90 public final static String ATTR_DELAY_FACTOR = "delay-factor"; 91 private final static Float DEFAULT_DELAY_FACTOR = new Float (5); 92 93 95 public final static String ATTR_MIN_DELAY = "min-delay-ms"; 96 97 private final static Integer DEFAULT_MIN_DELAY = new Integer (2000); 99 100 101 public final static String ATTR_MAX_DELAY = "max-delay-ms"; 102 103 private final static Integer DEFAULT_MAX_DELAY = new Integer (30000); 105 106 107 public final static String ATTR_MAX_RETRIES = "max-retries"; 108 private final static Integer DEFAULT_MAX_RETRIES = new Integer (30); 109 110 111 public final static String ATTR_RETRY_DELAY = "retry-delay-seconds"; 112 113 private final static Long DEFAULT_RETRY_DELAY = new Long (900); 115 116 117 public final static String ATTR_HOST_VALENCE = "host-valence"; 118 private final static Integer DEFAULT_HOST_VALENCE = new Integer (1); 119 120 121 public final static String ATTR_PREFERENCE_EMBED_HOPS = 122 "preference-embed-hops"; 123 private final static Integer DEFAULT_PREFERENCE_EMBED_HOPS = new Integer (0); 124 125 127 public final static String ATTR_FORCE_QUEUE = "force-queue-assignment"; 128 protected final static String DEFAULT_FORCE_QUEUE = ""; 129 131 protected final static String ACCEPTABLE_FORCE_QUEUE = "[-\\w\\.,:]*"; 132 133 136 public final static String ATTR_QUEUE_IGNORE_WWW = "queue-ignore-www"; 137 protected final static Boolean DEFAULT_QUEUE_IGNORE_WWW = new Boolean (false); 138 139 142 public final static String ATTR_USE_URI_UNIQ_FILTER = "use-uri-uniq-filter"; 143 protected final static Boolean DEFAULT_USE_URI_UNIQ_FILTER = new Boolean (false); 144 145 private CrawlController controller; 146 147 private AdaptiveRevisitQueueList hostQueues; 148 149 private UriUniqFilter alreadyIncluded; 150 151 private ThreadLocalQueue threadWaiting = new ThreadLocalQueue(); 152 153 154 private QueueAssignmentPolicy queueAssignmentPolicy = null; 155 156 private long succeededFetchCount = 0; 158 private long failedFetchCount = 0; 159 private long disregardedUriCount = 0; 161 162 private long totalProcessedBytes = 0; 163 164 private boolean shouldPause = false; 166 private boolean shouldTerminate = false; 167 168 169 public AdaptiveRevisitFrontier(String name) { 170 this(name, "AdaptiveRevisitFrontier. EXPERIMENTAL Frontier that " + 171 "will repeatedly visit all " + 172 "encountered URIs. Wait time between visits is configurable" + 173 " and is determined by seperate Processor(s). See " + 174 "WaitEvaluators " + 175 "See documentation for ARFrontier limitations."); 176 } 177 178 public AdaptiveRevisitFrontier(String name, String description) { 179 super(Frontier.ATTR_NAME, description); 180 addElementToDefinition(new SimpleType(ATTR_DELAY_FACTOR, 181 "How many multiples of last fetch elapsed time to wait before " + 182 "recontacting same server", DEFAULT_DELAY_FACTOR)); 183 addElementToDefinition(new SimpleType(ATTR_MAX_DELAY, 184 "Never wait more than this long, regardless of multiple", 185 DEFAULT_MAX_DELAY)); 186 addElementToDefinition(new SimpleType(ATTR_MIN_DELAY, 187 "Always wait this long after one completion before recontacting " + 188 "same server, regardless of multiple", DEFAULT_MIN_DELAY)); 189 addElementToDefinition(new SimpleType(ATTR_MAX_RETRIES, 190 "How often to retry fetching a URI that failed to be retrieved.\n" + 191 "If zero, the crawler will get the robots.txt only.", 192 DEFAULT_MAX_RETRIES)); 193 addElementToDefinition(new SimpleType(ATTR_RETRY_DELAY, 194 "How long to wait by default until we retry fetching a" + 195 " URI that failed to be retrieved (seconds). ", 196 DEFAULT_RETRY_DELAY)); 197 addElementToDefinition(new SimpleType(ATTR_PREFERENCE_EMBED_HOPS, 198 "Number of embedded (or redirected) hops up to which " + 199 "a URI has higher priority scheduling. For example, if set " + 200 "to 1 (the default), items such as inline images (1-hop " + 201 "embedded resources) will be scheduled ahead of all regular " + 202 "links (or many-hop resources, like nested frames). If set to " + 203 "zero, no preferencing will occur, and embeds/redirects are " + 204 "scheduled the same as regular links.", 205 DEFAULT_PREFERENCE_EMBED_HOPS)); 206 Type t; 207 t = addElementToDefinition(new SimpleType(ATTR_HOST_VALENCE, 208 "Maximum number of simultaneous requests to a single" + 209 " host.", 210 DEFAULT_HOST_VALENCE)); 211 t.setExpertSetting(true); 212 t = addElementToDefinition(new SimpleType(ATTR_QUEUE_IGNORE_WWW, 213 "If true then documents from x.com, www.x.com and any " + 214 "www[0-9]+.x.com will be assigned to the same queue.", 215 DEFAULT_QUEUE_IGNORE_WWW)); 216 t.setExpertSetting(true); 217 t = addElementToDefinition(new SimpleType( 218 ATTR_FORCE_QUEUE, 219 "The queue name into which to force URIs. Should " 220 + "be left blank at global level. Specify a " 221 + "per-domain/per-host override to force URIs into " 222 + "a particular named queue, regardless of the assignment " 223 + "policy in effect (domain or ip-based politeness). " 224 + "This could be used on domains known to all be from " 225 + "the same small set of IPs (eg blogspot, dailykos, etc.) " 226 + "to simulate IP-based politeness, or it could be used if " 227 + "you wanted to enforce politeness over a whole domain, even " 228 + "though the subdomains are split across many IPs.", 229 DEFAULT_FORCE_QUEUE)); 230 t.setOverrideable(true); 231 t.setExpertSetting(true); 232 t.addConstraint(new RegularExpressionConstraint(ACCEPTABLE_FORCE_QUEUE, 233 Level.WARNING, "This field must contain only alphanumeric " 234 + "characters plus period, dash, comma, colon, or underscore.")); 235 t = addElementToDefinition(new SimpleType(ATTR_USE_URI_UNIQ_FILTER, 236 "If true then the Frontier will use a seperate " + 237 "datastructure to detect and eliminate duplicates.\n" + 238 "This is required for Canonicalization rules to work.", 239 DEFAULT_USE_URI_UNIQ_FILTER)); 240 t.setExpertSetting(true); 241 t.setOverrideable(false); 242 243 CrawlURI.addAlistPersistentMember(A_CONTENT_STATE_KEY); 245 CrawlURI.addAlistPersistentMember(A_TIME_OF_NEXT_PROCESSING); 246 } 247 248 public synchronized void initialize(CrawlController c) 249 throws FatalConfigurationException, IOException { 250 controller = c; 251 controller.addCrawlStatusListener(this); 252 253 queueAssignmentPolicy = new HostnameQueueAssignmentPolicy(); 254 255 hostQueues = new AdaptiveRevisitQueueList(c.getBdbEnvironment(), 256 c.getClassCatalog()); 257 258 if(((Boolean )getUncheckedAttribute( 259 null,ATTR_USE_URI_UNIQ_FILTER)).booleanValue()){ 260 alreadyIncluded = createAlreadyIncluded(); 261 } else { 262 alreadyIncluded = null; 263 } 264 265 loadSeeds(); 266 } 267 268 275 protected UriUniqFilter createAlreadyIncluded() throws IOException { 276 UriUniqFilter uuf = new BdbUriUniqFilter( 277 this.controller.getBdbEnvironment()); 278 uuf.setDestination(this); 279 return uuf; 280 } 281 282 287 public void loadSeeds() { 288 Writer ignoredWriter = new StringWriter (); 289 Iterator iter = this.controller.getScope().seedsIterator(ignoredWriter); 291 while (iter.hasNext()) { 292 CandidateURI caUri = 293 CandidateURI.createSeedCandidateURI((UURI)iter.next()); 294 caUri.setSchedulingDirective(CandidateURI.MEDIUM); 295 schedule(caUri); 296 } 297 batchFlush(); 298 AbstractFrontier.saveIgnoredItems( 300 ignoredWriter.toString(), 301 controller.getDisk()); 302 } 303 304 public String getClassKey(CandidateURI cauri) { 305 String queueKey = (String )getUncheckedAttribute(cauri, 306 ATTR_FORCE_QUEUE); 307 if ("".equals(queueKey)) { 308 queueKey = 310 queueAssignmentPolicy.getClassKey(controller, cauri); 311 if(((Boolean )getUncheckedAttribute( 315 cauri,ATTR_QUEUE_IGNORE_WWW)).booleanValue()){ 316 queueKey = queueKey.replaceAll("^www[0-9]{0,}\\.",""); 317 } 318 } 319 return queueKey; 320 } 321 322 332 protected String canonicalize(UURI uuri) { 333 return Canonicalizer.canonicalize(uuri, this.controller.getOrder()); 334 } 335 336 351 protected String canonicalize(CandidateURI cauri) { 352 String canon = canonicalize(cauri.getUURI()); 353 if (cauri.isLocation()) { 354 if (!cauri.toString().equals(cauri.getVia().toString()) && 365 canonicalize(cauri.getVia()).equals(canon)) { 366 cauri.setForceFetch(true); 367 } 368 } 369 return canon; 370 } 371 372 376 protected void innerSchedule(CandidateURI caUri) { 377 CrawlURI curi; 378 if(caUri instanceof CrawlURI) { 379 curi = (CrawlURI) caUri; 380 } else { 381 curi = CrawlURI.from(caUri,System.currentTimeMillis()); 382 curi.putLong(A_TIME_OF_NEXT_PROCESSING, 383 System.currentTimeMillis()); 384 } 386 387 if(curi.getClassKey() == null){ 388 curi.setClassKey(getClassKey(curi)); 389 } 390 391 if(curi.isSeed() && curi.getVia() != null 392 && curi.flattenVia().length() > 0) { 393 this.controller.getScope().addSeed(curi); 400 curi.setSchedulingDirective(CandidateURI.MEDIUM); 402 } 403 404 int prefHops = ((Integer ) getUncheckedAttribute(curi, 406 ATTR_PREFERENCE_EMBED_HOPS)).intValue(); 407 boolean prefEmbed = false; 408 if (prefHops > 0) { 409 int embedHops = curi.getTransHops(); 410 if (embedHops > 0 && embedHops <= prefHops 411 && curi.getSchedulingDirective() == CandidateURI.NORMAL) { 412 curi.setSchedulingDirective(CandidateURI.MEDIUM); 415 prefEmbed = true; 416 } 417 } 418 419 curi.putLong(A_TIME_OF_NEXT_PROCESSING, 422 System.currentTimeMillis()); 423 424 try { 425 logger.finest("scheduling " + curi.toString()); 426 AdaptiveRevisitHostQueue hq = getHQ(curi); 427 hq.add(curi,prefEmbed); 428 } catch (IOException e) { 429 e.printStackTrace(); 431 } 432 433 } 434 435 443 protected AdaptiveRevisitHostQueue getHQ(CrawlURI curi) throws IOException { 444 AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey()); 445 if(hq == null){ 446 int valence = DEFAULT_HOST_VALENCE.intValue(); 448 try { 449 valence = ((Integer )getAttribute(curi,ATTR_HOST_VALENCE)).intValue(); 450 } catch (AttributeNotFoundException e2) { 451 logger.severe("Unable to load valence."); 452 } 453 hq = hostQueues.createHQ(curi.getClassKey(),valence); 454 } 455 return hq; 456 } 457 458 protected void batchSchedule(CandidateURI caUri) { 459 threadWaiting.getQueue().enqueue(caUri); 460 } 461 462 protected void batchFlush() { 463 innerBatchFlush(); 464 } 465 466 private void innerBatchFlush() { 467 Queue q = threadWaiting.getQueue(); 468 while(!q.isEmpty()) { 469 CandidateURI caUri = (CandidateURI)q.dequeue(); 470 if(alreadyIncluded != null){ 471 String cannon = canonicalize(caUri); 472 System.out.println("Cannon of " + caUri + " is " + cannon); 473 if (caUri.forceFetch()) { 474 alreadyIncluded.addForce(cannon, caUri); 475 } else { 476 alreadyIncluded.add(cannon, caUri); 477 } 478 } else { 479 innerSchedule(caUri); 480 } 481 } 482 } 483 484 488 protected CrawlServer getServer(CrawlURI curi) { 489 return this.controller.getServerCache().getServerFor(curi); 490 } 491 492 495 public synchronized CrawlURI next() 496 throws InterruptedException , EndedException { 497 controller.checkFinish(); 498 499 while(shouldPause){ 500 controller.toePaused(); 501 wait(); 502 } 503 504 if(shouldTerminate){ 505 throw new EndedException("terminated"); 506 } 507 508 AdaptiveRevisitHostQueue hq = hostQueues.getTopHQ(); 509 510 while(hq.getState() != AdaptiveRevisitHostQueue.HQSTATE_READY){ 511 long waitTime = hq.getNextReadyTime() - System.currentTimeMillis(); 514 if(waitTime > 0){ 515 wait(waitTime); 516 } 517 hq = hostQueues.getTopHQ(); 519 } 520 521 if(shouldTerminate){ 522 throw new EndedException("terminated"); 524 } 525 526 try { 527 CrawlURI curi = hq.next(); 528 logger.fine("Issuing " + curi.toString()); 530 long temp = curi.getLong(A_TIME_OF_NEXT_PROCESSING); 531 long currT = System.currentTimeMillis(); 532 long overdue = (currT-temp); 533 if(logger.isLoggable(Level.FINER)){ 534 String waitI = "not set"; 535 if(curi.containsKey(A_WAIT_INTERVAL)){ 536 waitI = ArchiveUtils.formatMillisecondsToConventional( 537 curi.getLong(A_WAIT_INTERVAL)); 538 } 539 logger.finer("Wait interval: " + waitI + 540 ", Time of next proc: " + temp + 541 ", Current time: " + currT + 542 ", Overdue by: " + overdue + "ms"); 543 } 544 if(overdue < 0){ 545 logger.severe("Time overdue for " + curi.toString() + 547 "is negative (" + overdue + ")!"); 548 } 549 curi.putLong(A_FETCH_OVERDUE,overdue); 550 return curi; 551 } catch (IOException e) { 552 e.printStackTrace(); 555 } 556 557 return null; 558 } 559 560 563 public boolean isEmpty() { 564 return hostQueues.getSize() == 0; 567 } 568 569 572 public void schedule(CandidateURI caURI) { 573 batchSchedule(caURI); 574 } 575 576 579 public synchronized void finished(CrawlURI curi) { 580 logger.fine(curi.toString()+ " " + 581 CrawlURI.fetchStatusCodesToString(curi.getFetchStatus())); 582 curi.incrementFetchAttempts(); 583 logLocalizedErrors(curi); 584 585 innerFinished(curi); 586 } 587 588 protected synchronized void innerFinished(CrawlURI curi) { 589 try { 590 innerBatchFlush(); 591 592 if (curi.isSuccess()) { 593 successDisposition(curi); 594 } else if (needsPromptRetry(curi)) { 595 reschedule(curi,false); 598 } else if (needsRetrying(curi)) { 599 reschedule(curi,true); 601 controller.fireCrawledURINeedRetryEvent(curi); 602 } else if(isDisregarded(curi)) { 603 disregardDisposition(curi); 606 } else { 607 failureDisposition(curi); 609 } 610 611 notifyAll(); 616 } catch (RuntimeException e) { 617 curi.setFetchStatus(S_RUNTIME_EXCEPTION); 618 logger.warning("RTE in innerFinished() " + 620 e.getMessage()); 621 e.printStackTrace(); 622 curi.putObject(A_RUNTIME_EXCEPTION, e); 623 failureDisposition(curi); 624 } catch (AttributeNotFoundException e) { 625 logger.severe(e.getMessage()); 626 } 627 } 628 629 634 private void logLocalizedErrors(CrawlURI curi) { 635 if(curi.containsKey(A_LOCALIZED_ERRORS)) { 636 List localErrors = (List )curi.getObject(A_LOCALIZED_ERRORS); 637 Iterator iter = localErrors.iterator(); 638 while(iter.hasNext()) { 639 Object array[] = {curi, iter.next()}; 640 controller.localErrors.log(Level.WARNING, 641 curi.getUURI().toString(), array); 642 } 643 curi.remove(A_LOCALIZED_ERRORS); 645 } 646 } 647 648 653 protected void successDisposition(CrawlURI curi) { 654 curi.aboutToLog(); 655 656 long waitInterval = 0; 657 658 if(curi.containsKey(A_WAIT_INTERVAL)){ 659 waitInterval = curi.getLong(A_WAIT_INTERVAL); 660 curi.addAnnotation("wt:" + 661 ArchiveUtils.formatMillisecondsToConventional( 662 waitInterval)); 663 } else { 664 logger.severe("Missing wait interval for " + curi.toString() + 665 " WaitEvaluator may be missing."); 666 } 667 if(curi.containsKey(A_NUMBER_OF_VISITS)){ 668 curi.addAnnotation(curi.getInt(A_NUMBER_OF_VISITS) + "vis"); 669 } 670 if(curi.containsKey(A_NUMBER_OF_VERSIONS)){ 671 curi.addAnnotation(curi.getInt(A_NUMBER_OF_VERSIONS) + "ver"); 672 } 673 if(curi.containsKey(A_FETCH_OVERDUE)){ 674 curi.addAnnotation("ov:" + 675 ArchiveUtils.formatMillisecondsToConventional( 676 (curi.getLong(A_FETCH_OVERDUE)))); 677 } 678 679 Object array[] = { curi }; 680 controller.uriProcessing.log( 681 Level.INFO, 682 curi.getUURI().toString(), 683 array); 684 685 succeededFetchCount++; 686 totalProcessedBytes += curi.getContentSize(); 687 688 controller.fireCrawledURISuccessfulEvent(curi); 691 692 curi.setSchedulingDirective(CandidateURI.NORMAL); 693 694 curi.putLong(A_TIME_OF_NEXT_PROCESSING, 696 System.currentTimeMillis()+waitInterval); 697 698 699 700 AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey()); 701 702 long wakeupTime = (curi.containsKey(A_FETCH_COMPLETED_TIME)? 706 curi.getLong(A_FETCH_COMPLETED_TIME): 707 (new Date ()).getTime()) + calculateSnoozeTime(curi); 708 709 curi.processingCleanup(); 711 curi.resetDeferrals(); 712 curi.resetFetchAttempts(); 713 try { 714 hq.update(curi, true, wakeupTime); 715 } catch (IOException e) { 716 logger.severe("An IOException occured when updating " + 717 curi.toString() + "\n" + e.getMessage()); 718 e.printStackTrace(); 719 } 720 } 721 722 731 protected void reschedule(CrawlURI curi, boolean errorWait) 732 throws AttributeNotFoundException { 733 long delay = 0; 734 if(errorWait){ 735 if(curi.containsKey(A_RETRY_DELAY)) { 736 delay = curi.getLong(A_RETRY_DELAY); 737 } else { 738 delay = ((Long )getAttribute(ATTR_RETRY_DELAY,curi)).longValue(); 740 } 741 } 742 743 long retryTime = (curi.containsKey(A_FETCH_COMPLETED_TIME)? 744 curi.getLong(A_FETCH_COMPLETED_TIME): 745 (new Date ()).getTime()) + delay; 746 747 AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey()); 748 curi.processingCleanup(); 750 if(errorWait){ 751 curi.resetDeferrals(); } 753 try { 754 hq.update(curi, errorWait, retryTime); 755 } catch (IOException e) { 756 e.printStackTrace(); 758 } 759 } 760 761 767 protected void failureDisposition(CrawlURI curi) { 768 this.controller.fireCrawledURIFailureEvent(curi); 770 771 curi.aboutToLog(); 773 Object array[] = { curi }; 774 this.controller.uriProcessing.log( 775 Level.INFO, 776 curi.getUURI().toString(), 777 array); 778 779 if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) { 781 this.controller.runtimeErrors.log( 782 Level.WARNING, 783 curi.getUURI().toString(), 784 array); 785 } 786 failedFetchCount++; 787 788 curi.setSchedulingDirective(CandidateURI.NORMAL); 790 curi.putLong(A_TIME_OF_NEXT_PROCESSING,Long.MAX_VALUE); 792 793 AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey()); 794 curi.processingCleanup(); 796 curi.resetDeferrals(); 797 curi.resetFetchAttempts(); 798 try { 799 boolean shouldForget = shouldBeForgotten(curi); 801 if(shouldForget && alreadyIncluded != null){ 802 alreadyIncluded.forget(canonicalize(curi.getUURI()),curi); 803 } 804 hq.update(curi,false, 0, shouldForget); 805 } catch (IOException e) { 806 e.printStackTrace(); 808 } 809 } 810 811 protected void disregardDisposition(CrawlURI curi) { 812 controller.fireCrawledURIDisregardEvent(curi); 814 815 curi.aboutToLog(); 817 Object array[] = { curi }; 818 controller.uriProcessing.log( 819 Level.INFO, 820 curi.getUURI().toString(), 821 array); 822 823 disregardedUriCount++; 824 825 curi.putLong(A_TIME_OF_NEXT_PROCESSING,Long.MAX_VALUE); 828 curi.setSchedulingDirective(CandidateURI.NORMAL); 829 830 AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey()); 831 curi.processingCleanup(); 833 curi.resetDeferrals(); 834 curi.resetFetchAttempts(); 835 try { 836 hq.update(curi, false, 0, shouldBeForgotten(curi)); 838 } catch (IOException e) { 839 e.printStackTrace(); 841 } 842 } 843 844 853 protected boolean shouldBeForgotten(CrawlURI curi) { 854 switch(curi.getFetchStatus()) { 855 case S_OUT_OF_SCOPE: 856 case S_TOO_MANY_EMBED_HOPS: 857 case S_TOO_MANY_LINK_HOPS: 858 return true; 859 default: 860 return false; 861 } 862 } 863 864 874 protected boolean needsPromptRetry(CrawlURI curi) 875 throws AttributeNotFoundException { 876 if (curi.getFetchAttempts() >= 877 ((Integer )getAttribute(ATTR_MAX_RETRIES, curi)).intValue() ) { 878 return false; 879 } 880 881 switch (curi.getFetchStatus()) { 882 case S_DEFERRED: 883 return true; 884 885 case HttpStatus.SC_UNAUTHORIZED: 886 boolean loaded = curi.hasRfc2617CredentialAvatar(); 893 if (!loaded) { 894 logger.severe("Have 401 but no creds loaded " + curi); 895 } 896 return loaded; 897 898 default: 899 return false; 900 } 901 } 902 903 912 protected boolean needsRetrying(CrawlURI curi) 913 throws AttributeNotFoundException { 914 if (curi.getFetchAttempts() >= 916 ((Integer )getAttribute(ATTR_MAX_RETRIES,curi)).intValue() ) { 917 return false; 918 } else { 919 switch (curi.getFetchStatus()) { 921 case S_CONNECT_FAILED: 922 case S_CONNECT_LOST: 923 case S_DOMAIN_UNRESOLVABLE: 924 return true; 928 default: 929 return false; 930 } 931 } 932 } 933 934 protected boolean isDisregarded(CrawlURI curi) { 935 switch (curi.getFetchStatus()) { 936 case S_ROBOTS_PRECLUDED : case S_OUT_OF_SCOPE : case S_BLOCKED_BY_CUSTOM_PROCESSOR: 939 case S_BLOCKED_BY_USER : case S_TOO_MANY_EMBED_HOPS : case S_TOO_MANY_LINK_HOPS : case S_DELETED_BY_USER : return true; 944 default: 945 return false; 946 } 947 } 948 949 956 protected long calculateSnoozeTime(CrawlURI curi) { 957 long durationToWait = 0; 958 if (curi.containsKey(A_FETCH_BEGAN_TIME) 959 && curi.containsKey(A_FETCH_COMPLETED_TIME)) { 960 961 try{ 962 963 long completeTime = curi.getLong(A_FETCH_COMPLETED_TIME); 964 long durationTaken = 965 (completeTime - curi.getLong(A_FETCH_BEGAN_TIME)); 966 967 durationToWait = (long)( 968 ((Float ) getAttribute(ATTR_DELAY_FACTOR, curi)) 969 .floatValue() * durationTaken); 970 971 long minDelay = 972 ((Integer ) getAttribute(ATTR_MIN_DELAY, curi)).longValue(); 973 974 if (minDelay > durationToWait) { 975 durationToWait = minDelay; 977 } 978 979 long maxDelay = ((Integer ) getAttribute(ATTR_MAX_DELAY, curi)).longValue(); 980 if (durationToWait > maxDelay) { 981 durationToWait = maxDelay; 983 } 984 } catch (AttributeNotFoundException e) { 985 logger.severe("Unable to find attribute. " + 986 curi.toString()); 987 durationToWait = DEFAULT_MAX_DELAY.longValue(); 989 } 990 991 } 992 long ret = durationToWait > DEFAULT_MIN_DELAY.longValue() ? 993 durationToWait : DEFAULT_MIN_DELAY.longValue(); 994 logger.finest("Snooze time for " + curi.toString() + " = " + ret ); 995 return ret; 996 } 997 998 1001 public synchronized long discoveredUriCount() { 1002 return (this.alreadyIncluded != null) ? 1003 this.alreadyIncluded.count() : hostQueues.getSize(); 1004 } 1005 1006 1009 public synchronized long queuedUriCount() { 1010 return hostQueues.getSize(); 1011 } 1012 1013 1016 public long finishedUriCount() { 1017 return succeededFetchCount+failedFetchCount+disregardedUriCount; 1018 } 1019 1020 1023 public long succeededFetchCount() { 1024 return succeededFetchCount; 1025 } 1026 1027 1030 public long failedFetchCount() { 1031 return failedFetchCount; 1032 } 1033 1034 1037 public long disregardedUriCount() { 1038 return disregardedUriCount++; 1039 } 1040 1041 1044 public long totalBytesWritten() { 1045 return totalProcessedBytes; 1046 } 1047 1048 1053 public void importRecoverLog(String pathToLog) throws IOException { 1054 throw new IOException ("Unsupported by this frontier."); 1055 } 1056 1057 public synchronized FrontierMarker getInitialMarker(String regexpr, 1058 boolean inCacheOnly) { 1059 return null; 1060 } 1061 1062 1065 public synchronized ArrayList getURIsList(FrontierMarker marker, 1066 int numberOfMatches, boolean verbose) 1067 throws InvalidFrontierMarkerException { 1068 return null; 1070 } 1071 1072 1075 public synchronized long deleteURIs(String match) { 1076 return 0; 1078 } 1079 1080 1083 public synchronized void deleted(CrawlURI curi) { 1084 } 1086 1087 public void considerIncluded(UURI u) { 1088 CrawlURI curi = new CrawlURI(u); 1090 innerSchedule(curi); 1091 1092 } 1093 1094 public void kickUpdate() { 1095 loadSeeds(); 1096 } 1097 1098 public void start() { 1099 unpause(); 1100 } 1101 1102 synchronized public void pause() { 1103 shouldPause = true; 1104 notifyAll(); 1105 } 1106 synchronized public void unpause() { 1107 shouldPause = false; 1108 notifyAll(); 1109 } 1110 synchronized public void terminate() { 1111 shouldTerminate = true; 1112 } 1113 1114 1117 public FrontierJournal getFrontierJournal() { 1118 return null; 1119 } 1120 1121 private static class ThreadLocalQueue 1122 extends ThreadLocal <Queue<CandidateURI>> implements Serializable { 1123 1124 private static final long serialVersionUID = 8268977225156462059L; 1125 1126 protected Queue<CandidateURI> initialValue() { 1127 return new MemQueue<CandidateURI>(); 1128 } 1129 1130 1133 public Queue<CandidateURI> getQueue() { 1134 return get(); 1135 } 1136 } 1137 1138 1144 public void importRecoverLog(String pathToLog, boolean retainFailures) 1145 throws IOException { 1146 throw new IOException ("Unsupported"); 1147 } 1148 1149 1153 public String [] getReports() { 1154 return new String [] {}; 1156 } 1157 1158 1161 public String singleLineReport() { 1162 return ArchiveUtils.singleLineReport(this); 1163 } 1164 1165 1168 public void reportTo(PrintWriter writer) throws IOException { 1169 reportTo(null,writer); 1170 } 1171 1172 1175 public synchronized void singleLineReportTo(PrintWriter w) throws IOException { 1176 hostQueues.singleLineReportTo(w); 1177 } 1178 1179 1182 public String singleLineLegend() { 1183 return hostQueues.singleLineLegend(); 1184 } 1185 1186 1189 public synchronized void reportTo(String name, PrintWriter writer) { 1190 hostQueues.reportTo(name, writer); 1192 } 1193 1194 1197 public void crawlStarted(String message) { 1198 } 1200 1201 1204 public void crawlEnding(String sExitMessage) { 1205 } 1207 1208 1211 public void crawlEnded(String sExitMessage) { 1212 if (this.alreadyIncluded != null) { 1214 this.alreadyIncluded.close(); 1215 this.alreadyIncluded = null; 1216 } 1217 hostQueues.close(); 1218 } 1219 1220 1223 public void crawlPausing(String statusMessage) { 1224 } 1226 1227 1230 public void crawlPaused(String statusMessage) { 1231 } 1233 1234 1237 public void crawlResuming(String statusMessage) { 1238 } 1240 1241 1244 public void crawlCheckpoint(File checkpointDir) throws Exception { 1245 } 1247 1248 1251 public void receive(CandidateURI item) { 1252 System.out.println("Received " + item); 1253 innerSchedule(item); 1254 } 1255 1256 1259 public FrontierGroup getGroup(CrawlURI curi) { 1260 try { 1261 return getHQ(curi); 1262 } catch (IOException ioe) { 1263 throw new RuntimeException (ioe); 1264 } 1265 } 1266 1267 public long averageDepth() { 1268 return hostQueues.getAverageDepth(); 1269 } 1270 1271 public float congestionRatio() { 1272 return hostQueues.getCongestionRatio(); 1273 } 1274 1275 public long deepestUri() { 1276 return hostQueues.getDeepestQueueSize(); 1277 } 1278} 1279 | Popular Tags |