KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > archive > crawler > frontier > BdbMultipleWorkQueues


1 /* BdbMultipleWorkQueues
2  *
3  * Created on Dec 24, 2004
4  *
5  * Copyright (C) 2004 Internet Archive.
6  *
7  * This file is part of the Heritrix web crawler (crawler.archive.org).
8  *
9  * Heritrix is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU Lesser Public License as published by
11  * the Free Software Foundation; either version 2.1 of the License, or
12  * any later version.
13  *
14  * Heritrix is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Lesser Public License for more details.
18  *
19  * You should have received a copy of the GNU Lesser Public License
20  * along with Heritrix; if not, write to the Free Software
21  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22  */

23 package org.archive.crawler.frontier;
24
25 import java.io.UnsupportedEncodingException JavaDoc;
26 import java.math.BigInteger JavaDoc;
27 import java.util.ArrayList JavaDoc;
28 import java.util.List JavaDoc;
29 import java.util.logging.Level JavaDoc;
30 import java.util.logging.Logger JavaDoc;
31 import java.util.regex.Pattern JavaDoc;
32
33 import org.archive.crawler.datamodel.CrawlURI;
34 import org.archive.crawler.framework.FrontierMarker;
35 import org.archive.util.ArchiveUtils;
36
37 import com.sleepycat.bind.serial.StoredClassCatalog;
38 import com.sleepycat.je.Cursor;
39 import com.sleepycat.je.Database;
40 import com.sleepycat.je.DatabaseConfig;
41 import com.sleepycat.je.DatabaseEntry;
42 import com.sleepycat.je.DatabaseException;
43 import com.sleepycat.je.DatabaseNotFoundException;
44 import com.sleepycat.je.Environment;
45 import com.sleepycat.je.OperationStatus;
46 import com.sleepycat.util.RuntimeExceptionWrapper;
47
48
49 /**
50  * A BerkeleyDB-database-backed structure for holding ordered
51  * groupings of CrawlURIs. Reading the groupings from specific
52  * per-grouping (per-classKey/per-Host) starting points allows
53  * this to act as a collection of independent queues.
54  *
55  * <p>For how the bdb keys are made, see {@link #calculateInsertKey(CrawlURI)}.
56  *
57  * <p>TODO: refactor, improve naming.
58  *
59  * @author gojomo
60  */

