KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > archive > crawler > frontier > WorkQueueFrontier


1 /* $Id: WorkQueueFrontier.java,v 1.48.2.1 2007/01/13 01:31:23 stack-sf Exp $
2  * Created on Sep 24, 2004
3  *
4  * Copyright (C) 2004 Internet Archive.
5  *
6  * This file is part of the Heritrix web crawler (crawler.archive.org).
7  *
8  * Heritrix is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser Public License as published by
10  * the Free Software Foundation; either version 2.1 of the License, or
11  * any later version.
12  *
13  * Heritrix is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU Lesser Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser Public License
19  * along with Heritrix; if not, write to the Free Software
20  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21  *
22   */

23 package org.archive.crawler.frontier;
24
25 import java.io.IOException JavaDoc;
26 import java.io.PrintWriter JavaDoc;
27 import java.io.Serializable JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.Collection JavaDoc;
30 import java.util.Collections JavaDoc;
31 import java.util.Date JavaDoc;
32 import java.util.HashMap JavaDoc;
33 import java.util.Iterator JavaDoc;
34 import java.util.Map JavaDoc;
35 import java.util.SortedSet JavaDoc;
36 import java.util.Timer JavaDoc;
37 import java.util.TimerTask JavaDoc;
38 import java.util.TreeSet JavaDoc;
39 import java.util.logging.Level JavaDoc;
40 import java.util.logging.Logger JavaDoc;
41
42 import org.apache.commons.collections.Bag;
43 import org.apache.commons.collections.BagUtils;
44 import org.apache.commons.collections.bag.HashBag;
45 import org.archive.crawler.datamodel.CandidateURI;
46 import org.archive.crawler.datamodel.CoreAttributeConstants;
47 import org.archive.crawler.datamodel.CrawlURI;
48 import org.archive.crawler.datamodel.FetchStatusCodes;
49 import org.archive.crawler.datamodel.UriUniqFilter;
50 import org.archive.crawler.datamodel.UriUniqFilter.HasUriReceiver;
51 import org.archive.crawler.framework.CrawlController;
52 import org.archive.crawler.framework.Frontier;
53 import org.archive.crawler.framework.exceptions.EndedException;
54 import org.archive.crawler.framework.exceptions.FatalConfigurationException;
55 import org.archive.crawler.settings.SimpleType;
56 import org.archive.crawler.settings.Type;
57 import org.archive.net.UURI;
58 import org.archive.util.ArchiveUtils;
59
60 import com.sleepycat.collections.StoredIterator;
61
62 import java.util.concurrent.BlockingQueue JavaDoc;
63 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
64 import java.util.concurrent.TimeUnit JavaDoc;
65
66 /**
67  * A common Frontier base using several queues to hold pending URIs.
68  *
69  * Uses in-memory map of all known 'queues' inside a single database.
70  * Round-robins between all queues.
71  *
72  * @author Gordon Mohr
73  * @author Christian Kohlschuetter
74  */

