1 25 package org.archive.crawler.framework; 26 27 import java.io.DataInputStream ; 28 import java.io.DataOutputStream ; 29 import java.io.File ; 30 import java.io.FileInputStream ; 31 import java.io.FileNotFoundException ; 32 import java.io.FileOutputStream ; 33 import java.io.IOException ; 34 import java.io.ObjectInputStream ; 35 import java.net.InetAddress ; 36 import java.net.UnknownHostException ; 37 import java.util.ArrayList ; 38 import java.util.Arrays ; 39 import java.util.Iterator ; 40 import java.util.List ; 41 import java.util.concurrent.atomic.AtomicInteger ; 42 import java.util.logging.Logger ; 43 44 import javax.management.AttributeNotFoundException ; 45 import javax.management.MBeanException ; 46 import javax.management.ReflectionException ; 47 48 import org.archive.crawler.datamodel.CoreAttributeConstants; 49 import org.archive.crawler.datamodel.CrawlHost; 50 import org.archive.crawler.datamodel.CrawlOrder; 51 import org.archive.crawler.datamodel.CrawlURI; 52 import org.archive.crawler.event.CrawlStatusListener; 53 import org.archive.crawler.settings.SimpleType; 54 import org.archive.crawler.settings.StringList; 55 import org.archive.crawler.settings.Type; 56 import org.archive.io.ObjectPlusFilesInputStream; 57 import org.archive.io.WriterPool; 58 import org.archive.io.WriterPoolMember; 59 60 66 public abstract class WriterPoolProcessor extends Processor 67 implements CoreAttributeConstants, CrawlStatusListener { 68 private final Logger logger = Logger.getLogger(this.getClass().getName()); 69 70 73 public static final String ATTR_COMPRESS = "compress"; 74 75 78 public static final boolean DEFAULT_COMPRESS = true; 79 80 83 public static final String ATTR_PREFIX = "prefix"; 84 85 88 public static final String ATTR_PATH ="path"; 89 90 93 public static final String ATTR_SUFFIX = "suffix"; 94 95 98 public static final String ATTR_MAX_SIZE_BYTES = "max-size-bytes"; 99 100 105 public static final String ATTR_POOL_MAX_ACTIVE = "pool-max-active"; 106 107 111 public static final String ATTR_POOL_MAX_WAIT = "pool-max-wait"; 112 113 116 public static final String ATTR_MAX_BYTES_WRITTEN = 117 "total-bytes-to-write"; 118 119 124 private static final int DEFAULT_MAX_FILE_SIZE = 100000000; 125 126 131 private static final String [] DEFAULT_PATH = {"crawl-store"}; 132 133 136 transient private WriterPool pool = null; 137 138 141 private long totalBytesWritten = 0; 142 143 144 147 public WriterPoolProcessor(String name) { 148 this(name, "Pool of files processor"); 149 } 150 151 155 public WriterPoolProcessor(final String name, 156 final String description) { 157 super(name, description); 158 Type e = addElementToDefinition( 159 new SimpleType(ATTR_COMPRESS, "Compress files when " + 160 "writing to disk.", new Boolean (DEFAULT_COMPRESS))); 161 e.setOverrideable(false); 162 e = addElementToDefinition( 163 new SimpleType(ATTR_PREFIX, 164 "File prefix. " + 165 "The text supplied here will be used as a prefix naming " + 166 "writer files. For example if the prefix is 'IAH', " + 167 "then file names will look like " + 168 "IAH-20040808101010-0001-HOSTNAME.arc.gz " + 169 "...if writing ARCs (The prefix will be " + 170 "separated from the date by a hyphen).", 171 WriterPoolMember.DEFAULT_PREFIX)); 172 e = addElementToDefinition( 173 new SimpleType(ATTR_SUFFIX, "Suffix to tag onto " + 174 "files. If value is '${HOSTNAME}', will use hostname for " + 175 "suffix. If empty, no suffix will be added.", 176 WriterPoolMember.DEFAULT_SUFFIX)); 177 e.setOverrideable(false); 178 e = addElementToDefinition( 179 new SimpleType(ATTR_MAX_SIZE_BYTES, "Max size of each file", 180 new Integer (DEFAULT_MAX_FILE_SIZE))); 181 e.setOverrideable(false); 182 e = addElementToDefinition( 183 new StringList(ATTR_PATH, "Where to files. " + 184 "Supply absolute or relative path. If relative, files " + 185 "will be written relative to " + 186 "the " + CrawlOrder.ATTR_DISK_PATH + "setting." + 187 " If more than one path specified, we'll round-robin" + 188 " dropping files to each. This setting is safe" + 189 " to change midcrawl (You can remove and add new dirs" + 190 " as the crawler progresses).", getDefaultPath())); 191 e.setOverrideable(false); 192 e = addElementToDefinition(new SimpleType(ATTR_POOL_MAX_ACTIVE, 193 "Maximum active files in pool. " + 194 "This setting cannot be varied over the life of a crawl.", 195 new Integer (WriterPool.DEFAULT_MAX_ACTIVE))); 196 e.setOverrideable(false); 197 e = addElementToDefinition(new SimpleType(ATTR_POOL_MAX_WAIT, 198 "Maximum time to wait on pool element" + 199 " (milliseconds). This setting cannot be varied over the life" + 200 " of a crawl.", 201 new Integer (WriterPool.DEFAULT_MAXIMUM_WAIT))); 202 e.setOverrideable(false); 203 e = addElementToDefinition(new SimpleType(ATTR_MAX_BYTES_WRITTEN, 204 "Total file bytes to write to disk." + 205 " Once the size of all files on disk has exceeded this " + 206 "limit, this processor will stop the crawler. " + 207 "A value of zero means no upper limit.", new Long (0))); 208 e.setOverrideable(false); 209 e.setExpertSetting(true); 210 } 211 212 protected String [] getDefaultPath() { 213 return DEFAULT_PATH; 214 } 215 216 public synchronized void initialTasks() { 217 getSettingsHandler().getOrder().getController(). 219 addCrawlStatusListener(this); 220 setupPool(new AtomicInteger ()); 221 if (getSettingsHandler().getOrder().getController(). 223 isCheckpointRecover()) { 224 checkpointRecover(); 225 } 226 } 227 228 protected AtomicInteger getSerialNo() { 229 return ((WriterPool)getPool()).getSerialNo(); 230 } 231 232 235 protected abstract void setupPool(final AtomicInteger serialNo); 236 237 245 protected abstract void innerProcess(CrawlURI curi); 246 247 protected void checkBytesWritten() { 248 long max = getMaxToWrite(); 249 if (max <= 0) { 250 return; 251 } 252 if (max <= this.totalBytesWritten) { 253 getController().requestCrawlStop("Finished - Maximum bytes (" + 254 Long.toString(max) + ") written"); 255 } 256 } 257 258 protected String getHostAddress(CrawlURI curi) { 259 CrawlHost h = getController().getServerCache().getHostFor(curi); 260 if (h == null) { 261 throw new NullPointerException ("Crawlhost is null for " + 262 curi + " " + curi.getVia()); 263 } 264 InetAddress a = h.getIP(); 265 if (a == null) { 266 throw new NullPointerException ("Address is null for " + 267 curi + " " + curi.getVia() + ". Address " + 268 ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP)? 269 "was never looked up.": 270 (System.currentTimeMillis() - h.getIpFetched()) + 271 " ms ago.")); 272 } 273 return h.getIP().getHostAddress(); 274 } 275 276 282 public Object getAttributeUnchecked(String name) { 283 Object result = null; 284 try { 285 result = super.getAttribute(name); 286 } catch (AttributeNotFoundException e) { 287 logger.warning(e.getLocalizedMessage()); 288 } catch (MBeanException e) { 289 logger.warning(e.getLocalizedMessage()); 290 } catch (ReflectionException e) { 291 logger.warning(e.getLocalizedMessage()); 292 } 293 return result; 294 } 295 296 304 public int getMaxSize() { 305 Object obj = getAttributeUnchecked(ATTR_MAX_SIZE_BYTES); 306 return (obj == null)? DEFAULT_MAX_FILE_SIZE: ((Integer )obj).intValue(); 307 } 308 309 public String getPrefix() { 310 Object obj = getAttributeUnchecked(ATTR_PREFIX); 311 return (obj == null)? WriterPoolMember.DEFAULT_PREFIX: (String )obj; 312 } 313 314 public List <File > getOutputDirs() { 315 Object obj = getAttributeUnchecked(ATTR_PATH); 316 List list = (obj == null)? Arrays.asList(DEFAULT_PATH): (StringList)obj; 317 ArrayList <File > results = new ArrayList <File >(); 318 for (Iterator i = list.iterator(); i.hasNext();) { 319 String path = (String )i.next(); 320 File f = new File (path); 321 if (!f.isAbsolute()) { 322 f = new File (getController().getDisk(), path); 323 } 324 if (!f.exists()) { 325 try { 326 f.mkdirs(); 327 } catch (Exception e) { 328 e.printStackTrace(); 329 continue; 330 } 331 } 332 results.add(f); 333 } 334 return results; 335 } 336 337 public boolean isCompressed() { 338 Object obj = getAttributeUnchecked(ATTR_COMPRESS); 339 return (obj == null)? DEFAULT_COMPRESS: 340 ((Boolean )obj).booleanValue(); 341 } 342 343 346 public int getPoolMaximumActive() { 347 Object obj = getAttributeUnchecked(ATTR_POOL_MAX_ACTIVE); 348 return (obj == null)? WriterPool.DEFAULT_MAX_ACTIVE: 349 ((Integer )obj).intValue(); 350 } 351 352 355 public int getPoolMaximumWait() { 356 Object obj = getAttributeUnchecked(ATTR_POOL_MAX_WAIT); 357 return (obj == null)? WriterPool.DEFAULT_MAXIMUM_WAIT: 358 ((Integer )obj).intValue(); 359 } 360 361 public String getSuffix() { 362 Object obj = getAttributeUnchecked(ATTR_SUFFIX); 363 String sfx = (obj == null)? 364 WriterPoolMember.DEFAULT_SUFFIX: (String )obj; 365 if (sfx != null && sfx.trim(). 366 equals(WriterPoolMember.HOSTNAME_VARIABLE)) { 367 String str = "localhost.localdomain"; 368 try { 369 str = InetAddress.getLocalHost().getHostName(); 370 } catch (UnknownHostException ue) { 371 logger.severe("Failed getHostAddress for this host: " + ue); 372 } 373 sfx = str; 374 } 375 return sfx; 376 } 377 378 public long getMaxToWrite() { 379 Object obj = getAttributeUnchecked(ATTR_MAX_BYTES_WRITTEN); 380 return (obj == null)? 0: ((Long )obj).longValue(); 381 } 382 383 public void crawlEnding(String sExitMessage) { 384 this.pool.close(); 385 } 386 387 public void crawlEnded(String sExitMessage) { 388 } 390 391 394 public void crawlStarted(String message) { 395 } 397 398 protected String getCheckpointStateFile() { 399 return this.getClass().getName() + ".state"; 400 } 401 402 public void crawlCheckpoint(File checkpointDir) throws IOException { 403 int serial = getSerialNo().get(); 404 if (this.pool.getNumActive() > 0) { 405 serial = getSerialNo().incrementAndGet(); 411 } 412 saveCheckpointSerialNumber(checkpointDir, serial); 413 try { 415 this.pool.close(); 416 } finally { 417 setupPool(new AtomicInteger (serial)); 419 } 420 } 421 422 public void crawlPausing(String statusMessage) { 423 } 425 426 public void crawlPaused(String statusMessage) { 427 } 429 430 public void crawlResuming(String statusMessage) { 431 } 433 434 private void readObject(ObjectInputStream stream) 435 throws IOException , ClassNotFoundException { 436 stream.defaultReadObject(); 437 ObjectPlusFilesInputStream coistream = 438 (ObjectPlusFilesInputStream)stream; 439 coistream.registerFinishTask( new Runnable () { 440 public void run() { 441 setupPool(new AtomicInteger ()); 442 } 443 }); 444 } 445 446 protected WriterPool getPool() { 447 return pool; 448 } 449 450 protected void setPool(WriterPool pool) { 451 this.pool = pool; 452 } 453 454 protected long getTotalBytesWritten() { 455 return totalBytesWritten; 456 } 457 458 protected void setTotalBytesWritten(long totalBytesWritten) { 459 this.totalBytesWritten = totalBytesWritten; 460 } 461 462 466 protected void checkpointRecover() { 467 int serialNo = loadCheckpointSerialNumber(); 468 if (serialNo != -1) { 469 getSerialNo().set(serialNo); 470 } 471 } 472 473 477 protected int loadCheckpointSerialNumber() { 478 int result = -1; 479 480 File stateFile = new File (getSettingsHandler().getOrder() 483 .getController().getCheckpointRecover().getDirectory(), 484 getCheckpointStateFile()); 485 if (!stateFile.exists()) { 486 logger.info(stateFile.getAbsolutePath() 487 + " doesn't exist so cannot restore Writer serial number."); 488 } else { 489 DataInputStream dis = null; 490 try { 491 dis = new DataInputStream (new FileInputStream (stateFile)); 492 result = dis.readShort(); 493 } catch (FileNotFoundException e) { 494 e.printStackTrace(); 495 } catch (IOException e) { 496 e.printStackTrace(); 497 } finally { 498 try { 499 if (dis != null) { 500 dis.close(); 501 } 502 } catch (IOException e) { 503 e.printStackTrace(); 504 } 505 } 506 } 507 return result; 508 } 509 510 protected void saveCheckpointSerialNumber(final File checkpointDir, 511 final int serialNo) 512 throws IOException { 513 File f = new File (checkpointDir, getCheckpointStateFile()); 515 DataOutputStream dos = new DataOutputStream (new FileOutputStream (f)); 516 try { 517 dos.writeShort(serialNo); 518 } finally { 519 dos.close(); 520 } 521 } 522 } | Popular Tags |