KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > archive > crawler > util > FPMergeUriUniqFilter


1 /* UriUniqFilterImpl
2 *
3 * $Id: FPMergeUriUniqFilter.java,v 1.2.4.1 2007/01/13 01:31:29 stack-sf Exp $
4 *
5 * Created on Sep 29, 2005
6 *
7 * Copyright (C) 2005 Internet Archive.
8 *
9 * This file is part of the Heritrix web crawler (crawler.archive.org).
10 *
11 * Heritrix is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser Public License as published by
13 * the Free Software Foundation; either version 2.1 of the License, or
14 * any later version.
15 *
16 * Heritrix is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Lesser Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser Public License
22 * along with Heritrix; if not, write to the Free Software
23 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 */

25 package org.archive.crawler.util;
26
27 import it.unimi.dsi.fastutil.longs.LongIterator;
28
29 import java.io.BufferedOutputStream JavaDoc;
30 import java.io.File JavaDoc;
31 import java.io.FileNotFoundException JavaDoc;
32 import java.io.FileOutputStream JavaDoc;
33 import java.io.PrintWriter JavaDoc;
34 import java.util.Iterator JavaDoc;
35 import java.util.TreeSet JavaDoc;
36 import java.util.logging.Level JavaDoc;
37 import java.util.logging.Logger JavaDoc;
38
39 import org.archive.crawler.datamodel.CandidateURI;
40 import org.archive.crawler.datamodel.UriUniqFilter;
41 import org.archive.util.fingerprint.ArrayLongFPCache;
42
43 import st.ata.util.FPGenerator;
44
45 /**
46  * UriUniqFilter based on merging FP arrays (in memory or from disk).
47  *
48  * Inspired by the approach in Najork and Heydon, "High-Performance
49  * Web Crawling" (2001), section 3.2, "Efficient Duplicate URL
50  * Eliminators".
51  *
52  * @author gojomo
53  */

