1 2 3 4 package net.nutch.mapReduce; 5 6 import java.io.IOException ; 7 import java.io.File ; 8 9 import java.util.ArrayList ; 10 11 import net.nutch.fs.NutchFileSystem; 12 import net.nutch.fs.NFSDataInputStream; 13 14 import net.nutch.io.Writable; 15 import net.nutch.io.LongWritable; 16 import net.nutch.io.UTF8; 17 18 21 public class TextInputFormat implements InputFormat { 22 23 private static final double SPLIT_SLOP = 0.1; 25 public Split[] getSplits(NutchFileSystem fs, File [] files, int numSplits) 26 throws IOException { 27 28 long totalSize = 0; 29 for (int i = 0; i < files.length; i++) { 30 totalSize += fs.getLength(files[i]); 31 } 32 33 long bytesPerSplit = totalSize / numSplits; 34 long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP); 35 36 ArrayList splits = new ArrayList (numSplits); 37 for (int i = 0; i < files.length; i++) { 38 File file = files[i]; 39 long length = fs.getLength(file); 40 41 long bytesRemaining = length; 42 while (bytesRemaining >= maxPerSplit) { 43 splits.add(new FileSplit(fs, file, length-bytesRemaining, 44 bytesPerSplit)); 45 bytesRemaining -= bytesPerSplit; 46 } 47 48 if (bytesRemaining != 0) { 49 splits.add(new FileSplit(fs, file, length-bytesRemaining, 50 bytesRemaining)); 51 } 52 } 53 return (Split[])splits.toArray(); 54 } 55 56 public RecordReader getRecordReader(Split s) throws IOException { 57 final FileSplit split = (FileSplit)s; 58 59 final long start = split.getStart(); 60 final long end = start + split.getLength(); 61 62 final NFSDataInputStream in = 64 new NFSDataInputStream(split.getFileSystem().open(split.getFile())); 65 in.seek(start); 66 67 if (start != 0) { 68 while (in.getPos() < end) { char c = (char)in.read(); if (c == '\r' || c == '\n') { 71 break; 72 } 73 } 74 } 75 76 return new RecordReader() { 77 78 79 public Writable createKey() { return new LongWritable(); } 80 81 82 public Writable createValue() { return new UTF8(); } 83 84 85 public boolean next(Writable key, Writable value) throws IOException { 86 long pos = in.getPos(); 87 if (pos >= end) 88 return false; 89 90 ((LongWritable)key).set(pos); ((UTF8)value).set(readLine(in)); return true; 93 } 94 95 }; 96 } 97 98 private static String readLine(NFSDataInputStream in) throws IOException { 99 StringBuffer buffer = new StringBuffer (); 100 while (true) { 101 102 int b = in.read(); 103 if (b == -1) 104 break; 105 106 char c = (char)b; if (c == '\r' || c == '\n') 108 break; 109 110 buffer.append(c); 111 } 112 113 return buffer.toString(); 114 } 115 116 } 117 118 | Popular Tags |