KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > ndfs > FSDirectory


1 /* Copyright (c) 2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3 package net.nutch.ndfs;
4
5 import net.nutch.io.*;
6
7 import java.io.*;
8 import java.util.*;
9
10 /*************************************************
11  * FSDirectory stores the filesystem directory state.
12  * It handles writing/loading values to disk, and logging
13  * changes as we go.
14  *
15  * It keeps the filename->blockset mapping always-current
16  * and logged to disk.
17  *
18  * @author Mike Cafarella
19  *************************************************/

20 public class FSDirectory implements FSConstants {
21     static String JavaDoc FS_IMAGE = "fsimage";
22     static String JavaDoc NEW_FS_IMAGE = "fsimage.new";
23     static String JavaDoc OLD_FS_IMAGE = "fsimage.old";
24
25     private static final byte OP_ADD = 0;
26     private static final byte OP_RENAME = 1;
27     private static final byte OP_DELETE = 2;
28     private static final byte OP_MKDIR = 3;
29
30     /******************************************************
31      * We keep an in-memory representation of the file/block
32      * hierarchy.
33      ******************************************************/

34     class INode {
35         public String JavaDoc name;
36         public INode parent;
37         public Vector children = new Vector();
38         public Block blocks[];
39
40         /**
41          */

42         INode(String JavaDoc name, INode parent, Block blocks[]) {
43             this.name = name;
44             this.parent = parent;
45             this.blocks = blocks;
46         }
47
48         /**
49          */

50         INode getNode(String JavaDoc target) {
51             if (! target.startsWith("/")) {
52                 return null;
53             }
54
55             if (parent == null) {
56                 if ("/".equals(target)) {
57                     return this;
58                 } else {
59                     // Check with children
60
for (Iterator it = children.iterator(); it.hasNext(); ) {
61                         INode child = (INode) it.next();
62                         INode result = child.getNode(target);
63                         if (result != null) {
64                             return result;
65                         }
66                     }
67                 }
68             } else {
69                 // Strip the leading slash
70
if (target.length() > 1) {
71                     target = target.substring(1);
72                 }
73
74                 // Check if it's the current node
75
if (name.equals(target)) {
76                     return this;
77                 }
78
79                 // Get the chunk up to the next slash
80
String JavaDoc curComponent, remainder;
81                 int slash = target.indexOf('/');
82                 if (slash < 0) {
83                     return null;
84                 } else {
85                     curComponent = target.substring(0, slash);
86                     remainder = target.substring(slash);
87                 }
88
89                 // Make sure we're on the right track
90
if (! name.equals(curComponent)) {
91                     return null;
92                 }
93
94                 // Check with children
95
for (Iterator it = children.iterator(); it.hasNext(); ) {
96                     INode child = (INode) it.next();
97                     INode result = child.getNode(remainder);
98                     if (result != null) {
99                         return result;
100                     }
101                 }
102             }
103             return null;
104         }
105
106         /**
107          */

108         INode addNode(String JavaDoc target, Block blocks[]) {
109             if (getNode(target) != null) {
110                 return null;
111             } else {
112                 String JavaDoc parentName = new File(target).getParent();
113                 if (parentName == null) {
114                     return null;
115                 }
116
117                 INode parentNode = getNode(parentName);
118                 if (parentNode == null) {
119                     return null;
120                 } else {
121                     String JavaDoc targetName = new File(target).getName();
122                     INode newItem = new INode(targetName, parentNode, blocks);
123                     parentNode.children.add(newItem);
124                     return newItem;
125                 }
126             }
127         }
128
129         /**
130          */

131         INode removeNode(String JavaDoc target) {
132             INode targetNode = getNode(target);
133             if (targetNode == null) {
134                 return null;
135             } else {
136                 targetNode.parent.children.remove(targetNode);
137                 return targetNode;
138             }
139         }
140
141         /**
142          */

143         int numItemsInTree() {
144             int total = 0;
145             for (Iterator it = children.iterator(); it.hasNext(); ) {
146                 INode child = (INode) it.next();
147                 total += child.numItemsInTree();
148             }
149             return total + 1;
150         }
151
152         /**
153          */

154         String JavaDoc computeName() {
155             if (parent != null) {
156                 return parent.computeName() + "/" + name;
157             } else {
158                 return name;
159             }
160         }
161
162         /**
163          */

164         long computeFileLength() {
165             long total = 0;
166             if (blocks != null) {
167                 for (int i = 0; i < blocks.length; i++) {
168                     total += blocks[i].getNumBytes();
169                 }
170             }
171             return total;
172         }
173
174         /**
175          */

176         long computeContentsLength() {
177             long total = computeFileLength();
178             for (Iterator it = children.iterator(); it.hasNext(); ) {
179                 INode child = (INode) it.next();
180                 total += child.computeContentsLength();
181             }
182             return total;
183         }
184
185         /**
186          */

187         void listContents(Vector v) {
188             if (parent != null && blocks != null) {
189                 v.add(this);
190             }
191
192             for (Iterator it = children.iterator(); it.hasNext(); ) {
193                 INode child = (INode) it.next();
194                 v.add(child);
195             }
196         }
197
198         /**
199          */

200         void saveImage(String JavaDoc parentPrefix, DataOutputStream out) throws IOException {
201             String JavaDoc fullName = "";
202             if (parent != null) {
203                 fullName = parentPrefix + "/" + name;
204                 new UTF8(fullName).write(out);
205                 if (blocks == null) {
206                     out.writeInt(0);
207                 } else {
208                     out.writeInt(blocks.length);
209                     for (int i = 0; i < blocks.length; i++) {
210                         blocks[i].write(out);
211                     }
212                 }
213             }
214             for (Iterator it = children.iterator(); it.hasNext(); ) {
215                 INode child = (INode) it.next();
216                 child.saveImage(fullName, out);
217             }
218         }
219     }
220
221     INode rootDir = new INode("", null, null);
222     TreeSet activeBlocks = new TreeSet();
223     TreeMap activeLocks = new TreeMap();
224     DataOutputStream editlog = null;
225     boolean ready = false;
226
227     /**
228      * Create a FileSystem directory, and load its info
229      * from the indicated place.
230      */

231     public FSDirectory(File dir) throws IOException {
232         File fullimage = new File(dir, "image");
233         if (! fullimage.exists()) {
234             fullimage.mkdirs();
235         }
236         File edits = new File(dir, "edits");
237         if (loadFSImage(fullimage, edits)) {
238             saveFSImage(fullimage, edits);
239         }
240
241         synchronized (this) {
242             this.ready = true;
243             this.notifyAll();
244             this.editlog = new DataOutputStream(new FileOutputStream(edits));
245         }
246     }
247
248     /**
249      * Shutdown the filestore
250      */

251     public void close() throws IOException {
252         editlog.close();
253     }
254
255     /**
256      * Block until the object is ready to be used.
257      */

258     void waitForReady() {
259         if (! ready) {
260             synchronized (this) {
261                 while (!ready) {
262                     try {
263                         this.wait(5000);
264                     } catch (InterruptedException JavaDoc ie) {
265                     }
266                 }
267             }
268         }
269     }
270
271     /**
272      * Load in the filesystem image. It's a big list of
273      * filenames and blocks. Return whether we should
274      * "re-save" and consolidate the edit-logs
275      */

276     boolean loadFSImage(File fsdir, File edits) throws IOException {
277         //
278
// Atomic move sequence, to recover from interrupted save
279
//
280
File curFile = new File(fsdir, FS_IMAGE);
281         File newFile = new File(fsdir, NEW_FS_IMAGE);
282         File oldFile = new File(fsdir, OLD_FS_IMAGE);
283
284         // Maybe we were interrupted between 2 and 4
285
if (oldFile.exists() && curFile.exists()) {
286             oldFile.delete();
287             if (edits.exists()) {
288                 edits.delete();
289             }
290         } else if (oldFile.exists() && newFile.exists()) {
291             // Or maybe between 1 and 2
292
newFile.renameTo(curFile);
293             oldFile.delete();
294         } else if (curFile.exists() && newFile.exists()) {
295             // Or else before stage 1, in which case we lose the edits
296
newFile.delete();
297         }
298
299         //
300
// Load in bits
301
//
302
if (curFile.exists()) {
303             DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile)));
304             try {
305                 int numFiles = in.readInt();
306                 for (int i = 0; i < numFiles; i++) {
307                     UTF8 name = new UTF8();
308                     name.readFields(in);
309                     int numBlocks = in.readInt();
310                     if (numBlocks == 0) {
311                         unprotectedAddFile(name, null);
312                     } else {
313                         Block blocks[] = new Block[numBlocks];
314                         for (int j = 0; j < numBlocks; j++) {
315                             blocks[j] = new Block();
316                             blocks[j].readFields(in);
317                         }
318                         unprotectedAddFile(name, blocks);
319                     }
320                 }
321             } finally {
322                 in.close();
323             }
324         }
325
326         if (edits.exists() && loadFSEdits(edits) > 0) {
327             return true;
328         } else {
329             return false;
330         }
331     }
332
333     /**
334      * Load an edit log, and apply the changes to the in-memory structure
335      *
336      * This is where we apply edits that we've been writing to disk all
337      * along.
338      */