61 public class BdbMultipleWorkQueues {
62     private static final long serialVersionUID = ArchiveUtils
63         .classnameBasedUID(BdbMultipleWorkQueues.class, 1);
64     
65     private static final Logger JavaDoc LOGGER =
66         Logger.getLogger(BdbMultipleWorkQueues.class.getName());
67     
68     /** Database holding all pending URIs, grouped in virtual queues */
69     private Database pendingUrisDB = null;
70     
71     /** Supporting bdb serialization of CrawlURIs */
72     private RecyclingSerialBinding crawlUriBinding;
73
74     /**
75      * Create the multi queue in the given environment.
76      *
77      * @param env bdb environment to use
78      * @param classCatalog Class catalog to use.
79      * @param recycle True if we are to reuse db content if any.
80      * @throws DatabaseException
81      */

82     public BdbMultipleWorkQueues(Environment env,
83         StoredClassCatalog classCatalog, final boolean recycle)
84     throws DatabaseException {
85         // Open the database. Create it if it does not already exist.
86
DatabaseConfig dbConfig = new DatabaseConfig();
87         dbConfig.setAllowCreate(true);
88         if (!recycle) {
89             try {
90                 env.truncateDatabase(null, "pending", false);
91             } catch (DatabaseNotFoundException e) {
92                 // Ignored
93
}
94         }
95         // Make database deferred write: URLs that are added then removed
96
// before a page-out is required need never cause disk IO.
97
dbConfig.setDeferredWrite(true);
98
99         this.pendingUrisDB = env.openDatabase(null, "pending", dbConfig);
100         crawlUriBinding =
101             new RecyclingSerialBinding(classCatalog, CrawlURI.class);
102     }
103
104     /**
105      * Delete all CrawlURIs matching the given expression.
106      *
107      * @param match
108      * @param queue
109      * @param headKey
110      * @return count of deleted items
111      * @throws DatabaseException
112      * @throws DatabaseException
113      */

114     public long deleteMatchingFromQueue(String JavaDoc match, String JavaDoc queue,
115             DatabaseEntry headKey) throws DatabaseException {
116         long deletedCount = 0;
117         Pattern JavaDoc pattern = Pattern.compile(match);
118         DatabaseEntry key = headKey;
119         DatabaseEntry value = new DatabaseEntry();
120         Cursor cursor = null;
121         try {
122             cursor = pendingUrisDB.openCursor(null, null);
123             OperationStatus result = cursor.getSearchKeyRange(headKey,
124                     value, null);
125
126             while (result == OperationStatus.SUCCESS) {
127                 if(value.getData().length>0) {
128                     CrawlURI curi = (CrawlURI) crawlUriBinding
129                             .entryToObject(value);
130                     if (!curi.getClassKey().equals(queue)) {
131                         // rolled into next queue; finished with this queue
132
break;
133                     }
134                     if (pattern.matcher(curi.toString()).matches()) {
135                         cursor.delete();
136                         deletedCount++;
137                     }
138                 }
139                 result = cursor.getNext(key, value, null);
140             }
141         } finally {
142             if (cursor != null) {
143                 cursor.close();
144             }
145         }
146
147         return deletedCount;
148     }
149     
150     /**
151      * @param m marker
152      * @param maxMatches
153      * @return list of matches starting from marker position
154      * @throws DatabaseException
155      */

156     public List JavaDoc getFrom(FrontierMarker m, int maxMatches) throws DatabaseException {
157         int matches = 0;
158         int tries = 0;
159         ArrayList JavaDoc<CrawlURI> results = new ArrayList JavaDoc<CrawlURI>(maxMatches);
160         BdbFrontierMarker marker = (BdbFrontierMarker) m;
161         
162         DatabaseEntry key = marker.getStartKey();
163         DatabaseEntry value = new DatabaseEntry();
164         
165         if (key != null) {
166             Cursor cursor = null;
167             OperationStatus result = null;
168             try {
169                 cursor = pendingUrisDB.openCursor(null,null);
170                 result = cursor.getSearchKey(key, value, null);
171                 
172                 while(matches<maxMatches && result == OperationStatus.SUCCESS) {
173                     if(value.getData().length>0) {
174                         CrawlURI curi = (CrawlURI) crawlUriBinding.entryToObject(value);
175                         if(marker.accepts(curi)) {
176                             results.add(curi);
177                             matches++;
178                         }
179                         tries++;
180                     }
181                     result = cursor.getNext(key,value,null);
182                 }
183             } finally {
184                 if (cursor !=null) {
185                     cursor.close();
186                 }
187             }
188             
189             if(result != OperationStatus.SUCCESS) {
190                 // end of scan
191
marker.setStartKey(null);
192             }
193         }
194         return results;
195     }
196     
197     /**
198      * Get a marker for beginning a scan over all contents
199      *
200      * @param regexpr
201      * @return a marker pointing to the first item
202      */

203     public FrontierMarker getInitialMarker(String JavaDoc regexpr) {
204         try {
205             return new BdbFrontierMarker(getFirstKey(), regexpr);
206         } catch (DatabaseException e) {
207             e.printStackTrace();
208             return null;
209         }
210     }
211     
212     /**
213      * @return the key to the first item in the database
214      * @throws DatabaseException
215      */

216     protected DatabaseEntry getFirstKey() throws DatabaseException {
217         DatabaseEntry key = new DatabaseEntry();
218         DatabaseEntry value = new DatabaseEntry();
219         Cursor cursor = pendingUrisDB.openCursor(null,null);
220         OperationStatus status = cursor.getNext(key,value,null);
221         cursor.close();
222         if(status == OperationStatus.SUCCESS) {
223             return key;
224         }
225         return null;
226     }
227     
228     /**
229      * Get the next nearest item after the given key. Relies on
230      * external discipline -- we'll look at the queues count of how many
231      * items it has -- to avoid asking for something from a
232      * range where there are no associated items --
233      * otherwise could get first item of next 'queue' by mistake.
234      *
235      * <p>TODO: hold within a queue's range
236      *
237      * @param headKey Key prefix that demarks the beginning of the range
238      * in <code>pendingUrisDB</code> we're interested in.
239      * @return CrawlURI.
240      * @throws DatabaseException
241      */

242     public CrawlURI get(DatabaseEntry headKey)
243     throws DatabaseException {
244         DatabaseEntry result = new DatabaseEntry();
245         
246         // From Linda Lee of sleepycat:
247
// "You want to check the status returned from Cursor.getSearchKeyRange
248
// to make sure that you have OperationStatus.SUCCESS. In that case,
249
// you have found a valid data record, and result.getData()
250
// (called by internally by the binding code, in this case) will be
251
// non-null. The other possible status return is
252
// OperationStatus.NOTFOUND, in which case no data record matched
253
// the criteria. "
254
OperationStatus status = getNextNearestItem(headKey, result);
255         CrawlURI retVal = null;
256         if (status != OperationStatus.SUCCESS) {
257             LOGGER.severe("See '1219854 NPE je-2.0 "
258                     + "entryToObject...'. OperationStatus "
259                     + " was not SUCCESS: "
260                     + status
261                     + ", headKey "
262                     + BdbWorkQueue.getPrefixClassKey(headKey.getData()));
263             return null;
264         }
265         try {
266             retVal = (CrawlURI)crawlUriBinding.entryToObject(result);
267         } catch (RuntimeExceptionWrapper rw) {
268             LOGGER.log(
269                 Level.SEVERE,
270                 "expected object missing in queue " +
271                 BdbWorkQueue.getPrefixClassKey(headKey.getData()),
272                 rw);
273             return null;
274         }
275         retVal.setHolderKey(headKey);
276         return retVal;
277     }
278     
279     protected OperationStatus getNextNearestItem(DatabaseEntry headKey,
280             DatabaseEntry result) throws DatabaseException {
281         Cursor cursor = null;
282         OperationStatus status;
283         try {
284             cursor = this.pendingUrisDB.openCursor(null, null);
285             // get cap; headKey at this point should always point to
286
// a queue-beginning cap entry (zero-length value)
287
status = cursor.getSearchKey(headKey, result, null);
288             if(status!=OperationStatus.SUCCESS || result.getData().length > 0) {
289                 // cap missing
290
throw new DatabaseException("bdb queue cap missing");
291             }
292             // get next item (real first item of queue)
293
status = cursor.getNext(headKey,result,null);
294         } finally {
295             if(cursor!=null) {
296                 cursor.close();
297             }
298         }
299         return status;
300     }
301     
302     /**
303      * Put the given CrawlURI in at the appropriate place.
304      *
305      * @param curi
306      * @throws DatabaseException
307      */

308     public void put(CrawlURI curi) throws DatabaseException {
309         DatabaseEntry insertKey = (DatabaseEntry)curi.getHolderKey();
310         if (insertKey == null) {
311             insertKey = calculateInsertKey(curi);
312             curi.setHolderKey(insertKey);
313         }
314         DatabaseEntry value = new DatabaseEntry();
315         crawlUriBinding.objectToEntry(curi, value);
316         // Output tally on avg. size if level is FINE or greater.
317
if (LOGGER.isLoggable(Level.FINE)) {
318             tallyAverageEntrySize(curi, value);
319         }
320         pendingUrisDB.put(null, insertKey, value);
321     }
322     
323     private long entryCount = 0;
324     private long entrySizeSum = 0;
325     private int largestEntry = 0;
326     
327     /**
328      * Log average size of database entry.
329      * @param curi CrawlURI this entry is for.
330      * @param value Database entry value.
331      */

332     private synchronized void tallyAverageEntrySize(CrawlURI curi,
333             DatabaseEntry value) {
334         entryCount++;
335         int length = value.getData().length;
336         entrySizeSum += length;
337         int avg = (int) (entrySizeSum/entryCount);
338         if(entryCount % 1000 == 0) {
339             LOGGER.fine("Average entry size at "+entryCount+": "+avg);
340         }
341         if (length>largestEntry) {
342             largestEntry = length;
343             LOGGER.fine("Largest entry: "+length+" "+curi);
344             if(length>(2*avg)) {
345                 LOGGER.fine("excessive?");
346             }
347         }
348     }
349
350     /**
351      * Calculate the 'origin' key for a virtual queue of items
352      * with the given classKey. This origin key will be a
353      * prefix of the keys for all items in the queue.
354      *
355      * @param classKey String key to derive origin byte key from
356      * @return a byte array key
357      */

358     static byte[] calculateOriginKey(String JavaDoc classKey) {
359         byte[] classKeyBytes = null;
360         int len = 0;
361         try {
362             classKeyBytes = classKey.getBytes("UTF-8");
363             len = classKeyBytes.length;
364         } catch (UnsupportedEncodingException JavaDoc e) {
365             // should be impossible; all JVMs must support UTF-8
366
e.printStackTrace();
367         }
368         byte[] keyData = new byte[len+1];
369         System.arraycopy(classKeyBytes,0,keyData,0,len);
370         keyData[len]=0;
371         return keyData;
372     }
373     
374     /**
375      * Calculate the insertKey that places a CrawlURI in the
376      * desired spot. First bytes are always classKey (usu. host)
377      * based -- ensuring grouping by host -- terminated by a zero
378      * byte. Then 8 bytes of data ensuring desired ordering
379      * within that 'queue' are used. The first byte of these 8 is
380      * priority -- allowing 'immediate' and 'soon' items to
381      * sort above regular. Next 1 byte is 'cost'. Last 6 bytes
382      * are ordinal serial number, ensuring earlier-discovered
383      * URIs sort before later.
384      *
385      * NOTE: Dangers here are:
386      * (1) priorities or costs over 2^7 (signed byte comparison)
387      * (2) ordinals over 2^48
388      *
389      * Package access & static for testing purposes.
390      *
391      * @param curi
392      * @return a DatabaseEntry key for the CrawlURI
393      */

394     static DatabaseEntry calculateInsertKey(CrawlURI curi) {
395         byte[] classKeyBytes = null;
396         int len = 0;
397         try {
398             classKeyBytes = curi.getClassKey().getBytes("UTF-8");
399             len = classKeyBytes.length;
400         } catch (UnsupportedEncodingException JavaDoc e) {
401             // should be impossible; all JVMs must support UTF-8
402
e.printStackTrace();
403         }
404         byte[] keyData = new byte[len+9];
405         System.arraycopy(classKeyBytes,0,keyData,0,len);
406         keyData[len]=0;
407         long ordinalPlus = curi.getOrdinal() & 0x0000FFFFFFFFFFFFL;
408         ordinalPlus =
409             ((long)curi.getSchedulingDirective() << 56) | ordinalPlus;
410         ordinalPlus =
411             ((((long)curi.getHolderCost()) & 0xFFL) << 48) | ordinalPlus;
412         ArchiveUtils.longIntoByteArray(ordinalPlus, keyData, len+1);
413         return new DatabaseEntry(keyData);
414     }
415     
416     /**
417      * Delete the given CrawlURI from persistent store. Requires
418      * the key under which it was stored be available.
419      *
420      * @param item
421      * @throws DatabaseException
422      */

423     public void delete(CrawlURI item) throws DatabaseException {
424         OperationStatus status;
425         status = pendingUrisDB.delete(null, (DatabaseEntry) item.getHolderKey());
426         if (status != OperationStatus.SUCCESS) {
427             LOGGER.severe("expected item not present: "
428                     + item
429                     + "("
430                     + (new BigInteger JavaDoc(((DatabaseEntry) item.getHolderKey())
431                             .getData())).toString(16) + ")");
432         }
433
434     }
435     
436     /**
437      * Method used by BdbFrontier during checkpointing.
438      * <p>The backing bdbje database has been marked deferred write so we save
439      * on writes to disk. Means no guarantees disk will have whats in memory
440      * unless a sync is called (Calling sync on the bdbje Environment is not
441      * sufficent).
442      * <p>Package access only because only Frontiers of this package would ever
443      * need access.
444      * @see <a HREF="http://www.sleepycat.com/jedocs/GettingStartedGuide/DB.html">Deferred Write Databases</a>
445      */

446     void sync() {
447         if (this.pendingUrisDB == null) {
448             return;
449         }
450         try {
451             this.pendingUrisDB.sync();
452         } catch (DatabaseException e) {
453             e.printStackTrace();
454         }
455     }
456     
457     /**
458      * clean up
459      *
460      */

461     public void close() {
462         try {
463             this.pendingUrisDB.close();
464         } catch (DatabaseException e) {
465             e.printStackTrace();
466         }
467     }
468     
469     /**
470      * Marker for remembering a position within the BdbMultipleWorkQueues.
471      *
472      * @author gojomo
473      */

474     public class BdbFrontierMarker implements FrontierMarker {
475         DatabaseEntry startKey;
476         Pattern JavaDoc pattern;
477         int nextItemNumber;
478         
479         /**
480          * Create a marker pointed at the given start location.
481          *
482          * @param startKey
483          * @param regexpr
484          */

485         public BdbFrontierMarker(DatabaseEntry startKey, String JavaDoc regexpr) {
486             this.startKey = startKey;
487             pattern = Pattern.compile(regexpr);
488             nextItemNumber = 1;
489         }
490         
491         /**
492          * @param curi
493          * @return whether the marker accepts the given CrawlURI
494          */

495         public boolean accepts(CrawlURI curi) {
496             boolean retVal = pattern.matcher(curi.toString()).matches();
497             if(retVal==true) {
498                 nextItemNumber++;
499             }
500             return retVal;
501         }
502         
503         /**
504          * @param key position for marker
505          */

506         public void setStartKey(DatabaseEntry key) {
507             startKey = key;
508         }
509         
510         /**
511          * @return startKey
512          */

513         public DatabaseEntry getStartKey() {
514             return startKey;
515         }
516         
517         /* (non-Javadoc)
518          * @see org.archive.crawler.framework.FrontierMarker#getMatchExpression()
519          */

520         public String JavaDoc getMatchExpression() {
521             return pattern.pattern();
522         }
523         
524         /* (non-Javadoc)
525          * @see org.archive.crawler.framework.FrontierMarker#getNextItemNumber()
526          */

527         public long getNextItemNumber() {
528             return nextItemNumber;
529         }
530         
531         /* (non-Javadoc)
532          * @see org.archive.crawler.framework.FrontierMarker#hasNext()
533          */

534         public boolean hasNext() {
535             // as long as any startKey is stated, consider as having next
536
return startKey != null;
537         }
538     }
539
540     /**
541      * Add a dummy 'cap' entry at the given insertion key. Prevents
542      * 'seeks' to queue heads from holding lock on last item of
543      * 'preceding' queue. See:
544      * http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102
545      *
546      * @param origin key at which to insert the cap
547      */

548     public void addCap(byte[] origin) {
549         try {
550             pendingUrisDB.put(null, new DatabaseEntry(origin),
551                     new DatabaseEntry(new byte[0]));
552         } catch (DatabaseException e) {
553             throw new RuntimeException JavaDoc(e);
554         }
555     }
556 }
557
Popular Tags