KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > ndfs > FSDataset


1 /* Copyright (c) 2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3 package net.nutch.ndfs;
4
5 import java.io.*;
6 import java.util.*;
7
8 import net.nutch.fs.*;
9 import net.nutch.util.*;
10
11 /**************************************************
12  * FSDataset manages a set of data blocks. Each block
13  * has a unique name and an extent on disk.
14  *
15  * @author Mike Cafarella
16  ***************************************************/

17 public class FSDataset implements FSConstants {
18     /**
19      * A node type that can be built into a tree reflecting the
20      * hierarchy of blocks on the local disk.
21      */

22     class FSDir {
23         File dir;
24         FSDir children[];
25
26         /**
27          */

28         public FSDir(File dir) {
29             this.dir = dir;
30             this.children = null;
31         }
32
33         /**
34          */

35         public File getDirName() {
36             return dir;
37         }
38
39         /**
40          */

41         public FSDir[] getChildren() {
42             return children;
43         }
44
45         /**
46          */

47         public void addBlock(Block b, File src) {
48             addBlock(b, src, b.getBlockId(), 0);
49         }
50
51         /**
52          */

53         void addBlock(Block b, File src, long blkid, int depth) {
54             //
55
// Add to the local dir, if no child dirs
56
//
57
if (children == null) {
58                 src.renameTo(new File(dir, b.getBlockName()));
59
60                 //
61
// Test whether this dir's contents should be busted
62
// up into subdirs.
63
//
64

65                 // REMIND - mjc - sometime soon, we'll want this code
66
// working. It prevents the datablocks from all going
67
// into a single huge directory.
68
/**
69                 File localFiles[] = dir.listFiles();
70                 if (localFiles.length == 16) {
71                     //
72                     // Create all the necessary subdirs
73                     //
74                     this.children = new FSDir[16];
75                     for (int i = 0; i < children.length; i++) {
76                         String str = Integer.toBinaryString(i);
77                         try {
78                             File subdir = new File(dir, "dir_" + str);
79                             subdir.mkdir();
80                             children[i] = new FSDir(subdir);
81                         } catch (StringIndexOutOfBoundsException excep) {
82                             excep.printStackTrace();
83                             System.out.println("Ran into problem when i == " + i + " an str = " + str);
84                         }
85                     }
86
87                     //
88                     // Move existing files into new dirs
89                     //
90                     for (int i = 0; i < localFiles.length; i++) {
91                         Block srcB = new Block(localFiles[i]);
92                         File dst = getBlockFilename(srcB, blkid, depth);
93                         if (!src.renameTo(dst)) {
94                             System.out.println("Unexpected problem in renaming " + src);
95                         }
96                     }
97                 }
98                 **/

99             } else {
100                 // Find subdir
101
children[getHalfByte(blkid, depth)].addBlock(b, src, blkid, depth+1);
102             }
103         }
104
105         /**
106          * Fill in the given blockSet with any child blocks
107          * found at this node.
108          */

109         public void getBlockInfo(TreeSet blockSet) {
110             if (children != null) {
111                 for (int i = 0; i < children.length; i++) {
112                     children[i].getBlockInfo(blockSet);
113                 }
114             }
115
116             File blockFiles[] = dir.listFiles();
117             for (int i = 0; i < blockFiles.length; i++) {
118                 if (Block.isBlockFilename(blockFiles[i])) {
119                     blockSet.add(new Block(blockFiles[i], blockFiles[i].length()));
120                 }
121             }
122         }
123
124         /**
125          * Find the file that corresponds to the given Block
126          */

127         public File getBlockFilename(Block b) {
128             return getBlockFilename(b, b.getBlockId(), 0);
129         }
130
131         /**
132          * Return how many bytes are on disk
133          */

134         public long getTotalUsedBytes() {
135             long total = 0;
136             File blocks[] = dir.listFiles();
137             for (int i = 0; i < blocks.length; i++) {
138                 total += blocks[i].length();
139             }
140             return total;
141         }
142
143         /**
144          * Helper method to find file for a Block
145          */

146         private File getBlockFilename(Block b, long blkid, int depth) {
147             if (children == null) {
148                 return new File(dir, b.getBlockName());
149             } else {
150                 //
151
// Lift the 4 bits starting at depth, going left->right.
152
// That means there are 2^4 possible children, or 16.
153
// The max depth is thus ((len(long) / 4) == 16).
154
//
155
return children[getHalfByte(blkid, depth)].getBlockFilename(b, blkid, depth+1);
156             }
157         }
158
159         /**
160          * Returns a number 0-15, inclusive. Pulls out the right
161          * half-byte from the indicated long.
162          */

163         private int getHalfByte(long blkid, int halfByteIndex) {
164             blkid = blkid >> ((15 - halfByteIndex) * 4);
165             return (int) ((0x000000000000000F) & blkid);
166         }
167     }
168
169     //////////////////////////////////////////////////////
170
//
171
// FSDataSet
172
//
173
//////////////////////////////////////////////////////
174

175     File data = null, tmp = null;
176     long capacity = 0, used = 0, reserved = 0;
177     FSDir dirTree;
178     TreeSet ongoingCreates = new TreeSet();
179
180     /**
181      * An FSDataset has a directory where it loads its data files.
182      */

183     public FSDataset(File dir, long capacity) throws IOException {
184         this.capacity = capacity;
185         this.data = new File(dir, "data");
186         if (! data.exists()) {
187             data.mkdirs();
188         }
189         this.tmp = new File(dir, "tmp");
190         if (tmp.exists()) {
191             FileUtil.fullyDelete(tmp);
192         }
193         this.tmp.mkdirs();
194         this.dirTree = new FSDir(data);
195         this.used = dirTree.getTotalUsedBytes();
196     }
197
198     /**
199      * Return total capacity, used and unused
200      */

201     public long getCapacity() {
202         return capacity;
203     }
204
205     /**
206      * Return how many bytes can still be stored in the FSDataset
207      */

208     public long getRemaining() {
209         long remaining = capacity - used - reserved;
210         if (remaining < 0) {
211             remaining = 0;
212         }
213         return remaining;
214     }
215
216     /**
217      * Find the block's on-disk length
218      */

219     public long getLength(Block b) throws IOException {
220         if (! isValidBlock(b)) {
221             throw new IOException("Block " + b + " is not valid.");
222         }
223         File f = getFile(b);
224         return f.length();
225     }
226
227     /**
228      * Get a stream of data from the indicated block.
229      */

230     public InputStream getBlockData(Block b) throws IOException {
231         if (! isValidBlock(b)) {
232             throw new IOException("Block " + b + " is not valid.");
233         }
234         return new FileInputStream(getFile(b));
235     }
236
237     /**
238      * A Block b will be coming soon!
239      */

240     public boolean startBlock(Block b) throws IOException {
241         //
242
// Make sure the block isn't 'valid'
243
//
244
if (isValidBlock(b)) {
245             throw new IOException("Block " + b + " is valid, and cannot be created.");
246         }
247         return true;
248     }
249
250     /**
251      * Start writing to a block file
252      */

253     public OutputStream writeToBlock(Block b) throws IOException {
254         //
255
// Make sure the block isn't a valid one - we're still creating it!
256
//
257
if (isValidBlock(b)) {
258             throw new IOException("Block " + b + " is valid, and cannot be written to.");
259         }
260
261         //
262
// Serialize access to /tmp, and check if file already there.
263
//
264
File f = null;
265         synchronized (ongoingCreates) {
266             //
267
// Is it already in the create process?
268
//
269
if (ongoingCreates.contains(b)) {
270                 throw new IOException("Block " + b + " has already been started (though not completed), and thus cannot be created.");
271             }
272
273             //
274
// Check if we have too little space
275
//
276
if (capacity - used - reserved < BLOCK_SIZE) {
277                 throw new IOException("Insufficient space for an additional block");
278             }
279
280             //
281
// OK, all's well. Register the create, adjust
282
// 'reserved' size, & create file
283
//
284
ongoingCreates.add(b);
285             reserved += BLOCK_SIZE;
286             f = getTmpFile(b);
287
288             if (f.exists()) {
289                 throw new IOException("Unexpected problem in startBlock() for " + b + ". File " + f + " should not be present, but is.");
290             }
291         }
292
293         //
294
// Create the zero-length temp file
295
//
296
if (!f.createNewFile()) {
297             throw new IOException("Unexpected problem in startBlock() for " + b + ". File " + f + " should be creatable, but is already present.");
298         }
299
300         //
301
// Finally, allow a writer to the block file
302
// REMIND - mjc - make this a filter stream that enforces a max
303
// block size, so clients can't go crazy
304
//
305
return new FileOutputStream(f);
306     }
307
308     //
309
// REMIND - mjc - eventually we should have a timeout system
310
// in place to clean up block files left by abandoned clients.
311
// We should have some timer in place, so that if a blockfile
312
// is created but non-valid, and has been idle for >48 hours,
313
// we can GC it safely.
314
//
315

316     /**
317      * Complete the block write!
318      */

319     public void finalizeBlock(Block b) throws IOException {
320         File f = getTmpFile(b);
321         if (! f.exists()) {
322             throw new IOException("No temporary file " + f + " for block " + b);
323         }
324         
325         synchronized (ongoingCreates) {
326             //
327
// Make sure still registered as ongoing
328
//
329
if (! ongoingCreates.contains(b)) {
330                 throw new IOException("Tried to finalize block " + b + ", but not in ongoingCreates table");
331             }
332
333             long finalLen = f.length();
334             b.setNumBytes(finalLen);
335
336             //
337
// Move the file
338
// (REMIND - mjc - shame to move the file within a synch
339
// section! Maybe remove this?)
340
//
341
dirTree.addBlock(b, f);
342
343             //
344
// Done, so deregister from ongoingCreates
345
//
346
if (! ongoingCreates.remove(b)) {
347                 throw new IOException("Tried to finalize block " + b + ", but could not find it in ongoingCreates after file-move!");
348             }
349             reserved -= BLOCK_SIZE;
350             used += finalLen;
351         }
352     }
353
354     /**
355      * Return a table of block data
356      */

357     public Block[] getBlockReport() {
358         TreeSet blockSet = new TreeSet();
359         dirTree.getBlockInfo(blockSet);
360         Block blockTable[] = new Block[blockSet.size()];
361         int i = 0;
362         for (Iterator it = blockSet.iterator(); it.hasNext(); i++) {
363             blockTable[i] = (Block) it.next();
364         }
365         return blockTable;
366     }
367
368     /**
369      * Check whether the given block is a valid one.
370      */

371     public boolean isValidBlock(Block b) {
372         File f = getFile(b);
373         if (f.exists()) {
374             return true;
375         } else {
376             return false;
377         }
378     }
379
380     /**
381      * We're informed that a block is no longer valid. We
382      * could lazily garbage-collect the block, but why bother?
383      * just get rid of it.
384      */

385     public void invalidate(Block invalidBlks[]) throws IOException {
386         for (int i = 0; i < invalidBlks.length; i++) {
387             File f = getFile(invalidBlks[i]);
388
389             long len = f.length();
390             if (!f.delete()) {
391                 throw new IOException("Unexpected error trying to delete block " + invalidBlks[i] + " at file " + f);
392             }
393             used -= len;
394         }
395     }
396
397     /**
398      * Turn the block identifier into a filename.
399      */

400     File getFile(Block b) {
401         // REMIND - mjc - should cache this result for performance
402
return dirTree.getBlockFilename(b);
403     }
404
405     /**
406      * Get the temp file, if this block is still being created.
407      */

408     File getTmpFile(Block b) {
409         // REMIND - mjc - should cache this result for performance
410
return new File(tmp, b.getBlockName());
411     }
412 }
413
Popular Tags