KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > archive > crawler > framework > WriterPoolProcessor


1 /* WriterPoolProcessor
2  *
3  * $Id: WriterPoolProcessor.java,v 1.5.2.1 2007/01/13 01:31:22 stack-sf Exp $
4  *
5  * Created on July 19th, 2006
6  *
7  * Copyright (C) 2006 Internet Archive.
8  *
9  * This file is part of the Heritrix web crawler (crawler.archive.org).
10  *
11  * Heritrix is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser Public License as published by
13  * the Free Software Foundation; either version 2.1 of the License, or
14  * any later version.
15  *
16  * Heritrix is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Lesser Public License for more details.
20  *
21  * You should have received a copy of the GNU Lesser Public License
22  * along with Heritrix; if not, write to the Free Software
23  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24  */

25 package org.archive.crawler.framework;
26
27 import java.io.DataInputStream JavaDoc;
28 import java.io.DataOutputStream JavaDoc;
29 import java.io.File JavaDoc;
30 import java.io.FileInputStream JavaDoc;
31 import java.io.FileNotFoundException JavaDoc;
32 import java.io.FileOutputStream JavaDoc;
33 import java.io.IOException JavaDoc;
34 import java.io.ObjectInputStream JavaDoc;
35 import java.net.InetAddress JavaDoc;
36 import java.net.UnknownHostException JavaDoc;
37 import java.util.ArrayList JavaDoc;
38 import java.util.Arrays JavaDoc;
39 import java.util.Iterator JavaDoc;
40 import java.util.List JavaDoc;
41 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
42 import java.util.logging.Logger JavaDoc;
43
44 import javax.management.AttributeNotFoundException JavaDoc;
45 import javax.management.MBeanException JavaDoc;
46 import javax.management.ReflectionException JavaDoc;
47
48 import org.archive.crawler.datamodel.CoreAttributeConstants;
49 import org.archive.crawler.datamodel.CrawlHost;
50 import org.archive.crawler.datamodel.CrawlOrder;
51 import org.archive.crawler.datamodel.CrawlURI;
52 import org.archive.crawler.event.CrawlStatusListener;
53 import org.archive.crawler.settings.SimpleType;
54 import org.archive.crawler.settings.StringList;
55 import org.archive.crawler.settings.Type;
56 import org.archive.io.ObjectPlusFilesInputStream;
57 import org.archive.io.WriterPool;
58 import org.archive.io.WriterPoolMember;
59
60 /**
61  * Abstract implementation of a file pool processor.
62  * Subclass to implement for a particular {@link WriterPoolMember} instance.
63  * @author Parker Thompson
64  * @author stack
65  */