75 public abstract class WorkQueueFrontier extends AbstractFrontier
76 implements FetchStatusCodes, CoreAttributeConstants, HasUriReceiver,
77         Serializable JavaDoc {
78     private static final long serialVersionUID = 570384305871965843L;
79     
80     public class WakeTask extends TimerTask JavaDoc {
81         @Override JavaDoc
82         public void run() {
83             synchronized(snoozedClassQueues) {
84                 if(this!=nextWake) {
85                     // an intervening waketask was made
86
return;
87                 }
88                 wakeQueues();
89             }
90         }
91     }
92
93     /** truncate reporting of queues at some large but not unbounded number */
94     private static final int REPORT_MAX_QUEUES = 2000;
95     
96     /**
97      * If we know that only a small amount of queues is held in memory,
98      * we can avoid using a disk-based BigMap.
99      * This only works efficiently if the WorkQueue does not hold its
100      * entries in memory as well.
101      */

102     private static final int MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY = 3000;
103
104     /**
105      * When a snooze target for a queue is longer than this amount, and
106      * there are already ready queues, deactivate rather than snooze
107      * the current queue -- so other more responsive sites get a chance
108      * in active rotation. (As a result, queue's next try may be much
109      * further in the future than the snooze target delay.)
110      */

111     public final static String JavaDoc ATTR_SNOOZE_DEACTIVATE_MS =
112         "snooze-deactivate-ms";
113     public static Long JavaDoc DEFAULT_SNOOZE_DEACTIVATE_MS = new Long JavaDoc(5*60*1000); // 5 minutes
114

115     private static final Logger JavaDoc logger =
116         Logger.getLogger(WorkQueueFrontier.class.getName());
117     
118     /** whether to hold queues INACTIVE until needed for throughput */
119     public final static String JavaDoc ATTR_HOLD_QUEUES = "hold-queues";
120     protected final static Boolean JavaDoc DEFAULT_HOLD_QUEUES = new Boolean JavaDoc(true);
121
122     /** amount to replenish budget on each activation (duty cycle) */
123     public final static String JavaDoc ATTR_BALANCE_REPLENISH_AMOUNT =
124         "balance-replenish-amount";
125     protected final static Integer JavaDoc DEFAULT_BALANCE_REPLENISH_AMOUNT =
126         new Integer JavaDoc(3000);
127     
128     /** whether to hold queues INACTIVE until needed for throughput */
129     public final static String JavaDoc ATTR_ERROR_PENALTY_AMOUNT =
130         "error-penalty-amount";
131     protected final static Integer JavaDoc DEFAULT_ERROR_PENALTY_AMOUNT =
132         new Integer JavaDoc(100);
133
134
135     /** total expenditure to allow a queue before 'retiring' it */
136     public final static String JavaDoc ATTR_QUEUE_TOTAL_BUDGET = "queue-total-budget";
137     protected final static Long JavaDoc DEFAULT_QUEUE_TOTAL_BUDGET = new Long JavaDoc(-1);
138
139     /** cost assignment policy to use (by class name) */
140     public final static String JavaDoc ATTR_COST_POLICY = "cost-policy";
141     protected final static String JavaDoc DEFAULT_COST_POLICY =
142         UnitCostAssignmentPolicy.class.getName();
143
144     /** target size of ready queues backlog */
145     public final static String JavaDoc ATTR_TARGET_READY_QUEUES_BACKLOG =
146         "target-ready-backlog";
147     protected final static Integer JavaDoc DEFAULT_TARGET_READY_QUEUES_BACKLOG =
148         new Integer JavaDoc(50);
149     
150     /** those UURIs which are already in-process (or processed), and
151      thus should not be rescheduled */

152     protected transient UriUniqFilter alreadyIncluded;
153
154     /** All known queues.
155      */

156     protected transient Map JavaDoc<String JavaDoc,WorkQueue> allQueues = null;
157     // of classKey -> ClassKeyQueue
158

159     /**
160      * All per-class queues whose first item may be handed out.
161      * Linked-list of keys for the queues.
162      */

163     protected BlockingQueue JavaDoc<String JavaDoc> readyClassQueues =
164         new LinkedBlockingQueue JavaDoc<String JavaDoc>();
165     
166     /** Target (minimum) size to keep readyClassQueues */
167     protected int targetSizeForReadyQueues;
168     
169     /**
170      * All 'inactive' queues, not yet in active rotation.
171      * Linked-list of keys for the queues.
172      */

173     protected BlockingQueue JavaDoc<String JavaDoc> inactiveQueues =
174         new LinkedBlockingQueue JavaDoc<String JavaDoc>();
175
176     /**
177      * 'retired' queues, no longer considered for activation.
178      * Linked-list of keys for queues.
179      */

180     protected BlockingQueue JavaDoc<String JavaDoc> retiredQueues =
181         new LinkedBlockingQueue JavaDoc<String JavaDoc>();
182     
183     /** all per-class queues from whom a URI is outstanding */
184     protected Bag inProcessQueues =
185         BagUtils.synchronizedBag(new HashBag()); // of ClassKeyQueue
186

187     /**
188      * All per-class queues held in snoozed state, sorted by wake time.
189      */

190     protected SortedSet JavaDoc<WorkQueue> snoozedClassQueues =
191         Collections.synchronizedSortedSet(new TreeSet JavaDoc<WorkQueue>());
192     
193     /** Timer for tasks which wake head item of snoozedClassQueues */
194     protected transient Timer JavaDoc wakeTimer;
195     
196     /** Task for next wake */
197     protected transient WakeTask nextWake;
198     
199     protected WorkQueue longestActiveQueue = null;
200     
201     /** how long to wait for a ready queue when there's nothing snoozed */
202     private static final long DEFAULT_WAIT = 1000; // 1 second
203

204     /** a policy for assigning 'cost' values to CrawlURIs */
205     private transient CostAssignmentPolicy costAssignmentPolicy;
206     
207     /** all policies available to be chosen */
208     String JavaDoc[] AVAILABLE_COST_POLICIES = new String JavaDoc[] {
209             ZeroCostAssignmentPolicy.class.getName(),
210             UnitCostAssignmentPolicy.class.getName(),
211             WagCostAssignmentPolicy.class.getName(),
212             AntiCalendarCostAssignmentPolicy.class.getName()};
213
214     /**
215      * Create the CommonFrontier
216      *
217      * @param name
218      * @param description
219      */

220     public WorkQueueFrontier(String JavaDoc name, String JavaDoc description) {
221         // The 'name' of all frontiers should be the same (URIFrontier.ATTR_NAME)
222
// therefore we'll ignore the supplied parameter.
223
super(Frontier.ATTR_NAME, description);
224         Type t = addElementToDefinition(new SimpleType(ATTR_HOLD_QUEUES,
225             "Whether to hold newly-created per-host URI work" +
226             " queues until needed to stay busy. If false (default)," +
227             " all queues may contribute URIs for crawling at all" +
228             " times. If true, queues begin (and collect URIs) in" +
229             " an 'inactive' state, and only when the Frontier needs" +
230             " another queue to keep all ToeThreads busy will new" +
231             " queues be activated.", DEFAULT_HOLD_QUEUES));
232         t.setExpertSetting(true);
233         t.setOverrideable(false);
234         t = addElementToDefinition(new SimpleType(ATTR_BALANCE_REPLENISH_AMOUNT,
235             "Amount to replenish a queue's activity balance when it becomes " +
236             "active. Larger amounts mean more URIs will be tried from the " +
237             "queue before it is deactivated in favor of waiting queues. " +
238             "Default is 3000", DEFAULT_BALANCE_REPLENISH_AMOUNT));
239         t.setExpertSetting(true);
240         t.setOverrideable(true);
241         t = addElementToDefinition(new SimpleType(ATTR_ERROR_PENALTY_AMOUNT,
242                 "Amount to additionally penalize a queue when one of" +
243                 "its URIs fails completely. Accelerates deactivation or " +
244                 "full retirement of problem queues and unresponsive sites. " +
245                 "Default is 100", DEFAULT_ERROR_PENALTY_AMOUNT));
246         t.setExpertSetting(true);
247         t.setOverrideable(true);
248         t = addElementToDefinition(new SimpleType(ATTR_QUEUE_TOTAL_BUDGET,
249             "Total activity expenditure allowable to a single queue; queues " +
250             "over this expenditure will be 'retired' and crawled no more. " +
251             "Default of -1 means no ceiling on activity expenditures is " +
252             "enforced.", DEFAULT_QUEUE_TOTAL_BUDGET));
253         t.setExpertSetting(true);
254         t.setOverrideable(true);
255
256         t = addElementToDefinition(new SimpleType(ATTR_COST_POLICY,
257                 "Policy for calculating the cost of each URI attempted. " +
258                 "The default UnitCostAssignmentPolicy considers the cost of " +
259                 "each URI to be '1'.", DEFAULT_COST_POLICY, AVAILABLE_COST_POLICIES));
260         t.setExpertSetting(true);
261         
262         t = addElementToDefinition(new SimpleType(ATTR_SNOOZE_DEACTIVATE_MS,
263                 "Threshold above which any 'snooze' delay will cause the " +
264                 "affected queue to go inactive, allowing other queues a " +
265                 "chance to rotate into active state. Typically set to be " +
266                 "longer than the politeness pauses between successful " +
267                 "fetches, but shorter than the connection-failed " +
268                 "'retry-delay-seconds'. (Default is 5 minutes.)",
269                 DEFAULT_SNOOZE_DEACTIVATE_MS));
270         t.setExpertSetting(true);
271         t.setOverrideable(false);
272         t = addElementToDefinition(new SimpleType(ATTR_TARGET_READY_QUEUES_BACKLOG,
273                 "Target size for backlog of ready queues. This many queues " +
274                 "will be brought into 'ready' state even if a thread is " +
275                 "not waiting. Only has effect if 'hold-queues' is true. " +
276                 "Default is 50.", DEFAULT_TARGET_READY_QUEUES_BACKLOG));
277         t.setExpertSetting(true);
278         t.setOverrideable(false);
279     }
280
281     /**
282      * Initializes the Frontier, given the supplied CrawlController.
283      *
284      * @see org.archive.crawler.framework.Frontier#initialize(org.archive.crawler.framework.CrawlController)
285      */

286     public void initialize(CrawlController c)
287             throws FatalConfigurationException, IOException JavaDoc {
288         // Call the super method. It sets up frontier journalling.
289
super.initialize(c);
290         this.controller = c;
291         
292         this.targetSizeForReadyQueues = (Integer JavaDoc)getUncheckedAttribute(null,
293             ATTR_TARGET_READY_QUEUES_BACKLOG);
294         if (this.targetSizeForReadyQueues < 1) {
295             this.targetSizeForReadyQueues = 1;
296         }
297         this.wakeTimer = new Timer JavaDoc("waker for " + c.toString());
298         
299         try {
300             if (workQueueDataOnDisk()
301                     && queueAssignmentPolicy.maximumNumberOfKeys() >= 0
302                     && queueAssignmentPolicy.maximumNumberOfKeys() <=
303                         MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) {
304                 this.allQueues = Collections.synchronizedMap(
305                         new HashMap JavaDoc<String JavaDoc,WorkQueue>());
306             } else {
307                 this.allQueues = c.getBigMap("allqueues",
308                         String JavaDoc.class, WorkQueue.class);
309                 if (logger.isLoggable(Level.FINE)) {
310                     Iterator JavaDoc i = this.allQueues.keySet().iterator();
311                     try {
312                         for (; i.hasNext();) {
313                             logger.fine((String JavaDoc) i.next());
314                         }
315                     } finally {
316                         StoredIterator.close(i);
317                     }
318                 }
319             }
320             this.alreadyIncluded = createAlreadyIncluded();
321             initQueue();
322         } catch (IOException JavaDoc e) {
323             e.printStackTrace();
324             throw (FatalConfigurationException)
325                 new FatalConfigurationException(e.getMessage()).initCause(e);
326         } catch (Exception JavaDoc e) {
327             e.printStackTrace();
328             throw (FatalConfigurationException)
329                 new FatalConfigurationException(e.getMessage()).initCause(e);
330         }
331         
332         initCostPolicy();
333         
334         loadSeeds();
335     }
336     
337     /**
338      * Set (or reset after configuration change) the cost policy in effect.
339      *
340      * @throws FatalConfigurationException
341      */

342     private void initCostPolicy() throws FatalConfigurationException {
343         try {
344             costAssignmentPolicy = (CostAssignmentPolicy) Class.forName(
345                     (String JavaDoc) getUncheckedAttribute(null, ATTR_COST_POLICY))
346                     .newInstance();
347         } catch (Exception JavaDoc e) {
348             e.printStackTrace();
349             throw new FatalConfigurationException(e.getMessage());
350         }
351     }
352
353     /* (non-Javadoc)
354      * @see org.archive.crawler.frontier.AbstractFrontier#crawlEnded(java.lang.String)
355      */

356     public void crawlEnded(String JavaDoc sExitMessage) {
357         // Cleanup. CrawlJobs persist after crawl has finished so undo any
358
// references.
359
if (this.alreadyIncluded != null) {
360             this.alreadyIncluded.close();
361             this.alreadyIncluded = null;
362         }
363
364         this.queueAssignmentPolicy = null;
365         
366         try {
367             closeQueue();
368         } catch (IOException JavaDoc e) {
369             // FIXME exception handling
370
e.printStackTrace();
371         }
372         this.wakeTimer.cancel();
373         
374         this.allQueues.clear();
375         this.allQueues = null;
376         this.inProcessQueues = null;
377         this.readyClassQueues = null;
378         this.snoozedClassQueues = null;
379         this.inactiveQueues = null;
380         this.retiredQueues = null;
381         
382         this.costAssignmentPolicy = null;
383         
384         // Clearing controller is a problem. We get NPEs in #preNext.
385
super.crawlEnded(sExitMessage);
386         this.controller = null;
387     }
388
389     /**
390      * Create a UriUniqFilter that will serve as record
391      * of already seen URIs.
392      *
393      * @return A UURISet that will serve as a record of already seen URIs
394      * @throws IOException
395      */

396     protected abstract UriUniqFilter createAlreadyIncluded() throws IOException JavaDoc;
397
398     /**
399      * Arrange for the given CandidateURI to be visited, if it is not
400      * already scheduled/completed.
401      *
402      * @see org.archive.crawler.framework.Frontier#schedule(org.archive.crawler.datamodel.CandidateURI)
403      */

404     public void schedule(CandidateURI caUri) {
405         // Canonicalization may set forceFetch flag. See
406
// #canonicalization(CandidateURI) javadoc for circumstance.
407
String JavaDoc canon = canonicalize(caUri);
408         if (caUri.forceFetch()) {
409             alreadyIncluded.addForce(canon, caUri);
410         } else {
411             alreadyIncluded.add(canon, caUri);
412         }
413     }
414
415     /**
416      * Accept the given CandidateURI for scheduling, as it has
417      * passed the alreadyIncluded filter.
418      *
419      * Choose a per-classKey queue and enqueue it. If this
420      * item has made an unready queue ready, place that
421      * queue on the readyClassQueues queue.
422      * @param caUri CandidateURI.
423      */

424     public void receive(CandidateURI caUri) {
425         CrawlURI curi = asCrawlUri(caUri);
426         applySpecialHandling(curi);
427         sendToQueue(curi);
428         // Update recovery log.
429
doJournalAdded(curi);
430     }
431
432     /* (non-Javadoc)
433      * @see org.archive.crawler.frontier.AbstractFrontier#asCrawlUri(org.archive.crawler.datamodel.CandidateURI)
434      */

435     protected CrawlURI asCrawlUri(CandidateURI caUri) {
436         CrawlURI curi = super.asCrawlUri(caUri);
437         // force cost to be calculated, pre-insert
438
getCost(curi);
439         return curi;
440     }
441     
442     /**
443      * Send a CrawlURI to the appropriate subqueue.
444      *
445      * @param curi
446      */

447     protected void sendToQueue(CrawlURI curi) {
448         WorkQueue wq = getQueueFor(curi);
449         synchronized (wq) {
450             wq.enqueue(this, curi);
451             if(!wq.isRetired()) {
452                 incrementQueuedUriCount();
453             }
454             if(!wq.isHeld()) {
455                 wq.setHeld();
456                 if(holdQueues() && readyClassQueues.size()>=targetSizeForReadyQueues()) {
457                     deactivateQueue(wq);
458                 } else {
459                     replenishSessionBalance(wq);
460                     readyQueue(wq);
461                 }
462             }
463             WorkQueue laq = longestActiveQueue;
464             if(!wq.isRetired()&&((laq==null) || wq.getCount() > laq.getCount())) {
465                 longestActiveQueue = wq;
466             }
467         }
468     }
469
470     /**
471      * Whether queues should start inactive (only becoming active when needed
472      * to keep the crawler busy), or if queues should start out ready.
473      *
474      * @return true if new queues should held inactive
475      */

476     private boolean holdQueues() {
477         return ((Boolean JavaDoc) getUncheckedAttribute(null, ATTR_HOLD_QUEUES))
478                 .booleanValue();
479     }
480
481     /**
482      * Put the given queue on the readyClassQueues queue
483      * @param wq
484      */

485     private void readyQueue(WorkQueue wq) {
486         try {
487             wq.setActive(this, true);
488             readyClassQueues.put(wq.getClassKey());
489         } catch (InterruptedException JavaDoc e) {
490             e.printStackTrace();
491             System.err.println("unable to ready queue "+wq);
492             // propagate interrupt up
493
throw new RuntimeException JavaDoc(e);
494         }
495     }
496
497     /**
498      * Put the given queue on the inactiveQueues queue
499      * @param wq
500      */

501     private void deactivateQueue(WorkQueue wq) {
502         try {
503             wq.setSessionBalance(0); // zero out session balance
504
inactiveQueues.put(wq.getClassKey());
505             wq.setActive(this, false);
506         } catch (InterruptedException JavaDoc e) {
507             e.printStackTrace();
508             System.err.println("unable to deactivate queue "+wq);
509             // propagate interrupt up
510
throw new RuntimeException JavaDoc(e);
511         }
512     }
513     
514     /**
515      * Put the given queue on the retiredQueues queue
516      * @param wq
517      */

518     private void retireQueue(WorkQueue wq) {
519         try {
520             retiredQueues.put(wq.getClassKey());
521             decrementQueuedCount(wq.getCount());
522             wq.setRetired(true);
523             wq.setActive(this, false);
524         } catch (InterruptedException JavaDoc e) {
525             e.printStackTrace();
526             System.err.println("unable to retire queue "+wq);
527             // propagate interrupt up
528
throw new RuntimeException JavaDoc(e);
529         }
530     }
531     
532     /**
533      * Accomodate any changes in settings.
534      *
535      * @see org.archive.crawler.framework.Frontier#kickUpdate()
536      */

537     public void kickUpdate() {
538         super.kickUpdate();
539         int target = (Integer JavaDoc)getUncheckedAttribute(null,
540                 ATTR_TARGET_READY_QUEUES_BACKLOG);
541         if (target < 1) {
542             target = 1;
543         }
544         this.targetSizeForReadyQueues = target;
545         try {
546             initCostPolicy();
547         } catch (FatalConfigurationException fce) {
548             throw new RuntimeException JavaDoc(fce);
549         }
550         // The rules for a 'retired' queue may have changed; so,
551
// unretire all queues to 'inactive'. If they still qualify
552
// as retired/overbudget next time they come up, they'll
553
// be re-retired; if not, they'll get a chance to become
554
// active under the new rules.
555
Object JavaDoc key = this.retiredQueues.poll();
556         while (key != null) {
557             WorkQueue q = (WorkQueue)this.allQueues.get(key);
558             if(q != null) {
559                 unretireQueue(q);
560             }
561             key = this.retiredQueues.poll();
562         }
563     }
564     /**
565      * Restore a retired queue to the 'inactive' state.
566      *
567      * @param q
568      */

569     private void unretireQueue(WorkQueue q) {
570         deactivateQueue(q);
571         q.setRetired(false);
572         incrementQueuedUriCount(q.getCount());
573     }
574
575     /**
576      * Return the work queue for the given CrawlURI's classKey. URIs
577      * are ordered and politeness-delayed within their 'class'.
578      * If the requested queue is not found, a new instance is created.
579      *
580      * @param curi CrawlURI to base queue on
581      * @return the found or created ClassKeyQueue
582      */

583     protected abstract WorkQueue getQueueFor(CrawlURI curi);
584
585     /**
586      * Return the work queue for the given classKey, or null
587      * if no such queue exists.
588      *
589      * @param classKey key to look for
590      * @return the found WorkQueue
591      */

592     protected abstract WorkQueue getQueueFor(String JavaDoc classKey);
593     
594     /**
595      * Return the next CrawlURI to be processed (and presumably
596      * visited/fetched) by a a worker thread.
597      *
598      * Relies on the readyClassQueues having been loaded with
599      * any work queues that are eligible to provide a URI.
600      *
601      * @return next CrawlURI to be processed. Or null if none is available.
602      *
603      * @see org.archive.crawler.framework.Frontier#next()
604      */

605     public CrawlURI next()
606     throws InterruptedException JavaDoc, EndedException {
607         while (true) { // loop left only by explicit return or exception
608
long now = System.currentTimeMillis();
609
610             // Do common checks for pause, terminate, bandwidth-hold
611
preNext(now);
612             
613             synchronized(readyClassQueues) {
614                 int activationsNeeded = targetSizeForReadyQueues() - readyClassQueues.size();
615                 while(activationsNeeded > 0 && !inactiveQueues.isEmpty()) {
616                     activateInactiveQueue();
617                     activationsNeeded--;
618                 }
619             }
620                    
621             WorkQueue readyQ = null;
622             Object JavaDoc key = readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS);
623             if (key != null) {
624                 readyQ = (WorkQueue)this.allQueues.get(key);
625             }
626             if (readyQ != null) {
627                 while(true) { // loop left by explicit return or break on empty
628
CrawlURI curi = null;
629                     synchronized(readyQ) {
630                         curi = readyQ.peek(this);
631                         if (curi != null) {
632                             // check if curi belongs in different queue
633
String JavaDoc currentQueueKey = getClassKey(curi);
634                             if (currentQueueKey.equals(curi.getClassKey())) {
635                                 // curi was in right queue, emit
636
noteAboutToEmit(curi, readyQ);
637                                 inProcessQueues.add(readyQ);
638                                 return curi;
639                             }
640                             // URI's assigned queue has changed since it
641
// was queued (eg because its IP has become
642
// known). Requeue to new queue.
643
curi.setClassKey(currentQueueKey);
644                             readyQ.dequeue(this);
645                             decrementQueuedCount(1);
646                             curi.setHolderKey(null);
647                             // curi will be requeued to true queue after lock
648
// on readyQ is released, to prevent deadlock
649
} else {
650                             // readyQ is empty and ready: it's exhausted
651
// release held status, allowing any subsequent
652
// enqueues to again put queue in ready
653
readyQ.clearHeld();
654                             break;
655                         }
656                     }
657                     if(curi!=null) {
658                         // complete the requeuing begun earlier
659
sendToQueue(curi);
660                     }
661                 }
662             } else {
663                 // ReadyQ key wasn't in all queues: unexpected
664
if (key != null) {
665                     logger.severe("Key "+ key +
666                         " in readyClassQueues but not allQueues");
667                 }
668             }
669
670             if(shouldTerminate) {
671                 // skip subsequent steps if already on last legs
672
throw new EndedException("shouldTerminate is true");
673             }
674                 
675             if(inProcessQueues.size()==0) {
676                 // Nothing was ready or in progress or imminent to wake; ensure
677
// any piled-up pending-scheduled URIs are considered
678
this.alreadyIncluded.requestFlush();
679             }
680         }
681     }
682
683     private int targetSizeForReadyQueues() {
684         return targetSizeForReadyQueues;
685     }
686
687     /**
688      * Return the 'cost' of a CrawlURI (how much of its associated
689      * queue's budget it depletes upon attempted processing)
690      *
691      * @param curi
692      * @return the associated cost
693      */

