1 25 package org.archive.crawler.util; 26 27 import it.unimi.dsi.fastutil.longs.LongIterator; 28 29 import java.io.BufferedOutputStream ; 30 import java.io.File ; 31 import java.io.FileNotFoundException ; 32 import java.io.FileOutputStream ; 33 import java.io.PrintWriter ; 34 import java.util.Iterator ; 35 import java.util.TreeSet ; 36 import java.util.logging.Level ; 37 import java.util.logging.Logger ; 38 39 import org.archive.crawler.datamodel.CandidateURI; 40 import org.archive.crawler.datamodel.UriUniqFilter; 41 import org.archive.util.fingerprint.ArrayLongFPCache; 42 43 import st.ata.util.FPGenerator; 44 45 54 public abstract class FPMergeUriUniqFilter implements UriUniqFilter { 55 59 public class PendingItem implements Comparable { 60 long fp; 61 CandidateURI caUri; 62 public PendingItem(long fp, CandidateURI value) { 63 this.fp = fp; 64 this.caUri = value; 65 } 66 public int compareTo(Object arg0) { 67 PendingItem vs = (PendingItem) arg0; 68 return (fp < vs.fp) ? -1 : ( (fp == vs.fp) ? 0 : 1); 69 } 70 } 71 72 private static Logger LOGGER = 73 Logger.getLogger(FPMergeUriUniqFilter.class.getName()); 74 75 protected HasUriReceiver receiver; 76 protected PrintWriter profileLog; 77 78 protected long quickDuplicateCount = 0; 80 protected long quickDupAtLast = 0; 81 protected long pendDuplicateCount = 0; 82 protected long pendDupAtLast = 0; 83 protected long mergeDuplicateCount = 0; 84 protected long mergeDupAtLast = 0; 85 86 92 protected TreeSet <PendingItem> pendingSet = new TreeSet <PendingItem>(); 93 94 95 protected int maxPending = DEFAULT_MAX_PENDING; 96 public static final int DEFAULT_MAX_PENDING = 10000; 97 99 102 protected long nextFlushAllowableAfter = 0; 103 public static final long FLUSH_DELAY_FACTOR = 100; 104 105 106 protected ArrayLongFPCache quickCache = new ArrayLongFPCache(); 107 109 public FPMergeUriUniqFilter() { 110 super(); 111 String profileLogFile = 112 System.getProperty(FPMergeUriUniqFilter.class.getName() 113 + ".profileLogFile"); 114 if (profileLogFile != null) { 115 setProfileLog(new File (profileLogFile)); 116 } 117 } 118 119 public void setMaxPending(int max) { 120 maxPending = max; 121 } 122 123 public long pending() { 124 return pendingSet.size(); 125 } 126 127 public void setDestination(HasUriReceiver receiver) { 128 this.receiver = receiver; 129 } 130 131 protected void profileLog(String key) { 132 if (profileLog != null) { 133 profileLog.println(key); 134 } 135 } 136 137 140 public synchronized void add(String key, CandidateURI value) { 141 profileLog(key); 142 long fp = createFp(key); 143 if(! quickCheck(fp)) { 144 quickDuplicateCount++; 145 return; 146 } 147 pend(fp,value); 148 if (pendingSet.size()>=maxPending) { 149 flush(); 150 } 151 } 152 153 161 protected void pend(long fp, CandidateURI value) { 162 if(count()==0) { 164 if(pendingSet.add(new PendingItem(fp,null))==false) { 165 pendDuplicateCount++; } else { 167 if(value!=null) { 169 this.receiver.receive(value); 170 } 171 } 172 return; 173 } 174 if(pendingSet.add(new PendingItem(fp,value))==false) { 175 pendDuplicateCount++; } 177 } 178 179 186 private boolean quickCheck(long fp) { 187 return quickCache.add(fp); 188 } 189 190 196 public static long createFp(CharSequence key) { 197 return FPGenerator.std64.fp(key); 198 } 199 200 201 204 public void addNow(String key, CandidateURI value) { 205 add(key, value); 206 flush(); 207 } 208 209 212 public void addForce(String key, CandidateURI value) { 213 add(key,null); this.receiver.receive(value); 215 } 216 217 220 public void note(String key) { 221 add(key,null); 222 } 223 224 227 public void forget(String key, CandidateURI value) { 228 throw new UnsupportedOperationException (); 229 } 230 231 234 public synchronized long requestFlush() { 235 if(System.currentTimeMillis()>nextFlushAllowableAfter) { 236 return flush(); 237 } else { 238 return -1; 240 } 241 } 242 243 250 public synchronized long flush() { 251 if(pending()==0) { 252 return 0; 253 } 254 long flushStartTime = System.currentTimeMillis(); 255 long adds = 0; 256 long fpOnlyAdds = 0; 257 Long currFp = null; 258 PendingItem currPend = null; 259 260 Iterator pendIter = pendingSet.iterator(); 261 LongIterator fpIter = beginFpMerge(); 262 263 currPend = (PendingItem) (pendIter.hasNext() ? pendIter.next() : null); 264 currFp = (Long ) (fpIter.hasNext() ? fpIter.next() : null); 265 266 while(true) { 267 while(currFp!=null && (currPend==null||(currFp.longValue() <= currPend.fp))) { 268 addNewFp(currFp.longValue()); 269 if(currPend!=null && currFp.longValue() == currPend.fp) { 270 mergeDuplicateCount++; 271 } 272 if(fpIter.hasNext()) { 273 currFp = (Long ) fpIter.next(); 274 } else { 275 currFp = null; 276 break; 277 } 278 } 279 while(currPend!=null && (currFp==null||(currFp.longValue() > currPend.fp))) { 280 addNewFp(currPend.fp); 281 if(currPend.caUri!=null) { 282 adds++; 283 this.receiver.receive(currPend.caUri); 284 } else { 285 fpOnlyAdds++; 286 } 287 if(pendIter.hasNext()) { 288 currPend = (PendingItem)pendIter.next(); 289 } else { 290 currPend = null; 291 break; 292 } 293 } 294 if(currFp==null) { 295 break; 298 } 299 } 300 long flushDuration = System.currentTimeMillis() - flushStartTime; 302 nextFlushAllowableAfter = flushStartTime + (FLUSH_DELAY_FACTOR*flushDuration); 303 304 if(LOGGER.isLoggable(Level.INFO)) { 306 long mergeDups = (mergeDuplicateCount-mergeDupAtLast); 307 long pendDups = (pendDuplicateCount-pendDupAtLast); 308 long quickDups = (quickDuplicateCount-quickDupAtLast); 309 LOGGER.info("flush took "+flushDuration+"ms: " 310 +adds+" adds, " 311 +fpOnlyAdds+" fpOnlydds, " 312 +mergeDups+" mergeDups, " 313 +pendDups+" pendDups, " 314 +quickDups+" quickDups "); 315 if(adds==0 && fpOnlyAdds==0 && mergeDups == 0 && pendDups == 0 && quickDups == 0) { 316 LOGGER.info("that's odd"); 317 } 318 } 319 mergeDupAtLast = mergeDuplicateCount; 320 pendDupAtLast = pendDuplicateCount; 321 quickDupAtLast = quickDuplicateCount; 322 pendingSet.clear(); 323 finishFpMerge(); 324 return adds; 325 } 326 327 333 abstract protected LongIterator beginFpMerge(); 334 335 336 343 abstract protected void addNewFp(long fp); 344 345 349 abstract protected void finishFpMerge(); 350 351 public void close() { 352 if (profileLog != null) { 353 profileLog.close(); 354 } 355 } 356 357 public void setProfileLog(File logfile) { 358 try { 359 profileLog = new PrintWriter (new BufferedOutputStream ( 360 new FileOutputStream (logfile))); 361 } catch (FileNotFoundException e) { 362 throw new RuntimeException (e); 363 } 364 } 365 } 366 | Popular Tags |