66 public abstract class WriterPoolProcessor extends Processor
67 implements CoreAttributeConstants, CrawlStatusListener {
68     private final Logger JavaDoc logger = Logger.getLogger(this.getClass().getName());
69
70     /**
71      * Key to use asking settings for file compression value.
72      */

73     public static final String JavaDoc ATTR_COMPRESS = "compress";
74
75     /**
76      * Default as to whether we do compression of files.
77      */

78     public static final boolean DEFAULT_COMPRESS = true;
79
80     /**
81      * Key to use asking settings for file prefix value.
82      */

83     public static final String JavaDoc ATTR_PREFIX = "prefix";
84
85     /**
86      * Key to use asking settings for arc path value.
87      */

88     public static final String JavaDoc ATTR_PATH ="path";
89
90     /**
91      * Key to use asking settings for file suffix value.
92      */

93     public static final String JavaDoc ATTR_SUFFIX = "suffix";
94
95     /**
96      * Key to use asking settings for file max size value.
97      */

98     public static final String JavaDoc ATTR_MAX_SIZE_BYTES = "max-size-bytes";
99     
100     /**
101      * Key to get maximum pool size.
102      *
103      * This key is for maximum files active in the pool.
104      */

105     public static final String JavaDoc ATTR_POOL_MAX_ACTIVE = "pool-max-active";
106
107     /**
108      * Key to get maximum wait on pool object before we give up and
109      * throw IOException.
110      */

111     public static final String JavaDoc ATTR_POOL_MAX_WAIT = "pool-max-wait";
112
113     /***
114      * Key for the maximum bytes to write attribute.
115      */

116     public static final String JavaDoc ATTR_MAX_BYTES_WRITTEN =
117         "total-bytes-to-write";
118     
119     /**
120      * Default maximum file size.
121      * TODO: Check that subclasses can set a different MAX_FILE_SIZE and
122      * it will be used in the constructor as default.
123      */

124     private static final int DEFAULT_MAX_FILE_SIZE = 100000000;
125     
126     /**
127      * Default path list.
128      *
129      * TODO: Confirm this one gets picked up.
130      */

131     private static final String JavaDoc [] DEFAULT_PATH = {"crawl-store"};
132
133     /**
134      * Reference to pool.
135      */

136     transient private WriterPool pool = null;
137     
138     /**
139      * Total number of bytes written to disc.
140      */

141     private long totalBytesWritten = 0;
142
143
144     /**
145      * @param name Name of this processor.
146      */

147     public WriterPoolProcessor(String JavaDoc name) {
148         this(name, "Pool of files processor");
149     }
150         
151     /**
152      * @param name Name of this processor.
153      * @param description Description for this processor.
154      */

155     public WriterPoolProcessor(final String JavaDoc name,
156                 final String JavaDoc description) {
157         super(name, description);
158         Type e = addElementToDefinition(
159             new SimpleType(ATTR_COMPRESS, "Compress files when " +
160                 "writing to disk.", new Boolean JavaDoc(DEFAULT_COMPRESS)));
161         e.setOverrideable(false);
162         e = addElementToDefinition(
163             new SimpleType(ATTR_PREFIX,
164                 "File prefix. " +
165                 "The text supplied here will be used as a prefix naming " +
166                 "writer files. For example if the prefix is 'IAH', " +
167                 "then file names will look like " +
168                 "IAH-20040808101010-0001-HOSTNAME.arc.gz " +
169                 "...if writing ARCs (The prefix will be " +
170                 "separated from the date by a hyphen).",
171                 WriterPoolMember.DEFAULT_PREFIX));
172         e = addElementToDefinition(
173             new SimpleType(ATTR_SUFFIX, "Suffix to tag onto " +
174                 "files. If value is '${HOSTNAME}', will use hostname for " +
175                 "suffix. If empty, no suffix will be added.",
176                 WriterPoolMember.DEFAULT_SUFFIX));
177         e.setOverrideable(false);
178         e = addElementToDefinition(
179             new SimpleType(ATTR_MAX_SIZE_BYTES, "Max size of each file",
180                 new Integer JavaDoc(DEFAULT_MAX_FILE_SIZE)));
181         e.setOverrideable(false);
182         e = addElementToDefinition(
183             new StringList(ATTR_PATH, "Where to files. " +
184                 "Supply absolute or relative path. If relative, files " +
185                 "will be written relative to " +
186                 "the " + CrawlOrder.ATTR_DISK_PATH + "setting." +
187                 " If more than one path specified, we'll round-robin" +
188                 " dropping files to each. This setting is safe" +
189                 " to change midcrawl (You can remove and add new dirs" +
190                 " as the crawler progresses).", getDefaultPath()));
191         e.setOverrideable(false);
192         e = addElementToDefinition(new SimpleType(ATTR_POOL_MAX_ACTIVE,
193             "Maximum active files in pool. " +
194             "This setting cannot be varied over the life of a crawl.",
195             new Integer JavaDoc(WriterPool.DEFAULT_MAX_ACTIVE)));
196         e.setOverrideable(false);
197         e = addElementToDefinition(new SimpleType(ATTR_POOL_MAX_WAIT,
198             "Maximum time to wait on pool element" +
199             " (milliseconds). This setting cannot be varied over the life" +
200             " of a crawl.",
201             new Integer JavaDoc(WriterPool.DEFAULT_MAXIMUM_WAIT)));
202         e.setOverrideable(false);
203         e = addElementToDefinition(new SimpleType(ATTR_MAX_BYTES_WRITTEN,
204             "Total file bytes to write to disk." +
205             " Once the size of all files on disk has exceeded this " +
206             "limit, this processor will stop the crawler. " +
207             "A value of zero means no upper limit.", new Long JavaDoc(0)));
208         e.setOverrideable(false);
209         e.setExpertSetting(true);
210     }
211     
212     protected String JavaDoc [] getDefaultPath() {
213         return DEFAULT_PATH;
214     }
215
216     public synchronized void initialTasks() {
217         // Add this class to crawl state listeners and setup pool.
218
getSettingsHandler().getOrder().getController().
219             addCrawlStatusListener(this);
220         setupPool(new AtomicInteger JavaDoc());
221         // Run checkpoint recovery code.
222
if (getSettingsHandler().getOrder().getController().
223                 isCheckpointRecover()) {
224             checkpointRecover();
225         }
226     }
227     
228     protected AtomicInteger JavaDoc getSerialNo() {
229         return ((WriterPool)getPool()).getSerialNo();
230     }
231
232     /**
233      * Set up pool of files.
234      */

235     protected abstract void setupPool(final AtomicInteger JavaDoc serialNo);
236
237     /**
238      * Writes a CrawlURI and its associated data to store file.
239      *
240      * Currently this method understands the following uri types: dns, http,
241      * and https.
242      *
243      * @param curi CrawlURI to process.
244      */

245     protected abstract void innerProcess(CrawlURI curi);
246     
247     protected void checkBytesWritten() {
248         long max = getMaxToWrite();
249         if (max <= 0) {
250             return;
251         }
252         if (max <= this.totalBytesWritten) {
253             getController().requestCrawlStop("Finished - Maximum bytes (" +
254                 Long.toString(max) + ") written");
255         }
256     }
257     
258     protected String JavaDoc getHostAddress(CrawlURI curi) {
259         CrawlHost h = getController().getServerCache().getHostFor(curi);
260         if (h == null) {
261             throw new NullPointerException JavaDoc("Crawlhost is null for " +
262                 curi + " " + curi.getVia());
263         }
264         InetAddress JavaDoc a = h.getIP();
265         if (a == null) {
266             throw new NullPointerException JavaDoc("Address is null for " +
267                 curi + " " + curi.getVia() + ". Address " +
268                 ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP)?
269                      "was never looked up.":
270                      (System.currentTimeMillis() - h.getIpFetched()) +
271                          " ms ago."));
272         }
273         return h.getIP().getHostAddress();
274     }
275     
276     /**
277      * Version of getAttributes that catches and logs exceptions
278      * and returns null if failure to fetch the attribute.
279      * @param name Attribute name.
280      * @return Attribute or null.
281      */

