1 23 package org.archive.crawler.frontier; 24 25 import java.io.UnsupportedEncodingException ; 26 import java.math.BigInteger ; 27 import java.util.ArrayList ; 28 import java.util.List ; 29 import java.util.logging.Level ; 30 import java.util.logging.Logger ; 31 import java.util.regex.Pattern ; 32 33 import org.archive.crawler.datamodel.CrawlURI; 34 import org.archive.crawler.framework.FrontierMarker; 35 import org.archive.util.ArchiveUtils; 36 37 import com.sleepycat.bind.serial.StoredClassCatalog; 38 import com.sleepycat.je.Cursor; 39 import com.sleepycat.je.Database; 40 import com.sleepycat.je.DatabaseConfig; 41 import com.sleepycat.je.DatabaseEntry; 42 import com.sleepycat.je.DatabaseException; 43 import com.sleepycat.je.DatabaseNotFoundException; 44 import com.sleepycat.je.Environment; 45 import com.sleepycat.je.OperationStatus; 46 import com.sleepycat.util.RuntimeExceptionWrapper; 47 48 49 61 public class BdbMultipleWorkQueues { 62 private static final long serialVersionUID = ArchiveUtils 63 .classnameBasedUID(BdbMultipleWorkQueues.class, 1); 64 65 private static final Logger LOGGER = 66 Logger.getLogger(BdbMultipleWorkQueues.class.getName()); 67 68 69 private Database pendingUrisDB = null; 70 71 72 private RecyclingSerialBinding crawlUriBinding; 73 74 82 public BdbMultipleWorkQueues(Environment env, 83 StoredClassCatalog classCatalog, final boolean recycle) 84 throws DatabaseException { 85 DatabaseConfig dbConfig = new DatabaseConfig(); 87 dbConfig.setAllowCreate(true); 88 if (!recycle) { 89 try { 90 env.truncateDatabase(null, "pending", false); 91 } catch (DatabaseNotFoundException e) { 92 } 94 } 95 dbConfig.setDeferredWrite(true); 98 99 this.pendingUrisDB = env.openDatabase(null, "pending", dbConfig); 100 crawlUriBinding = 101 new RecyclingSerialBinding(classCatalog, CrawlURI.class); 102 } 103 104 114 public long deleteMatchingFromQueue(String match, String queue, 115 DatabaseEntry headKey) throws DatabaseException { 116 long deletedCount = 0; 117 Pattern pattern = Pattern.compile(match); 118 DatabaseEntry key = headKey; 119 DatabaseEntry value = new DatabaseEntry(); 120 Cursor cursor = null; 121 try { 122 cursor = pendingUrisDB.openCursor(null, null); 123 OperationStatus result = cursor.getSearchKeyRange(headKey, 124 value, null); 125 126 while (result == OperationStatus.SUCCESS) { 127 if(value.getData().length>0) { 128 CrawlURI curi = (CrawlURI) crawlUriBinding 129 .entryToObject(value); 130 if (!curi.getClassKey().equals(queue)) { 131 break; 133 } 134 if (pattern.matcher(curi.toString()).matches()) { 135 cursor.delete(); 136 deletedCount++; 137 } 138 } 139 result = cursor.getNext(key, value, null); 140 } 141 } finally { 142 if (cursor != null) { 143 cursor.close(); 144 } 145 } 146 147 return deletedCount; 148 } 149 150 156 public List getFrom(FrontierMarker m, int maxMatches) throws DatabaseException { 157 int matches = 0; 158 int tries = 0; 159 ArrayList <CrawlURI> results = new ArrayList <CrawlURI>(maxMatches); 160 BdbFrontierMarker marker = (BdbFrontierMarker) m; 161 162 DatabaseEntry key = marker.getStartKey(); 163 DatabaseEntry value = new DatabaseEntry(); 164 165 if (key != null) { 166 Cursor cursor = null; 167 OperationStatus result = null; 168 try { 169 cursor = pendingUrisDB.openCursor(null,null); 170 result = cursor.getSearchKey(key, value, null); 171 172 while(matches<maxMatches && result == OperationStatus.SUCCESS) { 173 if(value.getData().length>0) { 174 CrawlURI curi = (CrawlURI) crawlUriBinding.entryToObject(value); 175 if(marker.accepts(curi)) { 176 results.add(curi); 177 matches++; 178 } 179 tries++; 180 } 181 result = cursor.getNext(key,value,null); 182 } 183 } finally { 184 if (cursor !=null) { 185 cursor.close(); 186 } 187 } 188 189 if(result != OperationStatus.SUCCESS) { 190 marker.setStartKey(null); 192 } 193 } 194 return results; 195 } 196 197 203 public FrontierMarker getInitialMarker(String regexpr) { 204 try { 205 return new BdbFrontierMarker(getFirstKey(), regexpr); 206 } catch (DatabaseException e) { 207 e.printStackTrace(); 208 return null; 209 } 210 } 211 212 216 protected DatabaseEntry getFirstKey() throws DatabaseException { 217 DatabaseEntry key = new DatabaseEntry(); 218 DatabaseEntry value = new DatabaseEntry(); 219 Cursor cursor = pendingUrisDB.openCursor(null,null); 220 OperationStatus status = cursor.getNext(key,value,null); 221 cursor.close(); 222 if(status == OperationStatus.SUCCESS) { 223 return key; 224 } 225 return null; 226 } 227 228 242 public CrawlURI get(DatabaseEntry headKey) 243 throws DatabaseException { 244 DatabaseEntry result = new DatabaseEntry(); 245 246 OperationStatus status = getNextNearestItem(headKey, result); 255 CrawlURI retVal = null; 256 if (status != OperationStatus.SUCCESS) { 257 LOGGER.severe("See '1219854 NPE je-2.0 " 258 + "entryToObject...'. OperationStatus " 259 + " was not SUCCESS: " 260 + status 261 + ", headKey " 262 + BdbWorkQueue.getPrefixClassKey(headKey.getData())); 263 return null; 264 } 265 try { 266 retVal = (CrawlURI)crawlUriBinding.entryToObject(result); 267 } catch (RuntimeExceptionWrapper rw) { 268 LOGGER.log( 269 Level.SEVERE, 270 "expected object missing in queue " + 271 BdbWorkQueue.getPrefixClassKey(headKey.getData()), 272 rw); 273 return null; 274 } 275 retVal.setHolderKey(headKey); 276 return retVal; 277 } 278 279 protected OperationStatus getNextNearestItem(DatabaseEntry headKey, 280 DatabaseEntry result) throws DatabaseException { 281 Cursor cursor = null; 282 OperationStatus status; 283 try { 284 cursor = this.pendingUrisDB.openCursor(null, null); 285 status = cursor.getSearchKey(headKey, result, null); 288 if(status!=OperationStatus.SUCCESS || result.getData().length > 0) { 289 throw new DatabaseException("bdb queue cap missing"); 291 } 292 status = cursor.getNext(headKey,result,null); 294 } finally { 295 if(cursor!=null) { 296 cursor.close(); 297 } 298 } 299 return status; 300 } 301 302 308 public void put(CrawlURI curi) throws DatabaseException { 309 DatabaseEntry insertKey = (DatabaseEntry)curi.getHolderKey(); 310 if (insertKey == null) { 311 insertKey = calculateInsertKey(curi); 312 curi.setHolderKey(insertKey); 313 } 314 DatabaseEntry value = new DatabaseEntry(); 315 crawlUriBinding.objectToEntry(curi, value); 316 if (LOGGER.isLoggable(Level.FINE)) { 318 tallyAverageEntrySize(curi, value); 319 } 320 pendingUrisDB.put(null, insertKey, value); 321 } 322 323 private long entryCount = 0; 324 private long entrySizeSum = 0; 325 private int largestEntry = 0; 326 327 332 private synchronized void tallyAverageEntrySize(CrawlURI curi, 333 DatabaseEntry value) { 334 entryCount++; 335 int length = value.getData().length; 336 entrySizeSum += length; 337 int avg = (int) (entrySizeSum/entryCount); 338 if(entryCount % 1000 == 0) { 339 LOGGER.fine("Average entry size at "+entryCount+": "+avg); 340 } 341 if (length>largestEntry) { 342 largestEntry = length; 343 LOGGER.fine("Largest entry: "+length+" "+curi); 344 if(length>(2*avg)) { 345 LOGGER.fine("excessive?"); 346 } 347 } 348 } 349 350 358 static byte[] calculateOriginKey(String classKey) { 359 byte[] classKeyBytes = null; 360 int len = 0; 361 try { 362 classKeyBytes = classKey.getBytes("UTF-8"); 363 len = classKeyBytes.length; 364 } catch (UnsupportedEncodingException e) { 365 e.printStackTrace(); 367 } 368 byte[] keyData = new byte[len+1]; 369 System.arraycopy(classKeyBytes,0,keyData,0,len); 370 keyData[len]=0; 371 return keyData; 372 } 373 374 394 static DatabaseEntry calculateInsertKey(CrawlURI curi) { 395 byte[] classKeyBytes = null; 396 int len = 0; 397 try { 398 classKeyBytes = curi.getClassKey().getBytes("UTF-8"); 399 len = classKeyBytes.length; 400 } catch (UnsupportedEncodingException e) { 401 e.printStackTrace(); 403 } 404 byte[] keyData = new byte[len+9]; 405 System.arraycopy(classKeyBytes,0,keyData,0,len); 406 keyData[len]=0; 407 long ordinalPlus = curi.getOrdinal() & 0x0000FFFFFFFFFFFFL; 408 ordinalPlus = 409 ((long)curi.getSchedulingDirective() << 56) | ordinalPlus; 410 ordinalPlus = 411 ((((long)curi.getHolderCost()) & 0xFFL) << 48) | ordinalPlus; 412 ArchiveUtils.longIntoByteArray(ordinalPlus, keyData, len+1); 413 return new DatabaseEntry(keyData); 414 } 415 416 423 public void delete(CrawlURI item) throws DatabaseException { 424 OperationStatus status; 425 status = pendingUrisDB.delete(null, (DatabaseEntry) item.getHolderKey()); 426 if (status != OperationStatus.SUCCESS) { 427 LOGGER.severe("expected item not present: " 428 + item 429 + "(" 430 + (new BigInteger (((DatabaseEntry) item.getHolderKey()) 431 .getData())).toString(16) + ")"); 432 } 433 434 } 435 436 446 void sync() { 447 if (this.pendingUrisDB == null) { 448 return; 449 } 450 try { 451 this.pendingUrisDB.sync(); 452 } catch (DatabaseException e) { 453 e.printStackTrace(); 454 } 455 } 456 457 461 public void close() { 462 try { 463 this.pendingUrisDB.close(); 464 } catch (DatabaseException e) { 465 e.printStackTrace(); 466 } 467 } 468 469 474 public class BdbFrontierMarker implements FrontierMarker { 475 DatabaseEntry startKey; 476 Pattern pattern; 477 int nextItemNumber; 478 479 485 public BdbFrontierMarker(DatabaseEntry startKey, String regexpr) { 486 this.startKey = startKey; 487 pattern = Pattern.compile(regexpr); 488 nextItemNumber = 1; 489 } 490 491 495 public boolean accepts(CrawlURI curi) { 496 boolean retVal = pattern.matcher(curi.toString()).matches(); 497 if(retVal==true) { 498 nextItemNumber++; 499 } 500 return retVal; 501 } 502 503 506 public void setStartKey(DatabaseEntry key) { 507 startKey = key; 508 } 509 510 513 public DatabaseEntry getStartKey() { 514 return startKey; 515 } 516 517 520 public String getMatchExpression() { 521 return pattern.pattern(); 522 } 523 524 527 public long getNextItemNumber() { 528 return nextItemNumber; 529 } 530 531 534 public boolean hasNext() { 535 return startKey != null; 537 } 538 } 539 540 548 public void addCap(byte[] origin) { 549 try { 550 pendingUrisDB.put(null, new DatabaseEntry(origin), 551 new DatabaseEntry(new byte[0])); 552 } catch (DatabaseException e) { 553 throw new RuntimeException (e); 554 } 555 } 556 } 557 | Popular Tags |