KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > archive > crawler > framework > ToeThread


1 /* Copyright (C) 2003 Internet Archive.
2  *
3  * This file is part of the Heritrix web crawler (crawler.archive.org).
4  *
5  * Heritrix is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU Lesser Public License as published by
7  * the Free Software Foundation; either version 2.1 of the License, or
8  * any later version.
9  *
10  * Heritrix is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU Lesser Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser Public License
16  * along with Heritrix; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * ToeThread.java
20  * Created on May 14, 2003
21  *
22  * $Header: /cvsroot/archive-crawler/ArchiveOpenCrawler/src/java/org/archive/crawler/framework/ToeThread.java,v 1.65.6.1 2007/01/13 01:31:22 stack-sf Exp $
23  */

24 package org.archive.crawler.framework;
25
26 import java.io.PrintWriter JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.logging.Level JavaDoc;
29 import java.util.logging.Logger JavaDoc;
30
31 import org.archive.crawler.datamodel.CoreAttributeConstants;
32 import org.archive.crawler.datamodel.CrawlOrder;
33 import org.archive.crawler.datamodel.CrawlURI;
34 import org.archive.crawler.datamodel.FetchStatusCodes;
35 import org.archive.crawler.datamodel.InstancePerThread;
36 import org.archive.crawler.framework.exceptions.EndedException;
37 import org.archive.util.ArchiveUtils;
38 import org.archive.util.DevUtils;
39 import org.archive.util.HttpRecorder;
40 import org.archive.util.HttpRecorderMarker;
41 import org.archive.util.ProgressStatisticsReporter;
42 import org.archive.util.Reporter;
43
44 import com.sleepycat.util.RuntimeExceptionWrapper;
45
46 /**
47  * One "worker thread"; asks for CrawlURIs, processes them,
48  * repeats unless told otherwise.
49  *
50  * @author Gordon Mohr
51  */