694     private int getCost(CrawlURI curi) {
695         int cost = curi.getHolderCost();
696         if (cost == CrawlURI.UNCALCULATED) {
697             cost = costAssignmentPolicy.costOf(curi);
698             curi.setHolderCost(cost);
699         }
700         return cost;
701     }
702     
703     /**
704      * Activate an inactive queue, if any are available.
705      */

706     private void activateInactiveQueue() {
707         Object JavaDoc key = this.inactiveQueues.poll();
708         if (key == null) {
709             return;
710         }
711         WorkQueue candidateQ = (WorkQueue)this.allQueues.get(key);
712         if(candidateQ != null) {
713             synchronized(candidateQ) {
714                 replenishSessionBalance(candidateQ);
715                 if(candidateQ.isOverBudget()){
716                     // if still over-budget after an activation & replenishing,
717
// retire
718
retireQueue(candidateQ);
719                     return;
720                 }
721                 long now = System.currentTimeMillis();
722                 long delay_ms = candidateQ.getWakeTime() - now;
723                 if(delay_ms>0) {
724                     // queue still due for snoozing
725
snoozeQueue(candidateQ,now,delay_ms);
726                     return;
727                 }
728                 candidateQ.setWakeTime(0); // clear obsolete wake time, if any
729
readyQueue(candidateQ);
730                 if (logger.isLoggable(Level.FINE)) {
731                     logger.fine("ACTIVATED queue: " +
732                         candidateQ.getClassKey());
733                    
734                 }
735             }
736         }
737     }
738
739     /**
740      * Replenish the budget of the given queue by the appropriate amount.
741      *
742      * @param queue queue to replenish
743      */

