1 25 package org.archive.crawler.framework; 26 27 import java.io.File ; 28 import java.io.FileOutputStream ; 29 import java.io.IOException ; 30 import java.io.Serializable ; 31 import java.text.DecimalFormat ; 32 import java.util.LinkedList ; 33 import java.util.List ; 34 import java.util.Timer ; 35 import java.util.TimerTask ; 36 import java.util.logging.Level ; 37 import java.util.logging.Logger ; 38 39 import org.archive.crawler.datamodel.Checkpoint; 40 import org.archive.util.ArchiveUtils; 41 42 51 public class Checkpointer implements Serializable { 52 private static final long serialVersionUID = 7610078446694353173L; 53 54 private final static Logger LOGGER = 55 Logger.getLogger(Checkpointer.class.getName()); 56 57 private static final String DEFAULT_PREFIX = ""; 58 59 62 private String checkpointPrefix = DEFAULT_PREFIX; 63 64 67 private int nextCheckpoint = 1; 68 69 73 private List predecessorCheckpoints = new LinkedList (); 74 75 79 private transient File checkpointInProgressDir = null; 80 81 84 private transient boolean checkpointErrors = false; 85 86 89 private transient Thread checkpointThread = null; 90 91 private transient CrawlController controller; 92 93 96 private transient Timer timerThread = null; 97 98 public static final DecimalFormat INDEX_FORMAT = new DecimalFormat ("00000"); 99 100 105 public Checkpointer(final CrawlController cc, final File checkpointDir) { 106 this(cc, DEFAULT_PREFIX); 107 } 108 109 115 public Checkpointer(final CrawlController cc, final String prefix) { 116 super(); 117 initialize(cc, prefix); 118 119 } 120 121 protected void initialize(final CrawlController cc, final String prefix) { 122 this.controller = cc; 123 this.checkpointPrefix = prefix; 124 int period = Integer.parseInt(System.getProperties().getProperty( 126 this.getClass().getName() + ".period", "-1")); 127 if (period <= 0) { 128 return; 129 } 130 long periodMs = period * (1000 * 60 * 60); 132 TimerTask tt = new TimerTask () { 133 private CrawlController cController = cc; 134 public void run() { 135 if (isCheckpointing()) { 136 LOGGER.info("CheckpointTimerThread skipping checkpoint, " + 137 "already checkpointing: State: " + 138 this.cController.getState()); 139 return; 140 } 141 LOGGER.info("TimerThread request checkpoint"); 142 this.cController.requestCrawlCheckpoint(); 143 } 144 }; 145 this.timerThread = new Timer (true); 146 this.timerThread.schedule(tt, periodMs, periodMs); 147 LOGGER.info("Installed Checkpoint TimerThread to checkpoint every " + 148 period + " hour(s)."); 149 } 150 151 void cleanup() { 152 if (this.timerThread != null) { 153 LOGGER.info("Cleanedup Checkpoint TimerThread."); 154 this.timerThread.cancel(); 155 } 156 } 157 158 161 public int getNextCheckpoint() { 162 return this.nextCheckpoint; 163 } 164 165 168 public void checkpoint() { 169 String name = "Checkpoint-" + getNextCheckpointName(); 170 this.checkpointThread = new CheckpointingThread(name); 171 this.checkpointThread.setDaemon(true); 172 this.checkpointThread.start(); 173 } 174 175 179 public class CheckpointingThread extends Thread { 180 public CheckpointingThread(final String name) { 181 super(name); 182 } 183 184 public CrawlController getController() { 185 return Checkpointer.this.controller; 186 } 187 188 public void run() { 189 LOGGER.info("Started"); 190 final boolean alreadyPaused = getController().isPaused() || 193 getController().isPausing(); 194 try { 195 getController().requestCrawlPause(); 196 setCheckpointErrors(false); 198 if (!waitOnPaused()) { 199 checkpointFailed("Failed wait for complete pause."); 200 } else { 201 createCheckpointInProgressDirectory(); 202 this.getController().checkpoint(); 203 } 204 } catch (Exception e) { 205 checkpointFailed(e); 206 } finally { 207 if (!isCheckpointErrors()) { 208 writeValidity(); 209 } 210 Checkpointer.this.nextCheckpoint++; 211 clearCheckpointInProgressDirectory(); 212 LOGGER.info("Finished"); 213 getController().completePause(); 214 if (!alreadyPaused) { 215 getController().requestCrawlResume(); 216 } 217 } 218 } 219 220 private synchronized boolean waitOnPaused() { 221 while(!getController().isPaused() && !getController().isRunning()) { 224 try { 225 wait(1000 * 3); 226 } catch (InterruptedException e) { 227 } 229 } 230 return getController().isPaused(); 231 } 232 } 233 234 protected File createCheckpointInProgressDirectory() { 235 this.checkpointInProgressDir = 236 new File (Checkpointer.this.controller.getCheckpointsDisk(), 237 getNextCheckpointName()); 238 this.checkpointInProgressDir.mkdirs(); 239 return this.checkpointInProgressDir; 240 } 241 242 protected void clearCheckpointInProgressDirectory() { 243 this.checkpointInProgressDir = null; 244 } 245 246 protected CrawlController getController() { 247 return this.controller; 248 } 249 250 253 public String getNextCheckpointName() { 254 return formatCheckpointName(this.checkpointPrefix, this.nextCheckpoint); 255 } 256 257 public static String formatCheckpointName(final String prefix, 258 final int index) { 259 return prefix + INDEX_FORMAT.format(index); 260 } 261 262 protected void writeValidity() { 263 File valid = new File (this.checkpointInProgressDir, 264 Checkpoint.VALIDITY_STAMP_FILENAME); 265 try { 266 FileOutputStream fos = new FileOutputStream (valid); 267 fos.write(ArchiveUtils.get14DigitDate().getBytes()); 268 fos.close(); 269 } catch (IOException e) { 270 valid.delete(); 271 } 272 } 273 274 278 public File getCheckpointInProgressDirectory() { 279 return this.checkpointInProgressDir; 280 } 281 282 285 public boolean isCheckpointing() { 286 return this.checkpointThread != null && this.checkpointThread.isAlive(); 287 } 288 289 294 protected void checkpointFailed(Exception e) { 295 LOGGER.log(Level.WARNING, " Checkpoint failed", e); 296 checkpointFailed(); 297 } 298 299 protected void checkpointFailed(final String message) { 300 LOGGER.warning(message); 301 checkpointFailed(); 302 } 303 304 protected void checkpointFailed() { 305 this.checkpointErrors = true; 306 } 307 308 311 public boolean isCheckpointFailed() { 312 return this.checkpointErrors; 313 } 314 315 319 public boolean isAtBeginning() { 320 return nextCheckpoint == 1; 321 } 322 323 330 public void recover(final CrawlController cc) { 331 initialize(cc, 'r' + this.checkpointPrefix); 335 } 336 337 340 public List getPredecessorCheckpoints() { 341 return this.predecessorCheckpoints; 342 } 343 344 protected boolean isCheckpointErrors() { 345 return this.checkpointErrors; 346 } 347 348 protected void setCheckpointErrors(boolean checkpointErrors) { 349 this.checkpointErrors = checkpointErrors; 350 } 351 } 352 | Popular Tags |