339     int loadFSEdits(File edits) throws IOException {
340         int numEdits = 0;
341
342         if (edits.exists()) {
343             DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(edits)));
344             try {
345                 while (in.available() > 0) {
346                     byte opcode = in.readByte();
347                     numEdits++;
348                     switch (opcode) {
349                     case OP_ADD: {
350                         UTF8 name = new UTF8();
351                         name.readFields(in);
352                         ArrayWritable aw = new ArrayWritable(Block.class);
353                         aw.readFields(in);
354                         Writable writables[] = (Writable[]) aw.get();
355                         Block blocks[] = new Block[writables.length];
356                         System.arraycopy(writables, 0, blocks, 0, blocks.length);
357                         unprotectedAddFile(name, blocks);
358                         break;
359                     }
360                     case OP_RENAME: {
361                         UTF8 src = new UTF8();
362                         UTF8 dst = new UTF8();
363                         src.readFields(in);
364                         dst.readFields(in);
365                         unprotectedRenameTo(src, dst);
366                         break;
367                     }
368                     case OP_DELETE: {
369                         UTF8 src = new UTF8();
370                         src.readFields(in);
371                         unprotectedDelete(src);
372                         break;
373                     }
374                     case OP_MKDIR: {
375                         UTF8 src = new UTF8();
376                         src.readFields(in);
377                         unprotectedMkdir(src.toString());
378                         break;
379                     }
380                     default: {
381                         throw new IOException("Never seen opcode " + opcode);
382                     }
383                     }
384                 }
385             } finally {
386                 in.close();
387             }
388         }
389         return numEdits;
390     }
391
392     /**
393      * Save the contents of the FS image
394      */

