1 2 3 package net.nutch.ndfs; 4 5 import net.nutch.io.*; 6 import net.nutch.util.*; 7 8 import java.io.*; 9 import java.util.*; 10 import java.util.logging.*; 11 12 21 public class FSNamesystem implements FSConstants { 22 public static final Logger LOG = LogFormatter.getLogger("net.nutch.fs.FSNamesystem"); 23 24 final static int DESIRED_REPLICATION = 2; 26 27 final static int MIN_REPLICATION = 1; 29 30 final static long HEARTBEAT_RECHECK = 1000; 32 33 FSDirectory dir; 37 38 TreeMap blocksMap = new TreeMap(); 43 44 TreeMap datanodeMap = new TreeMap(); 50 51 TreeMap recentInvalidateSets = new TreeMap(); 57 58 TreeMap pendingCreates = new TreeMap(); 63 64 TreeSet pendingCreateBlocks = new TreeSet(); 68 69 long totalCapacity = 0, totalRemaining = 0; 73 74 Random r = new Random(); 76 77 TreeSet heartbeats = new TreeSet(new Comparator() { 81 public int compare(Object o1, Object o2) { 82 DatanodeInfo d1 = (DatanodeInfo) o1; 83 DatanodeInfo d2 = (DatanodeInfo) o2; 84 long lu1 = d1.lastUpdate(); 85 long lu2 = d2.lastUpdate(); 86 if (lu1 < lu2) { 87 return -1; 88 } else if (lu1 > lu2) { 89 return 1; 90 } else { 91 return d1.getName().compareTo(d2.getName()); 92 } 93 } 94 }); 95 96 TreeSet neededReplications = new TreeSet(); 101 TreeSet pendingReplications = new TreeSet(); 102 103 TreeMap leases = new TreeMap(); 107 TreeSet sortedLeases = new TreeSet(); 108 109 HeartbeatMonitor hbmon = null; 114 LeaseMonitor lmon = null; 115 Thread hbthread = null, lmthread = null; 116 boolean fsRunning = true; 117 long systemStart = 0; 118 119 123 public FSNamesystem(File dir) throws IOException { 124 this.dir = new FSDirectory(dir); 125 this.hbthread = new Thread (new HeartbeatMonitor()); 126 this.lmthread = new Thread (new LeaseMonitor()); 127 hbthread.start(); 128 lmthread.start(); 129 this.systemStart = System.currentTimeMillis(); 130 } 131 132 134 public void close() { 135 fsRunning = false; 136 try { 137 hbthread.join(); 138 } catch (InterruptedException ie) { 139 } 140 try { 141 lmthread.join(); 142 } catch (InterruptedException ie) { 143 } 144 } 145 146 159 public Object [] open(UTF8 src) { 160 Object results[] = null; 161 Block blocks[] = dir.getFile(src); 162 if (blocks != null) { 163 results = new Object [2]; 164 DatanodeInfo machineSets[][] = new DatanodeInfo[blocks.length][]; 165 166 for (int i = 0; i < blocks.length; i++) { 167 TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]); 168 if (containingNodes == null) { 169 machineSets[i] = new DatanodeInfo[0]; 170 } else { 171 machineSets[i] = new DatanodeInfo[containingNodes.size()]; 172 int j = 0; 173 for (Iterator it = containingNodes.iterator(); it.hasNext(); j++) { 174 machineSets[i][j] = (DatanodeInfo) it.next(); 175 } 176 } 177 } 178 179 results[0] = blocks; 180 results[1] = machineSets; 181 } 182 return results; 183 } 184 185 192 public synchronized Object [] startFile(UTF8 src, UTF8 holder, boolean overwrite) { 193 Object results[] = null; 194 if (pendingCreates.get(src) == null) { 195 boolean fileValid = dir.isValidToCreate(src); 196 if (overwrite && ! fileValid) { 197 delete(src); 198 fileValid = true; 199 } 200 201 if (fileValid) { 202 results = new Object [2]; 203 204 DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION, null); 206 if (targets.length < MIN_REPLICATION) { 207 return null; 208 } 209 210 pendingCreates.put(src, new Vector()); 212 synchronized (leases) { 213 Lease lease = (Lease) leases.get(holder); 214 if (lease == null) { 215 lease = new Lease(holder); 216 leases.put(holder, lease); 217 sortedLeases.add(lease); 218 } else { 219 sortedLeases.remove(lease); 220 lease.renew(); 221 sortedLeases.add(lease); 222 } 223 lease.startedCreate(src); 224 } 225 226 results[0] = allocateBlock(src); 228 results[1] = targets; 229 } 230 } 231 return results; 232 } 233 234 245 public synchronized Object [] getAdditionalBlock(UTF8 src) { 246 Object results[] = null; 247 if (dir.getFile(src) == null && pendingCreates.get(src) != null) { 248 results = new Object [2]; 249 250 if (checkFileProgress(src)) { 254 DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION, null); 256 if (targets.length < MIN_REPLICATION) { 257 return null; 258 } 259 260 results[0] = allocateBlock(src); 262 results[1] = targets; 263 } else { 264 LOG.info("File progress failure for " + src); 265 Vector v = (Vector) pendingCreates.get(src); 266 for (Iterator it = v.iterator(); it.hasNext(); ) { 267 Block b = (Block) it.next(); 268 TreeSet containingNodes = (TreeSet) blocksMap.get(b); 269 if (containingNodes == null || containingNodes.size() < MIN_REPLICATION) { 270 LOG.info("Problem with block " + b + ", with " + (containingNodes == null ? "0" : "" + containingNodes.size()) + " nodes reporting in."); 271 } 272 } 273 } 274 } 275 return results; 276 } 277 278 281 public synchronized boolean abandonBlock(Block b, UTF8 src) { 282 Vector pendingVector = (Vector) pendingCreates.get(src); 286 if (pendingVector != null) { 287 for (Iterator it = pendingVector.iterator(); it.hasNext(); ) { 288 Block cur = (Block) it.next(); 289 if (cur.compareTo(b) == 0) { 290 pendingCreateBlocks.remove(cur); 291 it.remove(); 292 return true; 293 } 294 } 295 } 296 return false; 297 } 298 299 305 public synchronized int completeFile(UTF8 src, UTF8 holder) { 306 if (dir.getFile(src) != null || pendingCreates.get(src) == null) { 307 return OPERATION_FAILED; 308 } else if (! checkFileProgress(src)) { 309 return STILL_WAITING; 310 } else { 311 Vector pendingVector = (Vector) pendingCreates.get(src); 312 Block pendingBlocks[] = (Block[]) pendingVector.toArray(new Block[pendingVector.size()]); 313 314 for (int i = 0; i < pendingBlocks.length; i++) { 324 Block b = pendingBlocks[i]; 325 TreeSet containingNodes = (TreeSet) blocksMap.get(b); 326 DatanodeInfo node = (DatanodeInfo) containingNodes.first(); 327 for (Iterator it = node.getBlockIterator(); it.hasNext(); ) { 328 Block cur = (Block) it.next(); 329 if (b.getBlockId() == cur.getBlockId()) { 330 b.setNumBytes(cur.getNumBytes()); 331 break; 332 } 333 } 334 } 335 336 if (dir.addFile(src, pendingBlocks)) { 340 pendingCreates.remove(src); 342 for (int i = 0; i < pendingBlocks.length; i++) { 343 pendingCreateBlocks.remove(pendingBlocks[i]); 344 } 345 346 synchronized (leases) { 347 Lease lease = (Lease) leases.get(holder); 348 if (lease != null) { 349 lease.completedCreate(src); 350 if (! lease.hasLocks()) { 351 leases.remove(holder); 352 sortedLeases.remove(lease); 353 } 354 } 355 } 356 357 for (int i = 0; i < pendingBlocks.length; i++) { 360 TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]); 361 if (containingNodes.size() < DESIRED_REPLICATION) { 362 synchronized (neededReplications) { 363 neededReplications.add(pendingBlocks[i]); 364 } 365 } 366 } 367 return COMPLETE_SUCCESS; 368 } 369 } 370 371 return OPERATION_FAILED; 372 } 373 374 377 synchronized Block allocateBlock(UTF8 src) { 378 Block b = new Block(); 379 Vector v = (Vector) pendingCreates.get(src); 380 v.add(b); 381 pendingCreateBlocks.add(b); 382 return b; 383 } 384 385 389 synchronized boolean checkFileProgress(UTF8 src) { 390 Vector v = (Vector) pendingCreates.get(src); 391 392 for (Iterator it = v.iterator(); it.hasNext(); ) { 393 Block b = (Block) it.next(); 394 TreeSet containingNodes = (TreeSet) blocksMap.get(b); 395 if (containingNodes == null || containingNodes.size() < MIN_REPLICATION) { 396 return false; 397 } 398 } 399 return true; 400 } 401 402 413 416 public boolean renameTo(UTF8 src, UTF8 dst) { 417 return dir.renameTo(src, dst); 418 } 419 420 424 public synchronized boolean delete(UTF8 src) { 425 Block deletedBlocks[] = (Block[]) dir.delete(src); 426 if (deletedBlocks != null) { 427 for (int i = 0; i < deletedBlocks.length; i++) { 428 Block b = deletedBlocks[i]; 429 430 TreeSet containingNodes = (TreeSet) blocksMap.get(b); 431 if (containingNodes != null) { 432 for (Iterator it = containingNodes.iterator(); it.hasNext(); ) { 433 DatanodeInfo node = (DatanodeInfo) it.next(); 434 Vector invalidateSet = (Vector) recentInvalidateSets.get(node.getName()); 435 if (invalidateSet == null) { 436 invalidateSet = new Vector(); 437 recentInvalidateSets.put(node.getName(), invalidateSet); 438 } 439 invalidateSet.add(b); 440 } 441 } 442 } 443 } 444 445 return (deletedBlocks != null); 446 } 447 448 451 public boolean exists(UTF8 src) { 452 if (dir.getFile(src) != null || dir.isDir(src)) { 453 return true; 454 } else { 455 return false; 456 } 457 } 458 459 462 public boolean isDir(UTF8 src) { 463 return dir.isDir(src); 464 } 465 466 469 public boolean mkdirs(UTF8 src) { 470 return dir.mkdirs(src); 471 } 472 473 480 class Lease implements Comparable { 481 public UTF8 holder; 482 public long lastUpdate; 483 TreeSet locks = new TreeSet(); 484 TreeSet creates = new TreeSet(); 485 486 public Lease(UTF8 holder) { 487 LOG.info("New lease, holder " + holder); 488 this.holder = holder; 489 renew(); 490 } 491 public void renew() { 492 this.lastUpdate = System.currentTimeMillis(); 493 } 494 public boolean expired() { 495 if (System.currentTimeMillis() - lastUpdate > LEASE_PERIOD) { 496 return true; 497 } else { 498 return false; 499 } 500 } 501 public void obtained(UTF8 src) { 502 locks.add(src); 503 } 504 public void released(UTF8 src) { 505 locks.remove(src); 506 } 507 public void startedCreate(UTF8 src) { 508 creates.add(src); 509 } 510 public void completedCreate(UTF8 src) { 511 creates.remove(src); 512 } 513 public boolean hasLocks() { 514 return (locks.size() + creates.size()) > 0; 515 } 516 public void releaseLocks() { 517 for (Iterator it = locks.iterator(); it.hasNext(); ) { 518 UTF8 src = (UTF8) it.next(); 519 internalReleaseLock(src, holder); 520 } 521 locks.clear(); 522 internalReleaseCreates(creates); 523 creates.clear(); 524 } 525 526 528 public String toString() { 529 return "[Lease. Holder: " + holder.toString() + ", heldlocks: " + locks.size() + ", pendingcreates: " + creates.size() + "]"; 530 } 531 532 534 public int compareTo(Object o) { 535 Lease l1 = (Lease) this; 536 Lease l2 = (Lease) o; 537 long lu1 = l1.lastUpdate; 538 long lu2 = l2.lastUpdate; 539 if (lu1 < lu2) { 540 return -1; 541 } else if (lu1 > lu2) { 542 return 1; 543 } else { 544 return l1.holder.compareTo(l2.holder); 545 } 546 } 547 } 548 552 class LeaseMonitor implements Runnable { 553 public void run() { 554 while (fsRunning) { 555 synchronized (FSNamesystem.this) { 556 synchronized (leases) { 557 Lease top; 558 while ((sortedLeases.size() > 0) && 559 ((top = (Lease) sortedLeases.first()) != null)) { 560 if (top.expired()) { 561 top.releaseLocks(); 562 leases.remove(top.holder); 563 LOG.info("Removing lease " + top + ", leases remaining: " + sortedLeases.size()); 564 if (!sortedLeases.remove(top)) { 565 LOG.info("Unknown failure trying to remove " + top + " from lease set."); 566 } 567 } else { 568 break; 569 } 570 } 571 } 572 } 573 try { 574 Thread.sleep(2000); 575 } catch (InterruptedException ie) { 576 } 577 } 578 } 579 } 580 581 584 public synchronized int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) { 585 int result = dir.obtainLock(src, holder, exclusive); 586 if (result == COMPLETE_SUCCESS) { 587 synchronized (leases) { 588 Lease lease = (Lease) leases.get(holder); 589 if (lease == null) { 590 lease = new Lease(holder); 591 leases.put(holder, lease); 592 sortedLeases.add(lease); 593 } else { 594 sortedLeases.remove(lease); 595 lease.renew(); 596 sortedLeases.add(lease); 597 } 598 lease.obtained(src); 599 } 600 } 601 return result; 602 } 603 604 607 public synchronized int releaseLock(UTF8 src, UTF8 holder) { 608 int result = internalReleaseLock(src, holder); 609 if (result == COMPLETE_SUCCESS) { 610 synchronized (leases) { 611 Lease lease = (Lease) leases.get(holder); 612 if (lease != null) { 613 lease.released(src); 614 if (! lease.hasLocks()) { 615 leases.remove(holder); 616 sortedLeases.remove(lease); 617 } 618 } 619 } 620 } 621 return result; 622 } 623 private int internalReleaseLock(UTF8 src, UTF8 holder) { 624 return dir.releaseLock(src, holder); 625 } 626 private void internalReleaseCreates(TreeSet creates) { 627 for (Iterator it = creates.iterator(); it.hasNext(); ) { 628 UTF8 src = (UTF8) it.next(); 629 Vector v = (Vector) pendingCreates.remove(src); 630 for (Iterator it2 = v.iterator(); it2.hasNext(); ) { 631 Block b = (Block) it2.next(); 632 pendingCreateBlocks.remove(b); 633 } 634 } 635 } 636 637 640 public void renewLease(UTF8 holder) { 641 synchronized (leases) { 642 Lease lease = (Lease) leases.get(holder); 643 if (lease != null) { 644 sortedLeases.remove(lease); 645 lease.renew(); 646 sortedLeases.add(lease); 647 LOG.info("Renewed lease " + lease); 648 } 649 } 650 } 651 652 656 public NDFSFileInfo[] getListing(UTF8 src) { 657 return dir.getListing(src); 658 } 659 660 670 public void gotHeartbeat(UTF8 name, long capacity, long remaining) { 671 synchronized (heartbeats) { 672 synchronized (datanodeMap) { 673 long capacityDiff = 0; 674 long remainingDiff = 0; 675 DatanodeInfo nodeinfo = (DatanodeInfo) datanodeMap.get(name); 676 677 if (nodeinfo == null) { 678 nodeinfo = new DatanodeInfo(name, capacity, remaining); 679 datanodeMap.put(name, nodeinfo); 680 capacityDiff = capacity; 681 remainingDiff = remaining; 682 } else { 683 capacityDiff = capacity - nodeinfo.getCapacity(); 684 remainingDiff = remaining - nodeinfo.getRemaining(); 685 heartbeats.remove(nodeinfo); 686 nodeinfo.updateHeartbeat(capacity, remaining); 687 } 688 heartbeats.add(nodeinfo); 689 totalCapacity += capacityDiff; 690 totalRemaining += remainingDiff; 691 } 692 } 693 } 694 695 698 class HeartbeatMonitor implements Runnable { 699 701 public void run() { 702 while (fsRunning) { 703 heartbeatCheck(); 704 try { 705 Thread.sleep(HEARTBEAT_RECHECK); 706 } catch (InterruptedException ie) { 707 } 708 } 709 } 710 } 711 712 716 synchronized void heartbeatCheck() { 717 synchronized (heartbeats) { 718 DatanodeInfo nodeInfo = null; 719 720 while ((heartbeats.size() > 0) && 721 ((nodeInfo = (DatanodeInfo) heartbeats.first()) != null) && 722 (nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) { 723 LOG.info("Lost heartbeat for " + nodeInfo.getName()); 724 725 heartbeats.remove(nodeInfo); 726 synchronized (datanodeMap) { 727 datanodeMap.remove(nodeInfo.getName()); 728 } 729 totalCapacity -= nodeInfo.getCapacity(); 730 totalRemaining -= nodeInfo.getRemaining(); 731 732 Block deadblocks[] = nodeInfo.getBlocks(); 733 if (deadblocks != null) { 734 for (int i = 0; i < deadblocks.length; i++) { 735 removeStoredBlock(deadblocks[i], nodeInfo); 736 } 737 } 738 739 if (heartbeats.size() > 0) { 740 nodeInfo = (DatanodeInfo) heartbeats.first(); 741 } 742 } 743 } 744 } 745 746 750 public synchronized void processReport(Block newReport[], UTF8 name) { 751 DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name); 752 if (node == null) { 753 throw new IllegalArgumentException ("Unexpected exception. Received block report from node " + name + ", but there is no info for " + name); 754 } 755 756 int oldPos = 0, newPos = 0; 761 Block oldReport[] = node.getBlocks(); 762 while (oldReport != null && newReport != null && oldPos < oldReport.length && newPos < newReport.length) { 763 int cmp = oldReport[oldPos].compareTo(newReport[newPos]); 764 765 if (cmp == 0) { 766 oldPos++; 768 newPos++; 769 } else if (cmp < 0) { 770 removeStoredBlock(oldReport[oldPos], node); 772 oldPos++; 773 } else { 774 addStoredBlock(newReport[newPos], node); 776 newPos++; 777 } 778 } 779 while (oldReport != null && oldPos < oldReport.length) { 780 removeStoredBlock(oldReport[oldPos], node); 782 oldPos++; 783 } 784 while (newReport != null && newPos < newReport.length) { 785 addStoredBlock(newReport[newPos], node); 787 newPos++; 788 } 789 790 node.updateBlocks(newReport); 794 } 795 796 800 synchronized void addStoredBlock(Block block, DatanodeInfo node) { 801 TreeSet containingNodes = (TreeSet) blocksMap.get(block); 802 if (containingNodes == null) { 803 containingNodes = new TreeSet(); 804 blocksMap.put(block, containingNodes); 805 } 806 if (! containingNodes.contains(node)) { 807 containingNodes.add(node); 808 } else { 809 LOG.info("Redundant addStoredBlock request received for block " + block + " on node " + node); 810 } 811 812 synchronized (neededReplications) { 813 if (dir.isValidBlock(block)) { 814 if (containingNodes.size() >= DESIRED_REPLICATION) { 815 neededReplications.remove(block); 816 pendingReplications.remove(block); 817 } else if (containingNodes.size() < DESIRED_REPLICATION) { 818 if (! neededReplications.contains(block)) { 819 neededReplications.add(block); 820 } 821 } 822 } 823 } 824 } 825 826 830 synchronized void removeStoredBlock(Block block, DatanodeInfo node) { 831 TreeSet containingNodes = (TreeSet) blocksMap.get(block); 832 if (containingNodes == null || ! containingNodes.contains(node)) { 833 throw new IllegalArgumentException ("No machine mapping found for block " + block + ", which should be at node " + node); 834 } 835 containingNodes.remove(node); 836 837 if (dir.isValidBlock(block) && (containingNodes.size() < DESIRED_REPLICATION)) { 844 synchronized (neededReplications) { 845 neededReplications.add(block); 846 } 847 } 848 } 849 850 853 public synchronized void blockReceived(Block block, UTF8 name) { 854 DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name); 855 if (node == null) { 856 throw new IllegalArgumentException ("Unexpected exception. Got blockReceived message from node " + name + ", but there is no info for " + name); 857 } 858 addStoredBlock(block, node); 862 863 node.addBlock(block); 867 } 868 869 872 public long totalCapacity() { 873 return totalCapacity; 874 } 875 876 879 public long totalRemaining() { 880 return totalRemaining; 881 } 882 883 885 public DatanodeInfo[] datanodeReport() { 886 DatanodeInfo results[] = null; 887 synchronized (heartbeats) { 888 synchronized (datanodeMap) { 889 results = new DatanodeInfo[datanodeMap.size()]; 890 int i = 0; 891 for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) { 892 DatanodeInfo cur = (DatanodeInfo) it.next(); 893 results[i++] = cur; 894 } 895 } 896 } 897 return results; 898 } 899 900 907 912 public synchronized Block[] recentlyInvalidBlocks(UTF8 name) { 913 Vector invalidateSet = (Vector) recentInvalidateSets.remove(name); 914 if (invalidateSet == null) { 915 return null; 916 } else { 917 return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]); 918 } 919 } 920 921 935 public synchronized Block[] checkObsoleteBlocks(UTF8 name) { 936 DatanodeInfo nodeInfo = (DatanodeInfo) datanodeMap.get(name); 937 if (System.currentTimeMillis() - nodeInfo.lastObsoleteCheck() <= OBSOLETE_INTERVAL) { 938 return null; 939 } else { 940 nodeInfo.updateObsoleteCheck(); 941 Vector obsolete = new Vector(); 942 for (Iterator it = nodeInfo.getBlockIterator(); it.hasNext(); ) { 943 Block b = (Block) it.next(); 944 945 if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) { 946 LOG.info("Obsoleting block " + b); 947 obsolete.add(b); 948 } 949 } 950 return (Block[]) obsolete.toArray(new Block[obsolete.size()]); 951 } 952 } 953 954 964 public synchronized Object [] pendingTransfers(DatanodeInfo srcNode, int maxXfers) { 965 if (System.currentTimeMillis() - systemStart < SYSTEM_STARTUP_PERIOD) { 970 return null; 971 } 972 973 synchronized (neededReplications) { 974 Object results[] = null; 975 976 if (neededReplications.size() > 0) { 977 Vector replicateBlocks = new Vector(); 983 Vector replicateTargetSets = new Vector(); 984 for (Iterator it = neededReplications.iterator(); it.hasNext(); ) { 985 if (replicateBlocks.size() >= maxXfers) { 989 break; 990 } 991 992 Block block = (Block) it.next(); 993 if (! dir.isValidBlock(block)) { 994 it.remove(); 995 } else { 996 TreeSet containingNodes = (TreeSet) blocksMap.get(block); 997 998 if (containingNodes.contains(srcNode)) { 999 DatanodeInfo targets[] = chooseTargets(DESIRED_REPLICATION - containingNodes.size(), containingNodes); 1000 if (targets.length > 0) { 1001 replicateBlocks.add(block); 1003 replicateTargetSets.add(targets); 1004 } 1005 } 1006 } 1007 } 1008 1009 if (replicateBlocks.size() > 0) { 1016 int i = 0; 1017 for (Iterator it = replicateBlocks.iterator(); it.hasNext(); i++) { 1018 Block block = (Block) it.next(); 1019 DatanodeInfo targets[] = (DatanodeInfo[]) replicateTargetSets.elementAt(i); 1020 TreeSet containingNodes = (TreeSet) blocksMap.get(block); 1021 1022 if (containingNodes.size() + targets.length >= DESIRED_REPLICATION) { 1023 neededReplications.remove(block); 1024 pendingReplications.add(block); 1025 } 1026 } 1027 1028 DatanodeInfo targetMatrix[][] = new DatanodeInfo[replicateTargetSets.size()][]; 1032 LOG.info("Pending transfer from " + srcNode.getName() + " to " + targetMatrix.length + " destinations"); 1033 for (i = 0; i < targetMatrix.length; i++) { 1034 targetMatrix[i] = (DatanodeInfo[]) replicateTargetSets.elementAt(i); 1035 } 1036 1037 results = new Object [2]; 1038 results[0] = replicateBlocks.toArray(new Block[replicateBlocks.size()]); 1039 results[1] = targetMatrix; 1040 } 1041 } 1042 return results; 1043 } 1044 } 1045 1046 1047 1051 DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes) { 1052 TreeSet alreadyChosen = new TreeSet(); 1053 Vector targets = new Vector(); 1054 1055 for (int i = 0; i < desiredReplicates; i++) { 1056 DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen); 1057 if (target != null) { 1058 targets.add(target); 1059 alreadyChosen.add(target); 1060 } else { 1061 break; 1062 } 1063 } 1064 return (DatanodeInfo[]) targets.toArray(new DatanodeInfo[targets.size()]); 1065 } 1066 1067 1075 DatanodeInfo chooseTarget(TreeSet alreadyHasNode, TreeSet alreadyChosen) { 1076 int totalMachines = datanodeMap.size(); 1077 if (totalMachines == 0) { 1078 return null; 1079 } 1080 int freeMachines = totalMachines; 1081 for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) { 1082 DatanodeInfo node = (DatanodeInfo) it.next(); 1083 if ((alreadyHasNode != null && alreadyHasNode.contains(node)) || 1084 (alreadyChosen != null && alreadyChosen.contains(node))) { 1085 freeMachines--; 1086 } 1087 } 1088 1089 DatanodeInfo target = null; 1093 if (freeMachines > 0) { 1094 int i = 0; 1098 DatanodeInfo targetlist[] = new DatanodeInfo[totalMachines]; 1099 for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); i++) { 1100 targetlist[i] = (DatanodeInfo) it.next(); 1101 } 1102 1103 do { 1104 int index = r.nextInt(totalMachines); 1105 target = targetlist[index]; 1106 1107 if ((alreadyHasNode != null && alreadyHasNode.contains(target)) || 1108 (alreadyChosen != null && alreadyChosen.contains(target))) { 1109 target = null; 1110 } 1111 } while (target == null); 1112 } 1113 return target; 1114 1115 1118 1159 } 1160} 1161 | Popular Tags |