1 24 package org.archive.crawler.framework; 25 26 import java.io.PrintWriter ; 27 import java.util.HashMap ; 28 import java.util.logging.Level ; 29 import java.util.logging.Logger ; 30 31 import org.archive.crawler.datamodel.CoreAttributeConstants; 32 import org.archive.crawler.datamodel.CrawlOrder; 33 import org.archive.crawler.datamodel.CrawlURI; 34 import org.archive.crawler.datamodel.FetchStatusCodes; 35 import org.archive.crawler.datamodel.InstancePerThread; 36 import org.archive.crawler.framework.exceptions.EndedException; 37 import org.archive.util.ArchiveUtils; 38 import org.archive.util.DevUtils; 39 import org.archive.util.HttpRecorder; 40 import org.archive.util.HttpRecorderMarker; 41 import org.archive.util.ProgressStatisticsReporter; 42 import org.archive.util.Reporter; 43 44 import com.sleepycat.util.RuntimeExceptionWrapper; 45 46 52 public class ToeThread extends Thread 53 implements CoreAttributeConstants, FetchStatusCodes, HttpRecorderMarker, 54 Reporter, ProgressStatisticsReporter { 55 private static final String STEP_NASCENT = "NASCENT"; 56 private static final String STEP_ABOUT_TO_GET_URI = "ABOUT_TO_GET_URI"; 57 private static final String STEP_FINISHED = "FINISHED"; 58 private static final String STEP_ABOUT_TO_BEGIN_CHAIN = 59 "ABOUT_TO_BEGIN_CHAIN"; 60 private static final String STEP_ABOUT_TO_BEGIN_PROCESSOR = 61 "ABOUT_TO_BEGIN_PROCESSOR"; 62 private static final String STEP_DONE_WITH_PROCESSORS = 63 "DONE_WITH_PROCESSORS"; 64 private static final String STEP_HANDLING_RUNTIME_EXCEPTION = 65 "HANDLING_RUNTIME_EXCEPTION"; 66 private static final String STEP_ABOUT_TO_RETURN_URI = 67 "ABOUT_TO_RETURN_URI"; 68 private static final String STEP_FINISHING_PROCESS = "FINISHING_PROCESS"; 69 70 private static Logger logger = 71 Logger.getLogger("org.archive.crawler.framework.ToeThread"); 72 73 private CrawlController controller; 74 private int serialNumber; 75 76 82 private HttpRecorder httpRecorder = null; 83 84 private HashMap <String ,Processor> localProcessors 85 = new HashMap <String ,Processor>(); 86 private String currentProcessorName = ""; 87 88 private String coreName; 89 private CrawlURI currentCuri; 90 private long lastStartTime; 91 private long lastFinishTime; 92 93 private String step = STEP_NASCENT; 95 private long atStepSince; 96 97 private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY-2; 99 100 private volatile boolean shouldRetire = false; 103 104 110 public ToeThread(ToePool g, int sn) { 111 super(g,"ToeThread #" + sn); 113 coreName="ToeThread #" + sn + ": "; 114 controller = g.getController(); 115 serialNumber = sn; 116 setPriority(DEFAULT_PRIORITY); 117 int outBufferSize = ((Integer ) controller 118 .getOrder() 119 .getUncheckedAttribute(null,CrawlOrder.ATTR_RECORDER_OUT_BUFFER)) 120 .intValue(); 121 int inBufferSize = ((Integer ) controller 122 .getOrder() 123 .getUncheckedAttribute(null, CrawlOrder.ATTR_RECORDER_IN_BUFFER)) 124 .intValue(); 125 httpRecorder = new HttpRecorder(controller.getScratchDisk(), 126 "tt" + sn + "http", outBufferSize, inBufferSize); 127 lastFinishTime = System.currentTimeMillis(); 128 } 129 130 133 public void run() { 134 String name = controller.getOrder().getCrawlOrderName(); 135 logger.fine(getName()+" started for order '"+name+"'"); 136 137 try { 138 while ( true ) { 139 continueCheck(); 141 142 setStep(STEP_ABOUT_TO_GET_URI); 143 144 CrawlURI curi = controller.getFrontier().next(); 145 146 synchronized(this) { 147 continueCheck(); 148 setCurrentCuri(curi); 149 } 150 151 processCrawlUri(); 152 153 setStep(STEP_ABOUT_TO_RETURN_URI); 154 continueCheck(); 155 156 synchronized(this) { 157 controller.getFrontier().finished(currentCuri); 158 setCurrentCuri(null); 159 } 160 161 setStep(STEP_FINISHING_PROCESS); 162 lastFinishTime = System.currentTimeMillis(); 163 controller.releaseContinuePermission(); 164 if(shouldRetire) { 165 break; } 167 } 168 } catch (EndedException e) { 169 } catch (Exception e) { 171 logger.log(Level.SEVERE,"Fatal exception in "+getName(),e); 173 } catch (OutOfMemoryError err) { 174 seriousError(err); 175 } finally { 176 controller.releaseContinuePermission(); 177 } 178 setCurrentCuri(null); 179 this.httpRecorder.closeRecorders(); 181 this.httpRecorder = null; 182 localProcessors = null; 183 184 logger.fine(getName()+" finished for order '"+name+"'"); 185 setStep(STEP_FINISHED); 186 controller.toeEnded(); 187 controller = null; 188 } 189 190 194 private void setCurrentCuri(CrawlURI curi) { 195 if(curi==null) { 196 setName(coreName); 197 } else { 198 setName(coreName+curi); 199 } 200 currentCuri = curi; 201 } 202 203 206 private void setStep(String s) { 207 step=s; 208 atStepSince = System.currentTimeMillis(); 209 } 210 211 private void seriousError(Error err) { 212 setPriority(DEFAULT_PRIORITY+1); 216 if (controller!=null) { 217 controller.singleThreadMode(); 219 controller.freeReserveMemory(); 222 controller.requestCrawlPause(); 223 if (controller.getFrontier().getFrontierJournal() != null) { 224 controller.getFrontier().getFrontierJournal().seriousError( 225 getName() + err.getMessage()); 226 } 227 } 228 229 String extraInfo = DevUtils.extraInfo(); 231 System.err.println("<<<"); 232 System.err.println(ArchiveUtils.getLog17Date()); 233 System.err.println(err); 234 System.err.println(extraInfo); 235 err.printStackTrace(System.err); 236 237 if (controller!=null) { 238 PrintWriter pw = new PrintWriter (System.err); 239 controller.getToePool().compactReportTo(pw); 240 pw.flush(); 241 } 242 System.err.println(">>>"); 243 245 String context = "unknown"; 246 if(currentCuri!=null) { 247 currentCuri.addAnnotation("err="+err.getClass().getName()); 249 currentCuri.addAnnotation("os"+currentCuri.getFetchStatus()); 250 currentCuri.setFetchStatus(S_SERIOUS_ERROR); 251 context = currentCuri.singleLineReport() + " in " + currentProcessorName; 252 } 253 String message = "Serious error occured trying " + 254 "to process '" + context + "'\n" + extraInfo; 255 logger.log(Level.SEVERE, message.toString(), err); 256 setPriority(DEFAULT_PRIORITY); 257 } 258 259 272 private void continueCheck() throws InterruptedException { 273 if(Thread.interrupted()) { 274 throw new InterruptedException ("die request detected"); 275 } 276 controller.acquireContinuePermission(); 277 } 278 279 284 private void processCrawlUri() throws InterruptedException { 285 currentCuri.setThreadNumber(this.serialNumber); 286 currentCuri.setNextProcessorChain(controller.getFirstProcessorChain()); 287 lastStartTime = System.currentTimeMillis(); 288 try { 290 while (currentCuri.nextProcessorChain() != null) { 291 setStep(STEP_ABOUT_TO_BEGIN_CHAIN); 292 currentCuri.setNextProcessor(currentCuri.nextProcessorChain().getFirstProcessor()); 294 currentCuri.setNextProcessorChain(currentCuri.nextProcessorChain().getNextProcessorChain()); 295 296 while (currentCuri.nextProcessor() != null) { 297 setStep(STEP_ABOUT_TO_BEGIN_PROCESSOR); 298 Processor currentProcessor = getProcessor(currentCuri.nextProcessor()); 299 currentProcessorName = currentProcessor.getName(); 300 continueCheck(); 301 currentProcessor.process(currentCuri); 303 } 306 } 307 setStep(STEP_DONE_WITH_PROCESSORS); 308 currentProcessorName = ""; 309 } catch (RuntimeExceptionWrapper e) { 310 if(e.getCause() == null) { 312 e.initCause(e.getCause()); 313 } 314 recoverableProblem(e); 315 } catch (AssertionError ae) { 316 recoverableProblem(ae); 319 } catch (RuntimeException e) { 320 recoverableProblem(e); 321 } catch (StackOverflowError err) { 322 recoverableProblem(err); 323 } catch (Error err) { 324 seriousError(err); 326 } 327 } 328 329 330 335 private void recoverableProblem(Throwable e) { 336 Object previousStep = step; 337 setStep(STEP_HANDLING_RUNTIME_EXCEPTION); 338 e.printStackTrace(System.err); 339 currentCuri.setFetchStatus(S_RUNTIME_EXCEPTION); 340 currentCuri.addAnnotation("err="+e.getClass().getName()); 342 currentCuri.putObject(A_RUNTIME_EXCEPTION, e); 343 String message = "Problem " + e + 344 " occured when trying to process '" 345 + currentCuri.toString() 346 + "' at step " + previousStep 347 + " in " + currentProcessorName +"\n"; 348 logger.log(Level.SEVERE, message.toString(), e); 349 } 350 351 private Processor getProcessor(Processor processor) { 352 if(!(processor instanceof InstancePerThread)) { 353 return processor; 355 } 356 Processor localProcessor = (Processor) localProcessors.get( 358 processor.getClass().getName()); 359 if (localProcessor == null) { 360 localProcessor = processor.spawn(this.getSerialNumber()); 361 localProcessors.put(processor.getClass().getName(),localProcessor); 362 } 363 return localProcessor; 364 } 365 366 369 public int getSerialNumber() { 370 return this.serialNumber; 371 } 372 373 379 public HttpRecorder getHttpRecorder() { 380 return this.httpRecorder; 381 } 382 383 387 public CrawlController getController() { 388 return controller; 389 } 390 391 406 protected void kill(){ 407 this.interrupt(); 408 synchronized(this) { 409 if (currentCuri!=null) { 410 currentCuri.setFetchStatus(S_PROCESSING_THREAD_KILLED); 411 controller.getFrontier().finished(currentCuri); 412 } 413 } 414 } 415 416 420 public Object getStep() { 421 return step; 422 } 423 424 428 public boolean isActive() { 429 return this.isAlive() && (currentCuri != null); 431 } 432 433 437 public void retire() { 438 shouldRetire = true; 439 } 440 441 447 public boolean shouldRetire() { 448 return shouldRetire; 449 } 450 451 455 460 public void reportTo(String name, PrintWriter pw) { 461 463 pw.print("["); 464 pw.println(getName()); 465 466 CrawlURI c = currentCuri; 474 if(c != null) { 475 pw.print(" "); 476 c.singleLineReportTo(pw); 477 pw.print(" "); 478 pw.print(c.getFetchAttempts()); 479 pw.print(" attempts"); 480 pw.println(); 481 pw.print(" "); 482 pw.print("in processor: "); 483 pw.print(currentProcessorName); 484 } else { 485 pw.print(" -no CrawlURI- "); 486 } 487 pw.println(); 488 489 long now = System.currentTimeMillis(); 490 long time = 0; 491 492 pw.print(" "); 493 if(lastFinishTime > lastStartTime) { 494 pw.print("WAITING for "); 497 time = now - lastFinishTime; 498 } else if(lastStartTime > 0) { 499 pw.print("ACTIVE for "); 501 time = now-lastStartTime; 502 } 503 pw.print(ArchiveUtils.formatMillisecondsToConventional(time)); 504 pw.println(); 505 506 pw.print(" "); 507 pw.print("step: "); 508 pw.print(step); 509 pw.print(" for "); 510 pw.print(ArchiveUtils.formatMillisecondsToConventional(System.currentTimeMillis()-atStepSince)); 511 pw.println(); 512 513 StackTraceElement [] ste = this.getStackTrace(); 514 for(int i=0;i<ste.length;i++) { 515 pw.print(" "); 516 pw.print(ste[i].toString()); 517 pw.println(); 518 } 519 pw.print("]"); 520 pw.println(); 521 522 pw.flush(); 523 } 524 525 528 public void singleLineReportTo(PrintWriter w) 529 { 530 w.print("#"); 531 w.print(this.serialNumber); 532 533 CrawlURI c = currentCuri; 541 if(c != null) { 542 w.print(" "); 543 w.print(currentProcessorName); 544 w.print(" "); 545 w.print(c.toString()); 546 w.print(" ("); 547 w.print(c.getFetchAttempts()); 548 w.print(") "); 549 } else { 550 w.print(" [no CrawlURI] "); 551 } 552 553 long now = System.currentTimeMillis(); 554 long time = 0; 555 556 if(lastFinishTime > lastStartTime) { 557 w.print("WAITING for "); 560 time = now - lastFinishTime; 561 } else if(lastStartTime > 0) { 562 w.print("ACTIVE for "); 564 time = now-lastStartTime; 565 } 566 w.print(ArchiveUtils.formatMillisecondsToConventional(time)); 567 w.print(" at "); 568 w.print(step); 569 w.print(" for "); 570 w.print(ArchiveUtils.formatMillisecondsToConventional(now-atStepSince)); 571 w.print("\n"); 572 w.flush(); 573 } 574 575 578 public String singleLineLegend() { 579 return "#serialNumber processorName currentUri (fetchAttempts) threadState threadStep"; 580 } 581 582 585 public String [] getReports() { 586 return new String [] {}; 588 } 589 590 public void reportTo(PrintWriter writer) { 591 reportTo(null, writer); 592 } 593 594 597 public String singleLineReport() { 598 return ArchiveUtils.singleLineReport(this); 599 } 600 601 public void progressStatisticsLine(PrintWriter writer) { 602 writer.print(getController().getStatistics() 603 .getProgressStatisticsLine()); 604 writer.print("\n"); 605 } 606 607 public void progressStatisticsLegend(PrintWriter writer) { 608 writer.print(getController().getStatistics() 609 .progressStatisticsLegend()); 610 writer.print("\n"); 611 } 612 613 public String getCurrentProcessorName() { 614 return currentProcessorName; 615 } 616 } 617 | Popular Tags |