282     public Object JavaDoc getAttributeUnchecked(String JavaDoc name) {
283         Object JavaDoc result = null;
284         try {
285             result = super.getAttribute(name);
286         } catch (AttributeNotFoundException JavaDoc e) {
287             logger.warning(e.getLocalizedMessage());
288         } catch (MBeanException JavaDoc e) {
289             logger.warning(e.getLocalizedMessage());
290         } catch (ReflectionException JavaDoc e) {
291             logger.warning(e.getLocalizedMessage());
292         }
293         return result;
294     }
295
296    /**
297     * Max size we want files to be (bytes).
298     *
299     * Default is ARCConstants.DEFAULT_MAX_ARC_FILE_SIZE. Note that ARC
300     * files will usually be bigger than maxSize; they'll be maxSize + length
301     * to next boundary.
302     * @return ARC maximum size.
303     */

304     public int getMaxSize() {
305         Object JavaDoc obj = getAttributeUnchecked(ATTR_MAX_SIZE_BYTES);
306         return (obj == null)? DEFAULT_MAX_FILE_SIZE: ((Integer JavaDoc)obj).intValue();
307     }
308
309     public String JavaDoc getPrefix() {
310         Object JavaDoc obj = getAttributeUnchecked(ATTR_PREFIX);
311         return (obj == null)? WriterPoolMember.DEFAULT_PREFIX: (String JavaDoc)obj;
312     }
313
314     public List JavaDoc<File JavaDoc> getOutputDirs() {
315         Object JavaDoc obj = getAttributeUnchecked(ATTR_PATH);
316         List JavaDoc list = (obj == null)? Arrays.asList(DEFAULT_PATH): (StringList)obj;
317         ArrayList JavaDoc<File JavaDoc> results = new ArrayList JavaDoc<File JavaDoc>();
318         for (Iterator JavaDoc i = list.iterator(); i.hasNext();) {
319             String JavaDoc path = (String JavaDoc)i.next();
320             File JavaDoc f = new File JavaDoc(path);
321             if (!f.isAbsolute()) {
322                 f = new File JavaDoc(getController().getDisk(), path);
323             }
324             if (!f.exists()) {
325                 try {
326                     f.mkdirs();
327                 } catch (Exception JavaDoc e) {
328                     e.printStackTrace();
329                     continue;
330                 }
331             }
332             results.add(f);
333         }
334         return results;
335     }
336     
337     public boolean isCompressed() {
338         Object JavaDoc obj = getAttributeUnchecked(ATTR_COMPRESS);
339         return (obj == null)? DEFAULT_COMPRESS:
340             ((Boolean JavaDoc)obj).booleanValue();
341     }
342
343     /**
344      * @return Returns the poolMaximumActive.
345      */