395     void saveFSImage(File fullimage, File edits) throws IOException {
396         File curFile = new File(fullimage, FS_IMAGE);
397         File newFile = new File(fullimage, NEW_FS_IMAGE);
398         File oldFile = new File(fullimage, OLD_FS_IMAGE);
399
400         //
401
// Write out data
402
//
403
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile)));
404         try {
405             out.writeInt(rootDir.numItemsInTree() - 1);
406             rootDir.saveImage("", out);
407         } finally {
408             out.close();
409         }
410
411         //
412
// Atomic move sequence
413
//
414
// 1. Move cur to old
415
curFile.renameTo(oldFile);
416         
417         // 2. Move new to cur
418
newFile.renameTo(curFile);
419
420         // 3. Remove pending-edits file (it's been integrated with newFile)
421
edits.delete();
422         
423         // 4. Delete old
424
oldFile.delete();
425     }
426
427     /**
428      * Write an operation to the edit log
429      */

430     void logEdit(byte op, Writable w1, Writable w2) {
431         synchronized (editlog) {
432             try {
433                 editlog.write(op);
434                 if (w1 != null) {
435                     w1.write(editlog);
436                 }
437                 if (w2 != null) {
438                     w2.write(editlog);
439                 }
440             } catch (IOException ie) {
441             }
442         }
443     }
444
445     /**
446      * Add the given filename to the fs.
447      */

