1 package org.archive.crawler.frontier; 2 3 import java.io.IOException ; 4 import java.io.PrintWriter ; 5 import java.io.Serializable ; 6 import java.util.logging.Level ; 7 import java.util.logging.Logger ; 8 9 import org.archive.crawler.datamodel.CrawlSubstats; 10 import org.archive.crawler.datamodel.CrawlURI; 11 import org.archive.crawler.framework.Frontier; 12 import org.archive.util.ArchiveUtils; 13 import org.archive.util.Reporter; 14 15 22 public abstract class WorkQueue implements Frontier.FrontierGroup, Comparable , 23 Serializable , Reporter { 24 private static final Logger logger = 25 Logger.getLogger(WorkQueue.class.getName()); 26 27 28 protected final String classKey; 29 30 private boolean active = true; 31 32 33 private long count = 0; 34 35 36 private long enqueueCount = 0; 37 38 39 private boolean isHeld = false; 40 41 42 private long wakeTime = 0; 43 44 45 private int sessionBalance = 0; 46 47 48 private int lastCost = 0; 49 50 52 private long costCount = 0; 53 54 55 private long totalExpenditure = 0; 56 57 58 private long totalBudget = 0; 59 60 61 private CrawlURI peekItem = null; 62 63 64 private String lastQueued; 65 66 67 private String lastPeeked; 68 69 70 private long lastDequeueTime; 71 72 73 private long errorCount = 0; 74 75 76 protected CrawlSubstats substats = new CrawlSubstats(); 77 78 private boolean retired; 79 80 public WorkQueue(final String pClassKey) { 81 this.classKey = pClassKey; 82 } 83 84 90 public long deleteMatching(final WorkQueueFrontier frontier, String match) { 91 try { 92 final long deleteCount = deleteMatchingFromQueue(frontier, match); 93 this.count -= deleteCount; 94 return deleteCount; 95 } catch (IOException e) { 96 e.printStackTrace(); 98 throw new RuntimeException (e); 99 } 100 } 101 102 109 public synchronized void enqueue(final WorkQueueFrontier frontier, 110 CrawlURI curi) { 111 try { 112 insert(frontier, curi); 113 } catch (IOException e) { 114 e.printStackTrace(); 116 throw new RuntimeException (e); 117 } 118 count++; 119 enqueueCount++; 120 } 121 122 132 public CrawlURI peek(final WorkQueueFrontier frontier) { 133 if(peekItem == null && count > 0) { 134 try { 135 peekItem = peekItem(frontier); 136 } catch (IOException e) { 137 logger.log(Level.SEVERE,"peek failure",e); 139 e.printStackTrace(); 140 } 142 if(peekItem != null) { 143 lastPeeked = peekItem.toString(); 144 } 145 } 146 return peekItem; 147 } 148 149 154 public synchronized void dequeue(final WorkQueueFrontier frontier) { 155 try { 156 deleteItem(frontier, peekItem); 157 } catch (IOException e) { 158 e.printStackTrace(); 160 throw new RuntimeException (e); 161 } 162 unpeek(); 163 count--; 164 lastDequeueTime = System.currentTimeMillis(); 165 } 166 167 172 public void setSessionBalance(int balance) { 173 this.sessionBalance = balance; 174 } 175 176 181 public int getSessionBalance() { 182 return this.sessionBalance; 183 } 184 185 191 public void setTotalBudget(long budget) { 192 this.totalBudget = budget; 193 } 194 195 201 public boolean isOverBudget() { 202 return this.sessionBalance <= 0 205 || (this.totalBudget >= 0 && this.totalExpenditure > this.totalBudget); 206 } 207 208 213 public long getTotalExpenditure() { 214 return totalExpenditure; 215 } 216 217 224 public int incrementSessionBalance(int amount) { 225 this.sessionBalance = this.sessionBalance + amount; 226 return this.sessionBalance; 227 } 228 229 234 public int expend(int amount) { 235 this.sessionBalance = this.sessionBalance - amount; 236 this.totalExpenditure = this.totalExpenditure + amount; 237 this.lastCost = amount; 238 this.costCount++; 239 return this.sessionBalance; 240 } 241 242 248 public int refund(int amount) { 249 this.sessionBalance = this.sessionBalance + amount; 250 this.totalExpenditure = this.totalExpenditure - amount; 251 this.costCount--; 252 return this.sessionBalance; 253 } 254 255 259 public void noteError(int penalty) { 260 this.sessionBalance = this.sessionBalance - penalty; 261 this.totalExpenditure = this.totalExpenditure + penalty; 262 errorCount++; 263 } 264 265 268 public void setWakeTime(long l) { 269 wakeTime = l; 270 } 271 272 275 public long getWakeTime() { 276 return wakeTime; 277 } 278 279 282 public String getClassKey() { 283 return this.classKey; 284 } 285 286 289 public void clearHeld() { 290 isHeld = false; 291 } 292 293 300 public boolean isHeld() { 301 return isHeld; 302 } 303 304 307 public void setHeld() { 308 isHeld = true; 309 } 310 311 316 public void unpeek() { 317 peekItem = null; 318 } 319 320 public final int compareTo(Object obj) { 321 if(this == obj) { 322 return 0; } 324 WorkQueue other = (WorkQueue) obj; 325 if(getWakeTime() > other.getWakeTime()) { 326 return 1; 327 } 328 if(getWakeTime() < other.getWakeTime()) { 329 return -1; 330 } 331 return this.classKey.compareTo(other.getClassKey()); 334 } 335 336 343 public void update(final WorkQueueFrontier frontier, CrawlURI curi) { 344 try { 345 insert(frontier, curi); 346 } catch (IOException e) { 347 e.printStackTrace(); 349 throw new RuntimeException (e); 350 } 351 } 352 353 356 public synchronized long getCount() { 357 return this.count; 358 } 359 360 366 private void insert(final WorkQueueFrontier frontier, CrawlURI curi) 367 throws IOException { 368 insertItem(frontier, curi); 369 lastQueued = curi.toString(); 370 } 371 372 380 protected abstract void insertItem(final WorkQueueFrontier frontier, 381 CrawlURI curi) throws IOException ; 382 383 390 protected abstract long deleteMatchingFromQueue( 391 final WorkQueueFrontier frontier, final String match) 392 throws IOException ; 393 394 403 protected abstract void deleteItem(final WorkQueueFrontier frontier, 404 final CrawlURI item) throws IOException ; 405 406 412 protected abstract CrawlURI peekItem(final WorkQueueFrontier frontier) 413 throws IOException ; 414 415 421 protected void suspend(final WorkQueueFrontier frontier) throws IOException { 422 } 423 424 430 protected void resume(final WorkQueueFrontier frontier) throws IOException { 431 } 432 433 public void setActive(final WorkQueueFrontier frontier, final boolean b) { 434 if(active != b) { 435 active = b; 436 try { 437 if(active) { 438 resume(frontier); 439 } else { 440 suspend(frontier); 441 } 442 } catch (IOException e) { 443 e.printStackTrace(); 445 throw new RuntimeException (e); 446 } 447 } 448 } 449 450 454 457 public String [] getReports() { 458 return new String [] {}; 459 } 460 461 464 public void reportTo(PrintWriter writer) { 465 reportTo(null,writer); 466 } 467 468 471 public void singleLineReportTo(PrintWriter writer) { 472 writer.print(classKey); 474 writer.print(" "); 475 writer.print(Long.toString(count)); 477 writer.print(" "); 478 writer.print(Long.toString(enqueueCount)); 480 writer.print(" "); 481 writer.print(sessionBalance); 482 writer.print(" "); 483 writer.print(lastCost); 484 writer.print("("); 485 writer.print(ArchiveUtils.doubleToString( 486 ((double) totalExpenditure / costCount), 1)); 487 writer.print(")"); 488 writer.print(" "); 489 if (lastDequeueTime != 0) { 491 writer.print(ArchiveUtils.getLog17Date(lastDequeueTime)); 492 } else { 493 writer.print("-"); 494 } 495 writer.print(" "); 496 if (wakeTime != 0) { 498 writer.print(ArchiveUtils.formatMillisecondsToConventional(wakeTime - System.currentTimeMillis())); 499 } else { 500 writer.print("-"); 501 } 502 writer.print(" "); 503 writer.print(Long.toString(totalExpenditure)); 504 writer.print("/"); 505 writer.print(Long.toString(totalBudget)); 506 writer.print(" "); 507 writer.print(Long.toString(errorCount)); 508 writer.print(" "); 509 writer.print(lastPeeked); 510 writer.print(" "); 511 writer.print(lastQueued); 512 writer.print("\n"); 513 } 514 515 518 public String singleLineLegend() { 519 return "queue currentSize totalEnqueues sessionBalance lastCost " + 520 "(averageCost) lastDequeueTime wakeTime " + 521 "totalSpend/totalBudget errorCount lastPeekUri lastQueuedUri"; 522 } 523 524 527 public String singleLineReport() { 528 return ArchiveUtils.singleLineReport(this); 529 } 530 531 535 public void reportTo(String name, PrintWriter writer) { 536 writer.print("Queue "); 538 writer.print(classKey); 539 writer.print("\n"); 540 writer.print(" "); 541 writer.print(Long.toString(count)); 542 writer.print(" items"); 543 if (wakeTime != 0) { 544 writer.print("\n wakes in: "+ArchiveUtils.formatMillisecondsToConventional(wakeTime - System.currentTimeMillis())); 545 } 546 writer.print("\n last enqueued: "); 547 writer.print(lastQueued); 548 writer.print("\n last peeked: "); 549 writer.print(lastPeeked); 550 writer.print("\n"); 551 writer.print(" total expended: "); 552 writer.print(Long.toString(totalExpenditure)); 553 writer.print(" (total budget: "); 554 writer.print(Long.toString(totalBudget)); 555 writer.print(")\n"); 556 writer.print(" active balance: "); 557 writer.print(sessionBalance); 558 writer.print("\n last(avg) cost: "); 559 writer.print(lastCost); 560 writer.print("("); 561 writer.print(ArchiveUtils.doubleToString( 562 ((double) totalExpenditure / costCount), 1)); 563 writer.print(")\n\n"); 564 } 565 566 public CrawlSubstats getSubstats() { 567 return substats; 568 } 569 570 575 public void setRetired(boolean b) { 576 this.retired = b; 577 } 578 579 public boolean isRetired() { 580 return retired; 581 } 582 } 583 | Popular Tags |