KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > archive > crawler > frontier > AdaptiveRevisitHostQueue


1 /* AdaptiveRevisitHostQueue
2 *
3 * Created on Sep 13, 2004
4 *
5 * Copyright (C) 2004 Kristinn Sigur?sson.
6 *
7 * This file is part of the Heritrix web crawler (crawler.archive.org).
8 *
9 * Heritrix is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU Lesser Public License as published by
11 * the Free Software Foundation; either version 2.1 of the License, or
12 * any later version.
13 *
14 * Heritrix is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Lesser Public License for more details.
18 *
19 * You should have received a copy of the GNU Lesser Public License
20 * along with Heritrix; if not, write to the Free Software
21 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 */

23 package org.archive.crawler.frontier;
24
25 import java.io.IOException JavaDoc;
26 import java.util.logging.Level JavaDoc;
27 import java.util.logging.Logger JavaDoc;
28
29 import org.archive.crawler.datamodel.CandidateURI;
30 import org.archive.crawler.datamodel.CrawlSubstats;
31 import org.archive.crawler.datamodel.CrawlURI;
32 import org.archive.crawler.framework.Frontier.FrontierGroup;
33 import org.archive.util.ArchiveUtils;
34
35 import com.sleepycat.bind.EntryBinding;
36 import com.sleepycat.bind.serial.ClassCatalog;
37 import com.sleepycat.bind.serial.SerialBinding;
38 import com.sleepycat.bind.serial.StoredClassCatalog;
39 import com.sleepycat.bind.serial.TupleSerialKeyCreator;
40 import com.sleepycat.bind.tuple.StringBinding;
41 import com.sleepycat.bind.tuple.TupleBinding;
42 import com.sleepycat.bind.tuple.TupleInput;
43 import com.sleepycat.bind.tuple.TupleOutput;
44 import com.sleepycat.je.Cursor;
45 import com.sleepycat.je.Database;
46 import com.sleepycat.je.DatabaseConfig;
47 import com.sleepycat.je.DatabaseEntry;
48 import com.sleepycat.je.DatabaseException;
49 import com.sleepycat.je.Environment;
50 import com.sleepycat.je.LockMode;
51 import com.sleepycat.je.OperationStatus;
52 import com.sleepycat.je.SecondaryConfig;
53 import com.sleepycat.je.SecondaryDatabase;
54
55 /**
56  * A priority based queue of CrawlURIs. Each queue should represent
57  * one host (although this is not enforced in this class). Items are ordered
58  * by the scheduling directive and time of next processing (in that order)
59  * and also indexed by the URI.
60  * <p>
61  * The HQ does no calculations on the 'time of next processing.' It always
62  * relies on values already set on the CrawlURI.
63  * <p>
64  * Note: Class is not 'thread safe.' In multi threaded environment the caller
65  * must ensure that two threads do not make overlapping calls.
66  * <p>
67  * Any BDB DatabaseException will be converted to an IOException by public
68  * methods. This includes preserving the original stacktrace, in favor of the
69  * one created for the IOException, so that the true source of the exception
70  * is not lost.
71  *
72  * @author Kristinn Sigurdsson
73  */

