1 2 3 4 package net.nutch.io; 5 6 import java.io.*; 7 import java.util.Arrays ; 8 import net.nutch.fs.*; 9 import net.nutch.util.*; 10 11 27 public class MapFile { 28 29 public static final String INDEX_FILE_NAME = "index"; 30 31 32 public static final String DATA_FILE_NAME = "data"; 33 34 protected MapFile() {} 36 37 public static class Writer { 38 private SequenceFile.Writer data; 39 private SequenceFile.Writer index; 40 41 private int indexInterval = 128; 42 43 private long size; 44 private LongWritable position = new LongWritable(); 45 46 private WritableComparator comparator; 48 private DataInputBuffer inBuf = new DataInputBuffer(); 49 private DataOutputBuffer outBuf = new DataOutputBuffer(); 50 private WritableComparable lastKey; 51 52 53 54 public Writer(NutchFileSystem nfs, String dirName, Class keyClass, Class valClass) 55 throws IOException { 56 this(nfs, dirName, new WritableComparator(keyClass), valClass); 57 } 58 59 60 public Writer(NutchFileSystem nfs, String dirName, WritableComparator 61 comparator, Class valClass) throws IOException { 62 this.comparator = comparator; 63 this.lastKey = comparator.newKey(); 64 65 File dir = new File(dirName); 66 if (nfs.exists(dir)) { 67 throw new IOException("already exists: " + dir); 68 } 69 nfs.mkdirs(dir); 70 71 File dataFile = new File(dir, DATA_FILE_NAME); 72 File indexFile = new File(dir, INDEX_FILE_NAME); 73 74 Class keyClass = comparator.getKeyClass(); 75 this.data = 76 new SequenceFile.Writer(nfs, dataFile.getPath(), keyClass, valClass); 77 this.index = 78 new SequenceFile.Writer(nfs, indexFile.getPath(), 79 keyClass, LongWritable.class); 80 } 81 82 83 public int getIndexInterval() { return indexInterval; } 84 85 88 public void setIndexInterval(int interval) { indexInterval = interval; } 89 90 91 public synchronized void close() throws IOException { 92 data.close(); 93 index.close(); 94 } 95 96 98 public synchronized void append(WritableComparable key, Writable val) 99 throws IOException { 100 101 checkKey(key); 102 103 if (size % indexInterval == 0) { position.set(data.getLength()); index.append(key, position); 106 } 107 108 data.append(key, val); size++; 110 } 111 112 private void checkKey(WritableComparable key) throws IOException { 113 if (size != 0 && comparator.compare(lastKey, key) >= 0) 115 throw new IOException("key out of order: "+key+" after "+lastKey); 116 117 outBuf.reset(); 119 key.write(outBuf); 121 inBuf.reset(outBuf.getData(), outBuf.getLength()); 122 lastKey.readFields(inBuf); } 124 125 } 126 127 128 public static class Reader { 129 private WritableComparator comparator; 130 131 private DataOutputBuffer keyBuf = new DataOutputBuffer(); 132 private DataOutputBuffer nextBuf = new DataOutputBuffer(); 133 private int nextKeyLen = -1; 134 private long seekPosition = -1; 135 private int seekIndex = -1; 136 private long firstPosition; 137 138 private WritableComparable getKey; 139 140 private SequenceFile.Reader data; 142 private SequenceFile.Reader index; 143 144 private boolean indexClosed = false; 146 147 private int count = -1; 149 private WritableComparable[] keys; 150 private long[] positions; 151 152 153 public Class getKeyClass() { return data.getKeyClass(); } 154 155 156 public Class getValueClass() { return data.getValueClass(); } 157 158 159 public Reader(NutchFileSystem nfs, String dirName) throws IOException { 160 this(nfs, dirName, null); 161 } 162 163 164 public Reader(NutchFileSystem nfs, String dirName, WritableComparator comparator) 165 throws IOException { 166 File dir = new File(dirName); 167 File dataFile = new File(dir, DATA_FILE_NAME); 168 File indexFile = new File(dir, INDEX_FILE_NAME); 169 170 this.data = new SequenceFile.Reader(nfs, dataFile.getPath()); 172 this.firstPosition = data.getPosition(); 173 174 if (comparator == null) 175 this.comparator = new WritableComparator(data.getKeyClass()); 176 else 177 this.comparator = comparator; 178 179 this.getKey = this.comparator.newKey(); 180 181 this.index = new SequenceFile.Reader(nfs, indexFile.getPath()); 183 } 184 185 private void readIndex() throws IOException { 186 if (this.keys != null) 188 return; 189 this.count = 0; 190 this.keys = new WritableComparable[1024]; 191 this.positions = new long[1024]; 192 try { 193 LongWritable position = new LongWritable(); 194 WritableComparable lastKey = null; 195 while (true) { 196 WritableComparable k = comparator.newKey(); 197 198 if (!index.next(k, position)) 199 break; 200 201 if (lastKey != null && comparator.compare(lastKey, k) >= 0) 203 throw new IOException("key out of order: "+k+" after "+lastKey); 204 lastKey = k; 205 206 if (count == keys.length) { int newLength = (keys.length*3)/2; 208 WritableComparable[] newKeys = new WritableComparable[newLength]; 209 long[] newPositions = new long[newLength]; 210 System.arraycopy(keys, 0, newKeys, 0, count); 211 System.arraycopy(positions, 0, newPositions, 0, count); 212 keys = newKeys; 213 positions = newPositions; 214 } 215 216 keys[count] = k; 217 positions[count] = position.get(); 218 count++; 219 } 220 } catch (EOFException e) { 221 SequenceFile.LOG.warning("Unexpected EOF reading " + index + 222 " at entry #" + count + ". Ignoring."); 223 } finally { 224 indexClosed = true; 225 index.close(); 226 } 227 } 228 229 230 public synchronized void reset() throws IOException { 231 data.seek(firstPosition); 232 } 233 234 238 public synchronized boolean seek(WritableComparable key) 239 throws IOException { 240 readIndex(); keyBuf.reset(); key.write(keyBuf); 243 244 if (seekIndex != -1 && seekIndex+1 < count 246 && comparator.compare(key,keys[seekIndex+1])<0 && comparator.compare(keyBuf.getData(), 0, keyBuf.getLength(), 248 nextBuf.getData(), 0, nextKeyLen) 249 >= 0) { } else { 252 seekIndex = binarySearch(key); 253 if (seekIndex < 0) seekIndex = -seekIndex-2; 255 256 if (seekIndex == -1) seekPosition = firstPosition; else 259 seekPosition = positions[seekIndex]; } 261 data.seek(seekPosition); 262 263 while ((nextKeyLen = data.next(nextBuf.reset())) != -1) { 264 int c = comparator.compare(keyBuf.getData(), 0, keyBuf.getLength(), 265 nextBuf.getData(), 0, nextKeyLen); 266 if (c <= 0) { data.seek(seekPosition); return c == 0; 269 } 270 seekPosition = data.getPosition(); 271 } 272 273 return false; 274 } 275 276 private int binarySearch(WritableComparable key) { 277 int low = 0; 278 int high = count-1; 279 280 while (low <= high) { 281 int mid = (low + high) >> 1; 282 WritableComparable midVal = keys[mid]; 283 int cmp = comparator.compare(midVal, key); 284 285 if (cmp < 0) 286 low = mid + 1; 287 else if (cmp > 0) 288 high = mid - 1; 289 else 290 return mid; } 292 return -(low + 1); } 294 295 298 public synchronized boolean next(WritableComparable key, Writable val) 299 throws IOException { 300 return data.next(key, val); 301 } 302 303 304 public synchronized Writable get(WritableComparable key, Writable val) 305 throws IOException { 306 if (seek(key)) { 307 next(getKey, val); return val; 309 } else 310 return null; 311 } 312 313 314 public synchronized void close() throws IOException { 315 if (! indexClosed) { 316 index.close(); 317 } 318 data.close(); 319 } 320 321 } 322 323 324 public static void rename(NutchFileSystem nfs, String oldName, String newName) 325 throws IOException { 326 File oldDir = new File(oldName); 327 File newDir = new File(newName); 328 if (!nfs.rename(oldDir, newDir)) { 329 throw new IOException("Could not rename " + oldDir + " to " + newDir); 330 } 331 } 332 333 334 public static void delete(NutchFileSystem nfs, String name) throws IOException { 335 File dir = new File(name); 336 File data = new File(dir, DATA_FILE_NAME); 337 File index = new File(dir, INDEX_FILE_NAME); 338 339 nfs.delete(data); 340 nfs.delete(index); 341 nfs.delete(dir); 342 } 343 344 354 public static long fix(NutchFileSystem nfs, File dir, 355 Class keyClass, Class valueClass, boolean dryrun) throws Exception { 356 String dr = (dryrun ? "[DRY RUN ] " : ""); 357 File data = new File(dir, DATA_FILE_NAME); 358 File index = new File(dir, INDEX_FILE_NAME); 359 int indexInterval = 128; 360 if (!nfs.exists(data)) { 361 throw new Exception (dr + "Missing data file in " + dir + ", impossible to fix this."); 363 } 364 if (nfs.exists(index)) { 365 return -1; 367 } 368 SequenceFile.Reader dataReader = new SequenceFile.Reader(nfs, data.toString()); 369 if (!dataReader.getKeyClass().equals(keyClass)) { 370 throw new Exception (dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() + 371 ", got " + dataReader.getKeyClass().getName()); 372 } 373 if (!dataReader.getValueClass().equals(valueClass)) { 374 throw new Exception (dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() + 375 ", got " + dataReader.getValueClass().getName()); 376 } 377 long cnt = 0L; 378 Writable key = (Writable)keyClass.getConstructor(new Class [0]).newInstance(new Object [0]); 379 Writable value = (Writable)valueClass.getConstructor(new Class [0]).newInstance(new Object [0]); 380 SequenceFile.Writer indexWriter = null; 381 if (!dryrun) indexWriter = new SequenceFile.Writer(nfs, index.toString(), keyClass, LongWritable.class); 382 try { 383 long pos = 0L; 384 LongWritable position = new LongWritable(); 385 while(dataReader.next(key, value)) { 386 cnt++; 387 if (cnt % indexInterval == 0) { 388 position.set(pos); 389 if (!dryrun) indexWriter.append(key, position); 390 } 391 pos = dataReader.getPosition(); 392 } 393 } catch(Throwable t) { 394 } 396 dataReader.close(); 397 if (!dryrun) indexWriter.close(); 398 return cnt; 399 } 400 401 402 public static void main(String [] args) throws Exception { 403 String usage = "Usage: MapFile inFile outFile"; 404 405 if (args.length != 2) { 406 System.err.println(usage); 407 System.exit(-1); 408 } 409 410 String in = args[0]; 411 String out = args[1]; 412 413 NutchFileSystem nfs = new LocalFileSystem(); 414 MapFile.Reader reader = new MapFile.Reader(nfs, in); 415 MapFile.Writer writer = 416 new MapFile.Writer(nfs, out, reader.getKeyClass(), reader.getValueClass()); 417 418 WritableComparable key = 419 (WritableComparable)reader.getKeyClass().newInstance(); 420 Writable value = (Writable)reader.getValueClass().newInstance(); 421 422 while (reader.next(key, value)) writer.append(key, value); 424 425 writer.close(); 426 } 427 428 } 429 | Popular Tags |