1 23 package org.archive.crawler.writer; 24 25 import java.io.ByteArrayInputStream ; 26 import java.io.IOException ; 27 import java.net.URI ; 28 import java.net.URISyntaxException ; 29 import java.util.Collection ; 30 import java.util.HashMap ; 31 import java.util.List ; 32 import java.util.Map ; 33 import java.util.concurrent.atomic.AtomicInteger ; 34 import java.util.logging.Level ; 35 import java.util.logging.Logger ; 36 37 import org.archive.crawler.datamodel.CoreAttributeConstants; 38 import org.archive.crawler.datamodel.CrawlURI; 39 import org.archive.crawler.datamodel.FetchStatusCodes; 40 import org.archive.crawler.event.CrawlStatusListener; 41 import org.archive.crawler.extractor.Link; 42 import org.archive.crawler.framework.WriterPoolProcessor; 43 import org.archive.io.WriterPoolMember; 44 import org.archive.io.WriterPoolSettings; 45 import org.archive.io.warc.ExperimentalWARCWriter; 46 import org.archive.io.warc.WARCConstants; 47 import org.archive.io.warc.WARCWriterPool; 48 import org.archive.uid.GeneratorFactory; 49 import org.archive.util.ArchiveUtils; 50 import org.archive.util.anvl.ANVLRecord; 51 52 53 58 public class ExperimentalWARCWriterProcessor extends WriterPoolProcessor 59 implements CoreAttributeConstants, CrawlStatusListener, 60 WriterPoolSettings, FetchStatusCodes, WARCConstants { 61 62 private static final long serialVersionUID = 188656957531675821L; 63 64 private final Logger logger = Logger.getLogger(this.getClass().getName()); 65 66 69 private static final String [] DEFAULT_PATH = {"warcs"}; 70 71 protected String [] getDefaultPath() { 72 return DEFAULT_PATH; 73 } 74 75 78 public ExperimentalWARCWriterProcessor(String name) { 79 super(name, "Experimental WARCWriter processor"); 80 } 81 82 protected void setupPool(final AtomicInteger serialNo) { 83 setPool(new WARCWriterPool(serialNo, this, getPoolMaximumActive(), 84 getPoolMaximumWait())); 85 } 86 87 97 protected void innerProcess(CrawlURI curi) { 98 if (curi.getFetchStatus() <= 0) { 100 return; 101 } 102 103 int recordLength = (int)curi.getContentSize(); 105 if (recordLength <= 0) { 106 return; 108 } 109 110 String scheme = curi.getUURI().getScheme().toLowerCase(); 111 try { 112 if ((scheme.equals("dns") && 113 curi.getFetchStatus() == S_DNS_SUCCESS) || 114 ((scheme.equals("http") || scheme.equals("https")) && 115 curi.getFetchStatus() > 0 && curi.isHttpTransaction()) || 116 (scheme.equals("ftp") && curi.getFetchStatus() == 200)) { 117 write(scheme, curi); 118 } else { 119 logger.info("This writer does not write out scheme " + 120 scheme + " content"); 121 } 122 } catch (IOException e) { 123 curi.addLocalizedError(this.getName(), e, "WriteRecord: " + 124 curi.toString()); 125 logger.log(Level.SEVERE, "Failed write of Record: " + 126 curi.toString(), e); 127 } 128 } 129 130 protected void write(final String lowerCaseScheme, final CrawlURI curi) 131 throws IOException { 132 WriterPoolMember writer = getPool().borrowFile(); 133 long position = writer.getPosition(); 134 writer.checkSize(); 138 if (writer.getPosition() != position) { 139 setTotalBytesWritten(getTotalBytesWritten() + 143 (writer.getPosition() - position)); 144 position = writer.getPosition(); 145 } 146 147 ExperimentalWARCWriter w = (ExperimentalWARCWriter)writer; 148 try { 149 final URI baseid = getRecordID(); 152 final String timestamp = 153 ArchiveUtils.get14DigitDate(curi.getLong(A_FETCH_BEGAN_TIME)); 154 if (lowerCaseScheme.startsWith("http")) { 155 ANVLRecord r = new ANVLRecord(); 158 if (curi.getContentDigest() != null) { 159 r.addLabelValue(NAMED_FIELD_CHECKSUM_LABEL, 162 curi.getContentDigestSchemeString()); 163 } 164 r.addLabelValue(NAMED_FIELD_IP_LABEL, getHostAddress(curi)); 165 URI rid = writeResponse(w, timestamp, HTTP_RESPONSE_MIMETYPE, 166 baseid, curi, r); 167 r = new ANVLRecord(1); 168 r.addLabelValue(NAMED_FIELD_RELATED_LABEL, rid.toString()); 169 writeRequest(w, timestamp, HTTP_REQUEST_MIMETYPE, 170 baseid, curi, r); 171 writeMetadata(w, timestamp, baseid, curi, r); 172 } else if (lowerCaseScheme.equals("dns")) { 173 String ip = curi.getString(A_DNS_SERVER_IP_LABEL); 174 ANVLRecord r = null; 175 if (ip != null && ip.length() > 0) { 176 r = new ANVLRecord(); 177 r.addLabelValue(NAMED_FIELD_IP_LABEL, ip); 178 } 179 writeResponse(w, timestamp, curi.getContentType(), baseid, 180 curi, r); 181 } else { 182 logger.warning("No handler for scheme " + lowerCaseScheme); 183 } 184 } catch (IOException e) { 185 getPool().invalidateFile(writer); 187 writer = null; 191 throw e; 192 } finally { 193 if (writer != null) { 194 setTotalBytesWritten(getTotalBytesWritten() + 195 (writer.getPosition() - position)); 196 getPool().returnFile(writer); 197 } 198 } 199 checkBytesWritten(); 200 } 201 202 protected URI writeRequest(final ExperimentalWARCWriter w, 203 final String timestamp, final String mimetype, 204 final URI baseid, final CrawlURI curi, 205 final ANVLRecord namedFields) 206 throws IOException { 207 final URI uid = qualifyRecordID(baseid, TYPE, REQUEST); 208 w.writeRequestRecord(curi.toString(), timestamp, mimetype, uid, 209 namedFields, 210 curi.getHttpRecorder().getRecordedOutput().getReplayInputStream(), 211 curi.getHttpRecorder().getRecordedOutput().getSize()); 212 return uid; 213 } 214 215 protected URI writeResponse(final ExperimentalWARCWriter w, 216 final String timestamp, final String mimetype, 217 final URI baseid, final CrawlURI curi, 218 final ANVLRecord namedFields) 219 throws IOException { 220 w.writeResponseRecord(curi.toString(), timestamp, mimetype, baseid, 221 namedFields, 222 curi.getHttpRecorder().getRecordedInput().getReplayInputStream(), 223 curi.getHttpRecorder().getRecordedInput().getSize()); 224 return baseid; 225 } 226 227 protected URI writeMetadata(final ExperimentalWARCWriter w, 228 final String timestamp, 229 final URI baseid, final CrawlURI curi, 230 final ANVLRecord namedFields) 231 throws IOException { 232 final URI uid = qualifyRecordID(baseid, TYPE, METADATA); 233 ANVLRecord r = new ANVLRecord(); 236 if (curi.isSeed()) { 237 r.addLabel("seed"); 238 } else { 239 if (curi.forceFetch()) { 240 r.addLabel("force-fetch"); 241 } 242 r.addLabelValue("via", curi.flattenVia()); 243 r.addLabelValue("pathFromSeed", curi.getPathFromSeed()); 244 } 245 Collection <Link> links = curi.getOutLinks(); 246 if (links != null || links.size() > 0) { 247 for (Link link: links) { 248 r.addLabelValue("outlink", link.toString()); 249 } 250 } 251 if (curi.isTruncatedFetch()) { 252 String value = curi.isTimeTruncatedFetch()? 253 NAMED_FIELD_TRUNCATED_VALUE_TIME: 254 curi.isLengthTruncatedFetch()? 255 NAMED_FIELD_TRUNCATED_VALUE_LEN: 256 curi.isHeaderTruncatedFetch()? 257 NAMED_FIELD_TRUNCATED_VALUE_HEAD: 258 NAMED_FIELD_TRUNCATED_VALUE_UNSPECIFIED; 259 260 r.addLabelValue(NAMED_FIELD_TRUNCATED, value); 261 } 262 263 272 byte [] b = r.getUTF8Bytes(); 273 w.writeMetadataRecord(curi.toString(), timestamp, ANVLRecord.MIMETYPE, 274 uid, namedFields, new ByteArrayInputStream (b), b.length); 275 return uid; 276 } 277 278 protected URI getRecordID() throws IOException { 279 URI result; 280 try { 281 result = GeneratorFactory.getFactory().getRecordID(); 282 } catch (URISyntaxException e) { 283 throw new IOException (e.toString()); 284 } 285 return result; 286 } 287 288 protected URI qualifyRecordID(final URI base, final String key, 289 final String value) 290 throws IOException { 291 URI result; 292 Map <String , String > qualifiers = new HashMap <String , String >(1); 293 qualifiers.put(key, value); 294 try { 295 result = GeneratorFactory.getFactory(). 296 qualifyRecordID(base, qualifiers); 297 } catch (URISyntaxException e) { 298 throw new IOException (e.toString()); 299 } 300 return result; 301 } 302 303 public List getMetadata() { 304 return null; 306 } 307 } | Popular Tags |