1 2 3 package net.nutch.ndfs; 4 5 import net.nutch.io.*; 6 import net.nutch.fs.*; 7 import net.nutch.ipc.*; 8 import net.nutch.util.*; 9 10 import java.io.*; 11 import java.net.*; 12 import java.util.*; 13 import java.util.logging.*; 14 15 21 public class NDFSClient implements FSConstants { 22 public static final Logger LOG = LogFormatter.getLogger("net.nutch.fs.NDFSClient"); 23 static int BUFFER_SIZE = 4096; 24 NameNodeCaller nameNodeCaller; 25 boolean running = true; 26 Random r = new Random(); 27 UTF8 clientName; 28 Thread leaseChecker; 29 30 31 33 public NDFSClient(InetSocketAddress namenode) { 34 this.nameNodeCaller = new NameNodeCaller(namenode); 35 this.clientName = new UTF8("NDFSClient_" + r.nextInt()); 36 this.leaseChecker = new Thread (new LeaseChecker()); 37 this.leaseChecker.start(); 38 } 39 40 42 public void close() throws IOException { 43 this.running = false; 45 try { 46 leaseChecker.join(); 47 } catch (InterruptedException ie) { 48 } 49 } 50 51 57 public NFSInputStream open(UTF8 src) throws IOException { 58 Object results[] = nameNodeCaller.getBlocksNodes(src); 60 return new NDFSInputStream((Block[]) results[0], (DatanodeInfo[][]) results[1]); 61 } 62 63 68 public NFSOutputStream create(UTF8 src) throws IOException { 69 return create(src, false); 70 } 71 public NFSOutputStream create(UTF8 src, boolean overwrite) throws IOException { 72 return new NDFSOutputStream(src, overwrite); 73 } 74 75 79 public boolean rename(UTF8 src, UTF8 dst) throws IOException { 80 return nameNodeCaller.rename(src, dst); 81 } 82 83 87 public boolean delete(UTF8 src) throws IOException { 88 return nameNodeCaller.delete(src); 89 } 90 91 93 public boolean exists(UTF8 src) throws IOException { 94 return nameNodeCaller.exists(src); 95 } 96 97 99 public boolean isDirectory(UTF8 src) throws IOException { 100 return nameNodeCaller.isDirectory(src); 101 } 102 103 105 public NDFSFileInfo[] listFiles(UTF8 src) throws IOException { 106 return nameNodeCaller.listing(src); 107 } 108 109 111 public long totalRawCapacity() throws IOException { 112 long rawNums[] = nameNodeCaller.rawReport(); 113 return rawNums[0]; 114 } 115 116 118 public long totalRawUsed() throws IOException { 119 long rawNums[] = nameNodeCaller.rawReport(); 120 return rawNums[1]; 121 } 122 123 public DatanodeInfo[] datanodeReport() throws IOException { 124 return nameNodeCaller.datanodeReport(); 125 } 126 127 129 public boolean mkdirs(UTF8 src) throws IOException { 130 return nameNodeCaller.mkdirs(src); 131 } 132 133 135 public void lock(UTF8 src, boolean exclusive) throws IOException { 136 nameNodeCaller.lock(src, exclusive); 137 } 138 139 141 public void release(UTF8 src) throws IOException { 142 nameNodeCaller.release(src); 143 } 144 145 149 private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException { 150 if ((nodes == null) || 151 (nodes.length - deadNodes.size() < 1)) { 152 throw new IOException("No live nodes contain current block"); 153 } 154 DatanodeInfo chosenNode = null; 155 do { 156 chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length]; 157 } while (deadNodes.contains(chosenNode)); 158 return chosenNode; 159 } 160 161 165 class LeaseChecker implements Runnable { 166 168 public void run() { 169 long lastRenewed = 0; 170 while (running) { 171 if (System.currentTimeMillis() - lastRenewed > (LEASE_PERIOD / 2)) { 172 try { 173 FSParam p = new FSParam(OP_CLIENT_RENEW_LEASE, clientName); 174 FSResults r = nameNodeCaller.call(p); 175 lastRenewed = System.currentTimeMillis(); 176 } catch (IOException ie) { 177 } 178 } 179 try { 180 Thread.sleep(1000); 181 } catch (InterruptedException ie) { 182 } 183 } 184 } 185 } 186 187 191 class NDFSInputStream extends NFSInputStream { 192 boolean closed = false; 193 194 private DataInputStream blockStream; 195 private DataOutputStream partnerStream; 196 private Block blocks[]; 197 private int curBlock = 0; 198 private DatanodeInfo nodes[][]; 199 private long pos = 0; 200 private long bytesRemainingInBlock = 0, curBlockSize = 0; 201 202 private int memoryBuf[] = new int[32 * 1024]; 203 private long memoryStartPos = 0; 204 private long openPoint = 0; 205 private int memoryBytes = 0; 206 private int memoryBytesStart = 0; 207 208 210 public NDFSInputStream(Block blocks[], DatanodeInfo nodes[][]) throws IOException { 211 this.blocks = blocks; 212 this.nodes = nodes; 213 this.blockStream = null; 214 this.partnerStream = null; 215 } 216 217 222 private synchronized void nextBlockInputStream() throws IOException { 223 nextBlockInputStream(0); 224 } 225 private synchronized void nextBlockInputStream(long preSkip) throws IOException { 226 if (curBlock >= blocks.length) { 227 throw new IOException("Attempted to read past end of file"); 228 } 229 if (bytesRemainingInBlock > 0) { 230 throw new IOException("Trying to skip to next block without reading all data"); 231 } 232 233 if (blockStream != null) { 234 blockStream.close(); 235 partnerStream.close(); 236 } 237 238 InetSocketAddress target = null; 242 Socket s = null; 243 TreeSet deadNodes = new TreeSet(); 244 while (s == null) { 245 DatanodeInfo chosenNode; 246 247 try { 248 chosenNode = bestNode(nodes[curBlock], deadNodes); 249 target = NDFS.createSocketAddr(chosenNode.getName().toString()); 250 } catch (IOException ie) { 251 LOG.info("Could not obtain block from any node. Retrying..."); 252 try { 253 Thread.sleep(10000); 254 } catch (InterruptedException iex) { 255 } 256 deadNodes.clear(); 257 continue; 258 } 259 try { 260 s = new Socket(target.getAddress(), target.getPort()); 261 LOG.info("Now downloading from " + target + ", block " + blocks[curBlock] + ", skipahead " + preSkip); 262 263 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); 267 out.write(OP_READSKIP_BLOCK); 268 blocks[curBlock].write(out); 269 out.writeLong(preSkip); 270 out.flush(); 271 272 DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream())); 276 curBlockSize = in.readLong(); 277 long amtSkipped = in.readLong(); 278 279 pos += amtSkipped; 280 bytesRemainingInBlock = curBlockSize - amtSkipped; 281 282 if (amtSkipped > 0) { 283 memoryStartPos = pos; 284 memoryBytes = 0; 285 memoryBytesStart = 0; 286 } 287 blockStream = in; 288 partnerStream = out; 289 curBlock++; 290 openPoint = pos; 291 } catch (IOException ex) { 292 LOG.info("Could not connect to " + target); 294 deadNodes.add(chosenNode); 295 s = null; 296 } 297 } 298 } 299 300 302 public synchronized void seek(long pos) throws IOException { 303 if (pos < 0) { 304 throw new IOException("Cannot seek to negative position " + pos); 305 } 306 if (pos == this.pos) { 307 return; 308 } 309 310 if ((pos >= memoryStartPos) && (memoryStartPos + memoryBytes > pos)) { 315 this.pos = pos; 316 } else { 317 if (pos < memoryStartPos && blockStream != null) { 324 blockStream.close(); 325 blockStream = null; 326 partnerStream.close(); 327 partnerStream = null; 328 this.curBlock = 0; 329 this.bytesRemainingInBlock = 0; 330 this.pos = 0; 331 this.memoryStartPos = 0; 332 this.memoryBytes = 0; 333 this.memoryBytesStart = 0; 334 } 339 340 long diff = pos - this.pos; 344 while (diff > 0) { 345 long skipped = skip(diff); 346 if (skipped > 0) { 347 diff -= skipped; 348 } 349 } 350 } 352 } 353 354 357 public synchronized long skip(long skip) throws IOException { 358 long toSkip = 0; 359 long toFastSkip = 0; 360 if (skip > memoryBuf.length) { 361 toSkip = memoryBuf.length; 362 toFastSkip = skip - toSkip; 363 } else { 364 toSkip = skip; 365 } 366 long totalSkipped = 0; 367 368 379 long realBytesRemaining = bytesRemainingInBlock + (memoryBytes - (pos - memoryStartPos)); 380 if (toFastSkip > 0 && realBytesRemaining > 0 && 381 toFastSkip < realBytesRemaining) { 382 383 blockStream.close(); 384 blockStream = null; 385 partnerStream.close(); 386 partnerStream = null; 387 388 long backwardsDistance = curBlockSize - realBytesRemaining; 389 pos -= backwardsDistance; 390 totalSkipped -= backwardsDistance; 391 toFastSkip += backwardsDistance; 392 bytesRemainingInBlock = 0; 393 curBlock--; 394 395 memoryStartPos = pos; 396 memoryBytes = 0; 397 memoryBytesStart = 0; 398 } 399 400 while (toFastSkip > 0 && curBlock < blocks.length) { 405 406 if (bytesRemainingInBlock > 0) { 407 blockStream.close(); 408 blockStream = null; 409 partnerStream.close(); 410 partnerStream = null; 411 412 pos += bytesRemainingInBlock; 413 totalSkipped += bytesRemainingInBlock; 414 toFastSkip -= bytesRemainingInBlock; 415 bytesRemainingInBlock = 0; 416 } 417 418 long oldPos = pos; 419 nextBlockInputStream(toFastSkip); 420 long forwardDistance = (pos - oldPos); 421 totalSkipped += forwardDistance; 422 toFastSkip -= (pos - oldPos); 423 424 memoryStartPos = pos; 425 memoryBytes = 0; 426 memoryBytesStart = 0; 427 } 428 429 if (toFastSkip > 0) { 435 System.err.println("Trying to skip past end of file...."); 436 toFastSkip = 0; 437 } 438 439 totalSkipped += super.skip(toSkip); 444 toSkip = 0; 445 return totalSkipped; 446 } 447 448 450 public synchronized long getPos() throws IOException { 451 return pos; 452 } 453 454 456 public synchronized int available() throws IOException { 457 if (closed) { 458 throw new IOException("Stream closed"); 459 } 460 return (int) Math.min((long) Integer.MAX_VALUE, bytesRemainingInBlock); 461 } 462 463 465 public synchronized void close() throws IOException { 466 if (closed) { 467 throw new IOException("Stream closed"); 468 } 469 470 if (blockStream != null) { 471 blockStream.close(); 472 blockStream = null; 473 partnerStream.close(); 474 } 475 super.close(); 476 closed = true; 477 } 478 479 483 public synchronized int read() throws IOException { 484 if (closed) { 485 throw new IOException("Stream closed"); 486 } 487 488 int b = 0; 489 if (pos - memoryStartPos < memoryBytes) { 490 int diff = (int) (pos - memoryStartPos); 494 495 b = memoryBuf[(memoryBytesStart + diff) % memoryBuf.length]; 499 500 pos++; 504 } else { 505 if (bytesRemainingInBlock == 0) { 506 if (curBlock < blocks.length) { 507 nextBlockInputStream(); 508 } else { 509 return -1; 510 } 511 } 512 b = blockStream.read(); 513 if (b >= 0) { 514 if (memoryBytes == memoryBuf.length) { 518 memoryStartPos++; 519 } 520 521 if (memoryBuf.length > 0) { 522 int target; 523 if (memoryBytes == memoryBuf.length) { 524 target = memoryBytesStart; 525 memoryBytesStart = (memoryBytesStart + 1) % memoryBuf.length; 526 } else { 527 target = (memoryBytesStart + memoryBytes) % memoryBuf.length; 528 memoryBytes++; 529 } 530 memoryBuf[target] = b; 531 } 532 bytesRemainingInBlock--; 533 pos++; 534 } 535 } 536 return b; 537 } 538 539 542 public boolean markSupported() { 543 return false; 544 } 545 public void mark(int readLimit) { 546 } 547 public void reset() throws IOException { 548 throw new IOException("Mark not supported"); 549 } 550 } 551 552 555 class NDFSOutputStream extends NFSOutputStream { 556 boolean closed = false; 557 558 private byte outBuf[] = new byte[BUFFER_SIZE]; 559 private int pos = 0; 560 561 private UTF8 src; 562 private boolean overwrite; 563 private boolean blockStreamWorking; 564 private DataOutputStream blockStream; 565 private DataInputStream blockReplyStream; 566 private File backupFile; 567 private OutputStream backupStream; 568 private Block block; 569 private DatanodeInfo targets[]; 570 private long filePos = 0; 571 private int bytesWrittenToBlock = 0; 572 573 576 public NDFSOutputStream(UTF8 src, boolean overwrite) throws IOException { 577 this.src = src; 578 this.overwrite = overwrite; 579 this.blockStream = null; 580 this.blockReplyStream = null; 581 this.blockStreamWorking = false; 582 this.backupFile = File.createTempFile("ndfsout", "bak"); 583 this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); 584 nextBlockOutputStream(true); 585 } 586 587 592 private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException { 593 if (! firstTime && blockStreamWorking) { 594 blockStream.close(); 595 blockReplyStream.close(); 596 blockStreamWorking = false; 597 } 598 599 boolean retry = false; 600 long start = System.currentTimeMillis(); 601 do { 602 retry = false; 603 Object results[] = nameNodeCaller.getNewOutputBlock(firstTime, overwrite, src); 604 block = (Block) results[0]; 605 DatanodeInfo nodes[] = (DatanodeInfo[]) results[1]; 606 607 InetSocketAddress target = NDFS.createSocketAddr(nodes[0].getName().toString()); 611 Socket s = null; 612 try { 613 s = new Socket(target.getAddress(), target.getPort()); 614 } catch (IOException ie) { 615 try { 617 if (System.currentTimeMillis() - start > 5000) { 618 LOG.info("Waiting to find target node"); 619 } 620 Thread.sleep(6000); 621 } catch (InterruptedException iex) { 622 } 623 nameNodeCaller.abandonBlock(block, src); 624 retry = true; 625 continue; 626 } 627 628 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); 632 out.write(OP_WRITE_BLOCK); 633 block.write(out); 634 out.writeInt(nodes.length); 635 for (int i = 0; i < nodes.length; i++) { 636 nodes[i].write(out); 637 } 638 out.write(CHUNKED_ENCODING); 639 bytesWrittenToBlock = 0; 640 blockStream = out; 641 blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream())); 642 blockStreamWorking = true; 643 } while (retry); 644 } 645 646 649 public synchronized long getPos() throws IOException { 650 return filePos; 651 } 652 653 657 public synchronized void write(int b) throws IOException { 658 if (closed) { 659 throw new IOException("Stream closed"); 660 } 661 662 if ((bytesWrittenToBlock + pos == BLOCK_SIZE) || 663 (pos >= BUFFER_SIZE)) { 664 flush(); 665 } 666 outBuf[pos++] = (byte) b; 667 filePos++; 668 } 669 670 673 public synchronized void flush() throws IOException { 674 if (closed) { 675 throw new IOException("Stream closed"); 676 } 677 678 if (bytesWrittenToBlock + pos >= BLOCK_SIZE) { 679 flushData(BLOCK_SIZE - bytesWrittenToBlock); 680 } 681 if (bytesWrittenToBlock == BLOCK_SIZE) { 682 endBlock(); 683 nextBlockOutputStream(false); 684 } 685 flushData(pos); 686 } 687 688 692 private synchronized void flushData(int maxPos) throws IOException { 693 int workingPos = Math.min(pos, maxPos); 694 695 if (workingPos >= 0) { 696 if (blockStreamWorking) { 700 try { 701 blockStream.writeLong(workingPos); 702 blockStream.write(outBuf, 0, workingPos); 703 } catch (IOException ie) { 704 try { 705 blockStream.close(); 706 } catch (IOException ie2) { 707 } 708 try { 709 blockReplyStream.close(); 710 } catch (IOException ie2) { 711 } 712 nameNodeCaller.abandonBlock(block, src); 713 blockStreamWorking = false; 714 } 715 } 716 backupStream.write(outBuf, 0, workingPos); 720 721 bytesWrittenToBlock += workingPos; 725 System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos); 726 pos -= workingPos; 727 } 728 } 729 730 732 private synchronized void endBlock() throws IOException { 733 boolean mustRecover = ! blockStreamWorking; 734 735 if (blockStreamWorking) { 739 try { 740 blockStream.writeLong(0); 741 blockStream.flush(); 742 743 long complete = blockReplyStream.readLong(); 744 if (complete != WRITE_COMPLETE) { 745 LOG.info("Did not receive WRITE_COMPLETE flag: " + complete); 746 throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete); 747 } 748 blockStream.close(); 749 blockReplyStream.close(); 750 } catch (IOException ie) { 751 try { 752 blockStream.close(); 753 } catch (IOException ie2) { 754 } 755 try { 756 blockReplyStream.close(); 757 } catch (IOException ie2) { 758 } 759 nameNodeCaller.abandonBlock(block, src); 760 mustRecover = true; 761 } finally { 762 blockStreamWorking = false; 763 } 764 } 765 766 backupStream.close(); 770 771 while (mustRecover) { 775 nextBlockOutputStream(false); 776 InputStream in = new FileInputStream(backupFile); 777 try { 778 byte buf[] = new byte[4096]; 779 int bytesRead = in.read(buf); 780 while (bytesRead >= 0) { 781 blockStream.writeLong((long) bytesRead); 782 blockStream.write(buf, 0, bytesRead); 783 bytesRead = in.read(buf); 784 } 785 blockStream.writeLong(0); 786 blockStream.close(); 787 LOG.info("Recovered from failed datanode connection"); 788 mustRecover = false; 789 } catch (IOException ie) { 790 try { 791 blockStream.close(); 792 } catch (IOException ie2) { 793 } 794 try { 795 blockReplyStream.close(); 796 } catch (IOException ie2) { 797 } 798 nameNodeCaller.abandonBlock(block, src); 799 blockStreamWorking = false; 800 } 801 } 802 803 backupFile.delete(); 807 backupFile = File.createTempFile("ndfsout", "bak"); 808 backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); 809 } 810 811 815 public synchronized void close() throws IOException { 816 if (closed) { 817 throw new IOException("Stream closed"); 818 } 819 820 flush(); 821 endBlock(); 822 823 backupStream.close(); 824 backupFile.delete(); 825 826 if (blockStreamWorking) { 827 blockStream.close(); 828 blockReplyStream.close(); 829 blockStreamWorking = false; 830 } 831 super.close(); 832 833 nameNodeCaller.completeFile(src); 834 closed = true; 835 } 836 } 837 838 845 private class NameNodeCaller { 846 private net.nutch.ipc.Client client; 847 private InetSocketAddress namenode; 848 849 852 public NameNodeCaller(InetSocketAddress namenode) { 853 this.client = new net.nutch.ipc.Client(FSResults.class); 854 this.namenode = namenode; 855 } 856 857 860 public FSResults call(FSParam p) throws IOException { 861 return (FSResults) call(p, namenode); 862 } 863 864 private synchronized FSResults call(FSParam p, InetSocketAddress target) throws IOException { 865 FSResults results = null; 866 while (results == null) { 867 try { 868 results = (FSResults) client.call(p, target); 869 } catch (IOException ie) { 870 long start = System.currentTimeMillis(); 871 LOG.info("Problem making IPC call on " + target); 872 client.stop(); 873 long end = System.currentTimeMillis(); 874 if (end - start < 15000) { 875 try { 876 Thread.sleep(15000 - (end - start)); 877 } catch (InterruptedException iex) { 878 } 879 } 880 LOG.info("Restarting client"); 881 client = new net.nutch.ipc.Client(FSResults.class); 882 } 883 } 884 return results; 885 } 886 887 891 public Object [] getNewOutputBlock(boolean newFile, boolean overwrite, UTF8 src) throws IOException { 892 long start = System.currentTimeMillis(); 893 FSParam p = null; 894 FSResults r = null; 895 boolean blockComplete = false; 896 while (! blockComplete) { 897 UTF8 nameParams[] = new UTF8[2]; 898 nameParams[0] = src; 899 nameParams[1] = clientName; 900 if (newFile) { 901 p = new FSParam(OP_CLIENT_STARTFILE, new ArrayWritable(UTF8.class, nameParams), new BooleanWritable(overwrite)); 902 } else { 903 p = new FSParam(OP_CLIENT_ADDBLOCK, src); 904 } 905 r = (FSResults) call(p, namenode); 906 if (! r.success()) { 907 throw new IOException("Could not obtain new output block for file " + src); 908 } else if (r.tryagain()) { 909 try { 910 Thread.sleep(400); 911 if (System.currentTimeMillis() - start > 5000) { 912 LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis() - start) + "ms"); 913 } 914 } catch (InterruptedException ie) { 915 } 916 } else { 917 blockComplete = true; 918 } 919 } 920 Block b = (Block) r.first; 921 DatanodeInfo targets[] = (DatanodeInfo[]) ((ArrayWritable) r.second).toArray(); 922 923 Object results[] = new Object [2]; 924 results[0] = b; 925 results[1] = targets; 926 return results; 927 } 928 929 931 public void abandonBlock(Block b, UTF8 src) throws IOException { 932 FSParam p = null; 933 FSResults r = null; 934 p = new FSParam(OP_CLIENT_ABANDONBLOCK, b, src); 935 r = (FSResults) call(p, namenode); 936 if (! r.success()) { 937 throw new IOException("Block " + b + " has already been committed."); 938 } 939 } 940 941 944 public Object [] getBlocksNodes(UTF8 src) throws IOException { 945 FSParam p = new FSParam(OP_CLIENT_OPEN, src); 946 FSResults r = (FSResults) call(p, namenode); 947 if (! r.success()) { 948 throw new IOException("Could not open file " + src); 949 } else { 950 Block blocks[] = (Block[]) ((ArrayWritable) r.first).toArray(); 951 DatanodeInfo nodes[][] = (DatanodeInfo[][]) ((TwoDArrayWritable) r.second).toArray(); 952 Object results[] = new Object [2]; 953 results[0] = blocks; 954 results[1] = nodes; 955 return results; 956 } 957 } 958 959 963 public boolean rename(UTF8 src, UTF8 dst) throws IOException{ 964 FSParam p = new FSParam(OP_CLIENT_RENAMETO, src, dst); 965 FSResults r = (FSResults) call(p, namenode); 966 return r.success(); 967 } 968 969 973 public boolean delete(UTF8 src) throws IOException { 974 FSParam p = new FSParam(OP_CLIENT_DELETE, src); 975 FSResults r = (FSResults) call(p, namenode); 976 return r.success(); 977 } 978 979 982 public boolean exists(UTF8 src) throws IOException { 983 FSParam p = new FSParam(OP_CLIENT_EXISTS, src); 984 FSResults r = (FSResults) call(p, namenode); 985 return r.success(); 986 } 987 988 991 public boolean isDirectory(UTF8 src) throws IOException { 992 FSParam p = new FSParam(OP_CLIENT_ISDIR, src); 993 FSResults r = (FSResults) call(p, namenode); 994 return r.success(); 995 } 996 997 999 public boolean mkdirs(UTF8 src) throws IOException { 1000 FSParam p = new FSParam(OP_CLIENT_MKDIRS, src); 1001 FSResults r = (FSResults) call(p, namenode); 1002 return r.success(); 1003 } 1004 1005 1009 public void lock(UTF8 src, boolean exclusive) throws IOException { 1010 long start = System.currentTimeMillis(); 1011 boolean complete = false; 1012 1013 while (! complete) { 1014 UTF8 nameParams[] = new UTF8[2]; 1015 nameParams[0] = src; 1016 nameParams[1] = clientName; 1017 FSParam p = new FSParam(OP_CLIENT_OBTAINLOCK, new ArrayWritable(UTF8.class, nameParams), new BooleanWritable(exclusive)); 1018 FSResults r = (FSResults) call(p, namenode); 1019 if (! r.success()) { 1020 throw new IOException("Could not obtain lock " + src); 1021 } else if (r.tryagain()) { 1022 try { 1023 Thread.sleep(400); 1024 if (System.currentTimeMillis() - start > 5000) { 1025 LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms."); 1026 Thread.sleep(2000); 1027 } 1028 } catch (InterruptedException ie) { 1029 } 1030 } else { 1031 complete = true; 1032 } 1033 } 1034 } 1035 1036 1039 public void release(UTF8 src) throws IOException { 1040 boolean complete = false; 1041 while (! complete) { 1042 UTF8 nameParams[] = new UTF8[2]; 1043 nameParams[0] = src; 1044 nameParams[1] = clientName; 1045 FSParam p = new FSParam(OP_CLIENT_RELEASELOCK, new ArrayWritable(UTF8.class, nameParams)); 1046 FSResults r = (FSResults) call(p, namenode); 1047 if (! r.success()) { 1048 throw new IOException("Could not release lock " + src); 1049 } else if (r.tryagain()) { 1050 LOG.info("Could not release. Retrying..."); 1051 try { 1052 Thread.sleep(2000); 1053 } catch (InterruptedException ie) { 1054 } 1055 } else { 1056 complete = true; 1057 } 1058 } 1059 } 1060 1061 1064 public NDFSFileInfo[] listing(UTF8 src) throws IOException { 1065 FSParam p = new FSParam(OP_CLIENT_LISTING, src); 1066 FSResults r = (FSResults) call(p, namenode); 1067 if (r.success()) { 1068 return (NDFSFileInfo[]) ((ArrayWritable) r.first).toArray(); 1069 } else { 1070 return null; 1071 } 1072 } 1073 1074 1077 public long[] rawReport() throws IOException { 1078 long results[] = null; 1079 FSParam p = new FSParam(OP_CLIENT_RAWSTATS); 1080 FSResults r = (FSResults) call(p, namenode); 1081 if (r.success()) { 1082 LongWritable report[] = (LongWritable[]) ((ArrayWritable) r.first).toArray(); 1083 results = new long[report.length]; 1084 for (int i = 0; i < report.length; i++) { 1085 results[i] = report[i].get(); 1086 } 1087 } 1088 return results; 1089 } 1090 1091 1093 public DatanodeInfo[] datanodeReport() throws IOException { 1094 FSParam p = new FSParam(OP_CLIENT_DATANODEREPORT); 1095 FSResults r = (FSResults) call(p, namenode); 1096 if (r.success()) { 1097 return (DatanodeInfo[]) ((ArrayWritable) r.first).toArray(); 1098 } else { 1099 return null; 1100 } 1101 } 1102 1103 1107 public void completeFile(UTF8 src) throws IOException { 1108 long start = System.currentTimeMillis(); 1109 boolean fileComplete = false; 1110 UTF8 nameParams[] = new UTF8[2]; 1111 nameParams[0] = src; 1112 nameParams[1] = clientName; 1113 1114 while (! fileComplete) { 1115 FSParam p = new FSParam(OP_CLIENT_COMPLETEFILE, new ArrayWritable(UTF8.class, nameParams)); 1116 FSResults r = (FSResults) call(p, namenode); 1117 if (! r.success()) { 1118 throw new IOException("Could not complete file " + src); 1119 } else if (r.tryagain()) { 1120 try { 1121 Thread.sleep(400); 1122 if (System.currentTimeMillis() - start > 5000) { 1123 LOG.info("Could not complete file, retrying..."); 1124 } 1125 } catch (InterruptedException ie) { 1126 } 1127 } else { 1128 fileComplete = true; 1129 } 1130 } 1131 } 1132 } 1133} 1134 | Popular Tags |