KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > io > SequenceFile


1 /* Copyright (c) 2003-2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.io;
5
6 import java.io.*;
7 import java.util.*;
8 import java.util.logging.*;
9 import java.nio.channels.*;
10 import org.apache.lucene.util.PriorityQueue;
11 import net.nutch.fs.*;
12 import net.nutch.util.*;
13
14 /** Support for flat files of binary key/value pairs. */
15 public class SequenceFile {
16   public static final Logger LOG =
17     LogFormatter.getLogger("net.nutch.io.SequenceFile");
18
19   private SequenceFile() {} // no public ctor
20

21   private static byte[] VERSION = new byte[] {
22     (byte)'S', (byte)'E', (byte)'Q', 1
23   };
24
25   /** Write key/value pairs to a sequence-format file. */
26   public static class Writer {
27     private NFSDataOutputStream out;
28     private DataOutputBuffer buffer = new DataOutputBuffer();
29     private NutchFileSystem nfs = null;
30     private File target = null;
31
32     private Class JavaDoc keyClass;
33     private Class JavaDoc valClass;
34
35     /** Create the named file. */
36     public Writer(NutchFileSystem nfs, String JavaDoc name,
37                   Class JavaDoc keyClass, Class JavaDoc valClass)
38       throws IOException {
39       this.nfs = nfs;
40       this.target = new File(name);
41       if (nfs.exists(target)) {
42         throw new IOException("already exists: " + target);
43       }
44       init(new NFSDataOutputStream(nfs.create(target)),
45            keyClass, valClass);
46     }
47     
48     /** Write to an arbitrary stream using a specified buffer size. */
49     private Writer(NFSDataOutputStream out,
50                    Class JavaDoc keyClass, Class JavaDoc valClass) throws IOException {
51       init(out, keyClass, valClass);
52     }
53     
54     private void init(NFSDataOutputStream out,
55                       Class JavaDoc keyClass, Class JavaDoc valClass) throws IOException {
56       this.out = out;
57       this.out.write(VERSION);
58
59       this.keyClass = keyClass;
60       this.valClass = valClass;
61
62       new UTF8(WritableName.getName(keyClass)).write(this.out);
63       new UTF8(WritableName.getName(valClass)).write(this.out);
64
65       this.out.flush(); // flush header
66
}
67     
68
69     /** Returns the class of keys in this file. */
70     public Class JavaDoc getKeyClass() { return keyClass; }
71
72     /** Returns the class of values in this file. */
73     public Class JavaDoc getValueClass() { return valClass; }
74
75
76     /** Close the file. */
77     public void close() throws IOException {
78       if (out != null) {
79         out.close();
80         out = null;
81       }
82     }
83
84     /** Append a key/value pair. */
85     public void append(Writable key, Writable val) throws IOException {
86       if (key.getClass() != keyClass)
87         throw new IOException("wrong key class: "+key+" is not "+keyClass);
88       if (val.getClass() != valClass)
89         throw new IOException("wrong value class: "+val+" is not "+valClass);
90
91       buffer.reset();
92
93       key.write(buffer);
94       int keyLength = buffer.getLength();
95       if (keyLength == 0)
96         throw new IOException("zero length keys not allowed: " + key);
97
98       val.write(buffer);
99       append(buffer.getData(), 0, buffer.getLength(), keyLength);
100     }
101
102     /** Append a key/value pair. */
103     public void append(byte[] data, int start, int length, int keyLength)
104       throws IOException {
105       if (keyLength == 0)
106         throw new IOException("zero length keys not allowed");
107
108       out.writeInt(length); // total record length
109
out.writeInt(keyLength); // key portion length
110
out.write(data, start, length); // data
111
}
112
113     /** Returns the current length of the output file. */
114     public long getLength() throws IOException {
115       return out.getPos();
116     }
117
118   }
119
120   /** Writes key/value pairs from a sequence-format file. */
121   public static class Reader {
122     private String JavaDoc file;
123     private NFSDataInputStream in;
124     private DataOutputBuffer outBuf = new DataOutputBuffer();
125     private DataInputBuffer inBuf = new DataInputBuffer();
126     private NutchFileSystem nfs = null;
127
128     private Class JavaDoc keyClass;
129     private Class JavaDoc valClass;
130
131     private long end;
132     private int keyLength;
133
134     /** Open the named file. */
135     public Reader(NutchFileSystem nfs, String JavaDoc file) throws IOException {
136       this(nfs, file, NutchConf.getInt("io.file.buffer.size", 4096));
137     }
138
139     private Reader(NutchFileSystem nfs, String JavaDoc name, int bufferSize) throws IOException {
140       this.nfs = nfs;
141       this.file = name;
142       File file = new File(name);
143       this.in = new NFSDataInputStream(nfs.open(file), bufferSize);
144       this.end = nfs.getLength(file);
145       init();
146     }
147     
148     private Reader(NutchFileSystem nfs, String JavaDoc file, int bufferSize, long start, long length)
149       throws IOException {
150       this.nfs = nfs;
151       this.file = file;
152       this.in = new NFSDataInputStream(nfs.open(new File(file)), bufferSize);
153       seek(start);
154       init();
155
156       this.end = in.getPos() + length;
157     }
158     
159     private void init() throws IOException {
160       byte[] version = new byte[VERSION.length];
161       in.readFully(version);
162       if (!Arrays.equals(version, VERSION)) {
163         throw new VersionMismatchException(VERSION[3], version[3]);
164       }
165
166       UTF8 className = new UTF8();
167       
168       className.readFields(in); // read key class name
169
this.keyClass = WritableName.getClass(className.toString());
170       
171       className.readFields(in); // read val class name
172
this.valClass = WritableName.getClass(className.toString());
173     }
174     
175     /** Close the file. */
176     public synchronized void close() throws IOException {
177       in.close();
178     }
179
180     /** Returns the class of keys in this file. */
181     public Class JavaDoc getKeyClass() { return keyClass; }
182
183     /** Returns the class of values in this file. */
184     public Class JavaDoc getValueClass() { return valClass; }
185
186     /** Read the next key in the file into <code>key</code>, skipping its
187      * value. True if another entry exists, and false at end of file. */

188     public synchronized boolean next(Writable key) throws IOException {
189       if (key.getClass() != keyClass)
190         throw new IOException("wrong key class: "+key+" is not "+keyClass);
191
192       outBuf.reset();
193
194       keyLength = next(outBuf);
195       if (keyLength < 0)
196         return false;
197
198       inBuf.reset(outBuf.getData(), outBuf.getLength());
199
200       key.readFields(inBuf);
201       if (inBuf.getPosition() != keyLength)
202         throw new IOException(key + " read " + inBuf.getPosition()
203                               + " bytes, should read " + keyLength);
204
205       return true;
206     }
207
208     /** Read the next key/value pair in the file into <code>key</code> and
209      * <code>val</code>. Returns true if such a pair exists and false when at
210      * end of file */

211     public synchronized boolean next(Writable key, Writable val)
212       throws IOException {
213       if (val.getClass() != valClass)
214         throw new IOException("wrong value class: "+val+" is not "+valClass);
215
216       boolean more = next(key);
217
218       if (more) {
219         val.readFields(inBuf);
220         if (inBuf.getPosition() != outBuf.getLength())
221           throw new IOException(val+" read "+(inBuf.getPosition()-keyLength)
222                                 + " bytes, should read " +
223                                 (outBuf.getLength()-keyLength));
224       }
225
226       return more;
227     }
228
229     /** Read the next key/value pair in the file into <code>buffer</code>.
230      * Returns the length of the key read, or -1 if at end of file. The length
231      * of the value may be computed by calling buffer.getLength() before and
232      * after calls to this method. */

233     public synchronized int next(DataOutputBuffer buffer) throws IOException {
234       if (in.getPos() >= end)
235         return -1;
236
237       int length = in.readInt();
238       int keyLength = in.readInt();
239       buffer.write(in, length);
240       return keyLength;
241     }
242
243     /** Set the current byte position in the input file. */
244     public synchronized void seek(long position) throws IOException {
245       in.seek(position);
246     }
247
248     /** Return the current byte position in the input file. */
249     public synchronized long getPosition() throws IOException {
250       return in.getPos();
251     }
252
253     /** Returns the name of the file. */
254     public String JavaDoc toString() {
255       return file;
256     }
257
258   }
259
260   /** Sorts key/value pairs in a sequence-format file.
261    *
262    * <p>For best performance, applications should make sure that the {@link
263    * Writable#readFields(DataInput)} implementation of their keys is
264    * very efficient. In particular, it should avoid allocating memory.
265    */

266   public static class Sorter {
267     private static final int FACTOR = NutchConf.getInt("io.sort.factor", 100);
268     private static final int MEGABYTES = NutchConf.getInt("io.sort.mb", 100);
269
270     private WritableComparator comparator;
271
272     private String JavaDoc inFile; // when sorting
273
private String JavaDoc[] inFiles; // when merging
274

275     private String JavaDoc outFile;
276
277     private int memory = MEGABYTES * 1024*1024; // bytes
278
private int factor = FACTOR; // merged per pass
279

280     private NutchFileSystem nfs = null;
281
282     private Class JavaDoc keyClass;
283     private Class JavaDoc valClass;
284
285     /** Sort and merge files containing the named classes. */
286     public Sorter(NutchFileSystem nfs, Class JavaDoc keyClass, Class JavaDoc valClass) {
287       this(nfs, new WritableComparator(keyClass), valClass);
288     }
289
290     /** Sort and merge using an arbitrary {@link WritableComparator}. */
291     public Sorter(NutchFileSystem nfs, WritableComparator comparator, Class JavaDoc valClass) {
292       this.nfs = nfs;
293       this.comparator = comparator;
294       this.keyClass = comparator.getKeyClass();
295       this.valClass = valClass;
296     }
297
298     /** Set the number of streams to merge at once.*/
299     public void setFactor(int factor) { this.factor = factor; }
300
301     /** Get the number of streams to merge at once.*/
302     public int getFactor() { return factor; }
303
304     /** Set the total amount of buffer memory, in bytes.*/
305     public void setMemory(int memory) { this.memory = memory; }
306
307     /** Get the total amount of buffer memory, in bytes.*/
308     public int getMemory() { return memory; }
309
310     /** Perform a file sort.*/
311     public void sort(String JavaDoc inFile, String JavaDoc outFile) throws IOException {
312       if (nfs.exists(new File(outFile))) {
313         throw new IOException("already exists: " + outFile);
314       }
315
316       this.inFile = inFile;
317       this.outFile = outFile;
318
319       int segments = sortPass();
320       int pass = 1;
321       while (segments > 1) {
322         segments = mergePass(pass, segments <= factor);
323         pass++;
324       }
325     }
326
327     private int sortPass() throws IOException {
328       LOG.fine("running sort pass");
329       SortPass sortPass = new SortPass(); // make the SortPass
330
try {
331         return sortPass.run(); // run it
332
} finally {
333         sortPass.close(); // close it
334
}
335     }
336
337     private class SortPass {
338       private int limit = memory/4;
339       private DataOutputBuffer buffer = new DataOutputBuffer();
340       private byte[] rawBuffer;
341
342       private int[] starts = new int[1024];
343       private int[] pointers = new int[starts.length];
344       private int[] pointersCopy = new int[starts.length];
345       private int[] keyLengths = new int[starts.length];
346       private int[] lengths = new int[starts.length];
347       
348       private Reader in;
349       private NFSDataOutputStream out;
350         private String JavaDoc outName;
351
352       public SortPass() throws IOException {
353         in = new Reader(nfs, inFile);
354       }
355       
356       public int run() throws IOException {
357         int segments = 0;
358         boolean atEof = false;
359         while (!atEof) {
360           int count = 0;
361           buffer.reset();
362           while (!atEof && buffer.getLength() < limit) {
363
364             int start = buffer.getLength(); // read an entry into buffer
365
int keyLength = in.next(buffer);
366             int length = buffer.getLength() - start;
367
368             if (keyLength == -1) {
369               atEof = true;
370               break;
371             }
372
373             if (count == starts.length)
374               grow();
375
376             starts[count] = start; // update pointers
377
pointers[count] = count;
378             lengths[count] = length;
379             keyLengths[count] = keyLength;
380
381             count++;
382           }
383
384           // buffer is full -- sort & flush it
385
LOG.finer("flushing segment " + segments);
386           rawBuffer = buffer.getData();
387           sort(count);
388           flush(count, segments==0 && atEof);
389           segments++;
390         }
391         return segments;
392       }
393
394       public void close() throws IOException {
395         in.close();
396
397         if (out != null) {
398           out.close();
399         }
400       }
401
402       private void grow() {
403         int newLength = starts.length * 3 / 2;
404         starts = grow(starts, newLength);
405         pointers = grow(pointers, newLength);
406         pointersCopy = new int[newLength];
407         keyLengths = grow(keyLengths, newLength);
408         lengths = grow(lengths, newLength);
409       }
410
411       private int[] grow(int[] old, int newLength) {
412         int[] result = new int[newLength];
413         System.arraycopy(old, 0, result, 0, old.length);
414         return result;
415       }
416
417       private void flush(int count, boolean done) throws IOException {
418         if (out == null) {
419           outName = done ? outFile : outFile+".0";
420           out = new NFSDataOutputStream(nfs.create(new File(outName)));
421         }
422
423         if (!done) { // an intermediate file
424
long length = buffer.getLength() + count*8;
425           out.writeLong(length); // write size
426
}
427
428         Writer writer = new Writer(out, keyClass, valClass);
429
430         for (int i = 0; i < count; i++) { // write in sorted order
431
int p = pointers[i];
432           writer.append(rawBuffer, starts[p], lengths[p], keyLengths[p]);
433         }
434       }
435
436       private void sort(int count) {
437         System.arraycopy(pointers, 0, pointersCopy, 0, count);
438         mergeSort(pointersCopy, pointers, 0, count);
439       }
440
441       private int compare(int i, int j) {
442         return comparator.compare(rawBuffer, starts[i], keyLengths[i],
443                                   rawBuffer, starts[j], keyLengths[j]);
444       }
445
446       private void mergeSort(int src[], int dest[], int low, int high) {
447         int length = high - low;
448
449         // Insertion sort on smallest arrays
450
if (length < 7) {
451           for (int i=low; i<high; i++)
452             for (int j=i; j>low && compare(dest[j-1], dest[j])>0; j--)
453               swap(dest, j, j-1);
454           return;
455         }
456
457         // Recursively sort halves of dest into src
458
int mid = (low + high) >> 1;
459         mergeSort(dest, src, low, mid);
460         mergeSort(dest, src, mid, high);
461
462         // If list is already sorted, just copy from src to dest. This is an
463
// optimization that results in faster sorts for nearly ordered lists.
464
if (compare(src[mid-1], src[mid]) <= 0) {
465           System.arraycopy(src, low, dest, low, length);
466           return;
467         }
468
469         // Merge sorted halves (now in src) into dest
470
for(int i = low, p = low, q = mid; i < high; i++) {
471           if (q>=high || p<mid && compare(src[p], src[q]) <= 0)
472             dest[i] = src[p++];
473           else
474             dest[i] = src[q++];
475         }
476       }
477
478       private void swap(int x[], int a, int b) {
479     int t = x[a];
480     x[a] = x[b];
481     x[b] = t;
482       }
483     }
484
485     private int mergePass(int pass, boolean last) throws IOException {
486       LOG.fine("running merge pass=" + pass);
487       MergePass mergePass = new MergePass(pass, last);
488       try { // make a merge pass
489
return mergePass.run(); // run it
490
} finally {
491         mergePass.close(); // close it
492
}
493     }
494
495     private class MergePass {
496       private int pass;
497       private boolean last;
498
499       private MergeQueue queue;
500       private NFSDataInputStream in;
501       private String JavaDoc inName;
502
503       public MergePass(int pass, boolean last) throws IOException {
504         this.pass = pass;
505         this.last = last;
506
507         this.queue = new MergeQueue(factor, last ? outFile : outFile+"."+pass);
508
509         this.inName = outFile+"."+(pass-1);
510         this.in = new NFSDataInputStream(nfs.open(new File(inName)));
511       }
512
513       public void close() throws IOException {
514         in.close(); // close and delete input
515
nfs.delete(new File(inName));
516
517         queue.close(); // close queue
518
}
519
520       public int run() throws IOException {
521         int segments = 0;
522         long end = nfs.getLength(new File(inName));
523
524         while (in.getPos() < end) {
525           LOG.finer("merging segment " + segments);
526           long totalLength = 0;
527           while (in.getPos() < end && queue.size() < factor) {
528             long length = in.readLong();
529             totalLength += length;
530             Reader reader = new Reader(nfs, inName, memory/(factor+1),
531                                        in.getPos(), length);
532             MergeStream ms = new MergeStream(reader); // add segment to queue
533
if (ms.next()) {
534               queue.put(ms);
535             }
536             in.seek(reader.end);
537           }
538
539           if (!last) // intermediate file
540
queue.out.writeLong(totalLength); // write sizes
541

542           queue.merge(); // do a merge
543

544           segments++;
545         }
546
547         return segments;
548       }
549     }
550
551     /** Merge the provided files.*/
552     public void merge(String JavaDoc[] inFiles, String JavaDoc outFile) throws IOException {
553       this.inFiles = inFiles;
554       this.outFile = outFile;
555       this.factor = inFiles.length;
556
557       if (new File(outFile).exists()) {
558         throw new IOException("already exists: " + outFile);
559       }
560
561       MergeFiles mergeFiles = new MergeFiles();
562       try { // make a merge pass
563
mergeFiles.run(); // run it
564
} finally {
565         mergeFiles.close(); // close it
566
}
567
568     }
569
570     private class MergeFiles {
571       private MergeQueue queue;
572
573       public MergeFiles() throws IOException {
574         this.queue = new MergeQueue(factor, outFile);
575       }
576
577       public void close() throws IOException {
578         queue.close();
579       }
580
581       public void run() throws IOException {
582         LOG.finer("merging files=" + inFiles.length);
583         for (int i = 0; i < inFiles.length; i++) {
584           String JavaDoc inFile = inFiles[i];
585           MergeStream ms =
586             new MergeStream(new Reader(nfs, inFile, memory/(factor+1)));
587           if (ms.next())
588             queue.put(ms);
589         }
590
591         queue.merge();
592       }
593     }
594
595     private class MergeStream {
596       private Reader in;
597
598       private DataOutputBuffer buffer = new DataOutputBuffer();
599       private int keyLength;
600       
601       public MergeStream(Reader reader) throws IOException {
602         if (reader.keyClass != keyClass)
603           throw new IOException("wrong key class: " + reader.getKeyClass() +
604                                 " is not " + keyClass);
605         if (reader.valClass != valClass)
606           throw new IOException("wrong value class: "+reader.getValueClass()+
607                                 " is not " + valClass);
608         this.in = reader;
609       }
610
611       public boolean next() throws IOException {
612         buffer.reset();
613         keyLength = in.next(buffer);
614         return keyLength >= 0;
615       }
616     }
617
618     private class MergeQueue extends PriorityQueue {
619       private NFSDataOutputStream out;
620
621       public MergeQueue(int size, String JavaDoc outName) throws IOException {
622         initialize(size);
623         this.out =
624           new NFSDataOutputStream(nfs.create(new File(outName)),
625                                   memory/(factor+1));
626       }
627
628       protected boolean lessThan(Object JavaDoc a, Object JavaDoc b) {
629         MergeStream msa = (MergeStream)a;
630         MergeStream msb = (MergeStream)b;
631         return comparator.compare(msa.buffer.getData(), 0, msa.keyLength,
632                                   msb.buffer.getData(), 0, msb.keyLength) < 0;
633       }
634
635       public void merge() throws IOException {
636         Writer writer = new Writer(out, keyClass, valClass);
637
638         while (size() != 0) {
639           MergeStream ms = (MergeStream)top();
640           DataOutputBuffer buffer = ms.buffer; // write top entry
641
writer.append(buffer.getData(), 0, buffer.getLength(), ms.keyLength);
642           
643           if (ms.next()) { // has another entry
644
adjustTop();
645           } else {
646             pop(); // done with this file
647
ms.in.close();
648           }
649         }
650       }
651
652       public void close() throws IOException {
653         MergeStream ms; // close inputs
654
while ((ms = (MergeStream)pop()) != null) {
655           ms.in.close();
656         }
657         out.close(); // close output
658
}
659     }
660   }
661
662 }
663
Popular Tags