448     public boolean addFile(UTF8 src, Block blocks[]) {
449         waitForReady();
450
451         // Always do an implicit mkdirs for parent directory tree
452
mkdirs(new File(src.toString()).getParent());
453         if (unprotectedAddFile(src, blocks)) {
454             logEdit(OP_ADD, src, new ArrayWritable(Block.class, blocks));
455             return true;
456         } else {
457             return false;
458         }
459     }
460     
461     /**
462      */

463     boolean unprotectedAddFile(UTF8 name, Block blocks[]) {
464         synchronized (rootDir) {
465             if (blocks != null) {
466                 // Add file->block mapping
467
for (int i = 0; i < blocks.length; i++) {
468                     activeBlocks.add(blocks[i]);
469                 }
470             }
471             return (rootDir.addNode(name.toString(), blocks) != null);
472         }
473     }
474
475     /**
476      * Change the filename
477      */

478     public boolean renameTo(UTF8 src, UTF8 dst) {
479         waitForReady();
480         if (unprotectedRenameTo(src, dst)) {
481             logEdit(OP_RENAME, src, dst);
482             return true;
483         } else {
484             return false;
485         }
486     }
487
488     /**
489      */

490     boolean unprotectedRenameTo(UTF8 src, UTF8 dst) {
491         synchronized(rootDir) {
492             INode removedNode = rootDir.removeNode(src.toString());
493             if (removedNode == null) {
494                 return false;
495             }
496
497             INode newNode = rootDir.addNode(dst.toString(), removedNode.blocks);
498             if (newNode != null) {
499                 newNode.children = removedNode.children;
500                 for (Iterator it = newNode.children.iterator(); it.hasNext(); ) {
501                     INode child = (INode) it.next();
502                     child.parent = newNode;
503                 }
504                 return true;
505             } else {
506                 removedNode.parent.children.add(removedNode);
507                 return false;
508             }
509         }
510     }
511
512     /**
513      * Remove the file from management, return blocks
514      */

515     public Block[] delete(UTF8 src) {
516         waitForReady();
517         logEdit(OP_DELETE, src, null);
518         return unprotectedDelete(src);
519     }
520
521     /**
522      */

523     Block[] unprotectedDelete(UTF8 src) {
524         synchronized (rootDir) {
525             INode targetNode = rootDir.getNode(src.toString());
526             if (targetNode == null) {
527                 return null;
528             } else {
529                 Vector allBlocks = new Vector();
530                 Vector contents = new Vector();
531                 targetNode.listContents(contents);
532
533                 for (Iterator it = contents.iterator(); it.hasNext(); ) {
534                     INode cur = (INode) it.next();
535                     INode removedNode = rootDir.removeNode(cur.computeName());
536                     if (removedNode != null) {
537                         Block blocks[] = removedNode.blocks;
538                         if (blocks != null) {
539                             for (int i = 0; i < blocks.length; i++) {
540                                 activeBlocks.remove(blocks[i]);
541                                 allBlocks.add(blocks[i]);
542                             }
543                         }
544                     }
545                 }
546                 rootDir.removeNode(src.toString());
547                 return (Block[]) allBlocks.toArray(new Block[0]);
548             }
549         }
550     }
551
552     /**
553      */

554     public int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) {
555         TreeSet holders = (TreeSet) activeLocks.get(src);
556         if (holders == null) {
557             holders = new TreeSet();
558             activeLocks.put(src, holders);
559         }
560         if (exclusive && holders.size() > 0) {
561             return STILL_WAITING;
562         } else {
563             holders.add(holder);
564             return COMPLETE_SUCCESS;
565         }
566     }
567
568     /**
569      */

570     public int releaseLock(UTF8 src, UTF8 holder) {
571         TreeSet holders = (TreeSet) activeLocks.get(src);
572         if (holders != null && holders.contains(holder)) {
573             holders.remove(holder);
574             if (holders.size() == 0) {
575                 activeLocks.remove(src);
576             }
577             return COMPLETE_SUCCESS;
578         } else {
579             return OPERATION_FAILED;
580         }
581     }
582
583     /**
584      * Get a listing of files given path 'src'
585      *
586      * This function is admittedly very inefficient right now. We'll
587      * make it better later.
588      */

