KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > archive > crawler > frontier > WorkQueue


1 package org.archive.crawler.frontier;
2
3 import java.io.IOException JavaDoc;
4 import java.io.PrintWriter JavaDoc;
5 import java.io.Serializable JavaDoc;
6 import java.util.logging.Level JavaDoc;
7 import java.util.logging.Logger JavaDoc;
8
9 import org.archive.crawler.datamodel.CrawlSubstats;
10 import org.archive.crawler.datamodel.CrawlURI;
11 import org.archive.crawler.framework.Frontier;
12 import org.archive.util.ArchiveUtils;
13 import org.archive.util.Reporter;
14
15 /**
16  * A single queue of related URIs to visit, grouped by a classKey
17  * (typically "hostname:port" or similar)
18  *
19  * @author gojomo
20  * @author Christian Kohlschuetter
21  */

22 public abstract class WorkQueue implements Frontier.FrontierGroup, Comparable JavaDoc,
23         Serializable JavaDoc, Reporter {
24     private static final Logger JavaDoc logger =
25         Logger.getLogger(WorkQueue.class.getName());
26     
27     /** The classKey */
28     protected final String JavaDoc classKey;
29
30     private boolean active = true;
31
32     /** Total number of stored items */
33     private long count = 0;
34
35     /** Total number of items ever enqueued */
36     private long enqueueCount = 0;
37     
38     /** Whether queue is already in lifecycle stage */
39     private boolean isHeld = false;
40
41     /** Time to wake, if snoozed */
42     private long wakeTime = 0;
43
44     /** Running 'budget' indicating whether queue should stay active */
45     private int sessionBalance = 0;
46
47     /** Cost of the last item to be charged against queue */
48     private int lastCost = 0;
49
50     /** Total number of items charged against queue; with totalExpenditure
51      * can be used to calculate 'average cost'. */

52     private long costCount = 0;
53
54     /** Running tally of total expenditures on this queue */
55     private long totalExpenditure = 0;
56
57     /** Total to spend on this queue over its lifetime */
58     private long totalBudget = 0;
59
60     /** The next item to be returned */
61     private CrawlURI peekItem = null;
62
63     /** Last URI enqueued */
64     private String JavaDoc lastQueued;
65
66     /** Last URI peeked */
67     private String JavaDoc lastPeeked;
68
69     /** time of last dequeue (disposition of some URI) **/
70     private long lastDequeueTime;
71     
72     /** count of errors encountered */
73     private long errorCount = 0;
74     
75     /** Substats for all CrawlURIs in this group */
76     protected CrawlSubstats substats = new CrawlSubstats();
77
78     private boolean retired;
79     
80     public WorkQueue(final String JavaDoc pClassKey) {
81         this.classKey = pClassKey;
82     }
83
84     /**
85      * Delete URIs matching the given pattern from this queue.
86      * @param frontier
87      * @param match
88      * @return count of deleted URIs
89      */

90     public long deleteMatching(final WorkQueueFrontier frontier, String JavaDoc match) {
91         try {
92             final long deleteCount = deleteMatchingFromQueue(frontier, match);
93             this.count -= deleteCount;
94             return deleteCount;
95         } catch (IOException JavaDoc e) {
96             //FIXME better exception handling
97
e.printStackTrace();
98             throw new RuntimeException JavaDoc(e);
99         }
100     }
101
102     /**
103      * Add the given CrawlURI, noting its addition in running count. (It
104      * should not already be present.)
105      *
106      * @param frontier Work queues manager.
107      * @param curi CrawlURI to insert.
108      */

109     public synchronized void enqueue(final WorkQueueFrontier frontier,
110         CrawlURI curi) {
111         try {
112             insert(frontier, curi);
113         } catch (IOException JavaDoc e) {
114             //FIXME better exception handling
115
e.printStackTrace();
116             throw new RuntimeException JavaDoc(e);
117         }
118         count++;
119         enqueueCount++;
120     }
121
122     /**
123      * Return the topmost queue item -- and remember it,
124      * such that even later higher-priority inserts don't
125      * change it.
126      *
127      * TODO: evaluate if this is really necessary
128      * @param frontier Work queues manager
129      *
130      * @return topmost queue item, or null
131      */

132     public CrawlURI peek(final WorkQueueFrontier frontier) {
133         if(peekItem == null && count > 0) {
134             try {
135                 peekItem = peekItem(frontier);
136             } catch (IOException JavaDoc e) {
137                 //FIXME better exception handling
138
logger.log(Level.SEVERE,"peek failure",e);
139                 e.printStackTrace();
140                 // throw new RuntimeException(e);
141
}
142             if(peekItem != null) {
143                 lastPeeked = peekItem.toString();
144             }
145         }
146         return peekItem;
147     }
148
149     /**
150      * Remove the peekItem from the queue and adjusts the count.
151      *
152      * @param frontier Work queues manager.
153      */

154     public synchronized void dequeue(final WorkQueueFrontier frontier) {
155         try {
156             deleteItem(frontier, peekItem);
157         } catch (IOException JavaDoc e) {
158             //FIXME better exception handling
159
e.printStackTrace();
160             throw new RuntimeException JavaDoc(e);
161         }
162         unpeek();
163         count--;
164         lastDequeueTime = System.currentTimeMillis();
165     }
166
167     /**
168      * Set the session 'activity budget balance' to the given value
169      *
170      * @param balance to use
171      */

172     public void setSessionBalance(int balance) {
173         this.sessionBalance = balance;
174     }
175
176     /**
177      * Return current session 'activity budget balance'
178      *
179      * @return session balance
180      */

181     public int getSessionBalance() {
182         return this.sessionBalance;
183     }
184
185     /**
186      * Set the total expenditure level allowable before queue is
187      * considered inherently 'over-budget'.
188      *
189      * @param budget
190      */

191     public void setTotalBudget(long budget) {
192         this.totalBudget = budget;
193     }
194
195     /**
196      * Check whether queue has temporarily or permanently exceeded
197      * its budget.
198      *
199      * @return true if queue is over its set budget(s)
200      */

201     public boolean isOverBudget() {
202         // check whether running balance is depleted
203
// or totalExpenditure exceeds totalBudget
204
return this.sessionBalance <= 0
205             || (this.totalBudget >= 0 && this.totalExpenditure > this.totalBudget);
206     }
207
208     /**
209      * Return the tally of all expenditures on this queue
210      *
211      * @return total amount expended on this queue
212      */

213     public long getTotalExpenditure() {
214         return totalExpenditure;
215     }
216
217     /**
218      * Increase the internal running budget to be used before
219      * deactivating the queue
220      *
221      * @param amount amount to increment
222      * @return updated budget value
223      */

224     public int incrementSessionBalance(int amount) {
225         this.sessionBalance = this.sessionBalance + amount;
226         return this.sessionBalance;
227     }
228
229     /**
230      * Decrease the internal running budget by the given amount.
231      * @param amount tp decrement
232      * @return updated budget value
233      */

234     public int expend(int amount) {
235         this.sessionBalance = this.sessionBalance - amount;
236         this.totalExpenditure = this.totalExpenditure + amount;
237         this.lastCost = amount;
238         this.costCount++;
239         return this.sessionBalance;
240     }
241
242     /**
243      * A URI should not have been charged against queue (eg
244      * it was disregarded); return the amount expended
245      * @param amount to return
246      * @return updated budget value
247      */

248     public int refund(int amount) {
249         this.sessionBalance = this.sessionBalance + amount;
250         this.totalExpenditure = this.totalExpenditure - amount;
251         this.costCount--;
252         return this.sessionBalance;
253     }
254     
255     /**
256      * Note an error and assess an extra penalty.
257      * @param penalty additional amount to deduct
258      */

259     public void noteError(int penalty) {
260         this.sessionBalance = this.sessionBalance - penalty;
261         this.totalExpenditure = this.totalExpenditure + penalty;
262         errorCount++;
263     }
264     
265     /**
266      * @param l
267      */

268     public void setWakeTime(long l) {
269         wakeTime = l;
270     }
271
272     /**
273      * @return wakeTime
274      */

275     public long getWakeTime() {
276         return wakeTime;
277     }
278
279     /**
280      * @return classKey, the 'identifier', for this queue.
281      */

282     public String JavaDoc getClassKey() {
283         return this.classKey;
284     }
285
286     /**
287      * Clear isHeld to false
288      */

289     public void clearHeld() {
290         isHeld = false;
291     }
292
293     /**
294      * Whether the queue is already in a lifecycle stage --
295      * such as ready, in-progress, snoozed -- and thus should
296      * not be redundantly inserted to readyClassQueues
297      *
298      * @return isHeld
299      */

300     public boolean isHeld() {
301         return isHeld;
302     }
303
304     /**
305      * Set isHeld to true
306      */

307     public void setHeld() {
308         isHeld = true;
309     }
310
311     /**
312      * Forgive the peek, allowing a subsequent peek to
313      * return a different item.
314      *
315      */

316     public void unpeek() {
317         peekItem = null;
318     }
319
320     public final int compareTo(Object JavaDoc obj) {
321         if(this == obj) {
322             return 0; // for exact identity only
323
}
324         WorkQueue other = (WorkQueue) obj;
325         if(getWakeTime() > other.getWakeTime()) {
326             return 1;
327         }
328         if(getWakeTime() < other.getWakeTime()) {
329             return -1;
330         }
331         // at this point, the ordering is arbitrary, but still
332
// must be consistent/stable over time
333
return this.classKey.compareTo(other.getClassKey());
334     }
335
336     /**
337      * Update the given CrawlURI, which should already be present. (This
338      * is not checked.) Equivalent to an enqueue without affecting the count.
339      *
340      * @param frontier Work queues manager.
341      * @param curi CrawlURI to update.
342      */

343     public void update(final WorkQueueFrontier frontier, CrawlURI curi) {
344         try {
345             insert(frontier, curi);
346         } catch (IOException JavaDoc e) {
347             //FIXME better exception handling
348
e.printStackTrace();
349             throw new RuntimeException JavaDoc(e);
350         }
351     }
352
353     /**
354      * @return Returns the count.
355      */

356     public synchronized long getCount() {
357         return this.count;
358     }
359
360     /**
361      * Insert the given curi, whether it is already present or not.
362      * @param frontier WorkQueueFrontier.
363      * @param curi CrawlURI to insert.
364      * @throws IOException
365      */

366     private void insert(final WorkQueueFrontier frontier, CrawlURI curi)
367         throws IOException JavaDoc {
368         insertItem(frontier, curi);
369         lastQueued = curi.toString();
370     }
371
372     /**
373      * Insert the given curi, whether it is already present or not.
374      * Hook for subclasses.
375      *
376      * @param frontier WorkQueueFrontier.
377      * @param curi CrawlURI to insert.
378      * @throws IOException if there was a problem while inserting the item
379      */

380     protected abstract void insertItem(final WorkQueueFrontier frontier,
381         CrawlURI curi) throws IOException JavaDoc;
382
383     /**
384      * Delete URIs matching the given pattern from this queue.
385      * @param frontier WorkQueues manager.
386      * @param match the pattern to match
387      * @return count of deleted URIs
388      * @throws IOException if there was a problem while deleting
389      */

390     protected abstract long deleteMatchingFromQueue(
391         final WorkQueueFrontier frontier, final String JavaDoc match)
392         throws IOException JavaDoc;
393
394     /**
395      * Removes the given item from the queue.
396      *
397      * This is only used to remove the first item in the queue,
398      * so it is not necessary to implement a random-access queue.
399      *
400      * @param frontier Work queues manager.
401      * @throws IOException if there was a problem while deleting the item
402      */

403     protected abstract void deleteItem(final WorkQueueFrontier frontier,
404         final CrawlURI item) throws IOException JavaDoc;
405
406     /**
407      * Returns first item from queue (does not delete)
408      *
409      * @return The peeked item, or null
410      * @throws IOException if there was a problem while peeking
411      */

412     protected abstract CrawlURI peekItem(final WorkQueueFrontier frontier)
413         throws IOException JavaDoc;
414
415     /**
416      * Suspends this WorkQueue. Closes all connections to resources etc.
417      *
418      * @param frontier
419      * @throws IOException
420      */

421     protected void suspend(final WorkQueueFrontier frontier) throws IOException JavaDoc {
422     }
423
424     /**
425      * Resumes this WorkQueue. Eventually opens connections to resources etc.
426      *
427      * @param frontier
428      * @throws IOException
429      */

430     protected void resume(final WorkQueueFrontier frontier) throws IOException JavaDoc {
431     }
432
433     public void setActive(final WorkQueueFrontier frontier, final boolean b) {
434         if(active != b) {
435             active = b;
436             try {
437                 if(active) {
438                     resume(frontier);
439                 } else {
440                     suspend(frontier);
441                 }
442             } catch (IOException JavaDoc e) {
443                 //FIXME better exception handling
444
e.printStackTrace();
445                 throw new RuntimeException JavaDoc(e);
446             }
447         }
448     }
449     
450     //
451
// Reporter
452
//
453

454     /* (non-Javadoc)
455      * @see org.archive.util.Reporter#getReports()
456      */

457     public String JavaDoc[] getReports() {
458         return new String JavaDoc[] {};
459     }
460
461     /* (non-Javadoc)
462      * @see org.archive.util.Reporter#reportTo(java.io.Writer)
463      */

464     public void reportTo(PrintWriter JavaDoc writer) {
465         reportTo(null,writer);
466     }
467
468     /* (non-Javadoc)
469      * @see org.archive.util.Reporter#singleLineReportTo(java.io.Writer)
470      */

471     public void singleLineReportTo(PrintWriter JavaDoc writer) {
472         // queue name
473
writer.print(classKey);
474         writer.print(" ");
475         // count of items
476
writer.print(Long.toString(count));
477         writer.print(" ");
478         // enqueue count
479
writer.print(Long.toString(enqueueCount));
480         writer.print(" ");
481         writer.print(sessionBalance);
482         writer.print(" ");
483         writer.print(lastCost);
484         writer.print("(");
485         writer.print(ArchiveUtils.doubleToString(
486                     ((double) totalExpenditure / costCount), 1));
487         writer.print(")");
488         writer.print(" ");
489         // last dequeue time, if any, or '-'
490
if (lastDequeueTime != 0) {
491             writer.print(ArchiveUtils.getLog17Date(lastDequeueTime));
492         } else {
493             writer.print("-");
494         }
495         writer.print(" ");
496         // wake time if snoozed, or '-'
497
if (wakeTime != 0) {
498             writer.print(ArchiveUtils.formatMillisecondsToConventional(wakeTime - System.currentTimeMillis()));
499         } else {
500             writer.print("-");
501         }
502         writer.print(" ");
503         writer.print(Long.toString(totalExpenditure));
504         writer.print("/");
505         writer.print(Long.toString(totalBudget));
506         writer.print(" ");
507         writer.print(Long.toString(errorCount));
508         writer.print(" ");
509         writer.print(lastPeeked);
510         writer.print(" ");
511         writer.print(lastQueued);
512         writer.print("\n");
513     }
514
515     /* (non-Javadoc)
516      * @see org.archive.util.Reporter#singleLineLegend()
517      */

518     public String JavaDoc singleLineLegend() {
519         return "queue currentSize totalEnqueues sessionBalance lastCost " +
520                 "(averageCost) lastDequeueTime wakeTime " +
521                 "totalSpend/totalBudget errorCount lastPeekUri lastQueuedUri";
522     }
523     
524     /* (non-Javadoc)
525      * @see org.archive.util.Reporter#singleLineReport()
526      */

527     public String JavaDoc singleLineReport() {
528         return ArchiveUtils.singleLineReport(this);
529     }
530     
531     /**
532      * @param writer
533      * @throws IOException
534      */

535     public void reportTo(String JavaDoc name, PrintWriter JavaDoc writer) {
536         // name is ignored: only one kind of report for now
537
writer.print("Queue ");
538         writer.print(classKey);
539         writer.print("\n");
540         writer.print(" ");
541         writer.print(Long.toString(count));
542         writer.print(" items");
543         if (wakeTime != 0) {
544             writer.print("\n wakes in: "+ArchiveUtils.formatMillisecondsToConventional(wakeTime - System.currentTimeMillis()));
545         }
546         writer.print("\n last enqueued: ");
547         writer.print(lastQueued);
548         writer.print("\n last peeked: ");
549         writer.print(lastPeeked);
550         writer.print("\n");
551         writer.print(" total expended: ");
552         writer.print(Long.toString(totalExpenditure));
553         writer.print(" (total budget: ");
554         writer.print(Long.toString(totalBudget));
555         writer.print(")\n");
556         writer.print(" active balance: ");
557         writer.print(sessionBalance);
558         writer.print("\n last(avg) cost: ");
559         writer.print(lastCost);
560         writer.print("(");
561         writer.print(ArchiveUtils.doubleToString(
562                     ((double) totalExpenditure / costCount), 1));
563         writer.print(")\n\n");
564     }
565     
566     public CrawlSubstats getSubstats() {
567         return substats;
568     }
569
570     /**
571      * Set the retired status of this queue.
572      *
573      * @param b new value for retired status
574      */

575     public void setRetired(boolean b) {
576         this.retired = b;
577     }
578     
579     public boolean isRetired() {
580         return retired;
581     }
582 }
583
Popular Tags