744     private void replenishSessionBalance(WorkQueue queue) {
745         // get a CrawlURI for override context purposes
746
CrawlURI contextUri = queue.peek(this);
747         // TODO: consider confusing cross-effects of this and IP-based politeness
748
queue.setSessionBalance(((Integer JavaDoc) getUncheckedAttribute(contextUri,
749                 ATTR_BALANCE_REPLENISH_AMOUNT)).intValue());
750         // reset total budget (it may have changed)
751
// TODO: is this the best way to be sensitive to potential mid-crawl changes
752
long totalBudget = ((Long JavaDoc)getUncheckedAttribute(contextUri,ATTR_QUEUE_TOTAL_BUDGET)).longValue();
753         queue.setTotalBudget(totalBudget);
754         queue.unpeek(); // don't insist on that URI being next released
755
}
756
757     /**
758      * Enqueue the given queue to either readyClassQueues or inactiveQueues,
759      * as appropriate.
760      *
761      * @param wq
762      */

763     private void reenqueueQueue(WorkQueue wq) {
764         if(wq.isOverBudget()) {
765             // if still over budget, deactivate
766
if (logger.isLoggable(Level.FINE)) {
767                 logger.fine("DEACTIVATED queue: " +
768                     wq.getClassKey());
769             }
770             deactivateQueue(wq);
771         } else {
772             readyQueue(wq);
773         }
774     }
775     
776     /**
777      * Wake any queues sitting in the snoozed queue whose time has come.
778      */

