1 23 package org.archive.io; 24 25 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; 26 27 import java.io.File ; 28 import java.io.FileOutputStream ; 29 import java.io.IOException ; 30 import java.io.InputStream ; 31 import java.io.OutputStream ; 32 import java.text.DecimalFormat ; 33 import java.text.NumberFormat ; 34 import java.util.Iterator ; 35 import java.util.List ; 36 import java.util.concurrent.atomic.AtomicInteger ; 37 import java.util.logging.Logger ; 38 import java.util.zip.GZIPOutputStream ; 39 40 import org.archive.util.ArchiveUtils; 41 import org.archive.util.IoUtils; 42 import org.archive.util.TimestampSerialno; 43 44 45 53 public abstract class WriterPoolMember implements ArchiveFileConstants { 54 private final Logger logger = Logger.getLogger(this.getClass().getName()); 55 56 public static final String UTF8 = "UTF-8"; 57 58 63 public static final String DEFAULT_PREFIX = "IAH"; 64 65 68 public static final String HOSTNAME_VARIABLE = "${HOSTNAME}"; 69 70 73 public static final String DEFAULT_SUFFIX = HOSTNAME_VARIABLE; 74 75 78 private File f = null; 79 80 83 private OutputStream out = null; 84 85 89 private FileOutputStream fos; 90 91 private final boolean compressed; 92 private List <File > writeDirs = null; 93 private String prefix = DEFAULT_PREFIX; 94 private String suffix = DEFAULT_SUFFIX; 95 private final int maxSize; 96 private final String extension; 97 98 102 private String createTimestamp = "UNSET!!!"; 103 104 107 final private AtomicInteger serialNo; 108 109 112 private static int roundRobinIndex = 0; 113 114 119 private static NumberFormat serialNoFormatter = new DecimalFormat ("00000"); 120 121 133 protected WriterPoolMember(AtomicInteger serialNo, 134 final OutputStream out, final File file, 135 final boolean cmprs, String a14DigitDate) 136 throws IOException { 137 this(serialNo, null, null, cmprs, -1, null); 138 this.out = out; 139 this.f = file; 140 } 141 142 152 public WriterPoolMember(AtomicInteger serialNo, 153 final List <File > dirs, final String prefix, 154 final boolean cmprs, final int maxSize, final String extension) { 155 this(serialNo, dirs, prefix, "", cmprs, maxSize, extension); 156 } 157 158 169 public WriterPoolMember(AtomicInteger serialNo, 170 final List <File > dirs, final String prefix, 171 final String suffix, final boolean cmprs, 172 final int maxSize, final String extension) { 173 this.suffix = suffix; 174 this.prefix = prefix; 175 this.maxSize = maxSize; 176 this.writeDirs = dirs; 177 this.compressed = cmprs; 178 this.extension = extension; 179 this.serialNo = serialNo; 180 } 181 182 195 public void checkSize() throws IOException { 196 if (this.out == null || 197 (this.maxSize != -1 && (this.f.length() > this.maxSize))) { 198 createFile(); 199 } 200 } 201 202 209 protected String createFile() throws IOException { 210 TimestampSerialno tsn = getTimestampSerialNo(); 211 String name = this.prefix + '-' + getUniqueBasename(tsn) + 212 ((this.suffix == null || this.suffix.length() <= 0)? 213 "": "-" + this.suffix) + '.' + this.extension + 214 ((this.compressed)? '.' + COMPRESSED_FILE_EXTENSION: "") + 215 OCCUPIED_SUFFIX; 216 this.createTimestamp = tsn.getTimestamp(); 217 File dir = getNextDirectory(this.writeDirs); 218 return createFile(new File (dir, name)); 219 } 220 221 protected String createFile(final File file) throws IOException { 222 close(); 223 this.f = file; 224 this.fos = new FileOutputStream (this.f); 225 this.out = new FastBufferedOutputStream(this.fos); 226 logger.info("Opened " + this.f.getAbsolutePath()); 227 return this.f.getName(); 228 } 229 230 236 protected File getNextDirectory(List <File > dirs) 237 throws IOException { 238 if (WriterPoolMember.roundRobinIndex >= dirs.size()) { 239 WriterPoolMember.roundRobinIndex = 0; 240 } 241 File d = null; 242 try { 243 d = checkWriteable((File )dirs. 244 get(WriterPoolMember.roundRobinIndex)); 245 } catch (IndexOutOfBoundsException e) { 246 } 249 if (d == null && dirs.size() > 1) { 250 for (Iterator i = dirs.iterator(); d == null && i.hasNext();) { 251 d = checkWriteable((File )i.next()); 252 } 253 } else { 254 WriterPoolMember.roundRobinIndex++; 255 } 256 if (d == null) { 257 throw new IOException ("Directories unusable."); 258 } 259 return d; 260 } 261 262 protected File checkWriteable(File d) { 263 if (d == null) { 264 return d; 265 } 266 267 try { 268 IoUtils.ensureWriteableDirectory(d); 269 } catch(IOException e) { 270 logger.warning("Directory " + d.getPath() + " is not" + 271 " writeable or cannot be created: " + e.getMessage()); 272 d = null; 273 } 274 return d; 275 } 276 277 protected synchronized TimestampSerialno getTimestampSerialNo() { 278 return getTimestampSerialNo(null); 279 } 280 281 290 protected synchronized TimestampSerialno 291 getTimestampSerialNo(final String timestamp) { 292 return new TimestampSerialno((timestamp != null)? 293 timestamp: ArchiveUtils.get14DigitDate(), 294 serialNo.getAndIncrement()); 295 } 296 297 306 private String getUniqueBasename(TimestampSerialno tsn) { 307 return tsn.getTimestamp() + "-" + 308 WriterPoolMember.serialNoFormatter.format(tsn.getSerialNumber()); 309 } 310 311 312 317 protected String getBaseFilename() { 318 String name = this.f.getName(); 319 if (this.compressed && name.endsWith(DOT_COMPRESSED_FILE_EXTENSION)) { 320 return name.substring(0,name.length() - 3); 321 } else if(this.compressed && 322 name.endsWith(DOT_COMPRESSED_FILE_EXTENSION + 323 OCCUPIED_SUFFIX)) { 324 return name.substring(0, name.length() - 325 (3 + OCCUPIED_SUFFIX.length())); 326 } else { 327 return name; 328 } 329 } 330 331 339 public File getFile() { 340 return this.f; 341 } 342 343 353 protected void preWriteRecordTasks() 354 throws IOException { 355 checkSize(); 356 if (this.compressed) { 357 this.out = new CompressedStream(this.out); 361 } 362 } 363 364 371 protected void postWriteRecordTasks() 372 throws IOException { 373 if (this.compressed) { 374 CompressedStream o = (CompressedStream)this.out; 375 o.finish(); 376 o.flush(); 377 this.out = o.getWrappedStream(); 378 } 379 } 380 381 388 public long getPosition() throws IOException { 389 long position = 0; 390 if (this.out != null) { 391 this.out.flush(); 392 } 393 if (this.fos != null) { 394 this.fos.flush(); 397 position = this.fos.getChannel().position(); 398 } 399 return position; 400 } 401 402 public boolean isCompressed() { 403 return compressed; 404 } 405 406 protected void write(final byte [] b) throws IOException { 407 this.out.write(b); 408 } 409 410 protected void flush() throws IOException { 411 this.out.flush(); 412 } 413 414 protected void write(byte[] b, int off, int len) throws IOException { 415 this.out.write(b, off, len); 416 } 417 418 protected void write(int b) throws IOException { 419 this.out.write(b); 420 } 421 422 protected void readFullyFrom(final InputStream is, final long recordLength, 423 final byte [] b) 424 throws IOException { 425 int read = b.length; 426 int total = 0; 427 while((read = is.read(b)) != -1 && total < recordLength) { 428 total += read; 429 write(b, 0, read); 430 } 431 if (total != recordLength) { 432 throw new IOException ("Read " + total + " but expected " + 433 recordLength); 434 } 435 } 436 437 public void close() throws IOException { 438 if (this.out == null) { 439 return; 440 } 441 this.out.close(); 442 this.out = null; 443 this.fos = null; 444 if (this.f != null && this.f.exists()) { 445 String path = this.f.getAbsolutePath(); 446 if (path.endsWith(OCCUPIED_SUFFIX)) { 447 File f = new File (path.substring(0, 448 path.length() - OCCUPIED_SUFFIX.length())); 449 if (!this.f.renameTo(f)) { 450 logger.warning("Failed rename of " + path); 451 } 452 this.f = f; 453 } 454 455 logger.info("Closed " + this.f.getAbsolutePath() + 456 ", size " + this.f.length()); 457 } 458 } 459 460 protected OutputStream getOutputStream() { 461 return this.out; 462 } 463 464 protected String getCreateTimestamp() { 465 return createTimestamp; 466 } 467 468 469 473 private class CompressedStream extends GZIPOutputStream { 474 public CompressedStream(OutputStream out) 475 throws IOException { 476 super(out); 477 } 478 479 482 OutputStream getWrappedStream() { 483 return this.out; 484 } 485 } 486 } 487 | Popular Tags |