1 26 package org.archive.crawler.frontier; 27 28 import java.io.File ; 29 import java.io.FileNotFoundException ; 30 import java.io.IOException ; 31 import java.io.Serializable ; 32 import java.util.ArrayList ; 33 import java.util.Iterator ; 34 import java.util.List ; 35 import java.util.logging.Logger ; 36 37 import javax.management.AttributeNotFoundException ; 38 39 import org.archive.crawler.datamodel.CrawlURI; 40 import org.archive.crawler.datamodel.UriUniqFilter; 41 import org.archive.crawler.framework.CrawlController; 42 import org.archive.crawler.framework.FrontierMarker; 43 import org.archive.crawler.framework.exceptions.FatalConfigurationException; 44 import org.archive.crawler.settings.SimpleType; 45 import org.archive.crawler.settings.Type; 46 import org.archive.crawler.util.BdbUriUniqFilter; 47 import org.archive.crawler.util.BloomUriUniqFilter; 48 import org.archive.crawler.util.CheckpointUtils; 49 import org.archive.crawler.util.DiskFPMergeUriUniqFilter; 50 import org.archive.crawler.util.MemFPMergeUriUniqFilter; 51 import org.archive.util.ArchiveUtils; 52 53 import com.sleepycat.je.DatabaseException; 54 55 61 public class BdbFrontier extends WorkQueueFrontier implements Serializable { 62 private static final long serialVersionUID = ArchiveUtils 64 .classnameBasedUID(BdbFrontier.class, 1); 65 66 private static final Logger logger = 67 Logger.getLogger(BdbFrontier.class.getName()); 68 69 70 protected transient BdbMultipleWorkQueues pendingUris; 71 72 73 private String [] AVAILABLE_INCLUDED_OPTIONS = new String [] { 74 BdbUriUniqFilter.class.getName(), 75 BloomUriUniqFilter.class.getName(), 76 MemFPMergeUriUniqFilter.class.getName(), 77 DiskFPMergeUriUniqFilter.class.getName()}; 78 79 80 public final static String ATTR_INCLUDED = "uri-included-structure"; 81 82 private final static String DEFAULT_INCLUDED = 83 BdbUriUniqFilter.class.getName(); 84 85 89 public BdbFrontier(String name) { 90 this(name, "BdbFrontier. " 91 + "A Frontier using BerkeleyDB Java Edition databases for " 92 + "persistence to disk."); 93 Type t = addElementToDefinition(new SimpleType(ATTR_INCLUDED, 94 "Structure to use for tracking already-seen URIs. Non-default " + 95 "options may require additional configuration via system " + 96 "properties.", DEFAULT_INCLUDED, AVAILABLE_INCLUDED_OPTIONS)); 97 t.setExpertSetting(true); 98 } 99 100 106 public BdbFrontier(String name, String description) { 107 super(name, description); 108 } 109 110 117 private BdbMultipleWorkQueues createMultipleWorkQueues() 118 throws DatabaseException { 119 return new BdbMultipleWorkQueues(this.controller.getBdbEnvironment(), 120 this.controller.getClassCatalog(), 121 this.controller.isCheckpointRecover()); 122 } 123 124 131 protected UriUniqFilter createAlreadyIncluded() throws IOException { 132 UriUniqFilter uuf; 133 String c = null; 134 try { 135 c = (String )getAttribute(null, ATTR_INCLUDED); 136 } catch (AttributeNotFoundException e) { 137 } 139 if (c != null && c.equals(BloomUriUniqFilter.class.getName())) { 142 uuf = this.controller.isCheckpointRecover()? 143 deserializeAlreadySeen(BloomUriUniqFilter.class, 144 this.controller.getCheckpointRecover().getDirectory()): 145 new BloomUriUniqFilter(); 146 } else if (c!=null && c.equals(MemFPMergeUriUniqFilter.class.getName())) { 147 uuf = new MemFPMergeUriUniqFilter(); 149 } else if (c!=null && c.equals(DiskFPMergeUriUniqFilter.class.getName())) { 150 uuf = new DiskFPMergeUriUniqFilter(controller.getScratchDisk()); 152 } else { 153 uuf = this.controller.isCheckpointRecover()? 155 deserializeAlreadySeen(BdbUriUniqFilter.class, 156 this.controller.getCheckpointRecover().getDirectory()): 157 new BdbUriUniqFilter(this.controller.getBdbEnvironment()); 158 if (this.controller.isCheckpointRecover()) { 159 try { 161 ((BdbUriUniqFilter)uuf). 162 reopen(this.controller.getBdbEnvironment()); 163 } catch (DatabaseException e) { 164 throw new IOException (e.getMessage()); 165 } 166 } 167 } 168 uuf.setDestination(this); 169 return uuf; 170 } 171 172 protected UriUniqFilter deserializeAlreadySeen( 173 final Class <? extends UriUniqFilter> cls, 174 final File dir) 175 throws FileNotFoundException , IOException { 176 UriUniqFilter uuf = null; 177 try { 178 logger.fine("Started deserializing " + cls.getName() + 179 " of checkpoint recover."); 180 uuf = CheckpointUtils.readObjectFromFile(cls, dir); 181 logger.fine("Finished deserializing bdbje as part " + 182 "of checkpoint recover."); 183 } catch (ClassNotFoundException e) { 184 throw new IOException ("Failed to deserialize " + 185 cls.getName() + ": " + e.getMessage()); 186 } 187 return uuf; 188 } 189 190 197 protected WorkQueue getQueueFor(CrawlURI curi) { 198 WorkQueue wq; 199 String classKey = curi.getClassKey(); 200 synchronized (allQueues) { 201 wq = (WorkQueue)allQueues.get(classKey); 202 if (wq == null) { 203 wq = new BdbWorkQueue(classKey, this); 204 wq.setTotalBudget(((Long )getUncheckedAttribute( 205 curi,ATTR_QUEUE_TOTAL_BUDGET)).longValue()); 206 allQueues.put(classKey, wq); 207 } 208 } 209 return wq; 210 } 211 212 219 protected WorkQueue getQueueFor(String classKey) { 220 WorkQueue wq; 221 synchronized (allQueues) { 222 wq = (WorkQueue)allQueues.get(classKey); 223 } 224 return wq; 225 } 226 227 public FrontierMarker getInitialMarker(String regexpr, 228 boolean inCacheOnly) { 229 return pendingUris.getInitialMarker(regexpr); 230 } 231 232 239 public ArrayList <String > getURIsList(FrontierMarker marker, 240 int numberOfMatches, final boolean verbose) { 241 List curis; 242 try { 243 curis = pendingUris.getFrom(marker, numberOfMatches); 244 } catch (DatabaseException e) { 245 e.printStackTrace(); 246 throw new RuntimeException (e); 247 } 248 ArrayList <String > results = new ArrayList <String >(curis.size()); 249 Iterator iter = curis.iterator(); 250 while(iter.hasNext()) { 251 CrawlURI curi = (CrawlURI) iter.next(); 252 results.add("["+curi.getClassKey()+"] "+curi.singleLineReport()); 253 } 254 return results; 255 } 256 257 protected void initQueue() throws IOException { 258 try { 259 this.pendingUris = createMultipleWorkQueues(); 260 } catch(DatabaseException e) { 261 throw (IOException )new IOException (e.getMessage()).initCause(e); 262 } 263 } 264 265 protected void closeQueue() { 266 if (this.pendingUris != null) { 267 this.pendingUris.close(); 268 this.pendingUris = null; 269 } 270 } 271 272 protected BdbMultipleWorkQueues getWorkQueues() { 273 return pendingUris; 274 } 275 276 protected boolean workQueueDataOnDisk() { 277 return true; 278 } 279 280 public void initialize(CrawlController c) 281 throws FatalConfigurationException, IOException { 282 super.initialize(c); 283 if (c.isCheckpointRecover()) { 284 BdbFrontier f = null; 292 try { 293 f = (BdbFrontier)CheckpointUtils. 294 readObjectFromFile(this.getClass(), 295 this.controller.getCheckpointRecover().getDirectory()); 296 } catch (FileNotFoundException e) { 297 throw new FatalConfigurationException("Failed checkpoint " + 298 "recover: " + e.getMessage()); 299 } catch (IOException e) { 300 throw new FatalConfigurationException("Failed checkpoint " + 301 "recover: " + e.getMessage()); 302 } catch (ClassNotFoundException e) { 303 throw new FatalConfigurationException("Failed checkpoint " + 304 "recover: " + e.getMessage()); 305 } 306 307 this.nextOrdinal = f.nextOrdinal; 308 this.totalProcessedBytes = f.totalProcessedBytes; 309 this.disregardedUriCount = f.disregardedUriCount; 310 this.failedFetchCount = f.failedFetchCount; 311 this.processedBytesAfterLastEmittedURI = 312 f.processedBytesAfterLastEmittedURI; 313 this.queuedUriCount = f.queuedUriCount; 314 this.succeededFetchCount = f.succeededFetchCount; 315 this.lastMaxBandwidthKB = f.lastMaxBandwidthKB; 316 this.readyClassQueues = f.readyClassQueues; 317 this.inactiveQueues = f.inactiveQueues; 318 this.retiredQueues = f.retiredQueues; 319 this.snoozedClassQueues = f.snoozedClassQueues; 320 this.inProcessQueues = f.inProcessQueues; 321 wakeQueues(); 322 } 323 } 324 325 public void crawlCheckpoint(File checkpointDir) throws Exception { 326 super.crawlCheckpoint(checkpointDir); 327 logger.fine("Started serializing already seen as part " 328 + "of checkpoint. Can take some time."); 329 if (this.pendingUris != null) { 332 this.pendingUris.sync(); 333 } 334 CheckpointUtils .writeObjectToFile(this.alreadyIncluded, checkpointDir); 335 logger.fine("Finished serializing already seen as part " 336 + "of checkpoint."); 337 CheckpointUtils.writeObjectToFile(this, checkpointDir); 339 } 340 } 341 | Popular Tags |