779     void wakeQueues() {
780         synchronized (snoozedClassQueues) {
781             long now = System.currentTimeMillis();
782             long nextWakeDelay = 0;
783             int wokenQueuesCount = 0;
784             while (true) {
785                 if (snoozedClassQueues.isEmpty()) {
786                     return;
787                 }
788                 WorkQueue peek = (WorkQueue) snoozedClassQueues.first();
789                 nextWakeDelay = peek.getWakeTime() - now;
790                 if (nextWakeDelay <= 0) {
791                     snoozedClassQueues.remove(peek);
792                     peek.setWakeTime(0);
793                     reenqueueQueue(peek);
794                     wokenQueuesCount++;
795                 } else {
796                     break;
797                 }
798             }
799             this.nextWake = new WakeTask();
800             this.wakeTimer.schedule(nextWake,nextWakeDelay);
801         }
802     }
803
804     /**
805      * Note that the previously emitted CrawlURI has completed
806      * its processing (for now).
807      *
808      * The CrawlURI may be scheduled to retry, if appropriate,
809      * and other related URIs may become eligible for release
810      * via the next next() call, as a result of finished().
811      *
812      * (non-Javadoc)
813      * @see org.archive.crawler.framework.Frontier#finished(org.archive.crawler.datamodel.CrawlURI)
814      */

