1 2 3 package net.nutch.ndfs; 4 5 import net.nutch.io.*; 6 7 import java.io.*; 8 import java.util.*; 9 10 20 public class FSDirectory implements FSConstants { 21 static String FS_IMAGE = "fsimage"; 22 static String NEW_FS_IMAGE = "fsimage.new"; 23 static String OLD_FS_IMAGE = "fsimage.old"; 24 25 private static final byte OP_ADD = 0; 26 private static final byte OP_RENAME = 1; 27 private static final byte OP_DELETE = 2; 28 private static final byte OP_MKDIR = 3; 29 30 34 class INode { 35 public String name; 36 public INode parent; 37 public Vector children = new Vector(); 38 public Block blocks[]; 39 40 42 INode(String name, INode parent, Block blocks[]) { 43 this.name = name; 44 this.parent = parent; 45 this.blocks = blocks; 46 } 47 48 50 INode getNode(String target) { 51 if (! target.startsWith("/")) { 52 return null; 53 } 54 55 if (parent == null) { 56 if ("/".equals(target)) { 57 return this; 58 } else { 59 for (Iterator it = children.iterator(); it.hasNext(); ) { 61 INode child = (INode) it.next(); 62 INode result = child.getNode(target); 63 if (result != null) { 64 return result; 65 } 66 } 67 } 68 } else { 69 if (target.length() > 1) { 71 target = target.substring(1); 72 } 73 74 if (name.equals(target)) { 76 return this; 77 } 78 79 String curComponent, remainder; 81 int slash = target.indexOf('/'); 82 if (slash < 0) { 83 return null; 84 } else { 85 curComponent = target.substring(0, slash); 86 remainder = target.substring(slash); 87 } 88 89 if (! name.equals(curComponent)) { 91 return null; 92 } 93 94 for (Iterator it = children.iterator(); it.hasNext(); ) { 96 INode child = (INode) it.next(); 97 INode result = child.getNode(remainder); 98 if (result != null) { 99 return result; 100 } 101 } 102 } 103 return null; 104 } 105 106 108 INode addNode(String target, Block blocks[]) { 109 if (getNode(target) != null) { 110 return null; 111 } else { 112 String parentName = new File(target).getParent(); 113 if (parentName == null) { 114 return null; 115 } 116 117 INode parentNode = getNode(parentName); 118 if (parentNode == null) { 119 return null; 120 } else { 121 String targetName = new File(target).getName(); 122 INode newItem = new INode(targetName, parentNode, blocks); 123 parentNode.children.add(newItem); 124 return newItem; 125 } 126 } 127 } 128 129 131 INode removeNode(String target) { 132 INode targetNode = getNode(target); 133 if (targetNode == null) { 134 return null; 135 } else { 136 targetNode.parent.children.remove(targetNode); 137 return targetNode; 138 } 139 } 140 141 143 int numItemsInTree() { 144 int total = 0; 145 for (Iterator it = children.iterator(); it.hasNext(); ) { 146 INode child = (INode) it.next(); 147 total += child.numItemsInTree(); 148 } 149 return total + 1; 150 } 151 152 154 String computeName() { 155 if (parent != null) { 156 return parent.computeName() + "/" + name; 157 } else { 158 return name; 159 } 160 } 161 162 164 long computeFileLength() { 165 long total = 0; 166 if (blocks != null) { 167 for (int i = 0; i < blocks.length; i++) { 168 total += blocks[i].getNumBytes(); 169 } 170 } 171 return total; 172 } 173 174 176 long computeContentsLength() { 177 long total = computeFileLength(); 178 for (Iterator it = children.iterator(); it.hasNext(); ) { 179 INode child = (INode) it.next(); 180 total += child.computeContentsLength(); 181 } 182 return total; 183 } 184 185 187 void listContents(Vector v) { 188 if (parent != null && blocks != null) { 189 v.add(this); 190 } 191 192 for (Iterator it = children.iterator(); it.hasNext(); ) { 193 INode child = (INode) it.next(); 194 v.add(child); 195 } 196 } 197 198 200 void saveImage(String parentPrefix, DataOutputStream out) throws IOException { 201 String fullName = ""; 202 if (parent != null) { 203 fullName = parentPrefix + "/" + name; 204 new UTF8(fullName).write(out); 205 if (blocks == null) { 206 out.writeInt(0); 207 } else { 208 out.writeInt(blocks.length); 209 for (int i = 0; i < blocks.length; i++) { 210 blocks[i].write(out); 211 } 212 } 213 } 214 for (Iterator it = children.iterator(); it.hasNext(); ) { 215 INode child = (INode) it.next(); 216 child.saveImage(fullName, out); 217 } 218 } 219 } 220 221 INode rootDir = new INode("", null, null); 222 TreeSet activeBlocks = new TreeSet(); 223 TreeMap activeLocks = new TreeMap(); 224 DataOutputStream editlog = null; 225 boolean ready = false; 226 227 231 public FSDirectory(File dir) throws IOException { 232 File fullimage = new File(dir, "image"); 233 if (! fullimage.exists()) { 234 fullimage.mkdirs(); 235 } 236 File edits = new File(dir, "edits"); 237 if (loadFSImage(fullimage, edits)) { 238 saveFSImage(fullimage, edits); 239 } 240 241 synchronized (this) { 242 this.ready = true; 243 this.notifyAll(); 244 this.editlog = new DataOutputStream(new FileOutputStream(edits)); 245 } 246 } 247 248 251 public void close() throws IOException { 252 editlog.close(); 253 } 254 255 258 void waitForReady() { 259 if (! ready) { 260 synchronized (this) { 261 while (!ready) { 262 try { 263 this.wait(5000); 264 } catch (InterruptedException ie) { 265 } 266 } 267 } 268 } 269 } 270 271 276 boolean loadFSImage(File fsdir, File edits) throws IOException { 277 File curFile = new File(fsdir, FS_IMAGE); 281 File newFile = new File(fsdir, NEW_FS_IMAGE); 282 File oldFile = new File(fsdir, OLD_FS_IMAGE); 283 284 if (oldFile.exists() && curFile.exists()) { 286 oldFile.delete(); 287 if (edits.exists()) { 288 edits.delete(); 289 } 290 } else if (oldFile.exists() && newFile.exists()) { 291 newFile.renameTo(curFile); 293 oldFile.delete(); 294 } else if (curFile.exists() && newFile.exists()) { 295 newFile.delete(); 297 } 298 299 if (curFile.exists()) { 303 DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile))); 304 try { 305 int numFiles = in.readInt(); 306 for (int i = 0; i < numFiles; i++) { 307 UTF8 name = new UTF8(); 308 name.readFields(in); 309 int numBlocks = in.readInt(); 310 if (numBlocks == 0) { 311 unprotectedAddFile(name, null); 312 } else { 313 Block blocks[] = new Block[numBlocks]; 314 for (int j = 0; j < numBlocks; j++) { 315 blocks[j] = new Block(); 316 blocks[j].readFields(in); 317 } 318 unprotectedAddFile(name, blocks); 319 } 320 } 321 } finally { 322 in.close(); 323 } 324 } 325 326 if (edits.exists() && loadFSEdits(edits) > 0) { 327 return true; 328 } else { 329 return false; 330 } 331 } 332 333 339 int loadFSEdits(File edits) throws IOException { 340 int numEdits = 0; 341 342 if (edits.exists()) { 343 DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(edits))); 344 try { 345 while (in.available() > 0) { 346 byte opcode = in.readByte(); 347 numEdits++; 348 switch (opcode) { 349 case OP_ADD: { 350 UTF8 name = new UTF8(); 351 name.readFields(in); 352 ArrayWritable aw = new ArrayWritable(Block.class); 353 aw.readFields(in); 354 Writable writables[] = (Writable[]) aw.get(); 355 Block blocks[] = new Block[writables.length]; 356 System.arraycopy(writables, 0, blocks, 0, blocks.length); 357 unprotectedAddFile(name, blocks); 358 break; 359 } 360 case OP_RENAME: { 361 UTF8 src = new UTF8(); 362 UTF8 dst = new UTF8(); 363 src.readFields(in); 364 dst.readFields(in); 365 unprotectedRenameTo(src, dst); 366 break; 367 } 368 case OP_DELETE: { 369 UTF8 src = new UTF8(); 370 src.readFields(in); 371 unprotectedDelete(src); 372 break; 373 } 374 case OP_MKDIR: { 375 UTF8 src = new UTF8(); 376 src.readFields(in); 377 unprotectedMkdir(src.toString()); 378 break; 379 } 380 default: { 381 throw new IOException("Never seen opcode " + opcode); 382 } 383 } 384 } 385 } finally { 386 in.close(); 387 } 388 } 389 return numEdits; 390 } 391 392 395 void saveFSImage(File fullimage, File edits) throws IOException { 396 File curFile = new File(fullimage, FS_IMAGE); 397 File newFile = new File(fullimage, NEW_FS_IMAGE); 398 File oldFile = new File(fullimage, OLD_FS_IMAGE); 399 400 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile))); 404 try { 405 out.writeInt(rootDir.numItemsInTree() - 1); 406 rootDir.saveImage("", out); 407 } finally { 408 out.close(); 409 } 410 411 curFile.renameTo(oldFile); 416 417 newFile.renameTo(curFile); 419 420 edits.delete(); 422 423 oldFile.delete(); 425 } 426 427 430 void logEdit(byte op, Writable w1, Writable w2) { 431 synchronized (editlog) { 432 try { 433 editlog.write(op); 434 if (w1 != null) { 435 w1.write(editlog); 436 } 437 if (w2 != null) { 438 w2.write(editlog); 439 } 440 } catch (IOException ie) { 441 } 442 } 443 } 444 445 448 public boolean addFile(UTF8 src, Block blocks[]) { 449 waitForReady(); 450 451 mkdirs(new File(src.toString()).getParent()); 453 if (unprotectedAddFile(src, blocks)) { 454 logEdit(OP_ADD, src, new ArrayWritable(Block.class, blocks)); 455 return true; 456 } else { 457 return false; 458 } 459 } 460 461 463 boolean unprotectedAddFile(UTF8 name, Block blocks[]) { 464 synchronized (rootDir) { 465 if (blocks != null) { 466 for (int i = 0; i < blocks.length; i++) { 468 activeBlocks.add(blocks[i]); 469 } 470 } 471 return (rootDir.addNode(name.toString(), blocks) != null); 472 } 473 } 474 475 478 public boolean renameTo(UTF8 src, UTF8 dst) { 479 waitForReady(); 480 if (unprotectedRenameTo(src, dst)) { 481 logEdit(OP_RENAME, src, dst); 482 return true; 483 } else { 484 return false; 485 } 486 } 487 488 490 boolean unprotectedRenameTo(UTF8 src, UTF8 dst) { 491 synchronized(rootDir) { 492 INode removedNode = rootDir.removeNode(src.toString()); 493 if (removedNode == null) { 494 return false; 495 } 496 497 INode newNode = rootDir.addNode(dst.toString(), removedNode.blocks); 498 if (newNode != null) { 499 newNode.children = removedNode.children; 500 for (Iterator it = newNode.children.iterator(); it.hasNext(); ) { 501 INode child = (INode) it.next(); 502 child.parent = newNode; 503 } 504 return true; 505 } else { 506 removedNode.parent.children.add(removedNode); 507 return false; 508 } 509 } 510 } 511 512 515 public Block[] delete(UTF8 src) { 516 waitForReady(); 517 logEdit(OP_DELETE, src, null); 518 return unprotectedDelete(src); 519 } 520 521 523 Block[] unprotectedDelete(UTF8 src) { 524 synchronized (rootDir) { 525 INode targetNode = rootDir.getNode(src.toString()); 526 if (targetNode == null) { 527 return null; 528 } else { 529 Vector allBlocks = new Vector(); 530 Vector contents = new Vector(); 531 targetNode.listContents(contents); 532 533 for (Iterator it = contents.iterator(); it.hasNext(); ) { 534 INode cur = (INode) it.next(); 535 INode removedNode = rootDir.removeNode(cur.computeName()); 536 if (removedNode != null) { 537 Block blocks[] = removedNode.blocks; 538 if (blocks != null) { 539 for (int i = 0; i < blocks.length; i++) { 540 activeBlocks.remove(blocks[i]); 541 allBlocks.add(blocks[i]); 542 } 543 } 544 } 545 } 546 rootDir.removeNode(src.toString()); 547 return (Block[]) allBlocks.toArray(new Block[0]); 548 } 549 } 550 } 551 552 554 public int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) { 555 TreeSet holders = (TreeSet) activeLocks.get(src); 556 if (holders == null) { 557 holders = new TreeSet(); 558 activeLocks.put(src, holders); 559 } 560 if (exclusive && holders.size() > 0) { 561 return STILL_WAITING; 562 } else { 563 holders.add(holder); 564 return COMPLETE_SUCCESS; 565 } 566 } 567 568 570 public int releaseLock(UTF8 src, UTF8 holder) { 571 TreeSet holders = (TreeSet) activeLocks.get(src); 572 if (holders != null && holders.contains(holder)) { 573 holders.remove(holder); 574 if (holders.size() == 0) { 575 activeLocks.remove(src); 576 } 577 return COMPLETE_SUCCESS; 578 } else { 579 return OPERATION_FAILED; 580 } 581 } 582 583 589 public NDFSFileInfo[] getListing(UTF8 src) { 590 String srcs = normalizePath(src); 591 592 synchronized (rootDir) { 593 INode targetNode = rootDir.getNode(srcs); 594 if (targetNode == null) { 595 return null; 596 } else { 597 Vector contents = new Vector(); 598 targetNode.listContents(contents); 599 600 NDFSFileInfo listing[] = new NDFSFileInfo[contents.size()]; 601 int i = 0; 602 for (Iterator it = contents.iterator(); it.hasNext(); i++) { 603 INode cur = (INode) it.next(); 604 UTF8 curName = new UTF8(cur.computeName()); 605 listing[i] = new NDFSFileInfo(curName, cur.computeFileLength(), cur.computeContentsLength(), isDir(curName)); 606 } 609 return listing; 610 } 611 } 612 } 613 614 617 public Block[] getFile(UTF8 src) { 618 waitForReady(); 619 synchronized (rootDir) { 620 INode targetNode = rootDir.getNode(src.toString()); 621 if (targetNode == null) { 622 return null; 623 } else { 624 return targetNode.blocks; 625 } 626 } 627 } 628 629 632 public boolean isValidToCreate(UTF8 src) { 633 String srcs = normalizePath(src); 634 synchronized (rootDir) { 635 if (srcs.startsWith("/") && 636 ! srcs.endsWith("/") && 637 rootDir.getNode(srcs) == null) { 638 return true; 639 } else { 640 return false; 641 } 642 } 643 } 644 645 648 public boolean isDir(UTF8 src) { 649 synchronized (rootDir) { 650 INode node = rootDir.getNode(normalizePath(src)); 651 if (node != null && node.blocks == null) { 652 return true; 653 } else { 654 return false; 655 } 656 } 657 } 658 659 662 public boolean mkdirs(UTF8 src) { 663 return mkdirs(src.toString()); 664 } 665 666 669 boolean mkdirs(String src) { 670 src = normalizePath(new UTF8(src)); 671 672 Vector v = new Vector(); 674 675 File f = new File(src); 677 v.add(f.getPath()); 678 679 while (f.getParent() != null) { 681 f = new File(f.getParent()); 682 v.add(f.getPath()); 683 } 684 685 boolean lastSuccess = false; 688 int numElts = v.size(); 689 for (int i = numElts - 1; i >= 0; i--) { 690 String cur = (String ) v.elementAt(i); 691 INode inserted = unprotectedMkdir(cur); 692 if (inserted != null) { 693 logEdit(OP_MKDIR, new UTF8(inserted.computeName()), null); 694 lastSuccess = true; 695 } else { 696 lastSuccess = false; 697 } 698 } 699 return lastSuccess; 700 } 701 702 704 INode unprotectedMkdir(String src) { 705 synchronized (rootDir) { 706 return rootDir.addNode(src, null); 707 } 708 } 709 710 712 String normalizePath(UTF8 src) { 713 String srcs = src.toString(); 714 if (srcs.length() > 1 && srcs.endsWith("/")) { 715 srcs = srcs.substring(0, srcs.length() - 1); 716 } 717 return srcs; 718 } 719 720 723 public boolean isValidBlock(Block b) { 724 synchronized (rootDir) { 725 if (activeBlocks.contains(b)) { 726 return true; 727 } else { 728 return false; 729 } 730 } 731 } 732 } 733 | Popular Tags |