54 public abstract class FPMergeUriUniqFilter implements UriUniqFilter {
55     /**
56      * Represents a long fingerprint and (possibly) its corresponding
57      * CandidateURI, awaiting the next merge in a 'pending' state.
58      */

59     public class PendingItem implements Comparable JavaDoc {
60         long fp;
61         CandidateURI caUri;
62         public PendingItem(long fp, CandidateURI value) {
63             this.fp = fp;
64             this.caUri = value;
65         }
66         public int compareTo(Object JavaDoc arg0) {
67             PendingItem vs = (PendingItem) arg0;
68             return (fp < vs.fp) ? -1 : ( (fp == vs.fp) ? 0 : 1);
69         }
70     }
71     
72     private static Logger JavaDoc LOGGER =
73         Logger.getLogger(FPMergeUriUniqFilter.class.getName());
74
75     protected HasUriReceiver receiver;
76     protected PrintWriter JavaDoc profileLog;
77     
78     // statistics
79
protected long quickDuplicateCount = 0;
80     protected long quickDupAtLast = 0;
81     protected long pendDuplicateCount = 0;
82     protected long pendDupAtLast = 0;
83     protected long mergeDuplicateCount = 0;
84     protected long mergeDupAtLast = 0;
85     
86     /** items awaiting merge
87      * TODO: consider only sorting just pre-merge
88      * TODO: consider using a fastutil long->Object class
89      * TODO: consider actually writing items to disk file,
90      * as in Najork/Heydon
91      */

92     protected TreeSet JavaDoc<PendingItem> pendingSet = new TreeSet JavaDoc<PendingItem>();
93     
94     /** size at which to force flush of pending items */
95     protected int maxPending = DEFAULT_MAX_PENDING;
96     public static final int DEFAULT_MAX_PENDING = 10000;
97     // TODO: increase
98

99     /**
100      * time-based throttle on flush-merge operations
101      */

102     protected long nextFlushAllowableAfter = 0;
103     public static final long FLUSH_DELAY_FACTOR = 100;
104
105     /** cache of most recently seen FPs */
106     protected ArrayLongFPCache quickCache = new ArrayLongFPCache();
107     // TODO: make cache most-often seen, not just most-recent
108

109     public FPMergeUriUniqFilter() {
110         super();
111         String JavaDoc profileLogFile =
112             System.getProperty(FPMergeUriUniqFilter.class.getName()
113                 + ".profileLogFile");
114         if (profileLogFile != null) {
115             setProfileLog(new File JavaDoc(profileLogFile));
116         }
117     }
118
119     public void setMaxPending(int max) {
120         maxPending = max;
121     }
122     
123     public long pending() {
124         return pendingSet.size();
125     }
126
127     public void setDestination(HasUriReceiver receiver) {
128         this.receiver = receiver;
129     }
130
131     protected void profileLog(String JavaDoc key) {
132         if (profileLog != null) {
133             profileLog.println(key);
134         }
135     }
136     
137     /* (non-Javadoc)
138      * @see org.archive.crawler.datamodel.UriUniqFilter#add(java.lang.String, org.archive.crawler.datamodel.CandidateURI)
139      */

140     public synchronized void add(String JavaDoc key, CandidateURI value) {
141         profileLog(key);
142         long fp = createFp(key);
143         if(! quickCheck(fp)) {
144             quickDuplicateCount++;
145             return;
146         }
147         pend(fp,value);
148         if (pendingSet.size()>=maxPending) {
149             flush();
150         }
151     }
152
153     /**
154      * Place the given FP/CandidateURI pair into the pending set, awaiting
155      * a merge to determine if it's actually accepted.
156      *
157      * @param fp long fingerprint
158      * @param value CandidateURI or null, if fp only needs merging (as when
159      * CandidateURI was already forced in
160      */

161     protected void pend(long fp, CandidateURI value) {
162         // special case for first batch of adds
163
if(count()==0) {
164             if(pendingSet.add(new PendingItem(fp,null))==false) {
165                 pendDuplicateCount++; // was already present
166
} else {
167                 // since there's no prior list to merge, push uri along right now
168
if(value!=null) {
169                     this.receiver.receive(value);
170                 }
171             }
172             return;
173         }
174         if(pendingSet.add(new PendingItem(fp,value))==false) {
175             pendDuplicateCount++; // was already present
176
}
177     }
178
179     /**
180      * Evaluate if quick-check cache considers fingerprint novel enough
181      * for further consideration.
182      *
183      * @param fp long fingerprint to check
184      * @return true if fp deserves consideration; false if it appears in cache
185      */

186     private boolean quickCheck(long fp) {
187         return quickCache.add(fp);
188     }
189
190     /**
191      * Create a fingerprint from the given key
192      *
193      * @param key CharSequence (URI) to fingerprint
194      * @return long fingerprint
195      */

196     public static long createFp(CharSequence JavaDoc key) {
197         return FPGenerator.std64.fp(key);
198     }
199
200
201     /* (non-Javadoc)
202      * @see org.archive.crawler.datamodel.UriUniqFilter#addNow(java.lang.String, org.archive.crawler.datamodel.CandidateURI)
203      */

204     public void addNow(String JavaDoc key, CandidateURI value) {
205         add(key, value);
206         flush();
207     }
208     
209     /* (non-Javadoc)
210      * @see org.archive.crawler.datamodel.UriUniqFilter#addForce(java.lang.String, org.archive.crawler.datamodel.CandidateURI)
211      */

212     public void addForce(String JavaDoc key, CandidateURI value) {
213         add(key,null); // dummy pend
214
this.receiver.receive(value);
215     }
216
217     /* (non-Javadoc)
218      * @see org.archive.crawler.datamodel.UriUniqFilter#note(java.lang.String)
219      */

220     public void note(String JavaDoc key) {
221         add(key,null);
222     }
223
224     /* (non-Javadoc)
225      * @see org.archive.crawler.datamodel.UriUniqFilter#forget(java.lang.String, org.archive.crawler.datamodel.CandidateURI)
226      */

227     public void forget(String JavaDoc key, CandidateURI value) {
228         throw new UnsupportedOperationException JavaDoc();
229     }
230
231     /* (non-Javadoc)
232      * @see org.archive.crawler.datamodel.UriUniqFilter#requestFlush()
233      */

234     public synchronized long requestFlush() {
235         if(System.currentTimeMillis()>nextFlushAllowableAfter) {
236             return flush();
237         } else {
238 // LOGGER.info("declining to flush: too soon after last flush");
239
return -1;
240         }
241     }
242
243     /**
244      * Perform a merge of all 'pending' items to the overall fingerprint list.
245      * If the pending item is new, and has an associated CandidateURI, pass that
246      * URI along to the 'receiver' (frontier) for queueing.
247      *
248      * @return number of pending items actually added
249      */

250     public synchronized long flush() {
251         if(pending()==0) {
252             return 0;
253         }
254         long flushStartTime = System.currentTimeMillis();
255         long adds = 0;
256         long fpOnlyAdds = 0;
257         Long JavaDoc currFp = null;
258         PendingItem currPend = null;
259         
260         Iterator pendIter = pendingSet.iterator();
261         LongIterator fpIter = beginFpMerge();
262
263         currPend = (PendingItem) (pendIter.hasNext() ? pendIter.next() : null);
264         currFp = (Long JavaDoc) (fpIter.hasNext() ? fpIter.next() : null);
265
266         while(true) {
267             while(currFp!=null && (currPend==null||(currFp.longValue() <= currPend.fp))) {
268                 addNewFp(currFp.longValue());
269                 if(currPend!=null && currFp.longValue() == currPend.fp) {
270                     mergeDuplicateCount++;
271                 }
272                 if(fpIter.hasNext()) {
273                     currFp = (Long JavaDoc) fpIter.next();
274                 } else {
275                     currFp = null;
276                     break;
277                 }
278             }
279             while(currPend!=null && (currFp==null||(currFp.longValue() > currPend.fp))) {
280                 addNewFp(currPend.fp);
281                 if(currPend.caUri!=null) {
282                     adds++;
283                     this.receiver.receive(currPend.caUri);
284                 } else {
285                     fpOnlyAdds++;
286                 }
287                 if(pendIter.hasNext()) {
288                     currPend = (PendingItem)pendIter.next();
289                 } else {
290                     currPend = null;
291                     break;
292                 }
293             }
294             if(currFp==null) {
295                 // currPend must be null too, or while wouldn't have exitted
296
// done
297
break;
298             }
299         }
300         // maintain throttle timing
301
long flushDuration = System.currentTimeMillis() - flushStartTime;
302         nextFlushAllowableAfter = flushStartTime + (FLUSH_DELAY_FACTOR*flushDuration);
303         
304         // add/duplicate statistics
305
if(LOGGER.isLoggable(Level.INFO)) {
306             long mergeDups = (mergeDuplicateCount-mergeDupAtLast);
307             long pendDups = (pendDuplicateCount-pendDupAtLast);
308             long quickDups = (quickDuplicateCount-quickDupAtLast);
309             LOGGER.info("flush took "+flushDuration+"ms: "
310                     +adds+" adds, "
311                     +fpOnlyAdds+" fpOnlydds, "
312                     +mergeDups+" mergeDups, "
313                     +pendDups+" pendDups, "
314                     +quickDups+" quickDups ");
315             if(adds==0 && fpOnlyAdds==0 && mergeDups == 0 && pendDups == 0 && quickDups == 0) {
316                 LOGGER.info("that's odd");
317             }
318         }
319         mergeDupAtLast = mergeDuplicateCount;
320         pendDupAtLast = pendDuplicateCount;
321         quickDupAtLast = quickDuplicateCount;
322         pendingSet.clear();
323         finishFpMerge();
324         return adds;
325     }
326     
327     /**
328      * Begin merging pending candidates with complete list. Return an
329      * Iterator which will return all previously-known FPs in turn.
330      *
331      * @return Iterator over all previously-known FPs
332      */

333     abstract protected LongIterator beginFpMerge();
334
335     
336     /**
337      * Add an FP (which may be an old or new FP) to the new complete
338      * list. Should only be called after beginFpMerge() and before
339      * finishFpMerge().
340      *
341      * @param fp the FP to add
342      */

343     abstract protected void addNewFp(long fp);
344
345     /**
346      * Complete the merge of candidate and previously-known FPs (closing
347      * files/iterators as appropriate).
348      */

349     abstract protected void finishFpMerge();
350
351     public void close() {
352         if (profileLog != null) {
353             profileLog.close();
354         }
355     }
356
357     public void setProfileLog(File JavaDoc logfile) {
358         try {
359             profileLog = new PrintWriter JavaDoc(new BufferedOutputStream JavaDoc(
360                     new FileOutputStream JavaDoc(logfile)));
361         } catch (FileNotFoundException JavaDoc e) {
362             throw new RuntimeException JavaDoc(e);
363         }
364     }
365 }
366
Popular Tags