815     public void finished(CrawlURI curi) {
816         long now = System.currentTimeMillis();
817
818         curi.incrementFetchAttempts();
819         logLocalizedErrors(curi);
820         WorkQueue wq = (WorkQueue) curi.getHolder();
821         assert (wq.peek(this) == curi) : "unexpected peek " + wq;
822         inProcessQueues.remove(wq, 1);
823
824         if(includesRetireDirective(curi)) {
825             // CrawlURI is marked to trigger retirement of its queue
826
curi.processingCleanup();
827             wq.unpeek();
828             wq.update(this, curi); // rewrite any changes
829
retireQueue(wq);
830             return;
831         }
832         
833         if (needsRetrying(curi)) {
834             // Consider errors which can be retried, leaving uri atop queue
835
if(curi.getFetchStatus()!=S_DEFERRED) {
836                 wq.expend(getCost(curi)); // all retries but DEFERRED cost
837
}
838             long delay_sec = retryDelayFor(curi);
839             curi.processingCleanup(); // lose state that shouldn't burden retry
840
synchronized(wq) {
841                 wq.unpeek();
842                 // TODO: consider if this should happen automatically inside unpeek()
843
wq.update(this, curi); // rewrite any changes
844
if (delay_sec > 0) {
845                     long delay_ms = delay_sec * 1000;
846                     snoozeQueue(wq, now, delay_ms);
847                 } else {
848                     reenqueueQueue(wq);
849                 }
850             }
851             // Let everyone interested know that it will be retried.
852
controller.fireCrawledURINeedRetryEvent(curi);
853             doJournalRescheduled(curi);
854             return;
855         }
856
857         // Curi will definitely be disposed of without retry, so remove from queue
858
wq.dequeue(this);
859         decrementQueuedCount(1);
860         log(curi);
861
862         if (curi.isSuccess()) {
863             totalProcessedBytes += curi.getContentSize();
864             incrementSucceededFetchCount();
865             // Let everyone know in case they want to do something before we strip the curi.
866
controller.fireCrawledURISuccessfulEvent(curi);
867             doJournalFinishedSuccess(curi);
868             wq.expend(getCost(curi)); // successes cost
869
} else if (isDisregarded(curi)) {
870             // Check for codes that mean that while we the crawler did
871
// manage to schedule it, it must be disregarded for some reason.
872
incrementDisregardedUriCount();
873             // Let interested listeners know of disregard disposition.
874
controller.fireCrawledURIDisregardEvent(curi);
875             // if exception, also send to crawlErrors
876
if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
877                 Object JavaDoc[] array = { curi };
878                 controller.runtimeErrors.log(Level.WARNING, curi.getUURI()
879                         .toString(), array);
880             }
881             // TODO: consider reinstating forget-uri
882
} else {
883             // In that case FAILURE, note & log
884
//Let interested listeners know of failed disposition.
885
this.controller.fireCrawledURIFailureEvent(curi);
886             // if exception, also send to crawlErrors
887
if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
888                 Object JavaDoc[] array = { curi };
889                 this.controller.runtimeErrors.log(Level.WARNING, curi.getUURI()
890                         .toString(), array);
891             }
892             incrementFailedFetchCount();
893             // let queue note error
894
wq.noteError(((Integer JavaDoc) getUncheckedAttribute(curi,
895                     ATTR_ERROR_PENALTY_AMOUNT)).intValue());
896             doJournalFinishedFailure(curi);
897             wq.expend(getCost(curi)); // failures cost
898
}
899
900         long delay_ms = politenessDelayFor(curi);
901         synchronized(wq) {
902             if (delay_ms > 0) {
903                 snoozeQueue(wq,now,delay_ms);
904             } else {
905                 reenqueueQueue(wq);
906             }
907         }
908
909         curi.stripToMinimal();
910         curi.processingCleanup();
911
912     }
913
914     private boolean includesRetireDirective(CrawlURI curi) {
915         return curi.containsKey(A_FORCE_RETIRE) && (Boolean JavaDoc)curi.getObject(A_FORCE_RETIRE);
916     }
917
918     /**
919      * Place the given queue into 'snoozed' state, ineligible to
920      * supply any URIs for crawling, for the given amount of time.
921      *
922      * @param wq queue to snooze
923      * @param now time now in ms
924      * @param delay_ms time to snooze in ms
925      */

926     private void snoozeQueue(WorkQueue wq, long now, long delay_ms) {
927         long nextTime = now + delay_ms;
928         wq.setWakeTime(nextTime);
929         long snoozeToInactiveDelayMs = ((Long JavaDoc)getUncheckedAttribute(null,
930                 ATTR_SNOOZE_DEACTIVATE_MS)).longValue();
931         if (delay_ms > snoozeToInactiveDelayMs && !inactiveQueues.isEmpty()) {
932             deactivateQueue(wq);
933         } else {
934             synchronized(snoozedClassQueues) {
935                 snoozedClassQueues.add(wq);
936                 if(wq == snoozedClassQueues.first()) {
937                     this.nextWake = new WakeTask();
938                     this.wakeTimer.schedule(nextWake, delay_ms);
939                 }
940             }
941         }
942     }
943
944     /**
945      * Forget the given CrawlURI. This allows a new instance
946      * to be created in the future, if it is reencountered under
947      * different circumstances.
948      *
949      * @param curi The CrawlURI to forget
950      */

951     protected void forget(CrawlURI curi) {
952         logger.finer("Forgetting " + curi);
953         alreadyIncluded.forget(canonicalize(curi.getUURI()), curi);
954     }
955
956     /** (non-Javadoc)
957      * @see org.archive.crawler.framework.Frontier#discoveredUriCount()
958      */

959     public long discoveredUriCount() {
960         return (this.alreadyIncluded != null)? this.alreadyIncluded.count(): 0;
961     }
962
963     /**
964      * @param match String to match.
965      * @return Number of items deleted.
966      */

967     public long deleteURIs(String JavaDoc match) {
968         long count = 0;
969         // TODO: DANGER/ values() may not work right from CachedBdbMap
970
Iterator JavaDoc iter = allQueues.keySet().iterator();
971         while(iter.hasNext()) {
972             WorkQueue wq = getQueueFor(((String JavaDoc)iter.next()));
973             wq.unpeek();
974             count += wq.deleteMatching(this, match);
975         }
976         decrementQueuedCount(count);
977         return count;
978     }
979
980     //
981
// Reporter implementation
982
//
983

984     public static String JavaDoc STANDARD_REPORT = "standard";
985     public static String JavaDoc ALL_NONEMPTY = "nonempty";
986     public static String JavaDoc ALL_QUEUES = "all";
987     protected static String JavaDoc[] REPORTS = {STANDARD_REPORT,ALL_NONEMPTY,ALL_QUEUES};
988     
989     public String JavaDoc[] getReports() {
990         return REPORTS;
991     }
992     
993     /**
994      * @param w Where to write to.
995      */

996     public void singleLineReportTo(PrintWriter JavaDoc w) {
997         if (this.allQueues == null) {
998             return;
999         }
1000        int allCount = allQueues.size();
1001        int inProcessCount = inProcessQueues.uniqueSet().size();
1002        int readyCount = readyClassQueues.size();
1003        int snoozedCount = snoozedClassQueues.size();
1004        int activeCount = inProcessCount + readyCount + snoozedCount;
1005        int inactiveCount = inactiveQueues.size();
1006        int retiredCount = retiredQueues.size();
1007        int exhaustedCount =
1008            allCount - activeCount - inactiveCount - retiredCount;
1009        w.print(allCount);
1010        w.print(" queues: ");
1011        w.print(activeCount);
1012        w.print(" active (");
1013        w.print(inProcessCount);
1014        w.print(" in-process; ");
1015        w.print(readyCount);
1016        w.print(" ready; ");
1017        w.print(snoozedCount);
1018        w.print(" snoozed); ");
1019        w.print(inactiveCount);
1020        w.print(" inactive; ");
1021        w.print(retiredCount);
1022        w.print(" retired; ");
1023        w.print(exhaustedCount);
1024        w.print(" exhausted");
1025        w.flush();
1026    }
1027    
1028    /* (non-Javadoc)
1029     * @see org.archive.util.Reporter#singleLineLegend()
1030     */

