KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > io > MapFile


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.io;
5
6 import java.io.*;
7 import java.util.Arrays JavaDoc;
8 import net.nutch.fs.*;
9 import net.nutch.util.*;
10
11 /** A file-based map from keys to values.
12  *
13  * <p>A map is a directory containing two files, the <code>data</code> file,
14  * containing all keys and values in the map, and a smaller <code>index</code>
15  * file, containing a fraction of the keys. The fraction is determined by
16  * {@link Writer#getIndexInterval()}.
17  *
18  * <p>The index file is read entirely into memory. Thus key implementations
19  * should try to keep themselves small.
20  *
21  * <p>Map files are created by adding entries in-order. To maintain a large
22  * database, perform updates by copying the previous version of a database and
23  * merging in a sorted change list, to create a new version of the database in
24  * a new file. Sorting large change lists can be done with {@link
25  * SequenceFile.Sorter}.
26  */

27 public class MapFile {
28   /** The name of the index file. */
29   public static final String JavaDoc INDEX_FILE_NAME = "index";
30
31   /** The name of the data file. */
32   public static final String JavaDoc DATA_FILE_NAME = "data";
33
34   protected MapFile() {} // no public ctor
35

36   /** Writes a new map. */
37   public static class Writer {
38     private SequenceFile.Writer data;
39     private SequenceFile.Writer index;
40
41     private int indexInterval = 128;
42
43     private long size;
44     private LongWritable position = new LongWritable();
45
46     // the following fields are used only for checking key order
47
private WritableComparator comparator;
48     private DataInputBuffer inBuf = new DataInputBuffer();
49     private DataOutputBuffer outBuf = new DataOutputBuffer();
50     private WritableComparable lastKey;
51
52
53     /** Create the named map for keys of the named class. */
54     public Writer(NutchFileSystem nfs, String JavaDoc dirName, Class JavaDoc keyClass, Class JavaDoc valClass)
55       throws IOException {
56       this(nfs, dirName, new WritableComparator(keyClass), valClass);
57     }
58
59     /** Create the named map using the named key comparator. */
60     public Writer(NutchFileSystem nfs, String JavaDoc dirName, WritableComparator
61                   comparator, Class JavaDoc valClass) throws IOException {
62       this.comparator = comparator;
63       this.lastKey = comparator.newKey();
64
65       File dir = new File(dirName);
66       if (nfs.exists(dir)) {
67           throw new IOException("already exists: " + dir);
68       }
69       nfs.mkdirs(dir);
70
71       File dataFile = new File(dir, DATA_FILE_NAME);
72       File indexFile = new File(dir, INDEX_FILE_NAME);
73
74       Class JavaDoc keyClass = comparator.getKeyClass();
75       this.data =
76         new SequenceFile.Writer(nfs, dataFile.getPath(), keyClass, valClass);
77       this.index =
78         new SequenceFile.Writer(nfs, indexFile.getPath(),
79                                 keyClass, LongWritable.class);
80     }
81     
82     /** The number of entries that are added before an index entry is added.*/
83     public int getIndexInterval() { return indexInterval; }
84
85     /** Sets the index interval.
86      * @see #getIndexInterval()
87      */

88     public void setIndexInterval(int interval) { indexInterval = interval; }
89
90     /** Close the map. */
91     public synchronized void close() throws IOException {
92       data.close();
93       index.close();
94     }
95
96     /** Append a key/value pair to the map. The key must be strictly greater
97      * than the previous key added to the map. */

98     public synchronized void append(WritableComparable key, Writable val)
99       throws IOException {
100
101       checkKey(key);
102       
103       if (size % indexInterval == 0) { // add an index entry
104
position.set(data.getLength()); // point to current eof
105
index.append(key, position);
106       }
107
108       data.append(key, val); // append key/value to data
109
size++;
110     }
111
112     private void checkKey(WritableComparable key) throws IOException {
113       // check that keys are well-ordered
114
if (size != 0 && comparator.compare(lastKey, key) >= 0)
115         throw new IOException("key out of order: "+key+" after "+lastKey);
116           
117       // update lastKey with a copy of key by writing and reading
118
outBuf.reset();
119       key.write(outBuf); // write new key
120

121       inBuf.reset(outBuf.getData(), outBuf.getLength());
122       lastKey.readFields(inBuf); // read into lastKey
123
}
124
125   }
126   
127   /** Provide access to an existing map. */
128   public static class Reader {
129     private WritableComparator comparator;
130
131     private DataOutputBuffer keyBuf = new DataOutputBuffer();
132     private DataOutputBuffer nextBuf = new DataOutputBuffer();
133     private int nextKeyLen = -1;
134     private long seekPosition = -1;
135     private int seekIndex = -1;
136     private long firstPosition;
137
138     private WritableComparable getKey;
139
140     // the data, on disk
141
private SequenceFile.Reader data;
142     private SequenceFile.Reader index;
143
144     // whether the index Reader was closed
145
private boolean indexClosed = false;
146
147     // the index, in memory
148
private int count = -1;
149     private WritableComparable[] keys;
150     private long[] positions;
151
152     /** Returns the class of keys in this file. */
153     public Class JavaDoc getKeyClass() { return data.getKeyClass(); }
154
155     /** Returns the class of values in this file. */
156     public Class JavaDoc getValueClass() { return data.getValueClass(); }
157
158     /** Construct a map reader for the named map.*/
159     public Reader(NutchFileSystem nfs, String JavaDoc dirName) throws IOException {
160       this(nfs, dirName, null);
161     }
162
163     /** Construct a map reader for the named map using the named comparator.*/
164     public Reader(NutchFileSystem nfs, String JavaDoc dirName, WritableComparator comparator)
165       throws IOException {
166       File dir = new File(dirName);
167       File dataFile = new File(dir, DATA_FILE_NAME);
168       File indexFile = new File(dir, INDEX_FILE_NAME);
169
170       // open the data
171
this.data = new SequenceFile.Reader(nfs, dataFile.getPath());
172       this.firstPosition = data.getPosition();
173
174       if (comparator == null)
175         this.comparator = new WritableComparator(data.getKeyClass());
176       else
177         this.comparator = comparator;
178
179       this.getKey = this.comparator.newKey();
180
181       // open the index
182
this.index = new SequenceFile.Reader(nfs, indexFile.getPath());
183     }
184
185     private void readIndex() throws IOException {
186       // read the index entirely into memory
187
if (this.keys != null)
188         return;
189       this.count = 0;
190       this.keys = new WritableComparable[1024];
191       this.positions = new long[1024];
192       try {
193         LongWritable position = new LongWritable();
194         WritableComparable lastKey = null;
195         while (true) {
196           WritableComparable k = comparator.newKey();
197
198           if (!index.next(k, position))
199             break;
200
201           // check order to make sure comparator is compatible
202
if (lastKey != null && comparator.compare(lastKey, k) >= 0)
203             throw new IOException("key out of order: "+k+" after "+lastKey);
204           lastKey = k;
205           
206           if (count == keys.length) { // time to grow arrays
207
int newLength = (keys.length*3)/2;
208             WritableComparable[] newKeys = new WritableComparable[newLength];
209             long[] newPositions = new long[newLength];
210             System.arraycopy(keys, 0, newKeys, 0, count);
211             System.arraycopy(positions, 0, newPositions, 0, count);
212             keys = newKeys;
213             positions = newPositions;
214           }
215
216           keys[count] = k;
217           positions[count] = position.get();
218           count++;
219         }
220       } catch (EOFException e) {
221         SequenceFile.LOG.warning("Unexpected EOF reading " + index +
222                                  " at entry #" + count + ". Ignoring.");
223       } finally {
224     indexClosed = true;
225         index.close();
226       }
227     }
228
229     /** Re-positions the reader before its first key. */
230     public synchronized void reset() throws IOException {
231       data.seek(firstPosition);
232     }
233
234     /** Positions the reader at the named key, or if none such exists, at the
235      * first entry after the named key. Returns true iff the named key exists
236      * in this map.
237      */

238     public synchronized boolean seek(WritableComparable key)
239       throws IOException {
240       readIndex(); // make sure index is read
241
keyBuf.reset(); // write key to keyBuf
242
key.write(keyBuf);
243
244       if (seekIndex != -1 // seeked before
245
&& seekIndex+1 < count
246           && comparator.compare(key,keys[seekIndex+1])<0 // before next indexed
247
&& comparator.compare(keyBuf.getData(), 0, keyBuf.getLength(),
248                                 nextBuf.getData(), 0, nextKeyLen)
249           >= 0) { // but after last seeked
250
// do nothing
251
} else {
252         seekIndex = binarySearch(key);
253         if (seekIndex < 0) // decode insertion point
254
seekIndex = -seekIndex-2;
255
256         if (seekIndex == -1) // belongs before first entry
257
seekPosition = firstPosition; // use beginning of file
258
else
259           seekPosition = positions[seekIndex]; // else use index
260
}
261       data.seek(seekPosition);
262       
263       while ((nextKeyLen = data.next(nextBuf.reset())) != -1) {
264         int c = comparator.compare(keyBuf.getData(), 0, keyBuf.getLength(),
265                                    nextBuf.getData(), 0, nextKeyLen);
266         if (c <= 0) { // at or beyond desired
267
data.seek(seekPosition); // back off to previous
268
return c == 0;
269         }
270         seekPosition = data.getPosition();
271       }
272
273       return false;
274     }
275
276     private int binarySearch(WritableComparable key) {
277       int low = 0;
278       int high = count-1;
279
280       while (low <= high) {
281         int mid = (low + high) >> 1;
282         WritableComparable midVal = keys[mid];
283         int cmp = comparator.compare(midVal, key);
284
285         if (cmp < 0)
286           low = mid + 1;
287         else if (cmp > 0)
288           high = mid - 1;
289         else
290           return mid; // key found
291
}
292       return -(low + 1); // key not found.
293
}
294
295     /** Read the next key/value pair in the map into <code>key</code> and
296      * <code>val</code>. Returns true if such a pair exists and false when at
297      * the end of the map */

298     public synchronized boolean next(WritableComparable key, Writable val)
299       throws IOException {
300       return data.next(key, val);
301     }
302
303     /** Return the value for the named key, or null if none exists. */
304     public synchronized Writable get(WritableComparable key, Writable val)
305       throws IOException {
306       if (seek(key)) {
307         next(getKey, val); // don't smash key
308
return val;
309       } else
310         return null;
311     }
312
313     /** Close the map. */
314     public synchronized void close() throws IOException {
315       if (! indexClosed) {
316     index.close();
317       }
318       data.close();
319     }
320
321   }
322
323   /** Renames an existing map directory. */
324   public static void rename(NutchFileSystem nfs, String JavaDoc oldName, String JavaDoc newName)
325     throws IOException {
326     File oldDir = new File(oldName);
327     File newDir = new File(newName);
328     if (!nfs.rename(oldDir, newDir)) {
329       throw new IOException("Could not rename " + oldDir + " to " + newDir);
330     }
331   }
332
333   /** Deletes the named map file. */
334   public static void delete(NutchFileSystem nfs, String JavaDoc name) throws IOException {
335     File dir = new File(name);
336     File data = new File(dir, DATA_FILE_NAME);
337     File index = new File(dir, INDEX_FILE_NAME);
338
339     nfs.delete(data);
340     nfs.delete(index);
341     nfs.delete(dir);
342   }
343
344   /**
345    * This method attempts to fix a corrupt MapFile by re-creating its index.
346    * @param nfs filesystem
347    * @param dir directory containing the MapFile data and index
348    * @param keyClass key class (has to be a subclass of Writable)
349    * @param valueClass value class (has to be a subclass of Writable)
350    * @param dryrun do not perform any changes, just report what needs to be done
351    * @return number of valid entries in this MapFile, or -1 if no fixing was needed
352    * @throws Exception
353    */

354   public static long fix(NutchFileSystem nfs, File dir,
355           Class JavaDoc keyClass, Class JavaDoc valueClass, boolean dryrun) throws Exception JavaDoc {
356     String JavaDoc dr = (dryrun ? "[DRY RUN ] " : "");
357     File data = new File(dir, DATA_FILE_NAME);
358     File index = new File(dir, INDEX_FILE_NAME);
359     int indexInterval = 128;
360     if (!nfs.exists(data)) {
361       // there's nothing we can do to fix this!
362
throw new Exception JavaDoc(dr + "Missing data file in " + dir + ", impossible to fix this.");
363     }
364     if (nfs.exists(index)) {
365       // no fixing needed
366
return -1;
367     }
368     SequenceFile.Reader dataReader = new SequenceFile.Reader(nfs, data.toString());
369     if (!dataReader.getKeyClass().equals(keyClass)) {
370       throw new Exception JavaDoc(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
371               ", got " + dataReader.getKeyClass().getName());
372     }
373     if (!dataReader.getValueClass().equals(valueClass)) {
374       throw new Exception JavaDoc(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
375               ", got " + dataReader.getValueClass().getName());
376     }
377     long cnt = 0L;
378     Writable key = (Writable)keyClass.getConstructor(new Class JavaDoc[0]).newInstance(new Object JavaDoc[0]);
379     Writable value = (Writable)valueClass.getConstructor(new Class JavaDoc[0]).newInstance(new Object JavaDoc[0]);
380     SequenceFile.Writer indexWriter = null;
381     if (!dryrun) indexWriter = new SequenceFile.Writer(nfs, index.toString(), keyClass, LongWritable.class);
382     try {
383       long pos = 0L;
384       LongWritable position = new LongWritable();
385       while(dataReader.next(key, value)) {
386         cnt++;
387         if (cnt % indexInterval == 0) {
388           position.set(pos);
389           if (!dryrun) indexWriter.append(key, position);
390         }
391         pos = dataReader.getPosition();
392       }
393     } catch(Throwable JavaDoc t) {
394       // truncated data file. swallow it.
395
}
396     dataReader.close();
397     if (!dryrun) indexWriter.close();
398     return cnt;
399   }
400
401
402   public static void main(String JavaDoc[] args) throws Exception JavaDoc {
403     String JavaDoc usage = "Usage: MapFile inFile outFile";
404       
405     if (args.length != 2) {
406       System.err.println(usage);
407       System.exit(-1);
408     }
409       
410     String JavaDoc in = args[0];
411     String JavaDoc out = args[1];
412
413     NutchFileSystem nfs = new LocalFileSystem();
414     MapFile.Reader reader = new MapFile.Reader(nfs, in);
415     MapFile.Writer writer =
416       new MapFile.Writer(nfs, out, reader.getKeyClass(), reader.getValueClass());
417
418     WritableComparable key =
419       (WritableComparable)reader.getKeyClass().newInstance();
420     Writable value = (Writable)reader.getValueClass().newInstance();
421
422     while (reader.next(key, value)) // copy all entries
423
writer.append(key, value);
424
425     writer.close();
426   }
427
428 }
429
Popular Tags