1 23 package org.archive.crawler.frontier; 24 25 import java.io.IOException ; 26 import java.io.PrintWriter ; 27 import java.io.Serializable ; 28 import java.util.ArrayList ; 29 import java.util.Collection ; 30 import java.util.Collections ; 31 import java.util.Date ; 32 import java.util.HashMap ; 33 import java.util.Iterator ; 34 import java.util.Map ; 35 import java.util.SortedSet ; 36 import java.util.Timer ; 37 import java.util.TimerTask ; 38 import java.util.TreeSet ; 39 import java.util.logging.Level ; 40 import java.util.logging.Logger ; 41 42 import org.apache.commons.collections.Bag; 43 import org.apache.commons.collections.BagUtils; 44 import org.apache.commons.collections.bag.HashBag; 45 import org.archive.crawler.datamodel.CandidateURI; 46 import org.archive.crawler.datamodel.CoreAttributeConstants; 47 import org.archive.crawler.datamodel.CrawlURI; 48 import org.archive.crawler.datamodel.FetchStatusCodes; 49 import org.archive.crawler.datamodel.UriUniqFilter; 50 import org.archive.crawler.datamodel.UriUniqFilter.HasUriReceiver; 51 import org.archive.crawler.framework.CrawlController; 52 import org.archive.crawler.framework.Frontier; 53 import org.archive.crawler.framework.exceptions.EndedException; 54 import org.archive.crawler.framework.exceptions.FatalConfigurationException; 55 import org.archive.crawler.settings.SimpleType; 56 import org.archive.crawler.settings.Type; 57 import org.archive.net.UURI; 58 import org.archive.util.ArchiveUtils; 59 60 import com.sleepycat.collections.StoredIterator; 61 62 import java.util.concurrent.BlockingQueue ; 63 import java.util.concurrent.LinkedBlockingQueue ; 64 import java.util.concurrent.TimeUnit ; 65 66 75 public abstract class WorkQueueFrontier extends AbstractFrontier 76 implements FetchStatusCodes, CoreAttributeConstants, HasUriReceiver, 77 Serializable { 78 private static final long serialVersionUID = 570384305871965843L; 79 80 public class WakeTask extends TimerTask { 81 @Override 82 public void run() { 83 synchronized(snoozedClassQueues) { 84 if(this!=nextWake) { 85 return; 87 } 88 wakeQueues(); 89 } 90 } 91 } 92 93 94 private static final int REPORT_MAX_QUEUES = 2000; 95 96 102 private static final int MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY = 3000; 103 104 111 public final static String ATTR_SNOOZE_DEACTIVATE_MS = 112 "snooze-deactivate-ms"; 113 public static Long DEFAULT_SNOOZE_DEACTIVATE_MS = new Long (5*60*1000); 115 private static final Logger logger = 116 Logger.getLogger(WorkQueueFrontier.class.getName()); 117 118 119 public final static String ATTR_HOLD_QUEUES = "hold-queues"; 120 protected final static Boolean DEFAULT_HOLD_QUEUES = new Boolean (true); 121 122 123 public final static String ATTR_BALANCE_REPLENISH_AMOUNT = 124 "balance-replenish-amount"; 125 protected final static Integer DEFAULT_BALANCE_REPLENISH_AMOUNT = 126 new Integer (3000); 127 128 129 public final static String ATTR_ERROR_PENALTY_AMOUNT = 130 "error-penalty-amount"; 131 protected final static Integer DEFAULT_ERROR_PENALTY_AMOUNT = 132 new Integer (100); 133 134 135 136 public final static String ATTR_QUEUE_TOTAL_BUDGET = "queue-total-budget"; 137 protected final static Long DEFAULT_QUEUE_TOTAL_BUDGET = new Long (-1); 138 139 140 public final static String ATTR_COST_POLICY = "cost-policy"; 141 protected final static String DEFAULT_COST_POLICY = 142 UnitCostAssignmentPolicy.class.getName(); 143 144 145 public final static String ATTR_TARGET_READY_QUEUES_BACKLOG = 146 "target-ready-backlog"; 147 protected final static Integer DEFAULT_TARGET_READY_QUEUES_BACKLOG = 148 new Integer (50); 149 150 152 protected transient UriUniqFilter alreadyIncluded; 153 154 156 protected transient Map <String ,WorkQueue> allQueues = null; 157 159 163 protected BlockingQueue <String > readyClassQueues = 164 new LinkedBlockingQueue <String >(); 165 166 167 protected int targetSizeForReadyQueues; 168 169 173 protected BlockingQueue <String > inactiveQueues = 174 new LinkedBlockingQueue <String >(); 175 176 180 protected BlockingQueue <String > retiredQueues = 181 new LinkedBlockingQueue <String >(); 182 183 184 protected Bag inProcessQueues = 185 BagUtils.synchronizedBag(new HashBag()); 187 190 protected SortedSet <WorkQueue> snoozedClassQueues = 191 Collections.synchronizedSortedSet(new TreeSet <WorkQueue>()); 192 193 194 protected transient Timer wakeTimer; 195 196 197 protected transient WakeTask nextWake; 198 199 protected WorkQueue longestActiveQueue = null; 200 201 202 private static final long DEFAULT_WAIT = 1000; 204 205 private transient CostAssignmentPolicy costAssignmentPolicy; 206 207 208 String [] AVAILABLE_COST_POLICIES = new String [] { 209 ZeroCostAssignmentPolicy.class.getName(), 210 UnitCostAssignmentPolicy.class.getName(), 211 WagCostAssignmentPolicy.class.getName(), 212 AntiCalendarCostAssignmentPolicy.class.getName()}; 213 214 220 public WorkQueueFrontier(String name, String description) { 221 super(Frontier.ATTR_NAME, description); 224 Type t = addElementToDefinition(new SimpleType(ATTR_HOLD_QUEUES, 225 "Whether to hold newly-created per-host URI work" + 226 " queues until needed to stay busy. If false (default)," + 227 " all queues may contribute URIs for crawling at all" + 228 " times. If true, queues begin (and collect URIs) in" + 229 " an 'inactive' state, and only when the Frontier needs" + 230 " another queue to keep all ToeThreads busy will new" + 231 " queues be activated.", DEFAULT_HOLD_QUEUES)); 232 t.setExpertSetting(true); 233 t.setOverrideable(false); 234 t = addElementToDefinition(new SimpleType(ATTR_BALANCE_REPLENISH_AMOUNT, 235 "Amount to replenish a queue's activity balance when it becomes " + 236 "active. Larger amounts mean more URIs will be tried from the " + 237 "queue before it is deactivated in favor of waiting queues. " + 238 "Default is 3000", DEFAULT_BALANCE_REPLENISH_AMOUNT)); 239 t.setExpertSetting(true); 240 t.setOverrideable(true); 241 t = addElementToDefinition(new SimpleType(ATTR_ERROR_PENALTY_AMOUNT, 242 "Amount to additionally penalize a queue when one of" + 243 "its URIs fails completely. Accelerates deactivation or " + 244 "full retirement of problem queues and unresponsive sites. " + 245 "Default is 100", DEFAULT_ERROR_PENALTY_AMOUNT)); 246 t.setExpertSetting(true); 247 t.setOverrideable(true); 248 t = addElementToDefinition(new SimpleType(ATTR_QUEUE_TOTAL_BUDGET, 249 "Total activity expenditure allowable to a single queue; queues " + 250 "over this expenditure will be 'retired' and crawled no more. " + 251 "Default of -1 means no ceiling on activity expenditures is " + 252 "enforced.", DEFAULT_QUEUE_TOTAL_BUDGET)); 253 t.setExpertSetting(true); 254 t.setOverrideable(true); 255 256 t = addElementToDefinition(new SimpleType(ATTR_COST_POLICY, 257 "Policy for calculating the cost of each URI attempted. " + 258 "The default UnitCostAssignmentPolicy considers the cost of " + 259 "each URI to be '1'.", DEFAULT_COST_POLICY, AVAILABLE_COST_POLICIES)); 260 t.setExpertSetting(true); 261 262 t = addElementToDefinition(new SimpleType(ATTR_SNOOZE_DEACTIVATE_MS, 263 "Threshold above which any 'snooze' delay will cause the " + 264 "affected queue to go inactive, allowing other queues a " + 265 "chance to rotate into active state. Typically set to be " + 266 "longer than the politeness pauses between successful " + 267 "fetches, but shorter than the connection-failed " + 268 "'retry-delay-seconds'. (Default is 5 minutes.)", 269 DEFAULT_SNOOZE_DEACTIVATE_MS)); 270 t.setExpertSetting(true); 271 t.setOverrideable(false); 272 t = addElementToDefinition(new SimpleType(ATTR_TARGET_READY_QUEUES_BACKLOG, 273 "Target size for backlog of ready queues. This many queues " + 274 "will be brought into 'ready' state even if a thread is " + 275 "not waiting. Only has effect if 'hold-queues' is true. " + 276 "Default is 50.", DEFAULT_TARGET_READY_QUEUES_BACKLOG)); 277 t.setExpertSetting(true); 278 t.setOverrideable(false); 279 } 280 281 286 public void initialize(CrawlController c) 287 throws FatalConfigurationException, IOException { 288 super.initialize(c); 290 this.controller = c; 291 292 this.targetSizeForReadyQueues = (Integer )getUncheckedAttribute(null, 293 ATTR_TARGET_READY_QUEUES_BACKLOG); 294 if (this.targetSizeForReadyQueues < 1) { 295 this.targetSizeForReadyQueues = 1; 296 } 297 this.wakeTimer = new Timer ("waker for " + c.toString()); 298 299 try { 300 if (workQueueDataOnDisk() 301 && queueAssignmentPolicy.maximumNumberOfKeys() >= 0 302 && queueAssignmentPolicy.maximumNumberOfKeys() <= 303 MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) { 304 this.allQueues = Collections.synchronizedMap( 305 new HashMap <String ,WorkQueue>()); 306 } else { 307 this.allQueues = c.getBigMap("allqueues", 308 String .class, WorkQueue.class); 309 if (logger.isLoggable(Level.FINE)) { 310 Iterator i = this.allQueues.keySet().iterator(); 311 try { 312 for (; i.hasNext();) { 313 logger.fine((String ) i.next()); 314 } 315 } finally { 316 StoredIterator.close(i); 317 } 318 } 319 } 320 this.alreadyIncluded = createAlreadyIncluded(); 321 initQueue(); 322 } catch (IOException e) { 323 e.printStackTrace(); 324 throw (FatalConfigurationException) 325 new FatalConfigurationException(e.getMessage()).initCause(e); 326 } catch (Exception e) { 327 e.printStackTrace(); 328 throw (FatalConfigurationException) 329 new FatalConfigurationException(e.getMessage()).initCause(e); 330 } 331 332 initCostPolicy(); 333 334 loadSeeds(); 335 } 336 337 342 private void initCostPolicy() throws FatalConfigurationException { 343 try { 344 costAssignmentPolicy = (CostAssignmentPolicy) Class.forName( 345 (String ) getUncheckedAttribute(null, ATTR_COST_POLICY)) 346 .newInstance(); 347 } catch (Exception e) { 348 e.printStackTrace(); 349 throw new FatalConfigurationException(e.getMessage()); 350 } 351 } 352 353 356 public void crawlEnded(String sExitMessage) { 357 if (this.alreadyIncluded != null) { 360 this.alreadyIncluded.close(); 361 this.alreadyIncluded = null; 362 } 363 364 this.queueAssignmentPolicy = null; 365 366 try { 367 closeQueue(); 368 } catch (IOException e) { 369 e.printStackTrace(); 371 } 372 this.wakeTimer.cancel(); 373 374 this.allQueues.clear(); 375 this.allQueues = null; 376 this.inProcessQueues = null; 377 this.readyClassQueues = null; 378 this.snoozedClassQueues = null; 379 this.inactiveQueues = null; 380 this.retiredQueues = null; 381 382 this.costAssignmentPolicy = null; 383 384 super.crawlEnded(sExitMessage); 386 this.controller = null; 387 } 388 389 396 protected abstract UriUniqFilter createAlreadyIncluded() throws IOException ; 397 398 404 public void schedule(CandidateURI caUri) { 405 String canon = canonicalize(caUri); 408 if (caUri.forceFetch()) { 409 alreadyIncluded.addForce(canon, caUri); 410 } else { 411 alreadyIncluded.add(canon, caUri); 412 } 413 } 414 415 424 public void receive(CandidateURI caUri) { 425 CrawlURI curi = asCrawlUri(caUri); 426 applySpecialHandling(curi); 427 sendToQueue(curi); 428 doJournalAdded(curi); 430 } 431 432 435 protected CrawlURI asCrawlUri(CandidateURI caUri) { 436 CrawlURI curi = super.asCrawlUri(caUri); 437 getCost(curi); 439 return curi; 440 } 441 442 447 protected void sendToQueue(CrawlURI curi) { 448 WorkQueue wq = getQueueFor(curi); 449 synchronized (wq) { 450 wq.enqueue(this, curi); 451 if(!wq.isRetired()) { 452 incrementQueuedUriCount(); 453 } 454 if(!wq.isHeld()) { 455 wq.setHeld(); 456 if(holdQueues() && readyClassQueues.size()>=targetSizeForReadyQueues()) { 457 deactivateQueue(wq); 458 } else { 459 replenishSessionBalance(wq); 460 readyQueue(wq); 461 } 462 } 463 WorkQueue laq = longestActiveQueue; 464 if(!wq.isRetired()&&((laq==null) || wq.getCount() > laq.getCount())) { 465 longestActiveQueue = wq; 466 } 467 } 468 } 469 470 476 private boolean holdQueues() { 477 return ((Boolean ) getUncheckedAttribute(null, ATTR_HOLD_QUEUES)) 478 .booleanValue(); 479 } 480 481 485 private void readyQueue(WorkQueue wq) { 486 try { 487 wq.setActive(this, true); 488 readyClassQueues.put(wq.getClassKey()); 489 } catch (InterruptedException e) { 490 e.printStackTrace(); 491 System.err.println("unable to ready queue "+wq); 492 throw new RuntimeException (e); 494 } 495 } 496 497 501 private void deactivateQueue(WorkQueue wq) { 502 try { 503 wq.setSessionBalance(0); inactiveQueues.put(wq.getClassKey()); 505 wq.setActive(this, false); 506 } catch (InterruptedException e) { 507 e.printStackTrace(); 508 System.err.println("unable to deactivate queue "+wq); 509 throw new RuntimeException (e); 511 } 512 } 513 514 518 private void retireQueue(WorkQueue wq) { 519 try { 520 retiredQueues.put(wq.getClassKey()); 521 decrementQueuedCount(wq.getCount()); 522 wq.setRetired(true); 523 wq.setActive(this, false); 524 } catch (InterruptedException e) { 525 e.printStackTrace(); 526 System.err.println("unable to retire queue "+wq); 527 throw new RuntimeException (e); 529 } 530 } 531 532 537 public void kickUpdate() { 538 super.kickUpdate(); 539 int target = (Integer )getUncheckedAttribute(null, 540 ATTR_TARGET_READY_QUEUES_BACKLOG); 541 if (target < 1) { 542 target = 1; 543 } 544 this.targetSizeForReadyQueues = target; 545 try { 546 initCostPolicy(); 547 } catch (FatalConfigurationException fce) { 548 throw new RuntimeException (fce); 549 } 550 Object key = this.retiredQueues.poll(); 556 while (key != null) { 557 WorkQueue q = (WorkQueue)this.allQueues.get(key); 558 if(q != null) { 559 unretireQueue(q); 560 } 561 key = this.retiredQueues.poll(); 562 } 563 } 564 569 private void unretireQueue(WorkQueue q) { 570 deactivateQueue(q); 571 q.setRetired(false); 572 incrementQueuedUriCount(q.getCount()); 573 } 574 575 583 protected abstract WorkQueue getQueueFor(CrawlURI curi); 584 585 592 protected abstract WorkQueue getQueueFor(String classKey); 593 594 605 public CrawlURI next() 606 throws InterruptedException , EndedException { 607 while (true) { long now = System.currentTimeMillis(); 609 610 preNext(now); 612 613 synchronized(readyClassQueues) { 614 int activationsNeeded = targetSizeForReadyQueues() - readyClassQueues.size(); 615 while(activationsNeeded > 0 && !inactiveQueues.isEmpty()) { 616 activateInactiveQueue(); 617 activationsNeeded--; 618 } 619 } 620 621 WorkQueue readyQ = null; 622 Object key = readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS); 623 if (key != null) { 624 readyQ = (WorkQueue)this.allQueues.get(key); 625 } 626 if (readyQ != null) { 627 while(true) { CrawlURI curi = null; 629 synchronized(readyQ) { 630 curi = readyQ.peek(this); 631 if (curi != null) { 632 String currentQueueKey = getClassKey(curi); 634 if (currentQueueKey.equals(curi.getClassKey())) { 635 noteAboutToEmit(curi, readyQ); 637 inProcessQueues.add(readyQ); 638 return curi; 639 } 640 curi.setClassKey(currentQueueKey); 644 readyQ.dequeue(this); 645 decrementQueuedCount(1); 646 curi.setHolderKey(null); 647 } else { 650 readyQ.clearHeld(); 654 break; 655 } 656 } 657 if(curi!=null) { 658 sendToQueue(curi); 660 } 661 } 662 } else { 663 if (key != null) { 665 logger.severe("Key "+ key + 666 " in readyClassQueues but not allQueues"); 667 } 668 } 669 670 if(shouldTerminate) { 671 throw new EndedException("shouldTerminate is true"); 673 } 674 675 if(inProcessQueues.size()==0) { 676 this.alreadyIncluded.requestFlush(); 679 } 680 } 681 } 682 683 private int targetSizeForReadyQueues() { 684 return targetSizeForReadyQueues; 685 } 686 687 694 private int getCost(CrawlURI curi) { 695 int cost = curi.getHolderCost(); 696 if (cost == CrawlURI.UNCALCULATED) { 697 cost = costAssignmentPolicy.costOf(curi); 698 curi.setHolderCost(cost); 699 } 700 return cost; 701 } 702 703 706 private void activateInactiveQueue() { 707 Object key = this.inactiveQueues.poll(); 708 if (key == null) { 709 return; 710 } 711 WorkQueue candidateQ = (WorkQueue)this.allQueues.get(key); 712 if(candidateQ != null) { 713 synchronized(candidateQ) { 714 replenishSessionBalance(candidateQ); 715 if(candidateQ.isOverBudget()){ 716 retireQueue(candidateQ); 719 return; 720 } 721 long now = System.currentTimeMillis(); 722 long delay_ms = candidateQ.getWakeTime() - now; 723 if(delay_ms>0) { 724 snoozeQueue(candidateQ,now,delay_ms); 726 return; 727 } 728 candidateQ.setWakeTime(0); readyQueue(candidateQ); 730 if (logger.isLoggable(Level.FINE)) { 731 logger.fine("ACTIVATED queue: " + 732 candidateQ.getClassKey()); 733 734 } 735 } 736 } 737 } 738 739 744 private void replenishSessionBalance(WorkQueue queue) { 745 CrawlURI contextUri = queue.peek(this); 747 queue.setSessionBalance(((Integer ) getUncheckedAttribute(contextUri, 749 ATTR_BALANCE_REPLENISH_AMOUNT)).intValue()); 750 long totalBudget = ((Long )getUncheckedAttribute(contextUri,ATTR_QUEUE_TOTAL_BUDGET)).longValue(); 753 queue.setTotalBudget(totalBudget); 754 queue.unpeek(); } 756 757 763 private void reenqueueQueue(WorkQueue wq) { 764 if(wq.isOverBudget()) { 765 if (logger.isLoggable(Level.FINE)) { 767 logger.fine("DEACTIVATED queue: " + 768 wq.getClassKey()); 769 } 770 deactivateQueue(wq); 771 } else { 772 readyQueue(wq); 773 } 774 } 775 776 779 void wakeQueues() { 780 synchronized (snoozedClassQueues) { 781 long now = System.currentTimeMillis(); 782 long nextWakeDelay = 0; 783 int wokenQueuesCount = 0; 784 while (true) { 785 if (snoozedClassQueues.isEmpty()) { 786 return; 787 } 788 WorkQueue peek = (WorkQueue) snoozedClassQueues.first(); 789 nextWakeDelay = peek.getWakeTime() - now; 790 if (nextWakeDelay <= 0) { 791 snoozedClassQueues.remove(peek); 792 peek.setWakeTime(0); 793 reenqueueQueue(peek); 794 wokenQueuesCount++; 795 } else { 796 break; 797 } 798 } 799 this.nextWake = new WakeTask(); 800 this.wakeTimer.schedule(nextWake,nextWakeDelay); 801 } 802 } 803 804 815 public void finished(CrawlURI curi) { 816 long now = System.currentTimeMillis(); 817 818 curi.incrementFetchAttempts(); 819 logLocalizedErrors(curi); 820 WorkQueue wq = (WorkQueue) curi.getHolder(); 821 assert (wq.peek(this) == curi) : "unexpected peek " + wq; 822 inProcessQueues.remove(wq, 1); 823 824 if(includesRetireDirective(curi)) { 825 curi.processingCleanup(); 827 wq.unpeek(); 828 wq.update(this, curi); retireQueue(wq); 830 return; 831 } 832 833 if (needsRetrying(curi)) { 834 if(curi.getFetchStatus()!=S_DEFERRED) { 836 wq.expend(getCost(curi)); } 838 long delay_sec = retryDelayFor(curi); 839 curi.processingCleanup(); synchronized(wq) { 841 wq.unpeek(); 842 wq.update(this, curi); if (delay_sec > 0) { 845 long delay_ms = delay_sec * 1000; 846 snoozeQueue(wq, now, delay_ms); 847 } else { 848 reenqueueQueue(wq); 849 } 850 } 851 controller.fireCrawledURINeedRetryEvent(curi); 853 doJournalRescheduled(curi); 854 return; 855 } 856 857 wq.dequeue(this); 859 decrementQueuedCount(1); 860 log(curi); 861 862 if (curi.isSuccess()) { 863 totalProcessedBytes += curi.getContentSize(); 864 incrementSucceededFetchCount(); 865 controller.fireCrawledURISuccessfulEvent(curi); 867 doJournalFinishedSuccess(curi); 868 wq.expend(getCost(curi)); } else if (isDisregarded(curi)) { 870 incrementDisregardedUriCount(); 873 controller.fireCrawledURIDisregardEvent(curi); 875 if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) { 877 Object [] array = { curi }; 878 controller.runtimeErrors.log(Level.WARNING, curi.getUURI() 879 .toString(), array); 880 } 881 } else { 883 this.controller.fireCrawledURIFailureEvent(curi); 886 if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) { 888 Object [] array = { curi }; 889 this.controller.runtimeErrors.log(Level.WARNING, curi.getUURI() 890 .toString(), array); 891 } 892 incrementFailedFetchCount(); 893 wq.noteError(((Integer ) getUncheckedAttribute(curi, 895 ATTR_ERROR_PENALTY_AMOUNT)).intValue()); 896 doJournalFinishedFailure(curi); 897 wq.expend(getCost(curi)); } 899 900 long delay_ms = politenessDelayFor(curi); 901 synchronized(wq) { 902 if (delay_ms > 0) { 903 snoozeQueue(wq,now,delay_ms); 904 } else { 905 reenqueueQueue(wq); 906 } 907 } 908 909 curi.stripToMinimal(); 910 curi.processingCleanup(); 911 912 } 913 914 private boolean includesRetireDirective(CrawlURI curi) { 915 return curi.containsKey(A_FORCE_RETIRE) && (Boolean )curi.getObject(A_FORCE_RETIRE); 916 } 917 918 926 private void snoozeQueue(WorkQueue wq, long now, long delay_ms) { 927 long nextTime = now + delay_ms; 928 wq.setWakeTime(nextTime); 929 long snoozeToInactiveDelayMs = ((Long )getUncheckedAttribute(null, 930 ATTR_SNOOZE_DEACTIVATE_MS)).longValue(); 931 if (delay_ms > snoozeToInactiveDelayMs && !inactiveQueues.isEmpty()) { 932 deactivateQueue(wq); 933 } else { 934 synchronized(snoozedClassQueues) { 935 snoozedClassQueues.add(wq); 936 if(wq == snoozedClassQueues.first()) { 937 this.nextWake = new WakeTask(); 938 this.wakeTimer.schedule(nextWake, delay_ms); 939 } 940 } 941 } 942 } 943 944 951 protected void forget(CrawlURI curi) { 952 logger.finer("Forgetting " + curi); 953 alreadyIncluded.forget(canonicalize(curi.getUURI()), curi); 954 } 955 956 959 public long discoveredUriCount() { 960 return (this.alreadyIncluded != null)? this.alreadyIncluded.count(): 0; 961 } 962 963 967 public long deleteURIs(String match) { 968 long count = 0; 969 Iterator iter = allQueues.keySet().iterator(); 971 while(iter.hasNext()) { 972 WorkQueue wq = getQueueFor(((String )iter.next())); 973 wq.unpeek(); 974 count += wq.deleteMatching(this, match); 975 } 976 decrementQueuedCount(count); 977 return count; 978 } 979 980 984 public static String STANDARD_REPORT = "standard"; 985 public static String ALL_NONEMPTY = "nonempty"; 986 public static String ALL_QUEUES = "all"; 987 protected static String [] REPORTS = {STANDARD_REPORT,ALL_NONEMPTY,ALL_QUEUES}; 988 989 public String [] getReports() { 990 return REPORTS; 991 } 992 993 996 public void singleLineReportTo(PrintWriter w) { 997 if (this.allQueues == null) { 998 return; 999 } 1000 int allCount = allQueues.size(); 1001 int inProcessCount = inProcessQueues.uniqueSet().size(); 1002 int readyCount = readyClassQueues.size(); 1003 int snoozedCount = snoozedClassQueues.size(); 1004 int activeCount = inProcessCount + readyCount + snoozedCount; 1005 int inactiveCount = inactiveQueues.size(); 1006 int retiredCount = retiredQueues.size(); 1007 int exhaustedCount = 1008 allCount - activeCount - inactiveCount - retiredCount; 1009 w.print(allCount); 1010 w.print(" queues: "); 1011 w.print(activeCount); 1012 w.print(" active ("); 1013 w.print(inProcessCount); 1014 w.print(" in-process; "); 1015 w.print(readyCount); 1016 w.print(" ready; "); 1017 w.print(snoozedCount); 1018 w.print(" snoozed); "); 1019 w.print(inactiveCount); 1020 w.print(" inactive; "); 1021 w.print(retiredCount); 1022 w.print(" retired; "); 1023 w.print(exhaustedCount); 1024 w.print(" exhausted"); 1025 w.flush(); 1026 } 1027 1028 1031 public String singleLineLegend() { 1032 return "total active in-process ready snoozed inactive retired exhausted"; 1033 } 1034 1035 1041 public synchronized void reportTo(String name, PrintWriter writer) { 1042 if(ALL_NONEMPTY.equals(name)) { 1043 allNonemptyReportTo(writer); 1044 return; 1045 } 1046 if(ALL_QUEUES.equals(name)) { 1047 allQueuesReportTo(writer); 1048 return; 1049 } 1050 if(name!=null && !STANDARD_REPORT.equals(name)) { 1051 writer.print(name); 1052 writer.print(" unavailable; standard report:\n"); 1053 } 1054 standardReportTo(writer); 1055 } 1056 1057 1061 private void allNonemptyReportTo(PrintWriter writer) { 1062 ArrayList <WorkQueue> inProcessQueuesCopy; 1063 synchronized(this.inProcessQueues) { 1064 @SuppressWarnings ("unchecked") 1066 Collection <WorkQueue> inProcess = this.inProcessQueues; 1067 inProcessQueuesCopy = new ArrayList <WorkQueue>(inProcess); 1068 } 1069 writer.print("\n -----===== IN-PROCESS QUEUES =====-----\n"); 1070 queueSingleLinesTo(writer, inProcessQueuesCopy.iterator()); 1071 1072 writer.print("\n -----===== READY QUEUES =====-----\n"); 1073 queueSingleLinesTo(writer, this.readyClassQueues.iterator()); 1074 1075 writer.print("\n -----===== SNOOZED QUEUES =====-----\n"); 1076 queueSingleLinesTo(writer, this.snoozedClassQueues.iterator()); 1077 1078 writer.print("\n -----===== INACTIVE QUEUES =====-----\n"); 1079 queueSingleLinesTo(writer, this.inactiveQueues.iterator()); 1080 1081 writer.print("\n -----===== RETIRED QUEUES =====-----\n"); 1082 queueSingleLinesTo(writer, this.retiredQueues.iterator()); 1083 } 1084 1085 1089 private void allQueuesReportTo(PrintWriter writer) { 1090 queueSingleLinesTo(writer, allQueues.keySet().iterator()); 1091 } 1092 1093 1100 private void queueSingleLinesTo(PrintWriter writer, Iterator iterator) { 1101 Object obj; 1102 WorkQueue q; 1103 boolean legendWritten = false; 1104 while( iterator.hasNext()) { 1105 obj = iterator.next(); 1106 if (obj == null) { 1107 continue; 1108 } 1109 q = (obj instanceof WorkQueue)? 1110 (WorkQueue)obj: 1111 (WorkQueue)this.allQueues.get(obj); 1112 if(q == null) { 1113 writer.print(" ERROR: "+obj); 1114 } 1115 if(!legendWritten) { 1116 writer.println(q.singleLineLegend()); 1117 legendWritten = true; 1118 } 1119 q.singleLineReportTo(writer); 1120 } 1121 } 1122 1123 1126 private void standardReportTo(PrintWriter w) { 1127 int allCount = allQueues.size(); 1128 int inProcessCount = inProcessQueues.uniqueSet().size(); 1129 int readyCount = readyClassQueues.size(); 1130 int snoozedCount = snoozedClassQueues.size(); 1131 int activeCount = inProcessCount + readyCount + snoozedCount; 1132 int inactiveCount = inactiveQueues.size(); 1133 int retiredCount = retiredQueues.size(); 1134 int exhaustedCount = 1135 allCount - activeCount - inactiveCount - retiredCount; 1136 1137 w.print("Frontier report - "); 1138 w.print(ArchiveUtils.TIMESTAMP12.format(new Date ())); 1139 w.print("\n"); 1140 w.print(" Job being crawled: "); 1141 w.print(controller.getOrder().getCrawlOrderName()); 1142 w.print("\n"); 1143 w.print("\n -----===== STATS =====-----\n"); 1144 w.print(" Discovered: "); 1145 w.print(Long.toString(discoveredUriCount())); 1146 w.print("\n"); 1147 w.print(" Queued: "); 1148 w.print(Long.toString(queuedUriCount())); 1149 w.print("\n"); 1150 w.print(" Finished: "); 1151 w.print(Long.toString(finishedUriCount())); 1152 w.print("\n"); 1153 w.print(" Successfully: "); 1154 w.print(Long.toString(succeededFetchCount())); 1155 w.print("\n"); 1156 w.print(" Failed: "); 1157 w.print(Long.toString(failedFetchCount())); 1158 w.print("\n"); 1159 w.print(" Disregarded: "); 1160 w.print(Long.toString(disregardedUriCount())); 1161 w.print("\n"); 1162 w.print("\n -----===== QUEUES =====-----\n"); 1163 w.print(" Already included size: "); 1164 w.print(Long.toString(alreadyIncluded.count())); 1165 w.print("\n"); 1166 w.print(" pending: "); 1167 w.print(Long.toString(alreadyIncluded.pending())); 1168 w.print("\n"); 1169 w.print("\n All class queues map size: "); 1170 w.print(Long.toString(allCount)); 1171 w.print("\n"); 1172 w.print( " Active queues: "); 1173 w.print(activeCount); 1174 w.print("\n"); 1175 w.print(" In-process: "); 1176 w.print(inProcessCount); 1177 w.print("\n"); 1178 w.print(" Ready: "); 1179 w.print(readyCount); 1180 w.print("\n"); 1181 w.print(" Snoozed: "); 1182 w.print(snoozedCount); 1183 w.print("\n"); 1184 w.print(" Inactive queues: "); 1185 w.print(inactiveCount); 1186 w.print("\n"); 1187 w.print(" Retired queues: "); 1188 w.print(retiredCount); 1189 w.print("\n"); 1190 w.print(" Exhausted queues: "); 1191 w.print(exhaustedCount); 1192 w.print("\n"); 1193 1194 w.print("\n -----===== IN-PROCESS QUEUES =====-----\n"); 1195 @SuppressWarnings ("unchecked") 1196 Collection <WorkQueue> inProcess = inProcessQueues; 1197 ArrayList <WorkQueue> copy = extractSome(inProcess, REPORT_MAX_QUEUES); 1198 appendQueueReports(w, copy.iterator(), copy.size(), REPORT_MAX_QUEUES); 1199 1200 w.print("\n -----===== READY QUEUES =====-----\n"); 1201 appendQueueReports(w, this.readyClassQueues.iterator(), 1202 this.readyClassQueues.size(), REPORT_MAX_QUEUES); 1203 1204 w.print("\n -----===== SNOOZED QUEUES =====-----\n"); 1205 copy = extractSome(snoozedClassQueues, REPORT_MAX_QUEUES); 1206 appendQueueReports(w, copy.iterator(), copy.size(), REPORT_MAX_QUEUES); 1207 1208 WorkQueue longest = longestActiveQueue; 1209 if (longest != null) { 1210 w.print("\n -----===== LONGEST QUEUE =====-----\n"); 1211 longest.reportTo(w); 1212 } 1213 1214 w.print("\n -----===== INACTIVE QUEUES =====-----\n"); 1215 appendQueueReports(w, this.inactiveQueues.iterator(), 1216 this.inactiveQueues.size(), REPORT_MAX_QUEUES); 1217 1218 w.print("\n -----===== RETIRED QUEUES =====-----\n"); 1219 appendQueueReports(w, this.retiredQueues.iterator(), 1220 this.retiredQueues.size(), REPORT_MAX_QUEUES); 1221 1222 w.flush(); 1223 } 1224 1225 1226 1236 private static <T> ArrayList <T> extractSome(Collection <T> c, int max) { 1237 int initial = Math.min(c.size() + 10, max); 1241 int count = 0; 1242 ArrayList <T> list = new ArrayList <T>(initial); 1243 synchronized (c) { 1244 Iterator <T> iter = c.iterator(); 1245 while (iter.hasNext() && (count < max)) { 1246 list.add(iter.next()); 1247 count++; 1248 } 1249 } 1250 return list; 1251 } 1252 1253 1260 protected void appendQueueReports(PrintWriter w, Iterator iterator, 1261 int total, int max) { 1262 Object obj; 1263 WorkQueue q; 1264 for(int count = 0; iterator.hasNext() && (count < max); count++) { 1265 obj = iterator.next(); 1266 if (obj == null) { 1267 continue; 1268 } 1269 q = (obj instanceof WorkQueue)? 1270 (WorkQueue)obj: 1271 (WorkQueue)this.allQueues.get(obj); 1272 if(q == null) { 1273 w.print("WARNING: No report for queue "+obj); 1274 } 1275 q.reportTo(w); 1276 } 1277 if(total > max) { 1278 w.print("...and " + (total - max) + " more.\n"); 1279 } 1280 } 1281 1282 1287 public synchronized void deleted(CrawlURI curi) { 1288 controller.fireCrawledURIDisregardEvent(curi); 1290 log(curi); 1291 incrementDisregardedUriCount(); 1292 curi.stripToMinimal(); 1293 curi.processingCleanup(); 1294 } 1295 1296 public void considerIncluded(UURI u) { 1297 this.alreadyIncluded.note(canonicalize(u)); 1298 CrawlURI temp = new CrawlURI(u); 1299 temp.setClassKey(getClassKey(temp)); 1300 getQueueFor(temp).expend(getCost(temp)); 1301 } 1302 1303 protected abstract void initQueue() throws IOException ; 1304 protected abstract void closeQueue() throws IOException ; 1305 1306 1313 protected abstract boolean workQueueDataOnDisk(); 1314 1315 1316 public FrontierGroup getGroup(CrawlURI curi) { 1317 return getQueueFor(curi); 1318 } 1319 1320 1321 public long averageDepth() { 1322 int inProcessCount = inProcessQueues.uniqueSet().size(); 1323 int readyCount = readyClassQueues.size(); 1324 int snoozedCount = snoozedClassQueues.size(); 1325 int activeCount = inProcessCount + readyCount + snoozedCount; 1326 int inactiveCount = inactiveQueues.size(); 1327 int totalQueueCount = (activeCount+inactiveCount); 1328 return (totalQueueCount == 0) ? 0 : queuedUriCount / totalQueueCount; 1329 } 1330 public float congestionRatio() { 1331 int inProcessCount = inProcessQueues.uniqueSet().size(); 1332 int readyCount = readyClassQueues.size(); 1333 int snoozedCount = snoozedClassQueues.size(); 1334 int activeCount = inProcessCount + readyCount + snoozedCount; 1335 int inactiveCount = inactiveQueues.size(); 1336 return (float)(activeCount + inactiveCount) / (inProcessCount + snoozedCount); 1337 } 1338 public long deepestUri() { 1339 return longestActiveQueue==null ? -1 : longestActiveQueue.getCount(); 1340 } 1341 1342 1343 1346 public synchronized boolean isEmpty() { 1347 return queuedUriCount == 0 && alreadyIncluded.pending() == 0; 1348 } 1349} 1350 1351 | Popular Tags |