1 2 3 4 package net.nutch.io; 5 6 import java.io.*; 7 import java.util.*; 8 import java.util.logging.*; 9 import java.nio.channels.*; 10 import org.apache.lucene.util.PriorityQueue; 11 import net.nutch.fs.*; 12 import net.nutch.util.*; 13 14 15 public class SequenceFile { 16 public static final Logger LOG = 17 LogFormatter.getLogger("net.nutch.io.SequenceFile"); 18 19 private SequenceFile() {} 21 private static byte[] VERSION = new byte[] { 22 (byte)'S', (byte)'E', (byte)'Q', 1 23 }; 24 25 26 public static class Writer { 27 private NFSDataOutputStream out; 28 private DataOutputBuffer buffer = new DataOutputBuffer(); 29 private NutchFileSystem nfs = null; 30 private File target = null; 31 32 private Class keyClass; 33 private Class valClass; 34 35 36 public Writer(NutchFileSystem nfs, String name, 37 Class keyClass, Class valClass) 38 throws IOException { 39 this.nfs = nfs; 40 this.target = new File(name); 41 if (nfs.exists(target)) { 42 throw new IOException("already exists: " + target); 43 } 44 init(new NFSDataOutputStream(nfs.create(target)), 45 keyClass, valClass); 46 } 47 48 49 private Writer(NFSDataOutputStream out, 50 Class keyClass, Class valClass) throws IOException { 51 init(out, keyClass, valClass); 52 } 53 54 private void init(NFSDataOutputStream out, 55 Class keyClass, Class valClass) throws IOException { 56 this.out = out; 57 this.out.write(VERSION); 58 59 this.keyClass = keyClass; 60 this.valClass = valClass; 61 62 new UTF8(WritableName.getName(keyClass)).write(this.out); 63 new UTF8(WritableName.getName(valClass)).write(this.out); 64 65 this.out.flush(); } 67 68 69 70 public Class getKeyClass() { return keyClass; } 71 72 73 public Class getValueClass() { return valClass; } 74 75 76 77 public void close() throws IOException { 78 if (out != null) { 79 out.close(); 80 out = null; 81 } 82 } 83 84 85 public void append(Writable key, Writable val) throws IOException { 86 if (key.getClass() != keyClass) 87 throw new IOException("wrong key class: "+key+" is not "+keyClass); 88 if (val.getClass() != valClass) 89 throw new IOException("wrong value class: "+val+" is not "+valClass); 90 91 buffer.reset(); 92 93 key.write(buffer); 94 int keyLength = buffer.getLength(); 95 if (keyLength == 0) 96 throw new IOException("zero length keys not allowed: " + key); 97 98 val.write(buffer); 99 append(buffer.getData(), 0, buffer.getLength(), keyLength); 100 } 101 102 103 public void append(byte[] data, int start, int length, int keyLength) 104 throws IOException { 105 if (keyLength == 0) 106 throw new IOException("zero length keys not allowed"); 107 108 out.writeInt(length); out.writeInt(keyLength); out.write(data, start, length); } 112 113 114 public long getLength() throws IOException { 115 return out.getPos(); 116 } 117 118 } 119 120 121 public static class Reader { 122 private String file; 123 private NFSDataInputStream in; 124 private DataOutputBuffer outBuf = new DataOutputBuffer(); 125 private DataInputBuffer inBuf = new DataInputBuffer(); 126 private NutchFileSystem nfs = null; 127 128 private Class keyClass; 129 private Class valClass; 130 131 private long end; 132 private int keyLength; 133 134 135 public Reader(NutchFileSystem nfs, String file) throws IOException { 136 this(nfs, file, NutchConf.getInt("io.file.buffer.size", 4096)); 137 } 138 139 private Reader(NutchFileSystem nfs, String name, int bufferSize) throws IOException { 140 this.nfs = nfs; 141 this.file = name; 142 File file = new File(name); 143 this.in = new NFSDataInputStream(nfs.open(file), bufferSize); 144 this.end = nfs.getLength(file); 145 init(); 146 } 147 148 private Reader(NutchFileSystem nfs, String file, int bufferSize, long start, long length) 149 throws IOException { 150 this.nfs = nfs; 151 this.file = file; 152 this.in = new NFSDataInputStream(nfs.open(new File(file)), bufferSize); 153 seek(start); 154 init(); 155 156 this.end = in.getPos() + length; 157 } 158 159 private void init() throws IOException { 160 byte[] version = new byte[VERSION.length]; 161 in.readFully(version); 162 if (!Arrays.equals(version, VERSION)) { 163 throw new VersionMismatchException(VERSION[3], version[3]); 164 } 165 166 UTF8 className = new UTF8(); 167 168 className.readFields(in); this.keyClass = WritableName.getClass(className.toString()); 170 171 className.readFields(in); this.valClass = WritableName.getClass(className.toString()); 173 } 174 175 176 public synchronized void close() throws IOException { 177 in.close(); 178 } 179 180 181 public Class getKeyClass() { return keyClass; } 182 183 184 public Class getValueClass() { return valClass; } 185 186 188 public synchronized boolean next(Writable key) throws IOException { 189 if (key.getClass() != keyClass) 190 throw new IOException("wrong key class: "+key+" is not "+keyClass); 191 192 outBuf.reset(); 193 194 keyLength = next(outBuf); 195 if (keyLength < 0) 196 return false; 197 198 inBuf.reset(outBuf.getData(), outBuf.getLength()); 199 200 key.readFields(inBuf); 201 if (inBuf.getPosition() != keyLength) 202 throw new IOException(key + " read " + inBuf.getPosition() 203 + " bytes, should read " + keyLength); 204 205 return true; 206 } 207 208 211 public synchronized boolean next(Writable key, Writable val) 212 throws IOException { 213 if (val.getClass() != valClass) 214 throw new IOException("wrong value class: "+val+" is not "+valClass); 215 216 boolean more = next(key); 217 218 if (more) { 219 val.readFields(inBuf); 220 if (inBuf.getPosition() != outBuf.getLength()) 221 throw new IOException(val+" read "+(inBuf.getPosition()-keyLength) 222 + " bytes, should read " + 223 (outBuf.getLength()-keyLength)); 224 } 225 226 return more; 227 } 228 229 233 public synchronized int next(DataOutputBuffer buffer) throws IOException { 234 if (in.getPos() >= end) 235 return -1; 236 237 int length = in.readInt(); 238 int keyLength = in.readInt(); 239 buffer.write(in, length); 240 return keyLength; 241 } 242 243 244 public synchronized void seek(long position) throws IOException { 245 in.seek(position); 246 } 247 248 249 public synchronized long getPosition() throws IOException { 250 return in.getPos(); 251 } 252 253 254 public String toString() { 255 return file; 256 } 257 258 } 259 260 266 public static class Sorter { 267 private static final int FACTOR = NutchConf.getInt("io.sort.factor", 100); 268 private static final int MEGABYTES = NutchConf.getInt("io.sort.mb", 100); 269 270 private WritableComparator comparator; 271 272 private String inFile; private String [] inFiles; 275 private String outFile; 276 277 private int memory = MEGABYTES * 1024*1024; private int factor = FACTOR; 280 private NutchFileSystem nfs = null; 281 282 private Class keyClass; 283 private Class valClass; 284 285 286 public Sorter(NutchFileSystem nfs, Class keyClass, Class valClass) { 287 this(nfs, new WritableComparator(keyClass), valClass); 288 } 289 290 291 public Sorter(NutchFileSystem nfs, WritableComparator comparator, Class valClass) { 292 this.nfs = nfs; 293 this.comparator = comparator; 294 this.keyClass = comparator.getKeyClass(); 295 this.valClass = valClass; 296 } 297 298 299 public void setFactor(int factor) { this.factor = factor; } 300 301 302 public int getFactor() { return factor; } 303 304 305 public void setMemory(int memory) { this.memory = memory; } 306 307 308 public int getMemory() { return memory; } 309 310 311 public void sort(String inFile, String outFile) throws IOException { 312 if (nfs.exists(new File(outFile))) { 313 throw new IOException("already exists: " + outFile); 314 } 315 316 this.inFile = inFile; 317 this.outFile = outFile; 318 319 int segments = sortPass(); 320 int pass = 1; 321 while (segments > 1) { 322 segments = mergePass(pass, segments <= factor); 323 pass++; 324 } 325 } 326 327 private int sortPass() throws IOException { 328 LOG.fine("running sort pass"); 329 SortPass sortPass = new SortPass(); try { 331 return sortPass.run(); } finally { 333 sortPass.close(); } 335 } 336 337 private class SortPass { 338 private int limit = memory/4; 339 private DataOutputBuffer buffer = new DataOutputBuffer(); 340 private byte[] rawBuffer; 341 342 private int[] starts = new int[1024]; 343 private int[] pointers = new int[starts.length]; 344 private int[] pointersCopy = new int[starts.length]; 345 private int[] keyLengths = new int[starts.length]; 346 private int[] lengths = new int[starts.length]; 347 348 private Reader in; 349 private NFSDataOutputStream out; 350 private String outName; 351 352 public SortPass() throws IOException { 353 in = new Reader(nfs, inFile); 354 } 355 356 public int run() throws IOException { 357 int segments = 0; 358 boolean atEof = false; 359 while (!atEof) { 360 int count = 0; 361 buffer.reset(); 362 while (!atEof && buffer.getLength() < limit) { 363 364 int start = buffer.getLength(); int keyLength = in.next(buffer); 366 int length = buffer.getLength() - start; 367 368 if (keyLength == -1) { 369 atEof = true; 370 break; 371 } 372 373 if (count == starts.length) 374 grow(); 375 376 starts[count] = start; pointers[count] = count; 378 lengths[count] = length; 379 keyLengths[count] = keyLength; 380 381 count++; 382 } 383 384 LOG.finer("flushing segment " + segments); 386 rawBuffer = buffer.getData(); 387 sort(count); 388 flush(count, segments==0 && atEof); 389 segments++; 390 } 391 return segments; 392 } 393 394 public void close() throws IOException { 395 in.close(); 396 397 if (out != null) { 398 out.close(); 399 } 400 } 401 402 private void grow() { 403 int newLength = starts.length * 3 / 2; 404 starts = grow(starts, newLength); 405 pointers = grow(pointers, newLength); 406 pointersCopy = new int[newLength]; 407 keyLengths = grow(keyLengths, newLength); 408 lengths = grow(lengths, newLength); 409 } 410 411 private int[] grow(int[] old, int newLength) { 412 int[] result = new int[newLength]; 413 System.arraycopy(old, 0, result, 0, old.length); 414 return result; 415 } 416 417 private void flush(int count, boolean done) throws IOException { 418 if (out == null) { 419 outName = done ? outFile : outFile+".0"; 420 out = new NFSDataOutputStream(nfs.create(new File(outName))); 421 } 422 423 if (!done) { long length = buffer.getLength() + count*8; 425 out.writeLong(length); } 427 428 Writer writer = new Writer(out, keyClass, valClass); 429 430 for (int i = 0; i < count; i++) { int p = pointers[i]; 432 writer.append(rawBuffer, starts[p], lengths[p], keyLengths[p]); 433 } 434 } 435 436 private void sort(int count) { 437 System.arraycopy(pointers, 0, pointersCopy, 0, count); 438 mergeSort(pointersCopy, pointers, 0, count); 439 } 440 441 private int compare(int i, int j) { 442 return comparator.compare(rawBuffer, starts[i], keyLengths[i], 443 rawBuffer, starts[j], keyLengths[j]); 444 } 445 446 private void mergeSort(int src[], int dest[], int low, int high) { 447 int length = high - low; 448 449 if (length < 7) { 451 for (int i=low; i<high; i++) 452 for (int j=i; j>low && compare(dest[j-1], dest[j])>0; j--) 453 swap(dest, j, j-1); 454 return; 455 } 456 457 int mid = (low + high) >> 1; 459 mergeSort(dest, src, low, mid); 460 mergeSort(dest, src, mid, high); 461 462 if (compare(src[mid-1], src[mid]) <= 0) { 465 System.arraycopy(src, low, dest, low, length); 466 return; 467 } 468 469 for(int i = low, p = low, q = mid; i < high; i++) { 471 if (q>=high || p<mid && compare(src[p], src[q]) <= 0) 472 dest[i] = src[p++]; 473 else 474 dest[i] = src[q++]; 475 } 476 } 477 478 private void swap(int x[], int a, int b) { 479 int t = x[a]; 480 x[a] = x[b]; 481 x[b] = t; 482 } 483 } 484 485 private int mergePass(int pass, boolean last) throws IOException { 486 LOG.fine("running merge pass=" + pass); 487 MergePass mergePass = new MergePass(pass, last); 488 try { return mergePass.run(); } finally { 491 mergePass.close(); } 493 } 494 495 private class MergePass { 496 private int pass; 497 private boolean last; 498 499 private MergeQueue queue; 500 private NFSDataInputStream in; 501 private String inName; 502 503 public MergePass(int pass, boolean last) throws IOException { 504 this.pass = pass; 505 this.last = last; 506 507 this.queue = new MergeQueue(factor, last ? outFile : outFile+"."+pass); 508 509 this.inName = outFile+"."+(pass-1); 510 this.in = new NFSDataInputStream(nfs.open(new File(inName))); 511 } 512 513 public void close() throws IOException { 514 in.close(); nfs.delete(new File(inName)); 516 517 queue.close(); } 519 520 public int run() throws IOException { 521 int segments = 0; 522 long end = nfs.getLength(new File(inName)); 523 524 while (in.getPos() < end) { 525 LOG.finer("merging segment " + segments); 526 long totalLength = 0; 527 while (in.getPos() < end && queue.size() < factor) { 528 long length = in.readLong(); 529 totalLength += length; 530 Reader reader = new Reader(nfs, inName, memory/(factor+1), 531 in.getPos(), length); 532 MergeStream ms = new MergeStream(reader); if (ms.next()) { 534 queue.put(ms); 535 } 536 in.seek(reader.end); 537 } 538 539 if (!last) queue.out.writeLong(totalLength); 542 queue.merge(); 544 segments++; 545 } 546 547 return segments; 548 } 549 } 550 551 552 public void merge(String [] inFiles, String outFile) throws IOException { 553 this.inFiles = inFiles; 554 this.outFile = outFile; 555 this.factor = inFiles.length; 556 557 if (new File(outFile).exists()) { 558 throw new IOException("already exists: " + outFile); 559 } 560 561 MergeFiles mergeFiles = new MergeFiles(); 562 try { mergeFiles.run(); } finally { 565 mergeFiles.close(); } 567 568 } 569 570 private class MergeFiles { 571 private MergeQueue queue; 572 573 public MergeFiles() throws IOException { 574 this.queue = new MergeQueue(factor, outFile); 575 } 576 577 public void close() throws IOException { 578 queue.close(); 579 } 580 581 public void run() throws IOException { 582 LOG.finer("merging files=" + inFiles.length); 583 for (int i = 0; i < inFiles.length; i++) { 584 String inFile = inFiles[i]; 585 MergeStream ms = 586 new MergeStream(new Reader(nfs, inFile, memory/(factor+1))); 587 if (ms.next()) 588 queue.put(ms); 589 } 590 591 queue.merge(); 592 } 593 } 594 595 private class MergeStream { 596 private Reader in; 597 598 private DataOutputBuffer buffer = new DataOutputBuffer(); 599 private int keyLength; 600 601 public MergeStream(Reader reader) throws IOException { 602 if (reader.keyClass != keyClass) 603 throw new IOException("wrong key class: " + reader.getKeyClass() + 604 " is not " + keyClass); 605 if (reader.valClass != valClass) 606 throw new IOException("wrong value class: "+reader.getValueClass()+ 607 " is not " + valClass); 608 this.in = reader; 609 } 610 611 public boolean next() throws IOException { 612 buffer.reset(); 613 keyLength = in.next(buffer); 614 return keyLength >= 0; 615 } 616 } 617 618 private class MergeQueue extends PriorityQueue { 619 private NFSDataOutputStream out; 620 621 public MergeQueue(int size, String outName) throws IOException { 622 initialize(size); 623 this.out = 624 new NFSDataOutputStream(nfs.create(new File(outName)), 625 memory/(factor+1)); 626 } 627 628 protected boolean lessThan(Object a, Object b) { 629 MergeStream msa = (MergeStream)a; 630 MergeStream msb = (MergeStream)b; 631 return comparator.compare(msa.buffer.getData(), 0, msa.keyLength, 632 msb.buffer.getData(), 0, msb.keyLength) < 0; 633 } 634 635 public void merge() throws IOException { 636 Writer writer = new Writer(out, keyClass, valClass); 637 638 while (size() != 0) { 639 MergeStream ms = (MergeStream)top(); 640 DataOutputBuffer buffer = ms.buffer; writer.append(buffer.getData(), 0, buffer.getLength(), ms.keyLength); 642 643 if (ms.next()) { adjustTop(); 645 } else { 646 pop(); ms.in.close(); 648 } 649 } 650 } 651 652 public void close() throws IOException { 653 MergeStream ms; while ((ms = (MergeStream)pop()) != null) { 655 ms.in.close(); 656 } 657 out.close(); } 659 } 660 } 661 662 } 663 | Popular Tags |