1 2 3 package net.nutch.ndfs; 4 5 import net.nutch.io.*; 6 import net.nutch.ipc.*; 7 import net.nutch.util.*; 8 9 import java.io.*; 10 import java.net.*; 11 import java.util.*; 12 import java.util.logging.*; 13 14 19 public class NDFS implements FSConstants { 20 public static final Logger LOG = LogFormatter.getLogger("net.nutch.fs.NDFS"); 21 private static final long GIGABYTE = 1024 * 1024 * 1024; 22 private static long numGigs = NutchConf.getLong("ndfs.datanode.maxgigs", 100); 23 24 private static final int MAX_BLOCKS_PER_ROUNDTRIP = 3; 29 30 33 public static InetSocketAddress createSocketAddr(String s) throws IOException { 34 String target = s; 35 int colonIndex = target.indexOf(':'); 36 String host = target.substring(0, colonIndex); 37 int port = Integer.parseInt(target.substring(colonIndex + 1)); 38 39 return new InetSocketAddress(host, port); 40 } 41 42 45 private NDFS() { 46 } 47 48 54 65 public static class NameNode extends net.nutch.ipc.Server { 66 FSNamesystem namesystem; 67 68 71 public NameNode(File dir, int port) throws IOException { 72 super(port, FSParam.class, 10); 73 this.namesystem = new FSNamesystem(dir); 74 } 75 76 80 public Writable call(Writable param) throws IOException { 81 FSParam p = (FSParam) param; 82 FSResults r = null; 83 84 switch (p.op) { 85 case OP_CLIENT_OPEN: { 89 UTF8 src = (UTF8) p.first; 90 Object results[] = namesystem.open(src); 91 if (results != null) { 92 Block blocks[] = (Block[]) results[0]; 93 DatanodeInfo sets[][] = (DatanodeInfo[][]) results[1]; 94 r = new FSResults(OP_CLIENT_OPEN_ACK, new ArrayWritable(Block.class, blocks), new TwoDArrayWritable(DatanodeInfo.class, sets)); 95 } else { 96 r = new FSResults(OP_FAILURE); 97 } 98 break; 99 } 100 case OP_CLIENT_STARTFILE: { 101 UTF8 nameParams[] = (UTF8[]) ((ArrayWritable) p.first).toArray(); 102 UTF8 src = nameParams[0]; 103 UTF8 clientName = nameParams[1]; 104 boolean overwrite = ((BooleanWritable) p.second).get(); 105 Object results[] = namesystem.startFile(src, clientName, overwrite); 106 if (results != null) { 107 Block b = (Block) results[0]; 108 DatanodeInfo targets[] = (DatanodeInfo[]) results[1]; 109 r = new FSResults(OP_CLIENT_STARTFILE_ACK, b, new ArrayWritable(DatanodeInfo.class, targets)); 110 } else { 111 r = new FSResults(OP_FAILURE); 112 } 113 break; 114 } 115 case OP_CLIENT_ADDBLOCK: { 116 UTF8 src = (UTF8) p.first; 117 Object results[] = namesystem.getAdditionalBlock(src); 118 if (results != null && results[0] == null) { 119 try { 120 Thread.sleep(50); 121 } catch (InterruptedException ie) { 122 } 123 results = namesystem.getAdditionalBlock(src); 124 } 125 126 if (results != null) { 127 if (results[0] != null) { 128 Block b = (Block) results[0]; 129 DatanodeInfo targets[] = (DatanodeInfo[]) results[1]; 130 r = new FSResults(OP_CLIENT_ADDBLOCK_ACK, b, new ArrayWritable(DatanodeInfo.class, targets)); 131 } else { 132 r = new FSResults(OP_CLIENT_TRYAGAIN); 133 } 134 } else { 135 r = new FSResults(OP_FAILURE); 136 } 137 break; 138 } 139 case OP_CLIENT_ABANDONBLOCK: { 140 Block b = (Block) p.first; 141 UTF8 src = (UTF8) p.second; 142 boolean success = namesystem.abandonBlock(b, src); 143 if (success) { 144 r = new FSResults(OP_CLIENT_ABANDONBLOCK_ACK); 145 } else { 146 r = new FSResults(OP_FAILURE); 147 } 148 break; 149 } 150 case OP_CLIENT_COMPLETEFILE: { 151 UTF8 nameParams[] = (UTF8[]) ((ArrayWritable) p.first).toArray(); 152 UTF8 src = nameParams[0]; 153 UTF8 clientName = nameParams[1]; 154 int returnCode = namesystem.completeFile(src, clientName); 155 if (returnCode == COMPLETE_SUCCESS) { 156 r = new FSResults(OP_CLIENT_COMPLETEFILE_ACK); 157 } else if (returnCode == STILL_WAITING) { 158 r = new FSResults(OP_CLIENT_TRYAGAIN); 159 } else { 160 r = new FSResults(OP_FAILURE); 161 } 162 break; 163 } 164 case OP_CLIENT_RENAMETO: { 165 UTF8 src = (UTF8) p.first; 166 UTF8 dst = (UTF8) p.second; 167 boolean success = namesystem.renameTo(src, dst); 168 if (success) { 169 r = new FSResults(OP_CLIENT_RENAMETO_ACK); 170 } else { 171 r = new FSResults(OP_FAILURE); 172 } 173 break; 174 } 175 case OP_CLIENT_DELETE: { 176 UTF8 src = (UTF8) p.first; 177 boolean success = namesystem.delete(src); 178 if (success) { 179 r = new FSResults(OP_CLIENT_DELETE_ACK); 180 } else { 181 r = new FSResults(OP_FAILURE); 182 } 183 break; 184 } 185 case OP_CLIENT_EXISTS: { 186 UTF8 src = (UTF8) p.first; 187 boolean success = namesystem.exists(src); 188 if (success) { 189 r = new FSResults(OP_CLIENT_EXISTS_ACK); 190 } else { 191 r = new FSResults(OP_FAILURE); 192 } 193 break; 194 } 195 case OP_CLIENT_ISDIR: { 196 UTF8 src = (UTF8) p.first; 197 boolean success = namesystem.isDir(src); 198 if (success) { 199 r = new FSResults(OP_CLIENT_ISDIR_ACK); 200 } else { 201 r = new FSResults(OP_FAILURE); 202 } 203 break; 204 } 205 case OP_CLIENT_MKDIRS: { 206 UTF8 src = (UTF8) p.first; 207 boolean success = namesystem.mkdirs(src); 208 if (success) { 209 r = new FSResults(OP_CLIENT_MKDIRS_ACK); 210 } else { 211 r = new FSResults(OP_FAILURE); 212 } 213 break; 214 } 215 case OP_CLIENT_OBTAINLOCK: { 216 UTF8 nameParams[] = (UTF8[]) ((ArrayWritable) p.first).toArray(); 217 UTF8 src = nameParams[0]; 218 UTF8 clientName = nameParams[1]; 219 boolean exclusive = ((BooleanWritable) p.second).get(); 220 int returnCode = namesystem.obtainLock(src, clientName, exclusive); 221 if (returnCode == COMPLETE_SUCCESS) { 222 r = new FSResults(OP_CLIENT_OBTAINLOCK_ACK); 223 } else if (returnCode == STILL_WAITING) { 224 r = new FSResults(OP_CLIENT_TRYAGAIN); 225 } else { 226 r = new FSResults(OP_FAILURE); 227 } 228 break; 229 } 230 case OP_CLIENT_RELEASELOCK: { 231 UTF8 nameParams[] = (UTF8[]) ((ArrayWritable) p.first).toArray(); 232 UTF8 src = nameParams[0]; 233 UTF8 clientName = nameParams[1]; 234 int returnCode = namesystem.releaseLock(src, clientName); 235 if (returnCode == COMPLETE_SUCCESS) { 236 r = new FSResults(OP_CLIENT_COMPLETEFILE_ACK); 237 } else if (returnCode == STILL_WAITING) { 238 r = new FSResults(OP_CLIENT_TRYAGAIN); 239 } else { 240 r = new FSResults(OP_FAILURE); 241 } 242 break; 243 } 244 case OP_CLIENT_RENEW_LEASE: { 245 UTF8 clientName = (UTF8) p.first; 246 namesystem.renewLease(clientName); 247 r = new FSResults(OP_CLIENT_RENEW_LEASE_ACK); 248 break; 249 } 250 case OP_CLIENT_LISTING: { 251 UTF8 src = (UTF8) p.first; 252 NDFSFileInfo results[] = namesystem.getListing(src); 253 if (results != null) { 254 r = new FSResults(OP_CLIENT_LISTING_ACK, new ArrayWritable(NDFSFileInfo.class, results)); 255 } else { 256 r = new FSResults(OP_FAILURE); 257 } 258 break; 259 } 260 case OP_CLIENT_RAWSTATS: { 261 long totalRaw = namesystem.totalCapacity(); 262 long remainingRaw = namesystem.totalRemaining(); 263 LongWritable results[] = new LongWritable[2]; 264 results[0] = new LongWritable(totalRaw); 265 results[1] = new LongWritable(totalRaw - remainingRaw); 266 r = new FSResults(OP_CLIENT_RAWSTATS_ACK, new ArrayWritable(LongWritable.class, results)); 267 break; 268 } 269 case OP_CLIENT_DATANODEREPORT: { 270 DatanodeInfo report[] = namesystem.datanodeReport(); 271 if (report != null) { 272 r = new FSResults(OP_CLIENT_DATANODEREPORT_ACK, new ArrayWritable(DatanodeInfo.class, report)); 273 } else { 274 r = new FSResults(OP_FAILURE); 275 } 276 break; 277 } 278 279 case OP_HEARTBEAT: 283 case OP_BLOCKREPORT: 284 case OP_BLOCKRECEIVED: 285 case OP_ERROR: { 286 UTF8 sender = null; 287 if (p.op == OP_HEARTBEAT) { 288 HeartbeatData hd = (HeartbeatData) p.first; 290 sender = hd.getName(); 291 namesystem.gotHeartbeat(sender, hd.getCapacity(), hd.getRemaining()); 292 293 } else if (p.op == OP_BLOCKREPORT) { 294 Block blocks[] = (Block[]) ((ArrayWritable) p.first).toArray(); 296 sender = (UTF8) p.second; 297 namesystem.processReport(blocks, sender); 298 299 } else if (p.op == OP_BLOCKRECEIVED) { 300 Writable blocks[] = ((ArrayWritable) p.first).get(); 302 sender = (UTF8) p.second; 303 for (int i = 0; i < blocks.length; i++) { 304 namesystem.blockReceived((Block) blocks[i], sender); 305 } 306 } else { 307 System.err.println("ERR from datanode! Op = " + p.op); 309 sender = (UTF8) p.first; 310 System.err.println("Datanode: " + sender); 311 System.err.println("Message: " + ((UTF8) p.second)); 312 } 313 314 Object xferResults[] = namesystem.pendingTransfers(new DatanodeInfo(sender), MAX_BLOCKS_PER_ROUNDTRIP); 318 if (xferResults != null) { 319 r = new FSResults(OP_TRANSFERBLOCKS, new ArrayWritable(Block.class, (Block[]) xferResults[0]), new TwoDArrayWritable(DatanodeInfo.class, (DatanodeInfo[][]) xferResults[1])); 320 } else { 321 Block blocks[] = namesystem.recentlyInvalidBlocks(sender); 322 if (blocks == null) { 323 blocks = namesystem.checkObsoleteBlocks(sender); 324 } 325 if (blocks != null) { 326 r = new FSResults(OP_INVALIDATE_BLOCKS, new ArrayWritable(Block.class, blocks)); 327 } else { 328 r = new FSResults(OP_ACK); 329 } 330 } 331 break; 332 } 333 default: 334 throw new RuntimeException ("Unknown op code: " + p.op); 335 } 336 return r; 337 } 338 339 341 public static void main(String argv[]) throws IOException, InterruptedException { 342 if (argv.length < 2) { 343 System.out.println("NDFS$NameNode <port> <namespace_dir>"); 344 System.exit(-1); 345 } 346 347 int port = Integer.parseInt(argv[0]); 348 File dir = new File(argv[1]); 349 350 NameNode namenode = new NameNode(dir, port); 351 namenode.start(); 352 namenode.join(); 353 } 354 } 355 356 362 373 public static class DataNode extends net.nutch.ipc.Client { 374 FSDataset data; 375 String localName; 376 InetSocketAddress nameNodeAddr; 377 378 Vector receivedBlockList = new Vector(); 379 382 public DataNode(String machineName, File dir, InetSocketAddress nameNodeAddr) throws IOException { 383 super(FSResults.class); 384 long capacity = numGigs * GIGABYTE; 385 this.data = new FSDataset(dir, capacity); 386 this.nameNodeAddr = nameNodeAddr; 387 388 ServerSocket ss = null; 389 int tmpPort = 7000; 390 while (ss == null) { 391 try { 392 ss = new ServerSocket(tmpPort); 393 LOG.info("Opened server at " + tmpPort); 394 } catch (IOException ie) { 395 LOG.info("Could not open server at " + tmpPort + ", trying new port"); 396 tmpPort++; 397 } 398 } 399 this.localName = machineName + ":" + tmpPort; 400 new Thread (new DataXceiveServer(ss)).start(); 401 } 402 403 406 public void offerService() throws Exception { 407 long wakeups = 0; 408 long lastHeartbeat = 0, lastBlockReport = 0; 409 long sendStart = System.currentTimeMillis(); 410 int heartbeatsSent = 0; 411 412 boolean shouldRun = true; 416 while (shouldRun) { 417 long now = System.currentTimeMillis(); 418 419 FSParam p = null; 423 FSResults r = null; 424 425 synchronized (receivedBlockList) { 426 if (now - lastHeartbeat > HEARTBEAT_INTERVAL) { 427 p = new FSParam(OP_HEARTBEAT, new HeartbeatData(localName, data.getCapacity(), data.getRemaining())); 435 lastHeartbeat = now; 436 } else if (now - lastBlockReport > BLOCKREPORT_INTERVAL) { 437 p = new FSParam(OP_BLOCKREPORT, new ArrayWritable(Block.class, data.getBlockReport()), new UTF8(localName)); 441 lastBlockReport = now; 442 } else if (receivedBlockList.size() > 0) { 443 Block blockArray[] = null; 447 blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]); 448 receivedBlockList.removeAllElements(); 449 p = new FSParam(OP_BLOCKRECEIVED, new ArrayWritable(Block.class, blockArray), new UTF8(localName)); 450 } else { 451 long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat); 456 if (waitTime > 0) { 457 try { 458 receivedBlockList.wait(waitTime); 459 } catch (InterruptedException ie) { 460 } 461 } 462 continue; 463 } 464 } 465 466 r = (FSResults) call(p, nameNodeAddr); 470 if (r == null) { 471 throw new IOException("No response to remote call to " + nameNodeAddr); 472 } 473 474 switch (r.op) { 487 case OP_ACK: { 488 break; 493 } 494 case OP_TRANSFERBLOCKS: { 495 Block blocks[] = (Block[]) ((ArrayWritable) r.first).toArray(); 500 DatanodeInfo xferTargets[][] = (DatanodeInfo[][]) ((TwoDArrayWritable) r.second).toArray(); 501 for (int i = 0; i < blocks.length; i++) { 502 if (!data.isValidBlock(blocks[i])) { 503 System.out.println("Invoking error! " + localName); 504 call(new FSParam(OP_ERROR, new UTF8(localName), new UTF8("Can't send invalid block " + blocks[i])), nameNodeAddr); 505 break; 506 } else { 507 if (xferTargets[i].length > 0) { 508 LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]); 509 new Thread (new DataTransfer(xferTargets[i], blocks[i])).start(); 510 } 511 } 512 } 513 break; 514 } 515 case OP_INVALIDATE_BLOCKS: { 516 ArrayWritable aw = (ArrayWritable) r.first; 521 Block b[] = (Block[]) aw.toArray(); 522 data.invalidate(b); 523 break; 524 } 525 default: 526 throw new RuntimeException ("Unknown op code: " + r.op); 527 } 528 } 529 } 530 531 534 class DataXceiveServer implements Runnable { 535 ServerSocket ss; 536 public DataXceiveServer(ServerSocket ss) { 537 this.ss = ss; 538 } 539 540 542 public void run() { 543 try { 544 while (true) { 545 Socket s = ss.accept(); 546 new Thread (new DataXceiver(s)).start(); 547 } 548 } catch (IOException ie) { 549 LOG.info("Exiting DataXceiveServer due to " + ie.toString()); 550 } 551 } 552 } 553 554 557 class DataXceiver implements Runnable { 558 Socket s; 559 public DataXceiver(Socket s) { 560 this.s = s; 561 } 562 563 565 public void run() { 566 try { 567 DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream())); 568 try { 569 byte op = (byte) in.read(); 570 if (op == OP_WRITE_BLOCK) { 571 DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); 575 try { 576 Block b = new Block(); 577 b.readFields(in); 578 int numTargets = in.readInt(); 579 if (numTargets <= 0) { 580 throw new IOException("Mislabelled incoming datastream."); 581 } 582 DatanodeInfo targets[] = new DatanodeInfo[numTargets]; 583 for (int i = 0; i < targets.length; i++) { 584 DatanodeInfo tmp = new DatanodeInfo(); 585 tmp.readFields(in); 586 targets[i] = tmp; 587 } 588 byte encodingType = (byte) in.read(); 589 long len = in.readLong(); 590 591 DatanodeInfo curTarget = targets[0]; 596 597 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b))); 601 InetSocketAddress mirrorTarget = null; 602 try { 603 DataInputStream in2 = null; 608 DataOutputStream out2 = null; 609 if (targets.length > 1) { 610 mirrorTarget = createSocketAddr(targets[1].getName().toString()); 612 try { 613 Socket s = new Socket(mirrorTarget.getAddress(), mirrorTarget.getPort()); 614 out2 = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); 615 in2 = new DataInputStream(new BufferedInputStream(s.getInputStream())); 616 617 out2.write(OP_WRITE_BLOCK); 619 b.write(out2); 620 out2.writeInt(targets.length - 1); 621 for (int i = 1; i < targets.length; i++) { 622 targets[i].write(out2); 623 } 624 out2.write(encodingType); 625 out2.writeLong(len); 626 } catch (IOException ie) { 627 if (out2 != null) { 628 try { 629 out2.close(); 630 in2.close(); 631 } catch (IOException out2close) { 632 } finally { 633 out2 = null; 634 in2 = null; 635 } 636 } 637 } 638 } 639 640 try { 645 boolean anotherChunk = true; 646 byte buf[] = new byte[2048]; 647 648 while (anotherChunk) { 649 while (len > 0) { 650 int bytesRead = in.read(buf, 0, Math.min(buf.length, (int) len)); 651 if (bytesRead >= 0) { 652 out.write(buf, 0, bytesRead); 653 if (out2 != null) { 654 try { 655 out2.write(buf, 0, bytesRead); 656 } catch (IOException out2e) { 657 try { 663 out2.close(); 664 in2.close(); 665 } catch (IOException out2close) { 666 } finally { 667 out2 = null; 668 in2 = null; 669 } 670 } 671 } 672 } 673 len -= bytesRead; 674 } 675 676 if (encodingType == RUNLENGTH_ENCODING) { 677 anotherChunk = false; 678 } else if (encodingType == CHUNKED_ENCODING) { 679 len = in.readLong(); 680 if (out2 != null) { 681 out2.writeLong(len); 682 } 683 if (len == 0) { 684 anotherChunk = false; 685 } 686 } 687 } 688 689 if (out2 == null) { 690 LOG.info("Received block " + b + " from " + s.getInetAddress()); 691 } else { 692 out2.flush(); 693 long complete = in2.readLong(); 694 if (complete != WRITE_COMPLETE) { 695 LOG.info("Conflicting value for WRITE_COMPLETE: " + complete); 696 } 697 LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget); 698 } 699 } finally { 700 if (out2 != null) { 701 out2.close(); 702 in2.close(); 703 } 704 } 705 } finally { 706 out.close(); 707 } 708 data.finalizeBlock(b); 709 710 synchronized (receivedBlockList) { 715 receivedBlockList.add(b); 716 receivedBlockList.notifyAll(); 717 } 718 719 reply.writeLong(WRITE_COMPLETE); 723 } finally { 724 reply.close(); 725 } 726 } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) { 727 Block b = new Block(); 731 b.readFields(in); 732 733 long toSkip = 0; 734 if (op == OP_READSKIP_BLOCK) { 735 toSkip = in.readLong(); 736 } 737 738 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); 742 try { 743 if (! data.isValidBlock(b)) { 747 out.writeLong(-1); 748 } else { 749 long len = data.getLength(b); 753 DataInputStream in2 = new DataInputStream(data.getBlockData(b)); 754 out.writeLong(len); 755 756 if (op == OP_READSKIP_BLOCK) { 757 if (toSkip > len) { 758 toSkip = len; 759 } 760 long amtSkipped = in2.skip(toSkip); 761 out.writeLong(amtSkipped); 762 } 763 764 byte buf[] = new byte[4096]; 765 try { 766 int bytesRead = in2.read(buf); 767 while (bytesRead >= 0) { 768 out.write(buf, 0, bytesRead); 769 len -= bytesRead; 770 bytesRead = in2.read(buf); 771 } 772 } catch (SocketException se) { 773 } finally { 776 in2.close(); 777 } 778 } 779 LOG.info("Served block " + b + " to " + s.getInetAddress()); 780 } finally { 781 out.close(); 782 } 783 } else { 784 while (op >= 0) { 785 System.out.println("Faulty op: " + op); 786 op = (byte) in.read(); 787 } 788 throw new IOException("Unknown opcode for incoming data stream"); 789 } 790 } finally { 791 in.close(); 792 } 793 } catch (IOException ie) { 794 ie.printStackTrace(); 795 } finally { 796 try { 797 s.close(); 798 } catch (IOException ie2) { 799 } 800 } 801 } 802 } 803 804 807 class DataTransfer implements Runnable { 808 InetSocketAddress curTarget; 809 DatanodeInfo targets[]; 810 Block b; 811 byte buf[]; 812 813 817 public DataTransfer(DatanodeInfo targets[], Block b) throws IOException { 818 this.curTarget = createSocketAddr(targets[0].getName().toString()); 819 this.targets = targets; 820 this.b = b; 821 this.buf = new byte[2048]; 822 } 823 824 827 public void run() { 828 try { 829 Socket s = new Socket(curTarget.getAddress(), curTarget.getPort()); 830 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); 831 try { 832 long filelen = data.getLength(b); 833 DataInputStream in = new DataInputStream(new BufferedInputStream(data.getBlockData(b))); 834 try { 835 out.write(OP_WRITE_BLOCK); 839 b.write(out); 840 out.writeInt(targets.length); 841 for (int i = 0; i < targets.length; i++) { 842 targets[i].write(out); 843 } 844 out.write(RUNLENGTH_ENCODING); 845 out.writeLong(filelen); 846 847 while (filelen > 0) { 851 int bytesRead = in.read(buf, 0, (int) Math.min(filelen, buf.length)); 852 out.write(buf, 0, bytesRead); 853 filelen -= bytesRead; 854 } 855 } finally { 856 in.close(); 857 } 858 } finally { 859 out.close(); 860 } 861 LOG.info("Replicated block " + b + " to " + curTarget); 862 } catch (IOException ie) { 863 } 864 } 865 } 866 867 869 public static void main(String argv[]) throws IOException { 870 if (argv.length < 3) { 871 System.out.println("NDFS$DataNode <dataDir> <localMachine> <namenode:port>"); 872 System.exit(-1); 873 } 874 875 File dir = new File(argv[0]); 876 String localMachine = argv[1]; 877 String nameNodeStr = argv[2]; 878 int colon = nameNodeStr.indexOf(":"); 879 if (colon < 0) { 880 System.out.println("Incorrect <nameserver:port> param"); 881 System.exit(-1); 882 } 883 String nameNodeName = nameNodeStr.substring(0, colon); 884 int nameNodePort = Integer.parseInt(nameNodeStr.substring(colon + 1)); 885 InetSocketAddress nameNodeAddr = new InetSocketAddress(nameNodeName, nameNodePort); 886 887 DataNode datanode = new DataNode(localMachine, dir, nameNodeAddr); 888 while (true) { 889 try { 890 datanode.offerService(); 891 } catch (Exception ex) { 892 LOG.info("Lost connection to namenode [" + nameNodeAddr + "]. Retrying..."); 893 try { 894 Thread.sleep(5000); 895 } catch (InterruptedException ie) { 896 } 897 } 898 } 899 } 900 } 901 } 902 | Popular Tags |