52 public class ToeThread extends Thread JavaDoc
53 implements CoreAttributeConstants, FetchStatusCodes, HttpRecorderMarker,
54 Reporter, ProgressStatisticsReporter {
55     private static final String JavaDoc STEP_NASCENT = "NASCENT";
56     private static final String JavaDoc STEP_ABOUT_TO_GET_URI = "ABOUT_TO_GET_URI";
57     private static final String JavaDoc STEP_FINISHED = "FINISHED";
58     private static final String JavaDoc STEP_ABOUT_TO_BEGIN_CHAIN =
59         "ABOUT_TO_BEGIN_CHAIN";
60     private static final String JavaDoc STEP_ABOUT_TO_BEGIN_PROCESSOR =
61         "ABOUT_TO_BEGIN_PROCESSOR";
62     private static final String JavaDoc STEP_DONE_WITH_PROCESSORS =
63         "DONE_WITH_PROCESSORS";
64     private static final String JavaDoc STEP_HANDLING_RUNTIME_EXCEPTION =
65         "HANDLING_RUNTIME_EXCEPTION";
66     private static final String JavaDoc STEP_ABOUT_TO_RETURN_URI =
67         "ABOUT_TO_RETURN_URI";
68     private static final String JavaDoc STEP_FINISHING_PROCESS = "FINISHING_PROCESS";
69
70     private static Logger JavaDoc logger =
71         Logger.getLogger("org.archive.crawler.framework.ToeThread");
72
73     private CrawlController controller;
74     private int serialNumber;
75     
76     /**
77      * Each ToeThead has an instance of HttpRecord that gets used
78      * over and over by each request.
79      *
80      * @see org.archive.util.HttpRecorderMarker
81      */

82     private HttpRecorder httpRecorder = null;
83     
84     private HashMap JavaDoc<String JavaDoc,Processor> localProcessors
85      = new HashMap JavaDoc<String JavaDoc,Processor>();
86     private String JavaDoc currentProcessorName = "";
87
88     private String JavaDoc coreName;
89     private CrawlURI currentCuri;
90     private long lastStartTime;
91     private long lastFinishTime;
92
93     // activity monitoring, debugging, and problem detection
94
private String JavaDoc step = STEP_NASCENT;
95     private long atStepSince;
96     
97     // default priority; may not be meaningful in recent JVMs
98
private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY-2;
99     
100     // indicator that a thread is now surplus based on current desired
101
// count; it should wrap up cleanly
102
private volatile boolean shouldRetire = false;
103     
104     /**
105      * Create a ToeThread
106      *
107      * @param g ToeThreadGroup
108      * @param sn serial number
109      */

110     public ToeThread(ToePool g, int sn) {
111         // TODO: add crawl name?
112
super(g,"ToeThread #" + sn);
113         coreName="ToeThread #" + sn + ": ";
114         controller = g.getController();
115         serialNumber = sn;
116         setPriority(DEFAULT_PRIORITY);
117         int outBufferSize = ((Integer JavaDoc) controller
118                 .getOrder()
119                 .getUncheckedAttribute(null,CrawlOrder.ATTR_RECORDER_OUT_BUFFER))
120                         .intValue();
121         int inBufferSize = ((Integer JavaDoc) controller
122                 .getOrder()
123                 .getUncheckedAttribute(null, CrawlOrder.ATTR_RECORDER_IN_BUFFER))
124                 .intValue();
125         httpRecorder = new HttpRecorder(controller.getScratchDisk(),
126             "tt" + sn + "http", outBufferSize, inBufferSize);
127         lastFinishTime = System.currentTimeMillis();
128     }
129
130     /** (non-Javadoc)
131      * @see java.lang.Thread#run()
132      */

133     public void run() {
134         String JavaDoc name = controller.getOrder().getCrawlOrderName();
135         logger.fine(getName()+" started for order '"+name+"'");
136
137         try {
138             while ( true ) {
139                 // TODO check for thread-abort? or is waiting for interrupt enough?
140
continueCheck();
141                 
142                 setStep(STEP_ABOUT_TO_GET_URI);
143                 
144                 CrawlURI curi = controller.getFrontier().next();
145                 
146                 synchronized(this) {
147                     continueCheck();
148                     setCurrentCuri(curi);
149                 }
150                 
151                 processCrawlUri();
152                 
153                 setStep(STEP_ABOUT_TO_RETURN_URI);
154                 continueCheck();
155
156                 synchronized(this) {
157                     controller.getFrontier().finished(currentCuri);
158                     setCurrentCuri(null);
159                 }
160                 
161                 setStep(STEP_FINISHING_PROCESS);
162                 lastFinishTime = System.currentTimeMillis();
163                 controller.releaseContinuePermission();
164                 if(shouldRetire) {
165                     break; // from while(true)
166
}
167             }
168         } catch (EndedException e) {
169             // crawl ended (or thread was retired), so allow thread to end
170
} catch (Exception JavaDoc e) {
171             // everything else (including interruption)
172
logger.log(Level.SEVERE,"Fatal exception in "+getName(),e);
173         } catch (OutOfMemoryError JavaDoc err) {
174             seriousError(err);
175         } finally {
176             controller.releaseContinuePermission();
177         }
178         setCurrentCuri(null);
179         // Do cleanup so that objects can be GC.
180
this.httpRecorder.closeRecorders();
181         this.httpRecorder = null;
182         localProcessors = null;
183
184         logger.fine(getName()+" finished for order '"+name+"'");
185         setStep(STEP_FINISHED);
186         controller.toeEnded();
187         controller = null;
188     }
189
190     /**
191      * Set currentCuri, updating thread name as appropriate
192      * @param curi
193      */

194     private void setCurrentCuri(CrawlURI curi) {
195         if(curi==null) {
196             setName(coreName);
197         } else {
198             setName(coreName+curi);
199         }
200         currentCuri = curi;
201     }
202
203     /**
204      * @param s
205      */

206     private void setStep(String JavaDoc s) {
207         step=s;
208         atStepSince = System.currentTimeMillis();
209     }
210
211     private void seriousError(Error JavaDoc err) {
212         // try to prevent timeslicing until we have a chance to deal with OOM
213
// TODO: recognize that new JVM priority indifference may make this
214
// priority-jumbling pointless
215
setPriority(DEFAULT_PRIORITY+1);
216         if (controller!=null) {
217             // hold all ToeThreads from proceeding to next processor
218
controller.singleThreadMode();
219             // TODO: consider if SoftReferences would be a better way to
220
// engineer a soft-landing for low-memory conditions
221
controller.freeReserveMemory();
222             controller.requestCrawlPause();
223             if (controller.getFrontier().getFrontierJournal() != null) {
224                 controller.getFrontier().getFrontierJournal().seriousError(
225                     getName() + err.getMessage());
226             }
227         }
228         
229         // OutOfMemory etc.
230
String JavaDoc extraInfo = DevUtils.extraInfo();
231         System.err.println("<<<");
232         System.err.println(ArchiveUtils.getLog17Date());
233         System.err.println(err);
234         System.err.println(extraInfo);
235         err.printStackTrace(System.err);
236         
237         if (controller!=null) {
238             PrintWriter JavaDoc pw = new PrintWriter JavaDoc(System.err);
239             controller.getToePool().compactReportTo(pw);
240             pw.flush();
241         }
242         System.err.println(">>>");
243 // DevUtils.sigquitSelf();
244

245         String JavaDoc context = "unknown";
246         if(currentCuri!=null) {
247             // update fetch-status, saving original as annotation
248
currentCuri.addAnnotation("err="+err.getClass().getName());
249             currentCuri.addAnnotation("os"+currentCuri.getFetchStatus());
250             currentCuri.setFetchStatus(S_SERIOUS_ERROR);
251             context = currentCuri.singleLineReport() + " in " + currentProcessorName;
252         }
253         String JavaDoc message = "Serious error occured trying " +
254             "to process '" + context + "'\n" + extraInfo;
255         logger.log(Level.SEVERE, message.toString(), err);
256         setPriority(DEFAULT_PRIORITY);
257     }
258
259     /**
260      * Perform checks as to whether normal execution should proceed.
261      *
262      * If an external interrupt is detected, throw an interrupted exception.
263      * Used before anything that should not be attempted by a 'zombie' thread
264      * that the Frontier/Crawl has given up on.
265      *
266      * Otherwise, if the controller's memoryGate has been closed,
267      * hold until it is opened. (Provides a better chance of
268      * being able to complete some tasks after an OutOfMemoryError.)
269      *
270      * @throws InterruptedException
271      */

272     private void continueCheck() throws InterruptedException JavaDoc {
273         if(Thread.interrupted()) {
274             throw new InterruptedException JavaDoc("die request detected");
275         }
276         controller.acquireContinuePermission();
277     }
278
279     /**
280      * Pass the CrawlURI to all appropriate processors
281      *
282      * @throws InterruptedException
283      */

284     private void processCrawlUri() throws InterruptedException JavaDoc {
285         currentCuri.setThreadNumber(this.serialNumber);
286         currentCuri.setNextProcessorChain(controller.getFirstProcessorChain());
287         lastStartTime = System.currentTimeMillis();
288 // System.out.println(currentCuri);
289
try {
290             while (currentCuri.nextProcessorChain() != null) {
291                 setStep(STEP_ABOUT_TO_BEGIN_CHAIN);
292                 // Starting on a new processor chain.
293
currentCuri.setNextProcessor(currentCuri.nextProcessorChain().getFirstProcessor());
294                 currentCuri.setNextProcessorChain(currentCuri.nextProcessorChain().getNextProcessorChain());
295
296                 while (currentCuri.nextProcessor() != null) {
297                     setStep(STEP_ABOUT_TO_BEGIN_PROCESSOR);
298                     Processor currentProcessor = getProcessor(currentCuri.nextProcessor());
299                     currentProcessorName = currentProcessor.getName();
300                     continueCheck();
301 // long memBefore = (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())/1024;
302
currentProcessor.process(currentCuri);
303 // long memAfter = (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())/1024;
304
// System.out.println((memAfter-memBefore)+"K in "+currentProcessorName);
305
}
306             }
307             setStep(STEP_DONE_WITH_PROCESSORS);
308             currentProcessorName = "";
309         } catch (RuntimeExceptionWrapper e) {
310             // Workaround to get cause from BDB
311
if(e.getCause() == null) {
312                 e.initCause(e.getCause());
313             }
314             recoverableProblem(e);
315         } catch (AssertionError JavaDoc ae) {
316             // This risks leaving crawl in fatally inconsistent state,
317
// but is often reasonable for per-Processor assertion problems
318
recoverableProblem(ae);
319         } catch (RuntimeException JavaDoc e) {
320             recoverableProblem(e);
321         } catch (StackOverflowError JavaDoc err) {
322             recoverableProblem(err);
323         } catch (Error JavaDoc err) {
324             // OutOfMemory and any others
325
seriousError(err);
326         }
327     }
328
329
330     /**
331      * Handling for exceptions and errors that are possibly recoverable.
332      *
333      * @param e
334      */

335     private void recoverableProblem(Throwable JavaDoc e) {
336         Object JavaDoc previousStep = step;
337         setStep(STEP_HANDLING_RUNTIME_EXCEPTION);
338         e.printStackTrace(System.err);
339         currentCuri.setFetchStatus(S_RUNTIME_EXCEPTION);
340         // store exception temporarily for logging
341
currentCuri.addAnnotation("err="+e.getClass().getName());
342         currentCuri.putObject(A_RUNTIME_EXCEPTION, e);
343         String JavaDoc message = "Problem " + e +
344                 " occured when trying to process '"
345                 + currentCuri.toString()
346                 + "' at step " + previousStep
347                 + " in " + currentProcessorName +"\n";
348         logger.log(Level.SEVERE, message.toString(), e);
349     }
350
351     private Processor getProcessor(Processor processor) {
352         if(!(processor instanceof InstancePerThread)) {
353             // just use the shared Processor
354
return processor;
355         }
356         // must use local copy of processor
357
Processor localProcessor = (Processor) localProcessors.get(
358                     processor.getClass().getName());
359         if (localProcessor == null) {
360             localProcessor = processor.spawn(this.getSerialNumber());
361             localProcessors.put(processor.getClass().getName(),localProcessor);
362         }
363         return localProcessor;
364     }
365
366     /**
367      * @return Return toe thread serial number.
368      */

369     public int getSerialNumber() {
370         return this.serialNumber;
371     }
372
373     /**
374      * Used to get current threads HttpRecorder instance.
375      * Implementation of the HttpRecorderMarker interface.
376      * @return Returns instance of HttpRecorder carried by this thread.
377      * @see org.archive.util.HttpRecorderMarker#getHttpRecorder()
378      */

379     public HttpRecorder getHttpRecorder() {
380         return this.httpRecorder;
381     }
382     
383     /** Get the CrawlController acossiated with this thread.
384      *
385      * @return Returns the CrawlController.
386      */

387     public CrawlController getController() {
388         return controller;
389     }
390
391     /**
392      * Terminates a thread.
393      *
394      * <p> Calling this method will ensure that the current thread will stop
395      * processing as soon as possible (note: this may be never). Meant to
396      * 'short circuit' hung threads.
397      *
398      * <p> Current crawl uri will have its fetch status set accordingly and
399      * will be immediately returned to the frontier.
400      *
401      * <p> As noted before, this does not ensure that the thread will stop
402      * running (ever). But once evoked it will not try and communicate with
403      * other parts of crawler and will terminate as soon as control is
404      * established.
405      */

406     protected void kill(){
407         this.interrupt();
408         synchronized(this) {
409             if (currentCuri!=null) {
410                 currentCuri.setFetchStatus(S_PROCESSING_THREAD_KILLED);
411                 controller.getFrontier().finished(currentCuri);
412              }
413         }
414     }
415
416     /**
417      * @return Current step (For debugging/reporting, give abstract step
418      * where this thread is).
419      */

420     public Object JavaDoc getStep() {
421         return step;
422     }
423
424     /**
425      * Is this thread processing a URI, not paused or waiting for a URI?
426      * @return whether thread is actively processing a URI
427      */

428     public boolean isActive() {
429         // if alive and not waiting in/for frontier.next(), we're 'active'
430
return this.isAlive() && (currentCuri != null);
431     }
432     
433     /**
434      * Request that this thread retire (exit cleanly) at the earliest
435      * opportunity.
436      */

437     public void retire() {
438         shouldRetire = true;
439     }
440
441     /**
442      * Whether this thread should cleanly retire at the earliest
443      * opportunity.
444      *
445      * @return True if should retire.
446      */

447     public boolean shouldRetire() {
448         return shouldRetire;
449     }
450
451     //
452
// Reporter implementation
453
//
454

455     /**
456      * Compiles and returns a report on its status.
457      * @param name Report name.
458      * @param pw Where to print.
459      */

460     public void reportTo(String JavaDoc name, PrintWriter JavaDoc pw) {
461         // name is ignored for now: only one kind of report
462

463         pw.print("[");
464         pw.println(getName());
465
466         // Make a local copy of the currentCuri reference in case it gets
467
// nulled while we're using it. We're doing this because
468
// alternative is synchronizing and we don't want to do this --
469
// it causes hang ups as controller waits on a lock for this thread,
470
// something it gets easily enough on old threading model but something
471
// it can wait interminably for on NPTL threading model.
472
// See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM.
473
CrawlURI c = currentCuri;
474         if(c != null) {
475             pw.print(" ");
476             c.singleLineReportTo(pw);
477             pw.print(" ");
478             pw.print(c.getFetchAttempts());
479             pw.print(" attempts");
480             pw.println();
481             pw.print(" ");
482             pw.print("in processor: ");
483             pw.print(currentProcessorName);
484         } else {
485             pw.print(" -no CrawlURI- ");
486         }
487         pw.println();
488
489         long now = System.currentTimeMillis();
490         long time = 0;
491
492         pw.print(" ");
493         if(lastFinishTime > lastStartTime) {
494             // That means we finished something after we last started something
495
// or in other words we are not working on anything.
496
pw.print("WAITING for ");
497             time = now - lastFinishTime;
498         } else if(lastStartTime > 0) {
499             // We are working on something
500
pw.print("ACTIVE for ");
501             time = now-lastStartTime;
502         }
503         pw.print(ArchiveUtils.formatMillisecondsToConventional(time));
504         pw.println();
505
506         pw.print(" ");
507         pw.print("step: ");
508         pw.print(step);
509         pw.print(" for ");
510         pw.print(ArchiveUtils.formatMillisecondsToConventional(System.currentTimeMillis()-atStepSince));
511         pw.println();
512
513         StackTraceElement JavaDoc[] ste = this.getStackTrace();
514         for(int i=0;i<ste.length;i++) {
515             pw.print(" ");
516             pw.print(ste[i].toString());
517             pw.println();
518         }
519         pw.print("]");
520         pw.println();
521         
522         pw.flush();
523     }
524
525     /**
526      * @param w PrintWriter to write to.
527      */

528     public void singleLineReportTo(PrintWriter JavaDoc w)
529     {
530         w.print("#");
531         w.print(this.serialNumber);
532
533         // Make a local copy of the currentCuri reference in case it gets
534
// nulled while we're using it. We're doing this because
535
// alternative is synchronizing and we don't want to do this --
536
// it causes hang ups as controller waits on a lock for this thread,
537
// something it gets easily enough on old threading model but something
538
// it can wait interminably for on NPTL threading model.
539
// See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM.
540
CrawlURI c = currentCuri;
541         if(c != null) {
542             w.print(" ");
543             w.print(currentProcessorName);
544             w.print(" ");
545             w.print(c.toString());
546             w.print(" (");
547             w.print(c.getFetchAttempts());
548             w.print(") ");
549         } else {
550             w.print(" [no CrawlURI] ");
551         }
552         
553         long now = System.currentTimeMillis();
554         long time = 0;
555
556         if(lastFinishTime > lastStartTime) {
557             // That means we finished something after we last started something
558
// or in other words we are not working on anything.
559
w.print("WAITING for ");
560             time = now - lastFinishTime;
561         } else if(lastStartTime > 0) {
562             // We are working on something
563
w.print("ACTIVE for ");
564             time = now-lastStartTime;
565         }
566         w.print(ArchiveUtils.formatMillisecondsToConventional(time));
567         w.print(" at ");
568         w.print(step);
569         w.print(" for ");
570         w.print(ArchiveUtils.formatMillisecondsToConventional(now-atStepSince));
571         w.print("\n");
572         w.flush();
573     }
574
575     /* (non-Javadoc)
576      * @see org.archive.util.Reporter#singleLineLegend()
577      */

578     public String JavaDoc singleLineLegend() {
579         return "#serialNumber processorName currentUri (fetchAttempts) threadState threadStep";
580     }
581     
582     /* (non-Javadoc)
583      * @see org.archive.util.Reporter#getReports()
584      */

585     public String JavaDoc[] getReports() {
586         // for now none but the default
587
return new String JavaDoc[] {};
588     }
589
590     public void reportTo(PrintWriter JavaDoc writer) {
591         reportTo(null, writer);
592     }
593
594     /* (non-Javadoc)
595      * @see org.archive.util.Reporter#singleLineReport()
596      */

597     public String JavaDoc singleLineReport() {
598         return ArchiveUtils.singleLineReport(this);
599     }
600
601     public void progressStatisticsLine(PrintWriter JavaDoc writer) {
602         writer.print(getController().getStatistics()
603             .getProgressStatisticsLine());
604         writer.print("\n");
605     }
606
607     public void progressStatisticsLegend(PrintWriter JavaDoc writer) {
608         writer.print(getController().getStatistics()
609             .progressStatisticsLegend());
610         writer.print("\n");
611     }
612     
613     public String JavaDoc getCurrentProcessorName() {
614         return currentProcessorName;
615     }
616 }
617
Popular Tags