346     public int getPoolMaximumActive() {
347         Object JavaDoc obj = getAttributeUnchecked(ATTR_POOL_MAX_ACTIVE);
348         return (obj == null)? WriterPool.DEFAULT_MAX_ACTIVE:
349             ((Integer JavaDoc)obj).intValue();
350     }
351
352     /**
353      * @return Returns the poolMaximumWait.
354      */

355     public int getPoolMaximumWait() {
356         Object JavaDoc obj = getAttributeUnchecked(ATTR_POOL_MAX_WAIT);
357         return (obj == null)? WriterPool.DEFAULT_MAXIMUM_WAIT:
358             ((Integer JavaDoc)obj).intValue();
359     }
360
361     public String JavaDoc getSuffix() {
362         Object JavaDoc obj = getAttributeUnchecked(ATTR_SUFFIX);
363         String JavaDoc sfx = (obj == null)?
364             WriterPoolMember.DEFAULT_SUFFIX: (String JavaDoc)obj;
365         if (sfx != null && sfx.trim().
366                 equals(WriterPoolMember.HOSTNAME_VARIABLE)) {
367             String JavaDoc str = "localhost.localdomain";
368             try {
369                 str = InetAddress.getLocalHost().getHostName();
370             } catch (UnknownHostException JavaDoc ue) {
371                 logger.severe("Failed getHostAddress for this host: " + ue);
372             }
373             sfx = str;
374         }
375         return sfx;
376     }
377     
378     public long getMaxToWrite() {
379         Object JavaDoc obj = getAttributeUnchecked(ATTR_MAX_BYTES_WRITTEN);
380         return (obj == null)? 0: ((Long JavaDoc)obj).longValue();
381     }
382
383     public void crawlEnding(String JavaDoc sExitMessage) {
384         this.pool.close();
385     }
386
387     public void crawlEnded(String JavaDoc sExitMessage) {
388         // sExitMessage is unused.
389
}
390
391     /* (non-Javadoc)
392      * @see org.archive.crawler.event.CrawlStatusListener#crawlStarted(java.lang.String)
393      */