74 public class AdaptiveRevisitHostQueue
75 implements AdaptiveRevisitAttributeConstants, FrontierGroup {
76     
77     // TODO: Need to be able to remove URIs, both by name and reg.expr.
78

79     // Constants declerations
80
/** HQ contains no queued CrawlURIs elements. This state only occurs after
81      * queue creation before the first add. After the first item is added the
82      * state can never become empty again. */

83     public static final int HQSTATE_EMPTY = 0;
84     /** HQ has a CrawlURI ready for processing */
85     public static final int HQSTATE_READY = 1;
86     /** HQ has maximum number of CrawlURI currently being processed. This number
87      * is either equal to the 'valence' (maximum number of simultanious
88      * connections to a host) or (if smaller) the total number of CrawlURIs
89      * in the HQ. */

90     public static final int HQSTATE_BUSY = 2;
91     /** HQ is in a suspended state until it can be woken back up */
92     public static final int HQSTATE_SNOOZED = 3;
93     
94     // Internal class variables
95
/** Name of the host that this AdaptiveRevisitHostQueue represents */
96     final String JavaDoc hostName;
97     /** Last known state of HQ -- ALL methods should use getState() to read
98      * this value, never read it directly. */

99     int state;
100     /** Time (in milliseconds) when the HQ will next be ready to issue a URI
101      * for processing. When setting this value, methods should use the
102      * setter method {@link #setNextReadyTime(long) setNextReadyTime()}
103      */

104     long nextReadyTime;
105     /** Time (in milliseconds) when each URI 'slot' becomes available again.<p>
106      * Any positive value larger then the current time signifies a taken slot
107      * where the URI has completed processing but the politness wait has not
108      * ended. <p>
109      * A zero or positive value smaller then the current time in milliseconds
110      * signifies an empty slot.<p>
111      * Any negative value signifies a slot for a URI that is being processed.
112      * <p>
113      * Methods should never write directly to this, rather use the
114      * {@link #updateWakeUpTimeSlot(long) updateWakeUpTimeSlot()} and
115      * {@link #useWakeUpTimeSlot() useWakeUpTimeSlot()} methods as needed.
116      */

117     long[] wakeUpTime;
118     /** Number of simultanious connections permitted to this host. I.e. this
119      * many URIs can be issued before state of HQ becomes busy until one of
120      * them is returned via the update method. */

121     int valence;
122     /**
123      * Size of queue. That is, the number of CrawlURIs that have been added to
124      * it, including any that are currently being processed.
125      */

126     long size;
127     /** Number of URIs belonging to this queue that are being processed at the
128      * moment. This number will always be in the range of 0 - valence
129      */

130     long inProcessing;
131     /** The AdaptiveRevisitHostQueueList that contains this class. This
132      * reference is
133      * maintained to inform the owning class of changes to the sort order
134      * value. Value may be null, in which case no notices are made.*/

135     private AdaptiveRevisitQueueList owner;
136     /** Logger */
137     private static final Logger JavaDoc logger =
138         Logger.getLogger(AdaptiveRevisitHostQueue.class.getName());
139     
140     protected CrawlSubstats substats = new CrawlSubstats();
141     
142     // Berkeley DB - All class member variables related to BDB JE
143
// Databases
144
/** Database containing the URI priority queue, indexed by the the
145       * URI string. */

146     protected Database primaryUriDB;
147     /** Secondary index into {@link #primaryUriDB the primary DB}, URIs indexed
148      * by the time when they can next be processed again. */

149     protected SecondaryDatabase secondaryUriDB;
150     /** A database containing those URIs that are currently being processed. */
151     protected Database processingUriDB;
152     // Serialization support
153
/** For BDB serialization of objects */
154     protected StoredClassCatalog classCatalog;
155     /** A binding for the serialization of the primary key (URI string) */
156     protected EntryBinding primaryKeyBinding;
157     /** A binding for the CrawlURIARWrapper object */
158     protected EntryBinding crawlURIBinding;
159     // Cursors into databases
160

161     
162     
163     /**
164      * Constructor
165      *
166      * @param hostName Name of the host this queue represents. This name must
167      * be unique for all HQs in the same Environment.
168      * @param env Berkeley DB Environment. All BDB databases created will use
169      * it.
170      * @param catalog Db for bdb class serialization.
171      * @param valence The total number of simultanous URIs that the HQ can issue
172      * for processing. Once this many URIs have been issued for
173      * processing, the HQ will go into {@link #HQSTATE_BUSY busy}
174      * state until at least one of the URI is
175      * {@link #update(CrawlURI, boolean, long) updated}.
176      * Value should be larger then zero. Zero and negative values
177      * will be treated same as 1.
178      *
179      * @throws IOException if an error occurs opening/creating the
180      * database
181      */

182     public AdaptiveRevisitHostQueue(String JavaDoc hostName, Environment env,
183             StoredClassCatalog catalog, int valence)
184     throws IOException JavaDoc {
185         try{
186             if(valence < 1) {
187                 this.valence = 1;
188             } else {
189                 this.valence = valence;
190             }
191             wakeUpTime = new long[valence];
192             for(int i = 0 ; i < valence ; i++){
193                 wakeUpTime[i]=0; // 0 means open slot.
194
}
195             
196             inProcessing = 0;
197             
198             this.hostName = hostName;
199             
200             state = HQSTATE_EMPTY; //HQ is initially empty.
201
nextReadyTime = Long.MAX_VALUE; //Empty and busy HQ get this value.
202

203             // Set up the primary URI database, it is indexed by URI names
204
DatabaseConfig dbConfig = new DatabaseConfig();
205             dbConfig.setTransactional(false);
206             dbConfig.setAllowCreate(true);
207             primaryUriDB = env.openDatabase(null, hostName, dbConfig);
208     
209             this.classCatalog = catalog;
210             
211             // Set up a DB for storing URIs being processed
212
DatabaseConfig dbConfig2 = new DatabaseConfig();
213             dbConfig2.setTransactional(false);
214             dbConfig2.setAllowCreate(true);
215             processingUriDB = env.openDatabase(null,
216                     hostName + "/processing", dbConfig2);
217             
218             // Create a primitive binding for the primary key (URI string)
219
primaryKeyBinding = TupleBinding.getPrimitiveBinding(String JavaDoc.class);
220             // Create a serial binding for the CrawlURI object
221
crawlURIBinding = new SerialBinding(classCatalog, CrawlURI.class);
222     
223             // Open a secondary database to allow accessing the primary
224
// database by the secondary key value.
225
SecondaryConfig secConfig = new SecondaryConfig();
226             secConfig.setAllowCreate(true);
227             secConfig.setSortedDuplicates(true);
228             secConfig.setKeyCreator(
229                     new OrderOfProcessingKeyCreator(classCatalog,CrawlURI.class));
230             secondaryUriDB = env.openSecondaryDatabase(null,
231                 hostName+"/timeOfProcessing", primaryUriDB, secConfig);
232             
233             // Check if we are opening an existing DB...
234
size = countCrawlURIs();
235             if (size > 0) {
236                 // If size > 0 then we just opened an existing DB.
237
// Set nextReadyTime;
238
nextReadyTime = peek().getLong(
239                         A_TIME_OF_NEXT_PROCESSING);
240                 // Move any items in processingUriDB into the primariUriDB, ensure
241
// that they wind up on top!
242
flushProcessingURIs();
243                 state = HQSTATE_READY;
244             }
245         } catch (DatabaseException e) {
246             // Blanket catch all DBExceptions and convert to IOExceptions.
247
IOException JavaDoc e2 = new IOException JavaDoc(e.getMessage());
248             e2.setStackTrace(e.getStackTrace());
249             throw e2;
250         }
251     }
252     
253     /**
254      * Returns the HQ's name
255      * @return the HQ's name
256      */

257     public String JavaDoc getHostName() {
258         return hostName;
259     }
260     
261     /**
262      * Add a CrawlURI to this host queue.
263      * <p>
264      * Calls can optionally chose to have the time of next processing value
265      * override existing values for the URI if the existing values are 'later'
266      * then the new ones.
267      *
268      * @param curi The CrawlURI to add.
269      * @param overrideSetTimeOnDups If true then the time of next processing for
270      * the supplied URI will override the any
271      * existing time for it already stored in the HQ.
272      * If false, then no changes will be made to any
273      * existing values of the URI. Note: Will never
274      * override with a later time.
275      * @throws IOException When an error occurs accessing the database
276      */

277     public void add(CrawlURI curi, boolean overrideSetTimeOnDups)
278             throws IOException JavaDoc{
279         if(logger.isLoggable(Level.FINER)){
280             logger.finer("Adding " + curi.toString());
281         }
282         try{
283             if(inProcessing(curi.toString())){
284                 // If it is currently being processed, then it is already been
285
// added and we sure as heck can't fetch it any sooner!
286
return;
287             }
288             
289             OperationStatus opStatus = strictAdd(curi,false);
290             
291             long curiProcessingTime = curi.getLong(
292                     A_TIME_OF_NEXT_PROCESSING);
293     
294             if (opStatus == OperationStatus.KEYEXIST){
295                 // Override an existing URI
296
// We need to extract the old CrawlURI (it contains vital
297
// info on past crawls), check its scheduling directive
298
// and (possibly) its time of next fetch and update if it
299
// will promote the URI to an earlier processing time.
300
boolean update = false;
301                 CrawlURI curiExisting = getCrawlURI(curi.toString());
302                 long oldCuriProcessingTime = curiExisting.getLong(
303                         A_TIME_OF_NEXT_PROCESSING);
304                 if(curi.getSchedulingDirective() <
305                         curiExisting.getSchedulingDirective()){
306                     // New scheduling directive is of higher importance,
307
// update to promote URI.
308
curiExisting.setSchedulingDirective(
309                             curi.getSchedulingDirective());
310                     update = true;
311                 }
312                 if( (curiProcessingTime < oldCuriProcessingTime)
313                         && (overrideSetTimeOnDups || update)){
314                     // We update the processing time if it is earlier then
315
// the original and either overrideSetTimeOnDups was set
316
// or update is true, meaning a higher priority scheduling
317
// directive for this URI.
318
curiExisting.putLong(
319                             A_TIME_OF_NEXT_PROCESSING,
320                             curiProcessingTime);
321                     update = true;
322                 }
323                 if(update){
324                     opStatus = strictAdd(curiExisting,true); //Override
325
} else {
326                     return;
327                 }
328             } else if(opStatus == OperationStatus.SUCCESS) {
329                 // Just inserted a brand new CrawlURI into the queue.
330
size++;
331             }
332     
333             // Finally, check if insert (fresh add or override) into DB was
334
// successful and if so check if we need to update nextReadyTime.
335
if(opStatus == OperationStatus.SUCCESS){
336                 if (curiProcessingTime < nextReadyTime){
337                     // Update nextReadyTime to reflect new value.
338
setNextReadyTime(curiProcessingTime);
339                 }
340                 if(state == HQSTATE_EMPTY){
341                     // Definately no longer empty.
342
state = HQSTATE_READY;
343                 }
344             } else {
345                 // Something went wrong. Throw an exception.
346
throw new DatabaseException("Error on add into database for " +
347                         "CrawlURI " + curi.toString() + ". " +
348                         opStatus.toString());
349             }
350         } catch (DatabaseException e) {
351             // Blanket catch all DBExceptions and convert to IOExceptions.
352
IOException JavaDoc e2 = new IOException JavaDoc(e.getMessage());
353             e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
354
throw e2;
355         }
356         reorder(); // May need a reorder.
357
}
358     
359     /**
360      * An internal method for adding URIs to the queue.
361      *
362      * @param curi The CrawlURI to add
363      * @param overrideDuplicates If true then any existing CrawlURI in the DB
364      * will be overwritten. If false insert into the
365      * queue is only performed if the key doesn't
366      * already exist.
367      * @return The OperationStatus object returned by the put method.
368      *
369      * @throws DatabaseException
370      */

371     protected OperationStatus strictAdd(CrawlURI curi,
372         boolean overrideDuplicates)
373     throws DatabaseException{
374         DatabaseEntry keyEntry = new DatabaseEntry();
375         DatabaseEntry dataEntry = new DatabaseEntry();
376         primaryKeyBinding.objectToEntry(curi.toString(), keyEntry);
377         crawlURIBinding.objectToEntry(curi, dataEntry);
378         OperationStatus opStatus = null;
379         if(overrideDuplicates){
380             opStatus = primaryUriDB.put(null,keyEntry,dataEntry);
381         } else {
382             opStatus = primaryUriDB.putNoOverwrite(null,keyEntry,dataEntry);
383         }
384         
385         return opStatus;
386     }
387     
388     /**
389      * Flush any CrawlURIs in the processingUriDB into the primaryUriDB. URIs
390      * flushed will have their 'time of next fetch' maintained and the
391      * nextReadyTime will be updated if needed.
392      * <p>
393      * No change is made to the list of available slots.
394      *
395      * @throws DatabaseException if one occurs while flushing
396      */

397     protected void flushProcessingURIs() throws DatabaseException {
398         Cursor processingCursor = processingUriDB.openCursor(null,null);
399         DatabaseEntry keyEntry = new DatabaseEntry();
400         DatabaseEntry dataEntry = new DatabaseEntry();
401         
402         while(true){
403             OperationStatus opStatus = processingCursor.getFirst(
404                     keyEntry, dataEntry, LockMode.DEFAULT);
405             
406             if(opStatus == OperationStatus.SUCCESS){
407                 // Got one!
408
CrawlURI curi =
409                     (CrawlURI) crawlURIBinding.entryToObject(dataEntry);
410                 // Delete it from processingUriDB
411
deleteInProcessing(curi.toString());
412                 // Add to processingUriDB;
413
strictAdd(curi,false); // Ignore any duplicates. Go with the
414
// ones already in the queue.
415
// Update nextReadyTime if needed.
416
long curiNextReadyTime = curi.getLong(
417                         A_TIME_OF_NEXT_PROCESSING);
418                 if(curiNextReadyTime<nextReadyTime){
419                     setNextReadyTime(curiNextReadyTime);
420                 }
421             } else {
422                 // No more entries in processingUriDB
423
processingCursor.close();
424                 return;
425             }
426         }
427     }
428     
429     /**
430      * Count all entries in both primaryUriDB and processingUriDB.
431      * <p>
432      * This method is needed since BDB does not provide a simple way of counting
433      * entries.
434      * <p>
435      * Note: This is an expensive operation, requires a loop through the entire
436      * queue!
437      * @return the number of distinct CrawlURIs in the HQ.
438      * @throws DatabaseException
439      */

440     protected long countCrawlURIs() throws DatabaseException{
441         // TODO: Instead of all this, the value should be simply read from the
442
// database.
443
long count = 0;
444         
445         DatabaseEntry keyEntry = new DatabaseEntry();
446         DatabaseEntry dataEntry = new DatabaseEntry();
447         
448         // Count URIs in the queue
449
Cursor primaryCursor = primaryUriDB.openCursor(null,null);
450         OperationStatus opStatus = primaryCursor.getFirst(keyEntry,
451                                                             dataEntry,
452                                                             LockMode.DEFAULT);
453         while(opStatus == OperationStatus.SUCCESS){
454             count++;
455             opStatus = primaryCursor.getNext(keyEntry,
456                                              dataEntry,
457                                              LockMode.DEFAULT);
458         }
459         
460         primaryCursor.close();
461
462         // Now count URIs in the processingUriDB
463
Cursor processingCursor = processingUriDB.openCursor(null,null);
464         opStatus = processingCursor.getFirst(keyEntry,
465                                              dataEntry,
466                                              LockMode.DEFAULT);
467         while(opStatus == OperationStatus.SUCCESS){
468             count++;
469             opStatus = processingCursor.getNext(keyEntry,
470                                                 dataEntry,
471                                                 LockMode.DEFAULT);
472         }
473         
474         processingCursor.close();
475         return count;
476     }
477     
478     /**
479      * Returns true if this HQ has a CrawlURI matching the uri string currently
480      * being processed. False otherwise.
481      *
482      * @param uri Uri to check
483      * @return true if this HQ has a CrawlURI matching the uri string currently
484      * being processed. False otherwise.
485      *
486      * @throws DatabaseException
487      */

488     protected boolean inProcessing(String JavaDoc uri) throws DatabaseException{
489         DatabaseEntry keyEntry = new DatabaseEntry();
490         DatabaseEntry dataEntry = new DatabaseEntry();
491                 
492         StringBinding.stringToEntry(uri,keyEntry);
493         
494         OperationStatus opStatus = processingUriDB.get(null,
495                                                        keyEntry,
496                                                        dataEntry,
497                                                        LockMode.DEFAULT);
498         
499         if (opStatus == OperationStatus.SUCCESS){
500             return true;
501         }
502         
503         return false; //Not found
504
}
505     
506     /**
507      * Removes a URI from the list of URIs belonging to this HQ and are
508      * currently being processed.
509      * <p>
510      * Returns true if successful, false if the URI was not found.
511      *
512      * @param uri The URI string of the CrawlURI to delete.
513      *
514      * @throws DatabaseException
515      * @throws IllegalStateException if the URI was not on the list
516      */

517     protected void deleteInProcessing(String JavaDoc uri) throws DatabaseException {
518         DatabaseEntry keyEntry = new DatabaseEntry();
519
520         StringBinding.stringToEntry(uri, keyEntry);
521
522         OperationStatus opStatus = processingUriDB.delete(null, keyEntry);
523
524         if (opStatus != OperationStatus.SUCCESS) {
525             if (opStatus == OperationStatus.NOTFOUND) {
526                 throw new IllegalStateException JavaDoc("Trying to deleta a "
527                         + "non-existant URI from the list of URIs being "
528                         + "processed. HQ: " + hostName + ", CrawlURI: " + uri);
529             }
530             throw new DatabaseException("Error occured deleting URI: " + uri
531                     + " from HQ " + hostName + " list "
532                     + "of URIs currently being processed. "
533                     + opStatus.toString());
534         }
535     }
536
537     /**
538      * Adds a CrawlURI to the list of CrawlURIs belonging to this HQ and are
539      * being processed at the moment.
540      *
541      * @param curi
542      * The CrawlURI to add to the list
543      * @throws DatabaseException
544      * @throws IllegalStateException
545      * if the CrawlURI is already in the list of URIs being
546      * processed.
547      */

548     protected void addInProcessing(CrawlURI curi) throws DatabaseException,
549             IllegalStateException JavaDoc {
550         DatabaseEntry keyEntry = new DatabaseEntry();
551         DatabaseEntry dataEntry = new DatabaseEntry();
552
553         StringBinding.stringToEntry(curi.toString(), keyEntry);
554         crawlURIBinding.objectToEntry(curi, dataEntry);
555
556         OperationStatus opStatus = processingUriDB.putNoOverwrite(null,
557                 keyEntry, dataEntry);
558
559         if (opStatus != OperationStatus.SUCCESS) {
560             if (opStatus == OperationStatus.KEYEXIST) {
561                 throw new IllegalStateException JavaDoc("Can not insert duplicate "
562                         + "URI into list of URIs being processed. " + "HQ: "
563                         + hostName + ", CrawlURI: " + curi.toString());
564             }
565             throw new DatabaseException("Error occured adding CrawlURI: "
566                     + curi.toString() + " to HQ " + hostName + " list "
567                     + "of URIs currently being processed. "
568                     + opStatus.toString());
569         }
570     }
571     
572     /**
573      * Returns the CrawlURI associated with the specified URI (string) or null
574      * if no such CrawlURI is queued in this HQ. If CrawlURI is being processed
575      * it is not considered to be <i>queued </i> and this method will return
576      * null for any such URIs.
577      *
578      * @param uri
579      * A string representing the URI
580      * @return the CrawlURI associated with the specified URI (string) or null
581      * if no such CrawlURI is queued in this HQ.
582      *
583      * @throws DatabaseException
584      * if a errors occurs reading the database
585      */

586     protected CrawlURI getCrawlURI(String JavaDoc uri) throws DatabaseException{
587         DatabaseEntry keyEntry = new DatabaseEntry();
588         DatabaseEntry dataEntry = new DatabaseEntry();
589         
590         primaryKeyBinding.objectToEntry(uri,keyEntry);
591         primaryUriDB.get(null,keyEntry,dataEntry,LockMode.DEFAULT);
592         
593         CrawlURI curi = (CrawlURI)crawlURIBinding.entryToObject(dataEntry);
594         
595         return curi;
596     }
597
598     /**
599      * Update CrawlURI that has completed processing.
600      *
601      * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's
602      * {@link #next() next()} method.
603      * @param needWait If true then the URI was processed successfully,
604      * requiring a period of suspended action on that host. If
605      * valence is > 1 then seperate times are maintained for
606      * each slot.
607      * @param wakeupTime If new state is
608      * {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed}
609      * then this parameter should contain the time (in
610      * milliseconds) when it will be safe to wake the HQ up
611      * again. Otherwise this parameter will be ignored.
612      *
613      * @throws IllegalStateException if the CrawlURI
614      * does not match a CrawlURI issued for crawling by this HQ's
615      * {@link AdaptiveRevisitHostQueue#next() next()}.
616      * @throws IOException if an error occurs accessing the database
617      */

618     public void update(CrawlURI curi,
619                        boolean needWait,
620                        long wakeupTime)
621             throws IllegalStateException JavaDoc, IOException JavaDoc{
622         update(curi,needWait,wakeupTime,false);
623     }
624     
625     
626     /**
627      * Update CrawlURI that has completed processing.
628      *
629      * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's
630      * {@link #next() next()} method.
631      * @param needWait If true then the URI was processed successfully,
632      * requiring a period of suspended action on that host. If
633      * valence is > 1 then seperate times are maintained for
634      * each slot.
635      * @param wakeupTime If new state is
636      * {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed}
637      * then this parameter should contain the time (in
638      * milliseconds) when it will be safe to wake the HQ up
639      * again. Otherwise this parameter will be ignored.
640      * @param forgetURI If true, the URI will be deleted from the queue.
641      *
642      * @throws IllegalStateException if the CrawlURI
643      * does not match a CrawlURI issued for crawling by this HQ's
644      * {@link AdaptiveRevisitHostQueue#next() next()}.
645      * @throws IOException if an error occurs accessing the database
646      */

647     public void update(CrawlURI curi,
648                        boolean needWait,
649                        long wakeupTime,
650                        boolean forgetURI)
651             throws IllegalStateException JavaDoc, IOException JavaDoc{
652         if (logger.isLoggable(Level.FINE)) {
653             logger.fine("Updating " + curi.toString());
654         }
655         try{
656             // First add it to the regular queue (if not forgetting it).
657
if (forgetURI == false){
658                 OperationStatus opStatus = strictAdd(curi,false);
659                 if(opStatus != OperationStatus.SUCCESS){
660                     if(opStatus == OperationStatus.KEYEXIST){
661                         throw new IllegalStateException JavaDoc("Trying to update a" +
662                             " CrawlURI failed because it was in the queue" +
663                             " of URIs waiting for processing. URIs currently" +
664                             " being processsed can never be in that queue." +
665                             " HQ: " + hostName + ", CrawlURI: " +
666                             curi.toString());
667                     }
668                 }
669
670                 // Check if we need to update nextReadyTime
671
long curiTimeOfNextProcessing = curi.getLong(
672                         A_TIME_OF_NEXT_PROCESSING);
673                 if(nextReadyTime > curiTimeOfNextProcessing){
674                     setNextReadyTime(curiTimeOfNextProcessing);
675                 }
676                 
677             } else {
678                 size--;
679             }
680             
681             // Then remove from list of in processing URIs
682
deleteInProcessing(curi.toString());
683             
684             inProcessing--;
685             
686             // Update the wakeUpTime slot.
687
if(needWait==false){
688                 // Ok, no wait then. Set wake up time to 0.
689
wakeupTime = 0;
690             }
691
692             updateWakeUpTimeSlot(wakeupTime);
693         } catch (DatabaseException e) {
694             // Blanket catch all DBExceptions and convert to IOExceptions.
695
IOException JavaDoc e2 = new IOException JavaDoc(e.getMessage());
696             e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
697
throw e2;
698         }
699     }
700
701     /**
702      * Returns the 'top' URI in the AdaptiveRevisitHostQueue.
703      * <p>
704      * HQ state will be set to {@link AdaptiveRevisitHostQueue#HQSTATE_BUSY busy} if this
705      * method returns normally.
706  
707      *
708      * @return a CrawlURI ready for processing
709      *
710      * @throws IllegalStateException if the HostQueues current state is not
711      * ready {@link AdaptiveRevisitHostQueue#HQSTATE_READY ready}
712      * @throws IOException if an error occurs reading from the database
713      */

714     public CrawlURI next() throws IllegalStateException JavaDoc, IOException JavaDoc{
715         try{
716             // Ok, lets issue a URI, first check state and reserve slot.
717
if(getState()!=HQSTATE_READY || useWakeUpTimeSlot()==false){
718                 throw new IllegalStateException JavaDoc("Can not issue next URI when " +
719                         "HQ " + hostName + " state is " + getStateByName());
720             }
721     
722             DatabaseEntry keyEntry = new DatabaseEntry();
723             
724             // Get the top URI
725
CrawlURI curi = peek();
726             
727             // Add it to processingUriDB
728
addInProcessing(curi);
729             
730             // Delete it from the primaryUriDB
731
primaryKeyBinding.objectToEntry(curi.toString(),keyEntry);
732             OperationStatus opStatus = primaryUriDB.delete(null,keyEntry);
733             
734             if(opStatus != OperationStatus.SUCCESS){
735                 throw new DatabaseException("Error occured removing URI: " +
736                         curi.toString() + " from HQ " + hostName +
737                         " priority queue for processing. " + opStatus.toString());
738             }
739             
740             // Finally update nextReadyTime with new top if one exists.
741
CrawlURI top = peek();
742             long nextReady = Long.MAX_VALUE;
743             if(top != null){
744                 nextReady = top.getLong(
745                         A_TIME_OF_NEXT_PROCESSING);
746             }
747             inProcessing++;
748             setNextReadyTime(nextReady);
749             logger.fine("Issuing " + curi.toString());
750             return curi;
751         } catch (DatabaseException e) {
752             // Blanket catch all DBExceptions and convert to IOExceptions.
753
IOException JavaDoc e2 = new IOException JavaDoc(e.getMessage());
754             e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
755
throw e2;
756         }
757     }
758     
759     /**
760      * Returns the URI with the earliest time of next processing. I.e. the URI
761      * at the head of this host based priority queue.
762      * <p>
763      * Note: This method will return the head CrawlURI regardless of wether it
764      * is safe to start processing it or not. CrawlURI will remain in the queue.
765      * The returned CrawlURI should only be used for queue inspection, it can
766      * <i>not</i> be updated and returned to the queue. To get URIs ready for
767      * processing use {@link #next() next()}.
768      *
769      * @return the URI with the earliest time of next processing or null if
770      * the queue is empty or all URIs are currently being processed.
771      * @throws IllegalStateException
772      *
773      * @throws IOException if an error occurs reading from the database
774      */

775     public CrawlURI peek() throws IllegalStateException JavaDoc, IOException JavaDoc{
776         try{
777             
778             DatabaseEntry keyEntry = new DatabaseEntry();
779             DatabaseEntry dataEntry = new DatabaseEntry();
780             
781             CrawlURI curi = null;
782             Cursor secondaryCursor = secondaryUriDB.openCursor(null,null);
783             
784             OperationStatus opStatus =
785                 secondaryCursor.getFirst(keyEntry, dataEntry, LockMode.DEFAULT);
786             
787             if( opStatus == OperationStatus.SUCCESS){
788                 curi = (CrawlURI)crawlURIBinding.entryToObject(dataEntry);
789             } else {
790                 if( opStatus == OperationStatus.NOTFOUND ){
791                    curi = null;
792                 } else {
793                     throw new IOException JavaDoc("Error occured in " +
794                         "AdaptiveRevisitHostQueue.peek()." + opStatus.toString());
795                 }
796             }
797             secondaryCursor.close();
798             return curi;
799         } catch (DatabaseException e) {
800             // Blanket catch all DBExceptions and convert to IOExceptions.
801
IOException JavaDoc e2 = new IOException JavaDoc(e.getMessage());
802             e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
803
throw e2;
804         }
805     }
806     
807     
808     
809     /**
810      * Returns the current state of the HQ.
811      *
812      * @return the current state of the HQ.
813      *
814      * @see AdaptiveRevisitHostQueue#HQSTATE_BUSY
815      * @see AdaptiveRevisitHostQueue#HQSTATE_EMPTY
816      * @see AdaptiveRevisitHostQueue#HQSTATE_READY
817      * @see AdaptiveRevisitHostQueue#HQSTATE_SNOOZED
818      */

819     public int getState(){
820         if(state != HQSTATE_EMPTY){
821             // Need to confirm state
822
if(isBusy()){
823                 state = HQSTATE_BUSY;
824             } else {
825                 long currentTime = System.currentTimeMillis();
826                 long wakeTime = getEarliestWakeUpTimeSlot();
827                 
828                 if(wakeTime > currentTime || nextReadyTime > currentTime){
829                     state = HQSTATE_SNOOZED;
830                 } else {
831                     state = HQSTATE_READY;
832                 }
833             }
834         }
835         return state;
836     }
837     
838     /**
839      * Returns the time when the HQ will next be ready to issue a URI.
840      * <p>
841      * If the queue is in a {@link #HQSTATE_SNOOZED snoozed} state then this
842      * time will be in the future and reflects either the time when the HQ will
843      * again be able to issue URIs for processing because politness constraints
844      * have ended, or when a URI next becomes available for visit, whichever is
845      * larger.
846      * <p>
847      * If the queue is in a {@link #HQSTATE_READY ready} state this time will
848      * be in the past and reflect the earliest time when the HQ had a URI ready
849      * for processing, taking time spent snoozed for politness concerns into
850      * account.
851      * <p>
852      * If the HQ is in any other state then the return value of this method is
853      * equal to Long.MAX_VALUE.
854      * <p>
855      * This value may change each time a URI is added, issued or updated.
856      *
857      * @return the time when the HQ will next be ready to issue a URI
858      */

859     public long getNextReadyTime(){
860         if(getState()==HQSTATE_BUSY || getState()==HQSTATE_EMPTY){
861             // Have no idea when HQ next be able issue a URI
862
return Long.MAX_VALUE;
863         }
864         long wakeTime = getEarliestWakeUpTimeSlot();
865         return nextReadyTime > wakeTime ? nextReadyTime : wakeTime;
866     }
867     
868     /**
869      * Updates nextReadyTime (if smaller) with the supplied value
870      * @param newTime the new value of nextReady Time;
871      */

872     protected void setNextReadyTime(long newTime){
873         if(logger.isLoggable(Level.FINEST)){
874             logger.finest("Setting next ready to new value " + newTime +
875                     " from " + getNextReadyTime());
876         }
877         nextReadyTime=newTime;
878         reorder();
879     }
880     
881     /**
882      * Method is called whenever something has been done that might have
883      * changed the value of the 'published' time of next ready. If an owner
884      * has been specified it will be notified that the value may have changed..
885      */

886     protected void reorder(){
887         if(owner != null){
888             owner.reorder(this);
889         }
890     }
891
892
893     /**
894      * Same as {@link #getState() getState()} except this method returns a
895      * human readable name for the state instead of its constant integer value.
896      * <p>
897      * Should only be used for reports, error messages and other strings
898      * intended for human eyes.
899      *
900      * @return the human readable name of the current state
901      */

902     public String JavaDoc getStateByName() {
903         switch(getState()){
904             case HQSTATE_BUSY : return "busy";
905             case HQSTATE_EMPTY : return "empty";
906             case HQSTATE_READY : return "ready";
907             case HQSTATE_SNOOZED : return "snoozed";
908         }
909         // This should be impossible unless new states are added without
910
// updating this method.
911
return "undefined";
912     }
913     
914     /**
915      * Returns the size of the HQ. That is, the number of URIs queued,
916      * including any that are currently being processed.
917      *
918      * @return the size of the HQ.
919      */

920     public long getSize(){
921         return size;
922     }
923     
924     /**
925      * Set the AdaptiveRevisitQueueList object that contains this HQ. Will cause
926      * that
927      * object to be notified (via
928      * {@link AdaptiveRevisitQueueList#reorder(AdaptiveRevisitHostQueue)
929      * reorder()} when the
930      * value used for sorting the list of HQs changes.
931      * @param owner the ARHostQueueList object that contains this HQ.
932      */

933     public void setOwner(AdaptiveRevisitQueueList owner) {
934         this.owner = owner;
935     }
936
937     /**
938      * Cleanup all open Berkeley Database objects.
939      * <p>
940      * Does <I>not</I> close the Environment.
941      *
942      * @throws IOException if an error occurs closing a database object
943      */

944     public void close() throws IOException JavaDoc{
945         try{
946             secondaryUriDB.close();
947             processingUriDB.close();
948             primaryUriDB.close();
949         } catch (DatabaseException e) {
950             // Blanket catch all DBExceptions and convert to IOExceptions.
951
IOException JavaDoc e2 = new IOException JavaDoc(e.getMessage());
952             e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
953
throw e2;
954         }
955     }
956     
957     
958     /**
959      * If true then the HQ has no available slot for issuing URIs.
960      * <p>
961      * I.e. number of in processing URIs = valence.
962      *
963      * @return true if number of in processing URIs = valence
964      */

965     private boolean isBusy(){
966         return inProcessing == valence;
967     }
968     
969     /**
970      * Overwrites a used (-1) value in wakeUpTime[] with the supplied value.
971      * @param newVal
972      */

973     private void updateWakeUpTimeSlot(long newVal){
974         for(int i=0 ; i < valence ; i++){
975             if(wakeUpTime[i]==-1){
976                 wakeUpTime[i]=newVal;
977             }
978         }
979         reorder();
980     }
981     
982     /**
983      * A new URI is being issued. Set the wakeup time on an unused slot to -1.
984      *
985      * @return true if a slot was successfully reserved. False otherwise.
986      */

987     private boolean useWakeUpTimeSlot(){
988         for(int i=0 ; i < valence ; i++){
989             if(wakeUpTime[i]>-1 && wakeUpTime[i]<=System.currentTimeMillis()){
990                 wakeUpTime[i]=-1;
991                 return true;
992             }
993         }
994         reorder();
995         return false;
996     }
997     
998     /**
999      * Returns the earliest time when a wake up slot will become available. If
1000     * one is already available then this time will be in the past.
1001     * <p>
1002     * If all slots are taken with URIs currently being processed (i.e. HQ state
1003     * is {@link #HQSTATE_BUSY busy} then this will return Long.MAX_VALUE;
1004     * @return the earliest time when a wake up slot will become available
1005     */

1006    private long getEarliestWakeUpTimeSlot(){
1007        long earliest = Long.MAX_VALUE;
1008        for(int i=0 ; i < valence ; i++){
1009            if(wakeUpTime[i]>-1 && wakeUpTime[i]<earliest){
1010                earliest = wakeUpTime[i];
1011            }
1012        }
1013        return earliest;
1014    }
1015    
1016    /**
1017     * Returns a report detailing the status of this HQ.
1018     * @param max Maximum number of URIs to show. 0 equals no limit.
1019     * @return a report detailing the status of this HQ.
1020     */

1021    public String JavaDoc report(int max){
1022        try{
1023            StringBuffer JavaDoc ret = new StringBuffer JavaDoc(256);
1024            ret.append("AdaptiveRevisitHostQueue: " + hostName + "\n");
1025            ret.append("Size: " + size + "\n");
1026            ret.append("State: " + getStateByName() + "\n");
1027            if(getState()==HQSTATE_BUSY){
1028                ret.append("Processing URIs: \n");
1029                Cursor processingCursor = processingUriDB.openCursor(null,null);
1030                reportURIs(ret, processingCursor, valence);
1031                processingCursor.close();
1032            } else {
1033                ret.append("Next ready: " +
1034                        ArchiveUtils.formatMillisecondsToConventional(
1035                            getNextReadyTime() - System.currentTimeMillis()) +
1036                            "\n");
1037            }
1038            ret.append("Top URIs: \n");
1039            
1040            Cursor secondaryCursor = secondaryUriDB.openCursor(null,null);
1041            reportURIs(ret,secondaryCursor,max);
1042            secondaryCursor.close();
1043            return ret.toString();
1044        } catch( DatabaseException e ){
1045            return "Exception occured compiling report:\n" + e.getMessage();
1046        }
1047    }
1048    
1049    /**
1050     * Adds a report of the first <code>max</code> URIs that the cursor points
1051     * to to the stringbuffer object.
1052     *
1053     * @param ret The stringbuffer to append to
1054     * @param cursor The cursor pointing at a URI database
1055     * @param max Maximum number of URIs to report on. If fewer URIs are in the
1056     * database, all URIs are shown
1057     * @throws DatabaseException if an error occurs
1058     */

1059    private void reportURIs(StringBuffer JavaDoc ret, Cursor cursor, int max)
1060            throws DatabaseException{
1061        DatabaseEntry keyEntry = new DatabaseEntry();
1062        DatabaseEntry dataEntry = new DatabaseEntry();
1063        OperationStatus opStatus =
1064            cursor.getFirst(keyEntry,dataEntry,LockMode.DEFAULT);
1065        if(max == 0){
1066            // No limit on the number of values returned.
1067
max = Integer.MAX_VALUE;
1068        }
1069        int i = 0;
1070        while(i<max && opStatus == OperationStatus.SUCCESS){
1071            CrawlURI tmp = (CrawlURI)crawlURIBinding.entryToObject(dataEntry);
1072            ret.append(" URI: " + tmp.toString() + "\n");
1073            switch(tmp.getSchedulingDirective()){
1074                case CandidateURI.HIGHEST :
1075                    ret.append(" Sched. directive: HIGHEST\n"); break;
1076                case CandidateURI.HIGH :
1077                    ret.append(" Sched. directive: HIGH\n"); break;
1078                case CandidateURI.MEDIUM :
1079                    ret.append(" Sched. directive: MEDIUM\n"); break;
1080                case CandidateURI.NORMAL :
1081                    ret.append(" Sched. directive: NORMAL\n"); break;
1082            }
1083            ret.append(" Next processing: ");
1084            long nextProcessing =
1085                tmp.getLong(A_TIME_OF_NEXT_PROCESSING) -
1086                System.currentTimeMillis();
1087            if(nextProcessing < 0){
1088                ret.append("Overdue ");
1089                nextProcessing = nextProcessing*-1;
1090            }
1091            ret.append(ArchiveUtils.formatMillisecondsToConventional(
1092                    nextProcessing) + "\n");
1093            if(tmp.getFetchStatus()!=0){
1094                ret.append(" Last fetch status: " +
1095                        tmp.getFetchStatus() + "\n");
1096            }
1097            if(tmp.containsKey(A_WAIT_INTERVAL)){
1098                ret.append(" Wait interval: " +
1099                        ArchiveUtils.formatMillisecondsToConventional(
1100                                tmp.getLong(A_WAIT_INTERVAL)) + "\n");
1101            }
1102            if(tmp.containsKey(A_NUMBER_OF_VISITS)){
1103                ret.append(" Visits: " + tmp.getInt(
1104                        A_NUMBER_OF_VISITS) + "\n");
1105            }
1106            if(tmp.containsKey(A_NUMBER_OF_VERSIONS)){
1107                ret.append(" Versions: " + tmp.getInt(
1108                        A_NUMBER_OF_VERSIONS) + "\n");
1109            }
1110            
1111            opStatus = cursor.getNext(keyEntry,dataEntry,LockMode.DEFAULT);
1112            i++;
1113        }
1114    }
1115    
1116    /**
1117     * Creates the secondary key for the secondary index.
1118     * <p>
1119     * The secondary index is the scheduling directive (first sorting) and
1120     * the time of next processing (sorted from earlies to latest within each
1121     * scheduling directive). If the scheduling directive is missing or
1122     * unknown NORMAL will be assumed.
1123     */

1124    private static class OrderOfProcessingKeyCreator
1125            extends TupleSerialKeyCreator {
1126
1127        /**
1128         * Constructor. Invokes parent constructor.
1129         *
1130         * @param classCatalog is the catalog to hold shared class information
1131         * and for a database should be a
1132         * StoredClassCatalog.
1133         * @param dataClass is the CrawlURI class.
1134         */

1135        public OrderOfProcessingKeyCreator(ClassCatalog classCatalog,
1136                Class JavaDoc dataClass) {
1137            super(classCatalog, dataClass);
1138        }
1139
1140        /* (non-Javadoc)
1141         * @see com.sleepycat.bind.serial.TupleSerialKeyCreator#createSecondaryKey(com.sleepycat.bind.tuple.TupleInput, java.lang.Object, com.sleepycat.bind.tuple.TupleOutput)
1142         */

1143        public boolean createSecondaryKey(TupleInput primaryKeyInput,
1144                                          Object JavaDoc dataInput,
1145                                          TupleOutput indexKeyOutput) {
1146            CrawlURI curi = (CrawlURI)dataInput;
1147            int directive = curi.getSchedulingDirective();
1148            // Can not rely on the default directive constants having a good
1149
// sort order
1150
switch (directive) {
1151            case CandidateURI.HIGHEST:
1152                directive = 0;
1153                break;
1154            case CandidateURI.HIGH:
1155                directive = 1;
1156                break;
1157            case CandidateURI.MEDIUM:
1158                directive = 2;
1159                break;
1160            case CandidateURI.NORMAL:
1161                directive = 3;
1162                break;
1163            default:
1164                directive = 3; // If directive missing or unknown
1165
}
1166            
1167            indexKeyOutput.writeInt(directive);
1168            long timeOfNextProcessing =
1169                curi.getLong(A_TIME_OF_NEXT_PROCESSING);
1170            
1171            indexKeyOutput.writeLong(timeOfNextProcessing);
1172            return true;
1173        }
1174    }
1175
1176    /* (non-Javadoc)
1177     * @see org.archive.crawler.datamodel.CrawlSubstats.HasCrawlSubstats#getSubstats()
1178     */

1179    public CrawlSubstats getSubstats() {
1180        return substats;
1181    }
1182
1183}
1184
Popular Tags