1 26 package org.archive.crawler.postprocessor; 27 28 import java.io.IOException ; 29 import java.util.Arrays ; 30 import java.util.List ; 31 import java.util.logging.Level ; 32 import java.util.logging.Logger ; 33 import java.util.regex.Matcher ; 34 import java.util.regex.Pattern ; 35 36 import org.archive.crawler.datamodel.CrawlURI; 37 import org.archive.crawler.framework.Processor; 38 import org.archive.crawler.settings.SimpleType; 39 import org.archive.crawler.settings.Type; 40 import org.archive.util.IoUtils; 41 42 48 public class LowDiskPauseProcessor extends Processor { 49 50 private static final long serialVersionUID = 3338337700768396302L; 51 52 55 private static final Logger logger = 56 Logger.getLogger(LowDiskPauseProcessor.class.getName()); 57 58 61 public static final String ATTR_MONITOR_MOUNTS = "monitor-mounts"; 62 public static final String DEFAULT_MONITOR_MOUNTS = ""; 63 64 67 public static final String ATTR_PAUSE_THRESHOLD = "pause-threshold-kb"; 68 public static final int DEFAULT_PAUSE_THRESHOLD = 500 * 1024; 70 73 public static final String ATTR_RECHECK_THRESHOLD = "recheck-threshold-kb"; 74 public static final int DEFAULT_RECHECK_THRESHOLD = 200 * 1024; 76 protected int contentSinceCheck = 0; 77 78 public static final Pattern VALID_DF_OUTPUT = 79 Pattern.compile("(?s)^Filesystem\\s+1K-blocks\\s+Used\\s+Available\\s+Use%\\s+Mounted on\\n.*"); 80 public static final Pattern AVAILABLE_EXTRACTOR = 81 Pattern.compile("(?m)\\s(\\d+)\\s+\\d+%\\s+(\\S+)$"); 82 83 86 public LowDiskPauseProcessor(String name) { 87 super(name, "LowDiskPause processor"); 88 Type e = addElementToDefinition( 89 new SimpleType(ATTR_MONITOR_MOUNTS, 90 "Space-delimited list of filessystem mounts whose " + 91 "'available' space should be monitored via 'df' " + 92 "(if available).", 93 DEFAULT_MONITOR_MOUNTS)); 94 e.setOverrideable(false); 95 e = addElementToDefinition( 96 new SimpleType(ATTR_PAUSE_THRESHOLD, 97 "When available space on any monitored mounts falls " + 98 "below this threshold, the crawl will be paused. ", 99 new Integer (DEFAULT_PAUSE_THRESHOLD))); 100 e = addElementToDefinition( 101 new SimpleType(ATTR_RECHECK_THRESHOLD, 102 "Available space via 'df' is rechecked after every " + 103 "increment of this much content (uncompressed) is " + 104 "observed. ", 105 new Integer (DEFAULT_RECHECK_THRESHOLD))); 106 e.setOverrideable(false); 107 } 108 109 117 protected void innerProcess(CrawlURI curi) { 118 contentSinceCheck += curi.getContentSize(); 119 synchronized (this) { 120 if (contentSinceCheck/1024 > ((Integer ) getUncheckedAttribute(null, 121 ATTR_RECHECK_THRESHOLD)).intValue()) { 122 checkAvailableSpace(curi); 123 contentSinceCheck = 0; 124 } 125 } 126 } 127 128 129 135 private void checkAvailableSpace(CrawlURI curi) { 136 try { 137 String df = IoUtils.readFullyAsString(Runtime.getRuntime().exec( 138 "df -k").getInputStream()); 139 Matcher matcher = VALID_DF_OUTPUT.matcher(df); 140 if(!matcher.matches()) { 141 logger.severe("'df -k' output unacceptable for low-disk checking"); 142 return; 143 } 144 List monitoredMounts = Arrays.asList(((String ) getUncheckedAttribute(null, 145 ATTR_MONITOR_MOUNTS)).split("\\s*")); 146 matcher = AVAILABLE_EXTRACTOR.matcher(df); 147 while (matcher.find()) { 148 String mount = matcher.group(2); 149 if (monitoredMounts.contains(mount)) { 150 long availKilobytes = Long.parseLong(matcher.group(1)); 151 int thresholdKilobytes = ((Integer ) getUncheckedAttribute( 152 null, ATTR_PAUSE_THRESHOLD)).intValue(); 153 if (availKilobytes < thresholdKilobytes ) { 154 getController().requestCrawlPause(); 155 logger.log(Level.SEVERE, "Low Disk Pause", 156 availKilobytes + "K available on " + mount 157 + " (below threshold " 158 + thresholdKilobytes + "K)"); 159 break; 160 } 161 } 162 } 163 } catch (IOException e) { 164 curi.addLocalizedError(this.getName(), e, 165 "problem checking available space via 'df'"); 166 } 167 } 168 } 169 | Popular Tags |