589     public NDFSFileInfo[] getListing(UTF8 src) {
590         String JavaDoc srcs = normalizePath(src);
591
592         synchronized (rootDir) {
593             INode targetNode = rootDir.getNode(srcs);
594             if (targetNode == null) {
595                 return null;
596             } else {
597                 Vector contents = new Vector();
598                 targetNode.listContents(contents);
599
600                 NDFSFileInfo listing[] = new NDFSFileInfo[contents.size()];
601                 int i = 0;
602                 for (Iterator it = contents.iterator(); it.hasNext(); i++) {
603                     INode cur = (INode) it.next();
604                     UTF8 curName = new UTF8(cur.computeName());
605                     listing[i] = new NDFSFileInfo(curName, cur.computeFileLength(), cur.computeContentsLength(), isDir(curName));
606                     //listing[i] = new NDFSFileInfo(curName, cur.computeFileLength(), 0, isDir(curName));
607
//listing[i] = new NDFSFileInfo(curName, cur.computeFileLength(), 0, false);
608
}
609                 return listing;
610             }
611         }
612     }
613
614     /**
615      * Get the blocks associated with the file
616      */

617     public Block[] getFile(UTF8 src) {
618         waitForReady();
619         synchronized (rootDir) {
620             INode targetNode = rootDir.getNode(src.toString());
621             if (targetNode == null) {
622                 return null;
623             } else {
624                 return targetNode.blocks;
625             }
626         }
627     }
628
629     /**
630      * Check whether the filepath could be created
631      */

632     public boolean isValidToCreate(UTF8 src) {
633         String JavaDoc srcs = normalizePath(src);
634         synchronized (rootDir) {
635             if (srcs.startsWith("/") &&
636                 ! srcs.endsWith("/") &&
637                 rootDir.getNode(srcs) == null) {
638                 return true;
639             } else {
640                 return false;
641             }
642         }
643     }
644
645     /**
646      * Check whether it's a directory
647      */

648     public boolean isDir(UTF8 src) {
649         synchronized (rootDir) {
650             INode node = rootDir.getNode(normalizePath(src));
651             if (node != null && node.blocks == null) {
652                 return true;
653             } else {
654                 return false;
655             }
656         }
657     }
658
659     /**
660      * Create the given directory and all its parent dirs.
661      */

662     public boolean mkdirs(UTF8 src) {
663         return mkdirs(src.toString());
664     }
665
666     /**
667      * Create directory entries for every item
668      */

669     boolean mkdirs(String JavaDoc src) {
670         src = normalizePath(new UTF8(src));
671
672         // Use this to collect all the dirs we need to construct
673
Vector v = new Vector();
674
675         // The dir itself
676
File f = new File(src);
677         v.add(f.getPath());
678
679         // All its parents
680
while (f.getParent() != null) {
681             f = new File(f.getParent());
682             v.add(f.getPath());
683         }
684
685         // Now go backwards through list of dirs, creating along
686
// the way
687
boolean lastSuccess = false;
688         int numElts = v.size();
689         for (int i = numElts - 1; i >= 0; i--) {
690             String JavaDoc cur = (String JavaDoc) v.elementAt(i);
691             INode inserted = unprotectedMkdir(cur);
692             if (inserted != null) {
693                 logEdit(OP_MKDIR, new UTF8(inserted.computeName()), null);
694                 lastSuccess = true;
695             } else {
696                 lastSuccess = false;
697             }
698         }
699         return lastSuccess;
700     }
701
702     /**
703      */

704     INode unprotectedMkdir(String JavaDoc src) {
705         synchronized (rootDir) {
706             return rootDir.addNode(src, null);
707         }
708     }
709
710     /**
711      */

712     String JavaDoc normalizePath(UTF8 src) {
713         String JavaDoc srcs = src.toString();
714         if (srcs.length() > 1 && srcs.endsWith("/")) {
715             srcs = srcs.substring(0, srcs.length() - 1);
716         }
717         return srcs;
718     }
719
720     /**
721      * Returns whether the given block is one pointed-to by a file.
722      */

723     public boolean isValidBlock(Block b) {
724         synchronized (rootDir) {
725             if (activeBlocks.contains(b)) {
726                 return true;
727             } else {
728                 return false;
729             }
730         }
731     }
732 }
733
Popular Tags