KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > mapReduce > TextInputFormat


1 /* Copyright (c) 2005 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.mapReduce;
5
6 import java.io.IOException JavaDoc;
7 import java.io.File JavaDoc;
8
9 import java.util.ArrayList JavaDoc;
10
11 import net.nutch.fs.NutchFileSystem;
12 import net.nutch.fs.NFSDataInputStream;
13
14 import net.nutch.io.Writable;
15 import net.nutch.io.LongWritable;
16 import net.nutch.io.UTF8;
17
18 /** An {@link InputFormat} for plain text files. Files are broken into lines.
19  * Either linefeed or carriage-return are used to signal end of line. Keys are
20  * the position in the file, and values are the line of text.. */

21 public class TextInputFormat implements InputFormat {
22
23   private static final double SPLIT_SLOP = 0.1; // 10% slop
24

25   public Split[] getSplits(NutchFileSystem fs, File JavaDoc[] files, int numSplits)
26     throws IOException JavaDoc {
27
28     long totalSize = 0;
29     for (int i = 0; i < files.length; i++) {
30       totalSize += fs.getLength(files[i]);
31     }
32
33     long bytesPerSplit = totalSize / numSplits;
34     long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP);
35
36     ArrayList JavaDoc splits = new ArrayList JavaDoc(numSplits);
37     for (int i = 0; i < files.length; i++) {
38       File JavaDoc file = files[i];
39       long length = fs.getLength(file);
40
41       long bytesRemaining = length;
42       while (bytesRemaining >= maxPerSplit) {
43         splits.add(new FileSplit(fs, file, length-bytesRemaining,
44                                  bytesPerSplit));
45         bytesRemaining -= bytesPerSplit;
46       }
47
48       if (bytesRemaining != 0) {
49         splits.add(new FileSplit(fs, file, length-bytesRemaining,
50                                  bytesRemaining));
51       }
52     }
53     return (Split[])splits.toArray();
54   }
55
56   public RecordReader getRecordReader(Split s) throws IOException JavaDoc {
57     final FileSplit split = (FileSplit)s;
58
59     final long start = split.getStart();
60     final long end = start + split.getLength();
61
62     // open the file and seek to the start of the split
63
final NFSDataInputStream in =
64       new NFSDataInputStream(split.getFileSystem().open(split.getFile()));
65     in.seek(start);
66     
67     if (start != 0) {
68       while (in.getPos() < end) { // scan to the next newline in the file
69
char c = (char)in.read(); // bug: this assumes eight-bit characters.
70
if (c == '\r' || c == '\n') {
71           break;
72         }
73       }
74     }
75
76     return new RecordReader() {
77
78         /** Keys are longs. */
79         public Writable createKey() { return new LongWritable(); }
80
81         /** Values are lines. */
82         public Writable createValue() { return new UTF8(); }
83
84         /** Read a line. */
85         public boolean next(Writable key, Writable value) throws IOException JavaDoc {
86           long pos = in.getPos();
87           if (pos >= end)
88             return false;
89
90           ((LongWritable)key).set(pos); // key is position
91
((UTF8)value).set(readLine(in)); // value is line
92
return true;
93         }
94         
95       };
96   }
97
98   private static String JavaDoc readLine(NFSDataInputStream in) throws IOException JavaDoc {
99     StringBuffer JavaDoc buffer = new StringBuffer JavaDoc();
100     while (true) {
101
102       int b = in.read();
103       if (b == -1)
104         break;
105
106       char c = (char)b; // bug: this assumes eight-bit characters.
107
if (c == '\r' || c == '\n')
108         break;
109
110       buffer.append(c);
111     }
112     
113     return buffer.toString();
114   }
115
116 }
117
118
Popular Tags