1 24 package org.archive.crawler.framework; 25 26 import java.io.File ; 27 import java.io.FileOutputStream ; 28 import java.io.FilenameFilter ; 29 import java.io.IOException ; 30 import java.io.ObjectInputStream ; 31 import java.io.PrintWriter ; 32 import java.io.Serializable ; 33 import java.util.ArrayList ; 34 import java.util.Arrays ; 35 import java.util.Collections ; 36 import java.util.Date ; 37 import java.util.EventObject ; 38 import java.util.HashMap ; 39 import java.util.HashSet ; 40 import java.util.Hashtable ; 41 import java.util.Iterator ; 42 import java.util.LinkedList ; 43 import java.util.List ; 44 import java.util.Map ; 45 import java.util.Set ; 46 import java.util.TreeSet ; 47 import java.util.logging.FileHandler ; 48 import java.util.logging.Formatter ; 49 import java.util.logging.Level ; 50 import java.util.logging.Logger ; 51 52 import javax.management.AttributeNotFoundException ; 53 import javax.management.InvalidAttributeValueException ; 54 import javax.management.MBeanException ; 55 import javax.management.ReflectionException ; 56 57 import org.apache.commons.httpclient.URIException; 58 import org.archive.crawler.admin.CrawlJob; 59 import org.archive.crawler.admin.StatisticsTracker; 60 import org.archive.crawler.datamodel.Checkpoint; 61 import org.archive.crawler.datamodel.CrawlOrder; 62 import org.archive.crawler.datamodel.CrawlURI; 63 import org.archive.crawler.datamodel.ServerCache; 64 import org.archive.crawler.event.CrawlStatusListener; 65 import org.archive.crawler.event.CrawlURIDispositionListener; 66 import org.archive.crawler.framework.exceptions.FatalConfigurationException; 67 import org.archive.crawler.framework.exceptions.InitializationException; 68 import org.archive.crawler.io.LocalErrorFormatter; 69 import org.archive.crawler.io.RuntimeErrorFormatter; 70 import org.archive.crawler.io.StatisticsLogFormatter; 71 import org.archive.crawler.io.UriErrorFormatter; 72 import org.archive.crawler.io.UriProcessingFormatter; 73 import org.archive.crawler.settings.MapType; 74 import org.archive.crawler.settings.SettingsHandler; 75 import org.archive.crawler.util.CheckpointUtils; 76 import org.archive.io.GenerationFileHandler; 77 import org.archive.net.UURI; 78 import org.archive.net.UURIFactory; 79 import org.archive.util.ArchiveUtils; 80 import org.archive.util.CachedBdbMap; 81 import org.archive.util.FileUtils; 82 import org.archive.util.Reporter; 83 import org.xbill.DNS.DClass; 84 import org.xbill.DNS.Lookup; 85 import org.xbill.DNS.Type; 86 87 import com.sleepycat.bind.serial.StoredClassCatalog; 88 import com.sleepycat.je.CheckpointConfig; 89 import com.sleepycat.je.Database; 90 import com.sleepycat.je.DatabaseConfig; 91 import com.sleepycat.je.DatabaseException; 92 import com.sleepycat.je.DbInternal; 93 import com.sleepycat.je.Environment; 94 import com.sleepycat.je.EnvironmentConfig; 95 import com.sleepycat.je.dbi.EnvironmentImpl; 96 import com.sleepycat.je.utilint.DbLsn; 97 98 import java.util.concurrent.locks.ReentrantLock ; 99 100 110 public class CrawlController implements Serializable , Reporter { 111 private static final long serialVersionUID = 113 ArchiveUtils.classnameBasedUID(CrawlController.class,1); 114 115 120 private final static Logger LOGGER = 121 Logger.getLogger(CrawlController.class.getName()); 122 123 125 public static final char MANIFEST_CONFIG_FILE = 'C'; 126 127 public static final char MANIFEST_REPORT_FILE = 'R'; 128 129 public static final char MANIFEST_LOG_FILE = 'L'; 130 131 private static final String LOGNAME_PROGRESS_STATISTICS = 133 "progress-statistics"; 134 private static final String LOGNAME_URI_ERRORS = "uri-errors"; 135 private static final String LOGNAME_RUNTIME_ERRORS = "runtime-errors"; 136 private static final String LOGNAME_LOCAL_ERRORS = "local-errors"; 137 private static final String LOGNAME_CRAWL = "crawl"; 138 139 private transient CrawlOrder order; 141 private transient CrawlScope scope; 142 private transient ProcessorChainList processorChains; 143 144 private transient Frontier frontier; 145 146 private transient ToePool toePool; 147 148 private transient ServerCache serverCache; 149 150 private transient SettingsHandler settingsHandler; 152 153 154 private volatile transient boolean singleThreadMode = false; 156 private transient ReentrantLock singleThreadLock = null; 157 158 private transient LinkedList <char[]> reserveMemory; 160 private static final int RESERVE_BLOCKS = 1; 161 private static final int RESERVE_BLOCK_SIZE = 6*2^20; 163 165 168 private transient String sExit; 169 170 private static final Object NASCENT = "NASCENT".intern(); 171 private static final Object RUNNING = "RUNNING".intern(); 172 private static final Object PAUSED = "PAUSED".intern(); 173 private static final Object PAUSING = "PAUSING".intern(); 174 private static final Object CHECKPOINTING = "CHECKPOINTING".intern(); 175 private static final Object STOPPING = "STOPPING".intern(); 176 private static final Object FINISHED = "FINISHED".intern(); 177 private static final Object STARTED = "STARTED".intern(); 178 private static final Object PREPARING = "PREPARING".intern(); 179 180 transient private Object state = NASCENT; 181 182 private transient File disk; private transient File logsDisk; 186 189 private transient File stateDisk; 190 191 194 private transient File scratchDisk; 195 196 199 private transient File checkpointsDisk; 200 201 206 private Checkpointer checkpointer; 207 208 212 private transient Checkpoint checkpointRecover = null; 213 214 private long maxBytes; 216 private long maxDocument; 217 private long maxTime; 218 219 223 private StringBuffer manifest; 224 225 229 transient private Map <Logger ,FileHandler > fileHandlers; 230 231 232 public static final String CURRENT_LOG_SUFFIX = ".log"; 233 234 239 public transient Logger uriProcessing; 240 241 247 public transient Logger runtimeErrors; 248 249 255 public transient Logger localErrors; 256 257 260 public transient Logger uriErrors; 261 262 265 private transient Logger progressStats; 266 267 273 public transient Logger reports; 274 275 protected StatisticsTracking statistics = null; 276 277 284 private transient List <CrawlStatusListener> registeredCrawlStatusListeners = 285 Collections.synchronizedList(new ArrayList <CrawlStatusListener>()); 286 287 private transient CrawlURIDispositionListener 290 registeredCrawlURIDispositionListener; 291 292 protected transient ArrayList <CrawlURIDispositionListener> 294 registeredCrawlURIDispositionListeners; 295 296 297 private transient Environment bdbEnvironment = null; 300 301 305 private transient Database classCatalogDB = null; 306 307 311 private transient StoredClassCatalog classCatalog = null; 312 313 317 private transient Map <String ,CachedBdbMap<?,?>> bigmaps = null; 318 319 322 public CrawlController() { 323 super(); 324 } 326 327 334 public void initialize(SettingsHandler sH) 335 throws InitializationException { 336 sendCrawlStateChangeEvent(PREPARING, CrawlJob.STATUS_PREPARING); 337 338 this.singleThreadLock = new ReentrantLock (); 339 this.settingsHandler = sH; 340 this.order = settingsHandler.getOrder(); 341 this.order.setController(this); 342 this.bigmaps = new Hashtable <String ,CachedBdbMap<?,?>>(); 343 sExit = ""; 344 this.manifest = new StringBuffer (); 345 String onFailMessage = ""; 346 try { 347 onFailMessage = "You must set the User-Agent and From HTTP" + 348 " header values to acceptable strings. \n" + 349 " User-Agent: [software-name](+[info-url])[misc]\n" + 350 " From: [email-address]\n"; 351 order.checkUserAgentAndFrom(); 352 353 onFailMessage = "Unable to setup disk"; 354 if (disk == null) { 355 setupDisk(); 356 } 357 358 onFailMessage = "Unable to create log file(s)"; 359 setupLogs(); 360 361 onFailMessage = "Unable to test/run checkpoint recover"; 378 this.checkpointRecover = getCheckpointRecover(); 379 if (this.checkpointRecover == null) { 380 this.checkpointer = 381 new Checkpointer(this, this.checkpointsDisk); 382 } else { 383 setupCheckpointRecover(); 384 } 385 386 onFailMessage = "Unable to setup bdb environment."; 387 setupBdb(); 388 389 onFailMessage = "Unable to setup statistics"; 390 setupStatTracking(); 391 392 onFailMessage = "Unable to setup crawl modules"; 393 setupCrawlModules(); 394 } catch (Exception e) { 395 String tmp = "On crawl: " 396 + settingsHandler.getSettingsObject(null).getName() + " " + 397 onFailMessage; 398 LOGGER.log(Level.SEVERE, tmp, e); 399 throw new InitializationException(tmp, e); 400 } 401 402 Lookup.getDefaultCache(DClass.IN).setMaxEntries(1); 405 407 setupToePool(); 408 setThresholds(); 409 410 reserveMemory = new LinkedList <char[]>(); 411 for(int i = 1; i < RESERVE_BLOCKS; i++) { 412 reserveMemory.add(new char[RESERVE_BLOCK_SIZE]); 413 } 414 } 415 416 421 protected void setupCheckpointRecover() 422 throws IOException { 423 long started = System.currentTimeMillis();; 424 if (LOGGER.isLoggable(Level.FINE)) { 425 LOGGER.fine("Starting recovery setup -- copying into place " + 426 "bdbje log files -- for checkpoint named " + 427 this.checkpointRecover.getDisplayName()); 428 } 429 this.checkpointer.recover(this); 431 this.progressStats.info("CHECKPOINT RECOVER " + 432 this.checkpointRecover.getDisplayName()); 433 File bdbSubDir = CheckpointUtils. 440 getBdbSubDirectory(this.checkpointRecover.getDirectory()); 441 FileUtils.copyFiles(bdbSubDir, CheckpointUtils.getJeLogsFilter(), 442 getStateDisk(), true, 443 false); 444 if (LOGGER.isLoggable(Level.INFO)) { 445 LOGGER.info("Finished recovery setup for checkpoint named " + 446 this.checkpointRecover.getDisplayName() + " in " + 447 (System.currentTimeMillis() - started) + "ms."); 448 } 449 } 450 451 protected boolean getCheckpointCopyBdbjeLogs() { 452 return ((Boolean )this.order.getUncheckedAttribute(null, 453 CrawlOrder.ATTR_CHECKPOINT_COPY_BDBJE_LOGS)).booleanValue(); 454 } 455 456 private void setupBdb() 457 throws FatalConfigurationException, AttributeNotFoundException { 458 EnvironmentConfig envConfig = new EnvironmentConfig(); 459 envConfig.setAllowCreate(true); 460 int bdbCachePercent = ((Integer )this.order. 461 getAttribute(null, CrawlOrder.ATTR_BDB_CACHE_PERCENT)).intValue(); 462 if(bdbCachePercent > 0) { 463 envConfig.setCachePercent(bdbCachePercent); 466 } 467 envConfig.setLockTimeout(5000000); if (LOGGER.isLoggable(Level.FINEST)) { 469 envConfig.setConfigParam("java.util.logging.level", "SEVERE"); 470 envConfig.setConfigParam("java.util.logging.level.evictor", 471 "SEVERE"); 472 envConfig.setConfigParam("java.util.logging.ConsoleHandler.on", 473 "true"); 474 } 475 476 if (!getCheckpointCopyBdbjeLogs()) { 477 envConfig.setConfigParam("je.cleaner.expunge", "false"); 482 } 483 484 try { 485 this.bdbEnvironment = new Environment(getStateDisk(), envConfig); 486 if (LOGGER.isLoggable(Level.FINE)) { 487 envConfig = bdbEnvironment.getConfig(); 489 LOGGER.fine("BdbConfiguration: Cache percentage " + 490 envConfig.getCachePercent() + 491 ", cache size " + envConfig.getCacheSize()); 492 } 493 DatabaseConfig dbConfig = new DatabaseConfig(); 496 dbConfig.setAllowCreate(true); 497 this.classCatalogDB = this.bdbEnvironment. 498 openDatabase(null, "classes", dbConfig); 499 this.classCatalog = new StoredClassCatalog(classCatalogDB); 500 } catch (DatabaseException e) { 501 e.printStackTrace(); 502 throw new FatalConfigurationException(e.getMessage()); 503 } 504 } 505 506 public Environment getBdbEnvironment() { 507 return this.bdbEnvironment; 508 } 509 510 public StoredClassCatalog getClassCatalog() { 511 return this.classCatalog; 512 } 513 514 521 public void addCrawlStatusListener(CrawlStatusListener cl) { 522 synchronized (this.registeredCrawlStatusListeners) { 523 this.registeredCrawlStatusListeners.add(cl); 524 } 525 } 526 527 534 public void addCrawlURIDispositionListener(CrawlURIDispositionListener cl) { 535 registeredCrawlURIDispositionListener = null; 536 if (registeredCrawlURIDispositionListeners == null) { 537 registeredCrawlURIDispositionListener = cl; 539 registeredCrawlURIDispositionListeners 541 = new ArrayList <CrawlURIDispositionListener>(1); 542 } 544 registeredCrawlURIDispositionListeners.add(cl); 545 } 546 547 556 public void fireCrawledURISuccessfulEvent(CrawlURI curi) { 557 if (registeredCrawlURIDispositionListener != null) { 558 registeredCrawlURIDispositionListener.crawledURISuccessful(curi); 560 } else { 561 if (registeredCrawlURIDispositionListeners != null 563 && registeredCrawlURIDispositionListeners.size() > 0) { 564 Iterator it = registeredCrawlURIDispositionListeners.iterator(); 565 while (it.hasNext()) { 566 ( 567 (CrawlURIDispositionListener) it 568 .next()) 569 .crawledURISuccessful( 570 curi); 571 } 572 } 573 } 574 } 575 576 585 public void fireCrawledURINeedRetryEvent(CrawlURI curi) { 586 if (registeredCrawlURIDispositionListener != null) { 587 registeredCrawlURIDispositionListener.crawledURINeedRetry(curi); 589 return; 590 } 591 592 if (registeredCrawlURIDispositionListeners != null 594 && registeredCrawlURIDispositionListeners.size() > 0) { 595 for (Iterator i = registeredCrawlURIDispositionListeners.iterator(); 596 i.hasNext();) { 597 ((CrawlURIDispositionListener)i.next()).crawledURINeedRetry(curi); 598 } 599 } 600 } 601 602 612 public void fireCrawledURIDisregardEvent(CrawlURI curi) { 613 if (registeredCrawlURIDispositionListener != null) { 614 registeredCrawlURIDispositionListener.crawledURIDisregard(curi); 616 } else { 617 if (registeredCrawlURIDispositionListeners != null 619 && registeredCrawlURIDispositionListeners.size() > 0) { 620 Iterator it = registeredCrawlURIDispositionListeners.iterator(); 621 while (it.hasNext()) { 622 ( 623 (CrawlURIDispositionListener) it 624 .next()) 625 .crawledURIDisregard( 626 curi); 627 } 628 } 629 } 630 } 631 632 640 public void fireCrawledURIFailureEvent(CrawlURI curi) { 641 if (registeredCrawlURIDispositionListener != null) { 642 registeredCrawlURIDispositionListener.crawledURIFailure(curi); 644 } else { 645 if (registeredCrawlURIDispositionListeners != null 647 && registeredCrawlURIDispositionListeners.size() > 0) { 648 Iterator it = registeredCrawlURIDispositionListeners.iterator(); 649 while (it.hasNext()) { 650 ((CrawlURIDispositionListener)it.next()) 651 .crawledURIFailure(curi); 652 } 653 } 654 } 655 } 656 657 private void setupCrawlModules() throws FatalConfigurationException, 658 AttributeNotFoundException , MBeanException , ReflectionException { 659 if (scope == null) { 660 scope = (CrawlScope) order.getAttribute(CrawlScope.ATTR_NAME); 661 scope.initialize(this); 662 } 663 try { 664 this.serverCache = new ServerCache(this); 665 } catch (Exception e) { 666 throw new FatalConfigurationException("Unable to" + 667 " initialize frontier (Failed setup of ServerCache) " + e); 668 } 669 670 if (this.frontier == null) { 671 this.frontier = (Frontier)order.getAttribute(Frontier.ATTR_NAME); 672 try { 673 frontier.initialize(this); 674 frontier.pause(); if (!isCheckpointRecover()) { 679 runFrontierRecover((String )order. 680 getAttribute(CrawlOrder.ATTR_RECOVER_PATH)); 681 } 682 } catch (IOException e) { 683 throw new FatalConfigurationException( 684 "unable to initialize frontier: " + e); 685 } 686 } 687 688 if (processorChains == null) { 690 processorChains = new ProcessorChainList(order); 691 } 692 } 693 694 protected void runFrontierRecover(String recoverPath) 695 throws AttributeNotFoundException , MBeanException , 696 ReflectionException , FatalConfigurationException { 697 if (recoverPath == null || recoverPath.length() <= 0) { 698 return; 699 } 700 File f = new File (recoverPath); 701 if (!f.exists()) { 702 LOGGER.severe("Recover file does not exist " + recoverPath); 703 return; 704 } 705 if (!f.isFile()) { 706 return; 708 } 709 boolean retainFailures = ((Boolean )order. 710 getAttribute(CrawlOrder.ATTR_RECOVER_RETAIN_FAILURES)).booleanValue(); 711 try { 712 frontier.importRecoverLog(recoverPath, retainFailures); 713 } catch (IOException e) { 714 e.printStackTrace(); 715 throw (FatalConfigurationException) new FatalConfigurationException( 716 "Recover.log " + recoverPath + " problem: " + e).initCause(e); 717 } 718 } 719 720 private void setupDisk() throws AttributeNotFoundException { 721 String diskPath 722 = (String ) order.getAttribute(null, CrawlOrder.ATTR_DISK_PATH); 723 this.disk = getSettingsHandler(). 724 getPathRelativeToWorkingDirectory(diskPath); 725 this.disk.mkdirs(); 726 this.logsDisk = getSettingsDir(CrawlOrder.ATTR_LOGS_PATH); 727 this.checkpointsDisk = getSettingsDir(CrawlOrder.ATTR_CHECKPOINTS_PATH); 728 this.stateDisk = getSettingsDir(CrawlOrder.ATTR_STATE_PATH); 729 this.scratchDisk = getSettingsDir(CrawlOrder.ATTR_SCRATCH_PATH); 730 } 731 732 735 public File getLogsDir() { 736 File f = null; 737 try { 738 f = getSettingsDir(CrawlOrder.ATTR_LOGS_PATH); 739 } catch (AttributeNotFoundException e) { 740 LOGGER.severe("Failed get of logs directory: " + e.getMessage()); 741 } 742 return f; 743 } 744 745 754 public File getSettingsDir(String key) 755 throws AttributeNotFoundException { 756 String path = (String )order.getAttribute(null, key); 757 File f = new File (path); 758 if (!f.isAbsolute()) { 759 f = new File (disk.getPath(), path); 760 } 761 if (!f.exists()) { 762 f.mkdirs(); 763 } 764 return f; 765 } 766 767 775 private void setupStatTracking() 776 throws InvalidAttributeValueException , FatalConfigurationException { 777 MapType loggers = order.getLoggers(); 778 final String cstName = "crawl-statistics"; 779 if (loggers.isEmpty(null)) { 780 if (!isCheckpointRecover() && this.statistics == null) { 781 this.statistics = new StatisticsTracker(cstName); 782 } 783 loggers.addElement(null, (StatisticsTracker)this.statistics); 784 } 785 786 if (isCheckpointRecover()) { 787 restoreStatisticsTracker(loggers, cstName); 788 } 789 790 for (Iterator it = loggers.iterator(null); it.hasNext();) { 791 StatisticsTracking tracker = (StatisticsTracking)it.next(); 792 tracker.initialize(this); 793 if (this.statistics == null) { 794 this.statistics = tracker; 795 } 796 } 797 } 798 799 protected void restoreStatisticsTracker(MapType loggers, 800 String replaceName) 801 throws FatalConfigurationException { 802 try { 803 loggers.removeElement(loggers.globalSettings(), replaceName); 805 loggers.addElement(loggers.globalSettings(), 806 (StatisticsTracker)this.statistics); 807 } catch (Exception e) { 808 throw convertToFatalConfigurationException(e); 809 } 810 } 811 812 protected FatalConfigurationException 813 convertToFatalConfigurationException(Exception e) { 814 FatalConfigurationException fce = 815 new FatalConfigurationException("Converted exception: " + 816 e.getMessage()); 817 fce.setStackTrace(e.getStackTrace()); 818 return fce; 819 } 820 821 private void setupLogs() throws IOException { 822 String logsPath = logsDisk.getAbsolutePath() + File.separatorChar; 823 uriProcessing = Logger.getLogger(LOGNAME_CRAWL + "." + logsPath); 824 runtimeErrors = Logger.getLogger(LOGNAME_RUNTIME_ERRORS + "." + 825 logsPath); 826 localErrors = Logger.getLogger(LOGNAME_LOCAL_ERRORS + "." + logsPath); 827 uriErrors = Logger.getLogger(LOGNAME_URI_ERRORS + "." + logsPath); 828 progressStats = Logger.getLogger(LOGNAME_PROGRESS_STATISTICS + "." + 829 logsPath); 830 831 this.fileHandlers = new HashMap <Logger ,FileHandler >(); 832 833 setupLogFile(uriProcessing, 834 logsPath + LOGNAME_CRAWL + CURRENT_LOG_SUFFIX, 835 new UriProcessingFormatter(), true); 836 837 setupLogFile(runtimeErrors, 838 logsPath + LOGNAME_RUNTIME_ERRORS + CURRENT_LOG_SUFFIX, 839 new RuntimeErrorFormatter(), true); 840 841 setupLogFile(localErrors, 842 logsPath + LOGNAME_LOCAL_ERRORS + CURRENT_LOG_SUFFIX, 843 new LocalErrorFormatter(), true); 844 845 setupLogFile(uriErrors, 846 logsPath + LOGNAME_URI_ERRORS + CURRENT_LOG_SUFFIX, 847 new UriErrorFormatter(), true); 848 849 setupLogFile(progressStats, 850 logsPath + LOGNAME_PROGRESS_STATISTICS + CURRENT_LOG_SUFFIX, 851 new StatisticsLogFormatter(), true); 852 853 } 854 855 private void setupLogFile(Logger logger, String filename, Formatter f, 856 boolean shouldManifest) throws IOException , SecurityException { 857 GenerationFileHandler fh = new GenerationFileHandler(filename, true, 858 shouldManifest); 859 fh.setFormatter(f); 860 logger.addHandler(fh); 861 addToManifest(filename, MANIFEST_LOG_FILE, shouldManifest); 862 logger.setUseParentHandlers(false); 863 this.fileHandlers.put(logger, fh); 864 } 865 866 protected void rotateLogFiles(String generationSuffix) 867 throws IOException { 868 if (this.state != PAUSED && this.state != CHECKPOINTING) { 869 throw new IllegalStateException ("Pause crawl before requesting " + 870 "log rotation."); 871 } 872 for (Iterator i = fileHandlers.keySet().iterator(); i.hasNext();) { 873 Logger l = (Logger )i.next(); 874 GenerationFileHandler gfh = 875 (GenerationFileHandler)fileHandlers.get(l); 876 GenerationFileHandler newGfh = 877 gfh.rotate(generationSuffix, CURRENT_LOG_SUFFIX); 878 if (gfh.shouldManifest()) { 879 addToManifest((String ) newGfh.getFilenameSeries().get(1), 880 MANIFEST_LOG_FILE, newGfh.shouldManifest()); 881 } 882 l.removeHandler(gfh); 883 l.addHandler(newGfh); 884 fileHandlers.put(l, newGfh); 885 } 886 } 887 888 891 public void closeLogFiles() { 892 for (Iterator i = fileHandlers.keySet().iterator(); i.hasNext();) { 893 Logger l = (Logger )i.next(); 894 GenerationFileHandler gfh = 895 (GenerationFileHandler)fileHandlers.get(l); 896 gfh.close(); 897 l.removeHandler(gfh); 898 } 899 } 900 901 904 private void setThresholds() { 905 try { 906 maxBytes = 907 ((Long ) order.getAttribute(CrawlOrder.ATTR_MAX_BYTES_DOWNLOAD)) 908 .longValue(); 909 } catch (Exception e) { 910 maxBytes = 0; 911 } 912 try { 913 maxDocument = 914 ((Long ) order 915 .getAttribute(CrawlOrder.ATTR_MAX_DOCUMENT_DOWNLOAD)) 916 .longValue(); 917 } catch (Exception e) { 918 maxDocument = 0; 919 } 920 try { 921 maxTime = 922 ((Long ) order.getAttribute(CrawlOrder.ATTR_MAX_TIME_SEC)) 923 .longValue(); 924 } catch (Exception e) { 925 maxTime = 0; 926 } 927 } 928 929 932 public StatisticsTracking getStatistics() { 933 return statistics==null ? 934 new StatisticsTracker("crawl-statistics"): this.statistics; 935 } 936 937 944 protected void sendCrawlStateChangeEvent(Object newState, String message) { 945 synchronized (this.registeredCrawlStatusListeners) { 946 this.state = newState; 947 for (Iterator i = this.registeredCrawlStatusListeners.iterator(); 948 i.hasNext();) { 949 CrawlStatusListener l = (CrawlStatusListener)i.next(); 950 if (newState.equals(PAUSED)) { 951 l.crawlPaused(message); 952 } else if (newState.equals(RUNNING)) { 953 l.crawlResuming(message); 954 } else if (newState.equals(PAUSING)) { 955 l.crawlPausing(message); 956 } else if (newState.equals(STARTED)) { 957 l.crawlStarted(message); 958 } else if (newState.equals(STOPPING)) { 959 l.crawlEnding(message); 960 } else if (newState.equals(FINISHED)) { 961 l.crawlEnded(message); 962 } else if (newState.equals(PREPARING)) { 963 l.crawlResuming(message); 964 } else { 965 throw new RuntimeException ("Unknown state: " + newState); 966 } 967 if (LOGGER.isLoggable(Level.FINE)) { 968 LOGGER.fine("Sent " + newState + " to " + l); 969 } 970 } 971 LOGGER.fine("Sent " + newState); 972 } 973 } 974 975 984 protected void sendCheckpointEvent(File checkpointDir) throws Exception { 985 synchronized (this.registeredCrawlStatusListeners) { 986 if (this.state != PAUSED) { 987 throw new IllegalStateException ("Crawler must be completly " + 988 "paused before checkpointing can start"); 989 } 990 this.state = CHECKPOINTING; 991 for (Iterator i = this.registeredCrawlStatusListeners.iterator(); 992 i.hasNext();) { 993 CrawlStatusListener l = (CrawlStatusListener)i.next(); 994 l.crawlCheckpoint(checkpointDir); 995 if (LOGGER.isLoggable(Level.FINE)) { 996 LOGGER.fine("Sent " + CHECKPOINTING + " to " + l); 997 } 998 } 999 LOGGER.fine("Sent " + CHECKPOINTING); 1000 } 1001 } 1002 1003 1006 public void requestCrawlStart() { 1007 runProcessorInitialTasks(); 1008 1009 sendCrawlStateChangeEvent(STARTED, CrawlJob.STATUS_PENDING); 1010 String jobState; 1011 state = RUNNING; 1012 jobState = CrawlJob.STATUS_RUNNING; 1013 sendCrawlStateChangeEvent(this.state, jobState); 1014 1015 this.sExit = CrawlJob.STATUS_FINISHED_ABNORMAL; 1017 1018 Thread statLogger = new Thread (statistics); 1019 statLogger.setName("StatLogger"); 1020 statLogger.start(); 1021 1022 frontier.start(); 1023 } 1024 1025 1028 protected void completeStop() { 1029 LOGGER.fine("Entered complete stop."); 1030 runProcessorFinalTasks(); 1032 sendCrawlStateChangeEvent(FINISHED, this.sExit); 1034 synchronized (this.registeredCrawlStatusListeners) { 1035 this.registeredCrawlStatusListeners. 1037 removeAll(this.registeredCrawlStatusListeners); 1038 this.registeredCrawlStatusListeners = null; 1039 } 1040 1041 closeLogFiles(); 1042 1043 this.fileHandlers = null; 1045 this.uriErrors = null; 1046 this.uriProcessing = null; 1047 this.localErrors = null; 1048 this.runtimeErrors = null; 1049 this.progressStats = null; 1050 this.reports = null; 1051 this.manifest = null; 1052 1053 this.statistics = null; 1055 this.frontier = null; 1056 this.disk = null; 1057 this.scratchDisk = null; 1058 this.order = null; 1059 this.scope = null; 1060 if (this.settingsHandler != null) { 1061 this.settingsHandler.cleanup(); 1062 } 1063 this.settingsHandler = null; 1064 this.reserveMemory = null; 1065 this.processorChains = null; 1066 if (this.serverCache != null) { 1067 this.serverCache.cleanup(); 1068 this.serverCache = null; 1069 } 1070 if (this.checkpointer != null) { 1071 this.checkpointer.cleanup(); 1072 this.checkpointer = null; 1073 } 1074 if (this.classCatalogDB != null) { 1075 try { 1076 this.classCatalogDB.close(); 1077 } catch (DatabaseException e) { 1078 e.printStackTrace(); 1079 } 1080 this.classCatalogDB = null; 1081 } 1082 if (this.bdbEnvironment != null) { 1083 try { 1084 this.bdbEnvironment.sync(); 1085 this.bdbEnvironment.close(); 1086 } catch (DatabaseException e) { 1087 e.printStackTrace(); 1088 } 1089 this.bdbEnvironment = null; 1090 } 1091 this.bigmaps = null; 1092 if (this.toePool != null) { 1093 this.toePool.cleanup(); 1094 } 1101 this.toePool = null; 1102 LOGGER.fine("Finished crawl."); 1103 } 1104 1105 synchronized void completePause() { 1106 notifyAll(); 1109 sendCrawlStateChangeEvent(PAUSED, CrawlJob.STATUS_PAUSED); 1110 } 1111 1112 private boolean shouldContinueCrawling() { 1113 if (frontier.isEmpty()) { 1114 this.sExit = CrawlJob.STATUS_FINISHED; 1115 return false; 1116 } 1117 1118 if (maxBytes > 0 && frontier.totalBytesWritten() >= maxBytes) { 1119 sExit = CrawlJob.STATUS_FINISHED_DATA_LIMIT; 1121 return false; 1122 } else if (maxDocument > 0 1123 && frontier.succeededFetchCount() >= maxDocument) { 1124 this.sExit = CrawlJob.STATUS_FINISHED_DOCUMENT_LIMIT; 1126 return false; 1127 } else if (maxTime > 0 && 1128 statistics.crawlDuration() >= maxTime * 1000) { 1129 this.sExit = CrawlJob.STATUS_FINISHED_TIME_LIMIT; 1131 return false; 1132 } 1133 return state == RUNNING; 1134 } 1135 1136 1142 public synchronized void requestCrawlCheckpoint() 1143 throws IllegalStateException { 1144 if (this.checkpointer == null) { 1145 return; 1146 } 1147 if (this.checkpointer.isCheckpointing()) { 1148 throw new IllegalStateException ("Checkpoint already running."); 1149 } 1150 this.checkpointer.checkpoint(); 1151 } 1152 1153 1156 public boolean isCheckpointing() { 1157 return this.state == CHECKPOINTING; 1158 } 1159 1160 1173 void checkpoint() 1174 throws Exception { 1175 sendCheckpointEvent(this.checkpointer. 1177 getCheckpointInProgressDirectory()); 1178 1179 LOGGER.fine("Rotating log files."); 1181 rotateLogFiles(CURRENT_LOG_SUFFIX + "." + 1182 this.checkpointer.getNextCheckpointName()); 1183 1184 LOGGER.fine("BigMaps."); 1186 checkpointBigMaps(this.checkpointer.getCheckpointInProgressDirectory()); 1187 1188 1195 LOGGER.fine("Bdb environment."); 1197 checkpointBdb(this.checkpointer.getCheckpointInProgressDirectory()); 1198 1199 LOGGER.fine("Copying settings."); 1201 copySettings(this.checkpointer.getCheckpointInProgressDirectory()); 1202 1203 CheckpointUtils.writeObjectToFile(this, 1205 this.checkpointer.getCheckpointInProgressDirectory()); 1206 } 1207 1208 1213 protected void copySettings(final File checkpointDir) throws IOException { 1214 final List files = this.settingsHandler.getListOfAllFiles(); 1215 boolean copiedSettingsDir = false; 1216 final File settingsDir = new File (this.disk, "settings"); 1217 for (final Iterator i = files.iterator(); i.hasNext();) { 1218 File f = new File ((String )i.next()); 1219 if (f.getAbsolutePath().startsWith(settingsDir.getAbsolutePath())) { 1220 if (copiedSettingsDir) { 1221 continue; 1224 } 1225 copiedSettingsDir = true; 1227 FileUtils.copyFiles(settingsDir, 1228 new File (checkpointDir, settingsDir.getName())); 1229 continue; 1230 } 1231 FileUtils.copyFiles(f, f.isDirectory()? checkpointDir: 1232 new File (checkpointDir, f.getName())); 1233 } 1234 } 1235 1236 1261 protected void checkpointBdb(File checkpointDir) 1262 throws DatabaseException, IOException , RuntimeException { 1263 EnvironmentConfig envConfig = this.bdbEnvironment.getConfig(); 1264 final List bkgrdThreads = Arrays.asList(new String [] 1265 {"je.env.runCheckpointer", "je.env.runCleaner", 1266 "je.env.runINCompressor"}); 1267 try { 1268 setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "false"); 1270 CheckpointConfig chkptConfig = new CheckpointConfig(); 1272 chkptConfig.setForce(true); 1273 1274 chkptConfig.setMinimizeRecoveryTime(true); 1288 this.bdbEnvironment.checkpoint(chkptConfig); 1289 LOGGER.fine("Finished bdb checkpoint."); 1290 1291 EnvironmentImpl envImpl = 1293 DbInternal.envGetEnvironmentImpl(this.bdbEnvironment); 1294 long firstFileInNextSet = 1295 DbLsn.getFileNumber(envImpl.forceLogFileFlip()); 1296 final String lastBdbCheckpointLog = 1299 getBdbLogFileName(firstFileInNextSet - 1); 1300 processBdbLogs(checkpointDir, lastBdbCheckpointLog); 1301 LOGGER.fine("Finished processing bdb log files."); 1302 } finally { 1303 setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "true"); 1305 } 1306 } 1307 1308 protected void processBdbLogs(final File checkpointDir, 1309 final String lastBdbCheckpointLog) throws IOException { 1310 File bdbDir = CheckpointUtils.getBdbSubDirectory(checkpointDir); 1311 if (!bdbDir.exists()) { 1312 bdbDir.mkdir(); 1313 } 1314 PrintWriter pw = new PrintWriter (new FileOutputStream (new File ( 1315 checkpointDir, "bdbje-logs-manifest.txt"))); 1316 try { 1317 boolean pastLastLogFile = false; 1320 Set <String > srcFilenames = null; 1321 final boolean copyFiles = getCheckpointCopyBdbjeLogs(); 1322 do { 1323 FilenameFilter filter = CheckpointUtils.getJeLogsFilter(); 1324 srcFilenames = 1325 new HashSet <String >(Arrays.asList( 1326 getStateDisk().list(filter))); 1327 List tgtFilenames = Arrays.asList(bdbDir.list(filter)); 1328 if (tgtFilenames != null && tgtFilenames.size() > 0) { 1329 srcFilenames.removeAll(tgtFilenames); 1330 } 1331 if (srcFilenames.size() > 0) { 1332 srcFilenames = new TreeSet <String >(srcFilenames); 1334 int count = 0; 1335 for (final Iterator i = srcFilenames.iterator(); 1336 i.hasNext() && !pastLastLogFile;) { 1337 String name = (String ) i.next(); 1338 if (copyFiles) { 1339 FileUtils.copyFiles(new File (getStateDisk(), name), 1340 new File (bdbDir, name)); 1341 } 1342 pw.println(name); 1343 if (name.equals(lastBdbCheckpointLog)) { 1344 pastLastLogFile = true; 1346 } 1347 count++; 1348 } 1349 if (LOGGER.isLoggable(Level.FINE)) { 1350 LOGGER.fine("Copied " + count); 1351 } 1352 } 1353 } while (!pastLastLogFile && srcFilenames != null && 1354 srcFilenames.size() > 0); 1355 } finally { 1356 pw.close(); 1357 } 1358 } 1359 1360 protected String getBdbLogFileName(final long index) { 1361 String lastBdbLogFileHex = Long.toHexString(index); 1362 StringBuffer buffer = new StringBuffer (); 1363 for (int i = 0; i < (8 - lastBdbLogFileHex.length()); i++) { 1364 buffer.append('0'); 1365 } 1366 buffer.append(lastBdbLogFileHex); 1367 buffer.append(".jdb"); 1368 return buffer.toString(); 1369 } 1370 1371 protected void setBdbjeBkgrdThreads(final EnvironmentConfig config, 1372 final List threads, final String setting) { 1373 for (final Iterator i = threads.iterator(); i.hasNext();) { 1374 config.setConfigParam((String )i.next(), setting); 1375 } 1376 } 1377 1378 1388 public synchronized Checkpoint getCheckpointRecover() { 1389 if (this.checkpointRecover != null) { 1390 return this.checkpointRecover; 1391 } 1392 return getCheckpointRecover(this.order); 1393 } 1394 1395 public static Checkpoint getCheckpointRecover(final CrawlOrder order) { 1396 String path = (String )order.getUncheckedAttribute(null, 1397 CrawlOrder.ATTR_RECOVER_PATH); 1398 if (path == null || path.length() <= 0) { 1399 return null; 1400 } 1401 File rp = new File (path); 1402 Checkpoint result = null; 1404 if (rp.exists() && rp.isDirectory()) { 1405 Checkpoint cp = new Checkpoint(rp); 1406 if (cp.isValid()) { 1407 result = cp; 1409 } 1410 } 1411 return result; 1412 } 1413 1414 public static boolean isCheckpointRecover(final CrawlOrder order) { 1415 return getCheckpointRecover(order) != null; 1416 } 1417 1418 1423 public boolean isCheckpointRecover() { 1424 return this.checkpointRecover != null; 1425 } 1426 1427 1430 public synchronized void requestCrawlStop() { 1431 requestCrawlStop(CrawlJob.STATUS_ABORTED); 1432 } 1433 1434 1438 public synchronized void requestCrawlStop(String message) { 1439 if (state == STOPPING || state == FINISHED) { 1440 return; 1441 } 1442 if (message == null) { 1443 throw new IllegalArgumentException ("Message cannot be null."); 1444 } 1445 this.sExit = message; 1446 beginCrawlStop(); 1447 } 1448 1449 1452 public void beginCrawlStop() { 1453 LOGGER.fine("Started."); 1454 sendCrawlStateChangeEvent(STOPPING, this.sExit); 1455 if (this.frontier != null) { 1456 this.frontier.terminate(); 1457 this.frontier.unpause(); 1458 } 1459 LOGGER.fine("Finished."); 1460 } 1461 1462 1465 public synchronized void requestCrawlPause() { 1466 if (state == PAUSING || state == PAUSED) { 1467 return; 1469 } 1470 sExit = CrawlJob.STATUS_WAITING_FOR_PAUSE; 1471 frontier.pause(); 1472 sendCrawlStateChangeEvent(PAUSING, this.sExit); 1473 if (toePool.getActiveToeCount() == 0) { 1474 completePause(); 1477 } 1478 } 1479 1480 1484 public boolean isPaused() { 1485 return state == PAUSED; 1486 } 1487 1488 public boolean isPausing() { 1489 return state == PAUSING; 1490 } 1491 1492 public boolean isRunning() { 1493 return state == RUNNING; 1494 } 1495 1496 1499 public synchronized void requestCrawlResume() { 1500 if (state != PAUSING && state != PAUSED && state != CHECKPOINTING) { 1501 return; 1504 } 1505 multiThreadMode(); 1506 frontier.unpause(); 1507 LOGGER.fine("Crawl resumed."); 1508 sendCrawlStateChangeEvent(RUNNING, CrawlJob.STATUS_RUNNING); 1509 } 1510 1511 1514 public int getActiveToeCount() { 1515 if (toePool == null) { 1516 return 0; 1517 } 1518 return toePool.getActiveToeCount(); 1519 } 1520 1521 private void setupToePool() { 1522 toePool = new ToePool(this); 1523 toePool.setSize(order.getMaxToes()); 1525 } 1526 1527 1530 public CrawlOrder getOrder() { 1531 return order; 1532 } 1533 1534 1537 public ServerCache getServerCache() { 1538 return serverCache; 1539 } 1540 1541 1544 public void setOrder(CrawlOrder o) { 1545 order = o; 1546 } 1547 1548 1549 1552 public Frontier getFrontier() { 1553 return frontier; 1554 } 1555 1556 1559 public CrawlScope getScope() { 1560 return scope; 1561 } 1562 1563 1567 public ProcessorChainList getProcessorChainList() { 1568 return processorChains; 1569 } 1570 1571 1575 public ProcessorChain getFirstProcessorChain() { 1576 return processorChains.getFirstChain(); 1577 } 1578 1579 1583 public ProcessorChain getPostprocessorChain() { 1584 return processorChains.getLastChain(); 1585 } 1586 1587 1591 public File getDisk() { 1592 return disk; 1593 } 1594 1595 1598 public File getScratchDisk() { 1599 return scratchDisk; 1600 } 1601 1602 1605 public File getStateDisk() { 1606 return stateDisk; 1607 } 1608 1609 1614 public int getToeCount() { 1615 return this.toePool == null? 0: this.toePool.getToeCount(); 1616 } 1617 1618 1621 public ToePool getToePool() { 1622 return toePool; 1623 } 1624 1625 1628 public String oneLineReportThreads() { 1629 return toePool.singleLineReport(); 1631 } 1632 1633 1638 public void kickUpdate() { 1639 toePool.setSize(order.getMaxToes()); 1640 1641 this.scope.kickUpdate(); 1642 this.frontier.kickUpdate(); 1643 this.processorChains.kickUpdate(); 1644 1645 1648 setThresholds(); 1649 } 1650 1651 1654 public SettingsHandler getSettingsHandler() { 1655 return settingsHandler; 1656 } 1657 1658 1663 private void runProcessorInitialTasks(){ 1664 for (Iterator ic = processorChains.iterator(); ic.hasNext(); ) { 1665 for (Iterator ip = ((ProcessorChain) ic.next()).iterator(); 1666 ip.hasNext(); ) { 1667 ((Processor) ip.next()).initialTasks(); 1668 } 1669 } 1670 } 1671 1672 1677 private void runProcessorFinalTasks(){ 1678 for (Iterator ic = processorChains.iterator(); ic.hasNext(); ) { 1679 for (Iterator ip = ((ProcessorChain) ic.next()).iterator(); 1680 ip.hasNext(); ) { 1681 ((Processor) ip.next()).finalTasks(); 1682 } 1683 } 1684 } 1685 1686 1694 public void killThread(int threadNumber, boolean replace){ 1695 toePool.killThread(threadNumber, replace); 1696 } 1697 1698 1714 public void addToManifest(String file, char type, boolean bundle) { 1715 manifest.append(type + (bundle? "+": "-") + " " + file + "\n"); 1716 } 1717 1718 1721 public void checkFinish() { 1722 if(atFinish()) { 1723 beginCrawlStop(); 1724 } 1725 } 1726 1727 1733 public boolean atFinish() { 1734 return state == RUNNING && !shouldContinueCrawling(); 1735 } 1736 1737 private void readObject(ObjectInputStream stream) 1738 throws IOException , ClassNotFoundException { 1739 stream.defaultReadObject(); 1740 this.registeredCrawlStatusListeners = 1742 Collections.synchronizedList(new ArrayList <CrawlStatusListener>()); 1743 singleThreadMode = false; 1745 } 1746 1747 1754 public void singleThreadMode() { 1755 this.singleThreadLock.lock(); 1756 singleThreadMode = true; 1757 } 1758 1759 1763 public void multiThreadMode() { 1764 this.singleThreadLock.lock(); 1765 singleThreadMode = false; 1766 while(this.singleThreadLock.isHeldByCurrentThread()) { 1767 this.singleThreadLock.unlock(); 1768 } 1769 } 1770 1771 1775 public void acquireContinuePermission() { 1776 if (singleThreadMode) { 1777 this.singleThreadLock.lock(); 1778 if(!singleThreadMode) { 1779 while(this.singleThreadLock.isHeldByCurrentThread()) { 1781 this.singleThreadLock.unlock(); 1782 } 1783 } 1784 } } 1786 1787 1791 public void releaseContinuePermission() { 1792 if (singleThreadMode) { 1793 while(this.singleThreadLock.isHeldByCurrentThread()) { 1794 this.singleThreadLock.unlock(); 1795 } 1796 } } 1798 1799 public void freeReserveMemory() { 1800 if(!reserveMemory.isEmpty()) { 1801 reserveMemory.removeLast(); 1802 System.gc(); 1803 } 1804 } 1805 1806 1810 public synchronized void toePaused() { 1811 releaseContinuePermission(); 1812 if (state == PAUSING && toePool.getActiveToeCount() == 0) { 1813 completePause(); 1814 } 1815 } 1816 1817 1820 public synchronized void toeEnded() { 1821 if (state == STOPPING && toePool.getActiveToeCount() == 0) { 1822 completeStop(); 1823 } 1824 } 1825 1826 1834 public void addOrderToManifest() { 1835 for (Iterator it = getSettingsHandler().getListOfAllFiles().iterator(); 1836 it.hasNext();) { 1837 addToManifest((String )it.next(), 1838 CrawlController.MANIFEST_CONFIG_FILE, true); 1839 } 1840 } 1841 1842 1850 public void logUriError(URIException e, UURI u, CharSequence l) { 1851 if (e.getReasonCode() == UURIFactory.IGNORED_SCHEME) { 1852 return; 1854 } 1855 Object [] array = {u, l}; 1856 uriErrors.log(Level.INFO, e.getMessage(), array); 1857 } 1858 1859 public final static String PROCESSORS_REPORT = "processors"; 1863 public final static String MANIFEST_REPORT = "manifest"; 1864 protected final static String [] REPORTS = {PROCESSORS_REPORT, MANIFEST_REPORT}; 1865 1866 1869 public String [] getReports() { 1870 return REPORTS; 1871 } 1872 1873 1876 public void reportTo(PrintWriter writer) { 1877 reportTo(null,writer); 1878 } 1879 1880 public String singleLineReport() { 1881 return ArchiveUtils.singleLineReport(this); 1882 } 1883 1884 public void reportTo(String name, PrintWriter writer) { 1885 if(PROCESSORS_REPORT.equals(name)) { 1886 reportProcessorsTo(writer); 1887 return; 1888 } else if (MANIFEST_REPORT.equals(name)) { 1889 reportManifestTo(writer); 1890 return; 1891 } else if (name!=null) { 1892 writer.println("requested report unknown: "+name); 1893 } 1894 singleLineReportTo(writer); 1895 } 1896 1897 1900 protected void reportManifestTo(PrintWriter writer) { 1901 writer.print(manifest.toString()); 1902 } 1903 1904 1909 protected void reportProcessorsTo(PrintWriter writer) { 1910 writer.print( 1911 "Processors report - " 1912 + ArchiveUtils.TIMESTAMP12.format(new Date ()) 1913 + "\n"); 1914 writer.print(" Job being crawled: " + getOrder().getCrawlOrderName() 1915 + "\n"); 1916 1917 writer.print(" Number of Processors: " + 1918 processorChains.processorCount() + "\n"); 1919 writer.print(" NOTE: Some processors may not return a report!\n\n"); 1920 1921 for (Iterator ic = processorChains.iterator(); ic.hasNext(); ) { 1922 for (Iterator ip = ((ProcessorChain) ic.next()).iterator(); 1923 ip.hasNext(); ) { 1924 writer.print(((Processor) ip.next()).report()); 1925 } 1926 } 1927 } 1928 1929 public void singleLineReportTo(PrintWriter writer) { 1930 writer.write("[Crawl Controller]\n"); 1932 } 1933 1934 public String singleLineLegend() { 1935 return "nothingYet"; 1937 } 1938 1939 1952 public <K,V> Map <K,V> getBigMap(final String dbName, 1953 final Class <? super K> keyClass, 1954 final Class <? super V> valueClass) 1955 throws Exception { 1956 CachedBdbMap<K,V> result = new CachedBdbMap<K,V>(dbName); 1957 if (isCheckpointRecover()) { 1958 File baseDir = getCheckpointRecover().getDirectory(); 1959 @SuppressWarnings ("unchecked") 1960 CachedBdbMap<K,V> temp = CheckpointUtils. 1961 readObjectFromFile(result.getClass(), dbName, baseDir); 1962 result = temp; 1963 } 1964 result.initialize(getBdbEnvironment(), keyClass, valueClass, 1965 getClassCatalog()); 1966 this.bigmaps.put(dbName, result); 1969 return result; 1970 } 1971 1972 protected void checkpointBigMaps(final File cpDir) 1973 throws Exception { 1974 for (final Iterator i = this.bigmaps.keySet().iterator(); i.hasNext();) { 1975 Object key = i.next(); 1976 Object obj = this.bigmaps.get(key); 1977 ((CachedBdbMap)obj).sync(); 1982 CheckpointUtils.writeObjectToFile(obj, (String )key, cpDir); 1983 } 1984 } 1985 1986 1990 public void progressStatisticsEvent(final EventObject e) { 1991 } 1996 1997 2001 public void logProgressStatistics(final String msg) { 2002 this.progressStats.info(msg); 2003 } 2004 2005 2008 public Object getState() { 2009 return this.state; 2010 } 2011 2012 public File getCheckpointsDisk() { 2013 return this.checkpointsDisk; 2014 } 2015} 2016 | Popular Tags |