394     public void crawlStarted(String JavaDoc message) {
395         // TODO Auto-generated method stub
396
}
397     
398     protected String JavaDoc getCheckpointStateFile() {
399         return this.getClass().getName() + ".state";
400     }
401     
402     public void crawlCheckpoint(File JavaDoc checkpointDir) throws IOException JavaDoc {
403         int serial = getSerialNo().get();
404         if (this.pool.getNumActive() > 0) {
405             // If we have open active Archive files, up the serial number
406
// so after checkpoint, we start at one past current number and
407
// so the number we serialize, is one past current serialNo.
408
// All this serial number manipulation should be fine in here since
409
// we're paused checkpointing (Revisit if this assumption changes).
410
serial = getSerialNo().incrementAndGet();
411         }
412         saveCheckpointSerialNumber(checkpointDir, serial);
413         // Close all ARCs on checkpoint.
414
try {
415             this.pool.close();
416         } finally {
417             // Reopen on checkpoint.
418
setupPool(new AtomicInteger JavaDoc(serial));
419         }
420     }
421     
422     public void crawlPausing(String JavaDoc statusMessage) {
423         // sExitMessage is unused.
424
}
425
426     public void crawlPaused(String JavaDoc statusMessage) {
427         // sExitMessage is unused.
428
}
429
430     public void crawlResuming(String JavaDoc statusMessage) {
431         // sExitMessage is unused.
432
}
433     
434     private void readObject(ObjectInputStream JavaDoc stream)
435     throws IOException JavaDoc, ClassNotFoundException JavaDoc {
436         stream.defaultReadObject();
437         ObjectPlusFilesInputStream coistream =
438             (ObjectPlusFilesInputStream)stream;
439         coistream.registerFinishTask( new Runnable JavaDoc() {
440             public void run() {
441                 setupPool(new AtomicInteger JavaDoc());
442             }
443         });
444     }
445
446     protected WriterPool getPool() {
447         return pool;
448     }
449
450     protected void setPool(WriterPool pool) {
451         this.pool = pool;
452     }
453
454     protected long getTotalBytesWritten() {
455         return totalBytesWritten;
456     }
457
458     protected void setTotalBytesWritten(long totalBytesWritten) {
459         this.totalBytesWritten = totalBytesWritten;
460     }
461     
462     /**
463      * Called out of {@link #initialTasks()} when recovering a checkpoint.
464      * Restore state.
465      */

466     protected void checkpointRecover() {
467         int serialNo = loadCheckpointSerialNumber();
468         if (serialNo != -1) {
469             getSerialNo().set(serialNo);
470         }
471     }
472
473     /**
474      * @return Serial number from checkpoint state file or if unreadable, -1
475      * (Client should check for -1).
476      */

477     protected int loadCheckpointSerialNumber() {
478         int result = -1;
479         
480         // If in recover mode, read in the Writer serial number saved
481
// off when we checkpointed.
482
File JavaDoc stateFile = new File JavaDoc(getSettingsHandler().getOrder()
483                 .getController().getCheckpointRecover().getDirectory(),
484                 getCheckpointStateFile());
485         if (!stateFile.exists()) {
486             logger.info(stateFile.getAbsolutePath()
487                     + " doesn't exist so cannot restore Writer serial number.");
488         } else {
489             DataInputStream JavaDoc dis = null;
490             try {
491                 dis = new DataInputStream JavaDoc(new FileInputStream JavaDoc(stateFile));
492                 result = dis.readShort();
493             } catch (FileNotFoundException JavaDoc e) {
494                 e.printStackTrace();
495             } catch (IOException JavaDoc e) {
496                 e.printStackTrace();
497             } finally {
498                 try {
499                     if (dis != null) {
500                         dis.close();
501                     }
502                 } catch (IOException JavaDoc e) {
503                     e.printStackTrace();
504                 }
505             }
506         }
507         return result;
508     }
509     
510     protected void saveCheckpointSerialNumber(final File JavaDoc checkpointDir,
511             final int serialNo)
512     throws IOException JavaDoc {
513         // Write out the current state of the ARCWriter serial number.
514
File JavaDoc f = new File JavaDoc(checkpointDir, getCheckpointStateFile());
515         DataOutputStream JavaDoc dos = new DataOutputStream JavaDoc(new FileOutputStream JavaDoc(f));
516         try {
517             dos.writeShort(serialNo);
518         } finally {
519             dos.close();
520         }
521     }
522 }
Popular Tags