1031    public String JavaDoc singleLineLegend() {
1032        return "total active in-process ready snoozed inactive retired exhausted";
1033    }
1034
1035    /**
1036     * This method compiles a human readable report on the status of the frontier
1037     * at the time of the call.
1038     * @param name Name of report.
1039     * @param writer Where to write to.
1040     */

1041    public synchronized void reportTo(String JavaDoc name, PrintWriter JavaDoc writer) {
1042        if(ALL_NONEMPTY.equals(name)) {
1043            allNonemptyReportTo(writer);
1044            return;
1045        }
1046        if(ALL_QUEUES.equals(name)) {
1047            allQueuesReportTo(writer);
1048            return;
1049        }
1050        if(name!=null && !STANDARD_REPORT.equals(name)) {
1051            writer.print(name);
1052            writer.print(" unavailable; standard report:\n");
1053        }
1054        standardReportTo(writer);
1055    }
1056    
1057    /** Compact report of all nonempty queues (one queue per line)
1058     *
1059     * @param writer
1060     */

1061    private void allNonemptyReportTo(PrintWriter JavaDoc writer) {
1062        ArrayList JavaDoc<WorkQueue> inProcessQueuesCopy;
1063        synchronized(this.inProcessQueues) {
1064            // grab a copy that will be stable against mods for report duration
1065
@SuppressWarnings JavaDoc("unchecked")
1066            Collection JavaDoc<WorkQueue> inProcess = this.inProcessQueues;
1067            inProcessQueuesCopy = new ArrayList JavaDoc<WorkQueue>(inProcess);
1068        }
1069        writer.print("\n -----===== IN-PROCESS QUEUES =====-----\n");
1070        queueSingleLinesTo(writer, inProcessQueuesCopy.iterator());
1071
1072        writer.print("\n -----===== READY QUEUES =====-----\n");
1073        queueSingleLinesTo(writer, this.readyClassQueues.iterator());
1074
1075        writer.print("\n -----===== SNOOZED QUEUES =====-----\n");
1076        queueSingleLinesTo(writer, this.snoozedClassQueues.iterator());
1077        
1078        writer.print("\n -----===== INACTIVE QUEUES =====-----\n");
1079        queueSingleLinesTo(writer, this.inactiveQueues.iterator());
1080        
1081        writer.print("\n -----===== RETIRED QUEUES =====-----\n");
1082        queueSingleLinesTo(writer, this.retiredQueues.iterator());
1083    }
1084
1085    /** Compact report of all nonempty queues (one queue per line)
1086     *
1087     * @param writer
1088     */

1089    private void allQueuesReportTo(PrintWriter JavaDoc writer) {
1090        queueSingleLinesTo(writer, allQueues.keySet().iterator());
1091    }
1092    
1093    /**
1094     * Writer the single-line reports of all queues in the
1095     * iterator to the writer
1096     *
1097     * @param writer to receive report
1098     * @param iterator over queues of interest.
1099     */

1100    private void queueSingleLinesTo(PrintWriter JavaDoc writer, Iterator JavaDoc iterator) {
1101        Object JavaDoc obj;
1102        WorkQueue q;
1103        boolean legendWritten = false;
1104        while( iterator.hasNext()) {
1105            obj = iterator.next();
1106            if (obj == null) {
1107                continue;
1108            }
1109            q = (obj instanceof WorkQueue)?
1110                (WorkQueue)obj:
1111                (WorkQueue)this.allQueues.get(obj);
1112            if(q == null) {
1113                writer.print(" ERROR: "+obj);
1114            }
1115            if(!legendWritten) {
1116                writer.println(q.singleLineLegend());
1117                legendWritten = true;
1118            }
1119            q.singleLineReportTo(writer);
1120        }
1121    }
1122
1123    /**
1124     * @param w Writer to print to.
1125     */

1126    private void standardReportTo(PrintWriter JavaDoc w) {
1127        int allCount = allQueues.size();
1128        int inProcessCount = inProcessQueues.uniqueSet().size();
1129        int readyCount = readyClassQueues.size();
1130        int snoozedCount = snoozedClassQueues.size();
1131        int activeCount = inProcessCount + readyCount + snoozedCount;
1132        int inactiveCount = inactiveQueues.size();
1133        int retiredCount = retiredQueues.size();
1134        int exhaustedCount =
1135            allCount - activeCount - inactiveCount - retiredCount;
1136
1137        w.print("Frontier report - ");
1138        w.print(ArchiveUtils.TIMESTAMP12.format(new Date JavaDoc()));
1139        w.print("\n");
1140        w.print(" Job being crawled: ");
1141        w.print(controller.getOrder().getCrawlOrderName());
1142        w.print("\n");
1143        w.print("\n -----===== STATS =====-----\n");
1144        w.print(" Discovered: ");
1145        w.print(Long.toString(discoveredUriCount()));
1146        w.print("\n");
1147        w.print(" Queued: ");
1148        w.print(Long.toString(queuedUriCount()));
1149        w.print("\n");
1150        w.print(" Finished: ");
1151        w.print(Long.toString(finishedUriCount()));
1152        w.print("\n");
1153        w.print(" Successfully: ");
1154        w.print(Long.toString(succeededFetchCount()));
1155        w.print("\n");
1156        w.print(" Failed: ");
1157        w.print(Long.toString(failedFetchCount()));
1158        w.print("\n");
1159        w.print(" Disregarded: ");
1160        w.print(Long.toString(disregardedUriCount()));
1161        w.print("\n");
1162        w.print("\n -----===== QUEUES =====-----\n");
1163        w.print(" Already included size: ");
1164        w.print(Long.toString(alreadyIncluded.count()));
1165        w.print("\n");
1166        w.print(" pending: ");
1167        w.print(Long.toString(alreadyIncluded.pending()));
1168        w.print("\n");
1169        w.print("\n All class queues map size: ");
1170        w.print(Long.toString(allCount));
1171        w.print("\n");
1172        w.print( " Active queues: ");
1173        w.print(activeCount);
1174        w.print("\n");
1175        w.print(" In-process: ");
1176        w.print(inProcessCount);
1177        w.print("\n");
1178        w.print(" Ready: ");
1179        w.print(readyCount);
1180        w.print("\n");
1181        w.print(" Snoozed: ");
1182        w.print(snoozedCount);
1183        w.print("\n");
1184        w.print(" Inactive queues: ");
1185        w.print(inactiveCount);
1186        w.print("\n");
1187        w.print(" Retired queues: ");
1188        w.print(retiredCount);
1189        w.print("\n");
1190        w.print(" Exhausted queues: ");
1191        w.print(exhaustedCount);
1192        w.print("\n");
1193        
1194        w.print("\n -----===== IN-PROCESS QUEUES =====-----\n");
1195        @SuppressWarnings JavaDoc("unchecked")
1196        Collection JavaDoc<WorkQueue> inProcess = inProcessQueues;
1197        ArrayList JavaDoc<WorkQueue> copy = extractSome(inProcess, REPORT_MAX_QUEUES);
1198        appendQueueReports(w, copy.iterator(), copy.size(), REPORT_MAX_QUEUES);
1199        
1200        w.print("\n -----===== READY QUEUES =====-----\n");
1201        appendQueueReports(w, this.readyClassQueues.iterator(),
1202            this.readyClassQueues.size(), REPORT_MAX_QUEUES);
1203
1204        w.print("\n -----===== SNOOZED QUEUES =====-----\n");
1205        copy = extractSome(snoozedClassQueues, REPORT_MAX_QUEUES);
1206        appendQueueReports(w, copy.iterator(), copy.size(), REPORT_MAX_QUEUES);
1207        
1208        WorkQueue longest = longestActiveQueue;
1209        if (longest != null) {
1210            w.print("\n -----===== LONGEST QUEUE =====-----\n");
1211            longest.reportTo(w);
1212        }
1213
1214        w.print("\n -----===== INACTIVE QUEUES =====-----\n");
1215        appendQueueReports(w, this.inactiveQueues.iterator(),
1216            this.inactiveQueues.size(), REPORT_MAX_QUEUES);
1217        
1218        w.print("\n -----===== RETIRED QUEUES =====-----\n");
1219        appendQueueReports(w, this.retiredQueues.iterator(),
1220            this.retiredQueues.size(), REPORT_MAX_QUEUES);
1221
1222        w.flush();
1223    }
1224    
1225    
1226    /**
1227     * Extract some of the elements in the given collection to an
1228     * ArrayList. This method synchronizes on the given collection's
1229     * monitor. The returned list will never contain more than the
1230     * specified maximum number of elements.
1231     *
1232     * @param c the collection whose elements to extract
1233     * @param max the maximum number of elements to extract
1234     * @return the extraction
1235     */

1236    private static <T> ArrayList JavaDoc<T> extractSome(Collection JavaDoc<T> c, int max) {
1237        // Try to guess a sane initial capacity for ArrayList
1238
// Hopefully given collection won't grow more than 10 items
1239
// between now and the synchronized block...
1240
int initial = Math.min(c.size() + 10, max);
1241        int count = 0;
1242        ArrayList JavaDoc<T> list = new ArrayList JavaDoc<T>(initial);
1243        synchronized (c) {
1244            Iterator JavaDoc<T> iter = c.iterator();
1245            while (iter.hasNext() && (count < max)) {
1246                list.add(iter.next());
1247                count++;
1248            }
1249        }
1250        return list;
1251    }
1252
1253    /**
1254     * Append queue report to general Frontier report.
1255     * @param w StringBuffer to append to.
1256     * @param iterator An iterator over
1257     * @param total
1258     * @param max
1259     */

1260    protected void appendQueueReports(PrintWriter JavaDoc w, Iterator JavaDoc iterator,
1261            int total, int max) {
1262        Object JavaDoc obj;
1263        WorkQueue q;
1264        for(int count = 0; iterator.hasNext() && (count < max); count++) {
1265            obj = iterator.next();
1266            if (obj == null) {
1267                continue;
1268            }
1269            q = (obj instanceof WorkQueue)?
1270                (WorkQueue)obj:
1271                (WorkQueue)this.allQueues.get(obj);
1272            if(q == null) {
1273                w.print("WARNING: No report for queue "+obj);
1274            }
1275            q.reportTo(w);
1276        }
1277        if(total > max) {
1278            w.print("...and " + (total - max) + " more.\n");
1279        }
1280    }
1281
1282    /**
1283     * Force logging, etc. of operator- deleted CrawlURIs
1284     *
1285     * @see org.archive.crawler.framework.Frontier#deleted(org.archive.crawler.datamodel.CrawlURI)
1286     */

1287    public synchronized void deleted(CrawlURI curi) {
1288        //treat as disregarded
1289
controller.fireCrawledURIDisregardEvent(curi);
1290        log(curi);
1291        incrementDisregardedUriCount();
1292        curi.stripToMinimal();
1293        curi.processingCleanup();
1294    }
1295
1296    public void considerIncluded(UURI u) {
1297        this.alreadyIncluded.note(canonicalize(u));
1298        CrawlURI temp = new CrawlURI(u);
1299        temp.setClassKey(getClassKey(temp));
1300        getQueueFor(temp).expend(getCost(temp));
1301    }
1302    
1303    protected abstract void initQueue() throws IOException JavaDoc;
1304    protected abstract void closeQueue() throws IOException JavaDoc;
1305    
1306    /**
1307     * Returns <code>true</code> if the WorkQueue implementation of this
1308     * Frontier stores its workload on disk instead of relying
1309     * on serialization mechanisms.
1310     *
1311     * @return a constant boolean value for this class/instance
1312     */

1313    protected abstract boolean workQueueDataOnDisk();
1314    
1315    
1316    public FrontierGroup getGroup(CrawlURI curi) {
1317        return getQueueFor(curi);
1318    }
1319    
1320    
1321    public long averageDepth() {
1322        int inProcessCount = inProcessQueues.uniqueSet().size();
1323        int readyCount = readyClassQueues.size();
1324        int snoozedCount = snoozedClassQueues.size();
1325        int activeCount = inProcessCount + readyCount + snoozedCount;
1326        int inactiveCount = inactiveQueues.size();
1327        int totalQueueCount = (activeCount+inactiveCount);
1328        return (totalQueueCount == 0) ? 0 : queuedUriCount / totalQueueCount;
1329    }
1330    public float congestionRatio() {
1331        int inProcessCount = inProcessQueues.uniqueSet().size();
1332        int readyCount = readyClassQueues.size();
1333        int snoozedCount = snoozedClassQueues.size();
1334        int activeCount = inProcessCount + readyCount + snoozedCount;
1335        int inactiveCount = inactiveQueues.size();
1336        return (float)(activeCount + inactiveCount) / (inProcessCount + snoozedCount);
1337    }
1338    public long deepestUri() {
1339        return longestActiveQueue==null ? -1 : longestActiveQueue.getCount();
1340    }
1341    
1342    
1343    /* (non-Javadoc)
1344     * @see org.archive.crawler.framework.Frontier#isEmpty()
1345     */

1346    public synchronized boolean isEmpty() {
1347        return queuedUriCount == 0 && alreadyIncluded.pending() == 0;
1348    }
1349}
1350
1351
Popular Tags