| 1 2 3 package net.nutch.ndfs; 4 5 import net.nutch.io.*; 6 import net.nutch.fs.*; 7 import net.nutch.ipc.*; 8 import net.nutch.util.*; 9 10 import java.io.*; 11 import java.net.*; 12 import java.util.*; 13 import java.util.logging.*; 14 15 21 public class NDFSClient implements FSConstants { 22 public static final Logger LOG = LogFormatter.getLogger("net.nutch.fs.NDFSClient"); 23 static int BUFFER_SIZE = 4096; 24 NameNodeCaller nameNodeCaller; 25 boolean running = true; 26 Random r = new Random(); 27 UTF8 clientName; 28 Thread leaseChecker; 29 30 31 33 public NDFSClient(InetSocketAddress namenode) { 34 this.nameNodeCaller = new NameNodeCaller(namenode); 35 this.clientName = new UTF8("NDFSClient_" + r.nextInt()); 36 this.leaseChecker = new Thread (new LeaseChecker()); 37 this.leaseChecker.start(); 38 } 39 40 42 public void close() throws IOException { 43 this.running = false; 45 try { 46 leaseChecker.join(); 47 } catch (InterruptedException ie) { 48 } 49 } 50 51 57 public NFSInputStream open(UTF8 src) throws IOException { 58 Object results[] = nameNodeCaller.getBlocksNodes(src); 60 return new NDFSInputStream((Block[]) results[0], (DatanodeInfo[][]) results[1]); 61 } 62 63 68 public NFSOutputStream create(UTF8 src) throws IOException { 69 return create(src, false); 70 } 71 public NFSOutputStream create(UTF8 src, boolean overwrite) throws IOException { 72 return new NDFSOutputStream(src, overwrite); 73 } 74 75 79 public boolean rename(UTF8 src, UTF8 dst) throws IOException { 80 return nameNodeCaller.rename(src, dst); 81 } 82 83 87 public boolean delete(UTF8 src) throws IOException { 88 return nameNodeCaller.delete(src); 89 } 90 91 93 public boolean exists(UTF8 src) throws IOException { 94 return nameNodeCaller.exists(src); 95 } 96 97 99 public boolean isDirectory(UTF8 src) throws IOException { 100 return nameNodeCaller.isDirectory(src); 101 } 102 103 105 public NDFSFileInfo[] listFiles(UTF8 src) throws IOException { 106 return nameNodeCaller.listing(src); 107 } 108 109 111 public long totalRawCapacity() throws IOException { 112 long rawNums[] = nameNodeCaller.rawReport(); 113 return rawNums[0]; 114 } 115 116 118 public long totalRawUsed() throws IOException { 119 long rawNums[] = nameNodeCaller.rawReport(); 120 return rawNums[1]; 121 } 122 123 public DatanodeInfo[] datanodeReport() throws IOException { 124 return nameNodeCaller.datanodeReport(); 125 } 126 127 129 public boolean mkdirs(UTF8 src) throws IOException { 130 return nameNodeCaller.mkdirs(src); 131 } 132 133 135 public void lock(UTF8 src, boolean exclusive) throws IOException { 136 nameNodeCaller.lock(src, exclusive); 137 } 138 139 141 public void release(UTF8 src) throws IOException { 142 nameNodeCaller.release(src); 143 } 144 145 149 private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException { 150 if ((nodes == null) || 151 (nodes.length - deadNodes.size() < 1)) { 152 throw new IOException("No live nodes contain current block"); 153 } 154 DatanodeInfo chosenNode = null; 155 do { 156 chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length]; 157 } while (deadNodes.contains(chosenNode)); 158 return chosenNode; 159 } 160 161 165 class LeaseChecker implements Runnable { 166 168 public void run() { 169 long lastRenewed = 0; 170 while (running) { 171 if (System.currentTimeMillis() - lastRenewed > (LEASE_PERIOD / 2)) { 172 try { 173 FSParam p = new FSParam(OP_CLIENT_RENEW_LEASE, clientName); 174 FSResults r = nameNodeCaller.call(p); 175 lastRenewed = System.currentTimeMillis(); 176 } catch (IOException ie) { 177 } 178 } 179 try { 180 Thread.sleep(1000); 181 } catch (InterruptedException ie) { 182 } 183 } 184 } 185 } 186 187 191 class NDFSInputStream extends NFSInputStream { 192 boolean closed = false; 193 194 private DataInputStream blockStream; 195 private DataOutputStream partnerStream; 196 private Block blocks[]; 197 private int curBlock = 0; 198 private DatanodeInfo nodes[][]; 199 private long pos = 0; 200 private long bytesRemainingInBlock = 0, curBlockSize = 0; 201 202 private int memoryBuf[] = new int[32 * 1024]; 203 private long memoryStartPos = 0; 204 private long openPoint = 0; 205 private int memoryBytes = 0; 206 private int memoryBytesStart = 0; 207 208 210 public NDFSInputStream(Block blocks[], DatanodeInfo nodes[][]) throws IOException { 211 this.blocks = blocks; 212 this.nodes = nodes; 213 this.blockStream = null; 214 this.partnerStream = null; 215 } 216 217 222 private synchronized void nextBlockInputStream() throws IOException { 223 nextBlockInputStream(0); 224 } 225 private synchronized void nextBlockInputStream(long preSkip) throws IOException { 226 if (curBlock >= blocks.length) { 227 throw new IOException("Attempted to read past end of file"); 228 } 229 if (bytesRemainingInBlock > 0) { 230 throw new IOException("Trying to skip to next block without reading all data"); 231 } 232 233 if (blockStream != null) { 234 blockStream.close(); 235 partnerStream.close(); 236 } 237 238 InetSocketAddress target = null; 242 Socket s = null; 243 TreeSet deadNodes = new TreeSet(); 244 while (s == null) { 245 DatanodeInfo chosenNode; 246 247 try { 248 chosenNode = bestNode(nodes[curBlock], deadNodes); 249 target = NDFS.createSocketAddr(chosenNode.getName().toString()); 250 } catch (IOException ie) { 251 LOG.info("Could not obtain block from any node. Retrying..."); 252 try { 253 Thread.sleep(10000); 254 } catch (InterruptedException iex) { 255 } 256 deadNodes.clear(); 257 continue; 258 } 259 try { 260 s = new Socket(target.getAddress(), target.getPort()); 261 LOG.info("Now downloading from " + target + ", block " + blocks[curBlock] + ", skipahead " + preSkip); 262 263 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); 267 out.write(OP_READSKIP_BLOCK); 268 blocks[curBlock].write(out); 269 out.writeLong(preSkip); 270 out.flush(); 271 272 DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream())); 276 curBlockSize = in.readLong(); 277 long amtSkipped = in.readLong(); 278 279 pos += amtSkipped; 280 bytesRemainingInBlock = curBlockSize - amtSkipped; 281 282 if (amtSkipped > 0) { 283 memoryStartPos = pos; 284 memoryBytes = 0; 285 memoryBytesStart = 0; 286 } 287 blockStream = in; 288 partnerStream = out; 289 curBlock++; 290 openPoint = pos; 291 } catch (IOException ex) { 292 LOG.info("Could not connect to " + target); 294 deadNodes.add(chosenNode); 295 s = null; 296 } 297 } 298 } 299 300 302 public synchronized void seek(long pos) throws IOException { 303 if (pos < 0) { 304 throw new IOException("Cannot seek to negative position " + pos); 305 } 306 if (pos == this.pos) { 307 return; 308 } 309 310 if ((pos >= memoryStartPos) && (memoryStartPos + memoryBytes > pos)) { 315 this.pos = pos; 316 } else { 317 if (pos < memoryStartPos && blockStream != null) { 324 blockStream.close(); 325 blockStream = null; 326 partnerStream.close(); 327 partnerStream = null; 328 this.curBlock = 0; 329 this.bytesRemainingInBlock = 0; 330 this.pos = 0; 331 this.memoryStartPos = 0; 332 this.memoryBytes = 0; 333 this.memoryBytesStart = 0; 334 } 339 340 long diff = pos - this.pos; 344 while (diff > 0) { 345 long skipped = skip(diff); 346 if (skipped > 0) { 347 diff -= skipped; 348 } 349 } 350 } 352 } 353 354 357 public synchronized long skip(long skip) throws IOException { 358 long toSkip = 0; 359 long toFastSkip = 0; 360 if (skip > memoryBuf.length) { 361 toSkip = memoryBuf.length; 362 toFastSkip = skip - toSkip; 363 } else { 364 toSkip = skip; 365 } 366 long totalSkipped = 0; 367 368 379 long realBytesRemaining = bytesRemainingInBlock + (memoryBytes - (pos - memoryStartPos)); 380 if (toFastSkip > 0 && realBytesRemaining > 0 && 381 toFastSkip < realBytesRemaining) { 382 383 blockStream.close(); 384 blockStream = null; 385 partnerStream.close(); 386 partnerStream = null; 387 388 long backwardsDistance = curBlockSize - realBytesRemaining; 389 pos -= backwardsDistance; 390 totalSkipped -= backwardsDistance; 391 toFastSkip += backwardsDistance; 392 bytesRemainingInBlock = 0; 393 curBlock--; 394 395 memoryStartPos = pos; 396 memoryBytes = 0; 397 memoryBytesStart = 0; 398 } 399 400 while (toFastSkip > 0 && curBlock < blocks.length) { 405 406 if (bytesRemainingInBlock > 0) { 407 blockStream.close(); 408 blockStream = null; 409 partnerStream.close(); 410 partnerStream = null; 411 412 pos += bytesRemainingInBlock; 413 totalSkipped += bytesRemainingInBlock; 414 toFastSkip -= bytesRemainingInBlock; 415 bytesRemainingInBlock = 0; 416 } 417 418 long oldPos = pos; 419 nextBlockInputStream(toFastSkip); 420 long forwardDistance = (pos - oldPos); 421 totalSkipped += forwardDistance; 422 toFastSkip -= (pos - oldPos); 423 424 memoryStartPos = pos; 425 memoryBytes = 0; 426 memoryBytesStart = 0; 427 } 428 429 if (toFastSkip > 0) { 435 System.err.println("Trying to skip past end of file...."); 436 toFastSkip = 0; 437 } 438 439 totalSkipped += super.skip(toSkip); 444 toSkip = 0; 445 return totalSkipped; 446 } 447 448 450 public synchronized long getPos() throws IOException { 451 return pos; 452 } 453 454 456 public synchronized int available() throws IOException { 457 if (closed) { 458 throw new IOException("Stream closed"); 459 } 460 return (int) Math.min((long) Integer.MAX_VALUE, bytesRemainingInBlock); 461 } 462 463 465 public synchronized void close() throws IOException { 466 if (closed) { 467 throw new IOException("Stream closed"); 468 } 469 470 if (blockStream != null) { 471 blockStream.close(); 472 blockStream = null; 473 partnerStream.close(); 474 } 475 super.close(); 476 closed = true; 477 } 478 479 483 public synchronized int read() throws IOException { 484 if (closed) { 485 throw new IOException("Stream closed"); 486 } 487 488 int b = 0; 489 if (pos - memoryStartPos < memoryBytes) { 490 int diff = (int) (pos - memoryStartPos); 494 495 b = memoryBuf[(memoryBytesStart + diff) % memoryBuf.length]; 499 500 pos++; 504 } else { 505 if (bytesRemainingInBlock == 0) { 506 if (curBlock < blocks.length) { 507 nextBlockInputStream(); 508 } else { 509 return -1; 510 } 511 } 512 b = blockStream.read(); 513 if (b >= 0) { 514 if (memoryBytes == memoryBuf.length) { 518 memoryStartPos++; 519 } 520 521 if (memoryBuf.length > 0) { 522 int target; 523 if (memoryBytes == memoryBuf.length) { 524 target = memoryBytesStart; 525 memoryBytesStart = (memoryBytesStart + 1) % memoryBuf.length; 526 } else { 527 target = (memoryBytesStart + memoryBytes) % memoryBuf.length; 528 memoryBytes++; 529 } 530 memoryBuf[target] = b; 531 } 532 bytesRemainingInBlock--; 533 pos++; 534 } 535 } 536 return b; 537 } 538 539 542 public boolean markSupported() { 543 return false; 544 } 545 public void mark(int readLimit) { 546 } 547 public void reset() throws IOException { 548 throw new IOException("Mark not supported"); 549 } 550 } 551 552 555 class NDFSOutputStream extends NFSOutputStream { 556 boolean closed = false; 557 558 private byte outBuf[] = new byte[BUFFER_SIZE]; 559 private int pos = 0; 560 561 private UTF8 src; 562 private boolean overwrite; 563 private boolean blockStreamWorking; 564 private DataOutputStream blockStream; 565 private DataInputStream blockReplyStream; 566 private File backupFile; 567 private OutputStream backupStream; 568 private Block block; 569 private DatanodeInfo targets[]; 570 private long filePos = 0; 571 private int bytesWrittenToBlock = 0; 572 573 576 public NDFSOutputStream(UTF8 src, boolean overwrite) throws IOException { 577 this.src = src; 578 this.overwrite = overwrite; 579 this.blockStream = null; 580 this.blockReplyStream = null; 581 this.blockStreamWorking = false; 582 this.backupFile = File.createTempFile("ndfsout", "bak"); 583 this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); 584 nextBlockOutputStream(true); 585 } 586 587 592 private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException { 593 if (! firstTime && blockStreamWorking) { 594 blockStream.close(); 595 blockReplyStream.close(); 596 blockStreamWorking = false; 597 } 598 599 boolean retry = false; 600 long start = System.currentTimeMillis(); 601 do { 602 retry = false; 603 Object results[] = nameNodeCaller.getNewOutputBlock(firstTime, overwrite, src); 604 block = (Block) results[0]; 605 DatanodeInfo nodes[] = (DatanodeInfo[]) results[1]; 606 607 InetSocketAddress target = NDFS.createSocketAddr(nodes[0].getName().toString()); 611 Socket s = null; 612 try { 613 s = new Socket(target.getAddress(), target.getPort()); 614 } catch (IOException ie) { 615 try { 617 if (System.currentTimeMillis() - start > 5000) { 618 LOG.info("Waiting to find target node"); 619 } 620 Thread.sleep(6000); 621 } catch (InterruptedException iex) { 622 } 623 nameNodeCaller.abandonBlock(block, src); 624 retry = true; 625 continue; 626 } 627 628 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); 632 out.write(OP_WRITE_BLOCK); 633 block.write(out); 634 out.writeInt(nodes.length); 635 for (int i = 0; i < nodes.length; i++) { 636 nodes[i].write(out); 637 } 638 out.write(CHUNKED_ENCODING); 639 bytesWrittenToBlock = 0; 640 blockStream = out; 641 blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream())); 642 blockStreamWorking = true; 643 } while (retry); 644 } 645 646 649 public synchronized long getPos() throws IOException { 650 return filePos; 651 } 652 653 657 public synchronized void write(int b) throws IOException { 658 if (closed) { 659 throw new IOException("Stream closed"); 660 } 661 662 if ((bytesWrittenToBlock + pos == BLOCK_SIZE) || 663 (pos >= BUFFER_SIZE)) { 664 flush(); 665 } 666 outBuf[pos++] = (byte) b; 667 filePos++; 668 } 669 670 673 public synchronized void flush() throws IOException { 674 if (closed) { 675 throw new IOException("Stream closed"); 676 } 677 678 if (bytesWrittenToBlock + pos >= BLOCK_SIZE) { 679 flushData(BLOCK_SIZE - bytesWrittenToBlock); 680 } 681 if (bytesWrittenToBlock == BLOCK_SIZE) { 682 endBlock(); 683 nextBlockOutputStream(false); 684 } 685 flushData(pos); 686 } 687 688 692 private synchronized void flushData(int maxPos) throws IOException { 693 int workingPos = Math.min(pos, maxPos); 694 695 if (workingPos >= 0) { 696 if (blockStreamWorking) { 700 try { 701 blockStream.writeLong(workingPos); 702 blockStream.write(outBuf, 0, workingPos); 703 } catch (IOException ie) { 704 try { 705 blockStream.close(); 706 } catch (IOException ie2) { 707 } 708 try { 709 blockReplyStream.close(); 710 } catch (IOException ie2) { 711 } 712 nameNodeCaller.abandonBlock(block, src); 713 blockStreamWorking = false; 714 } 715 } 716 backupStream.write(outBuf, 0, workingPos); 720 721 bytesWrittenToBlock += workingPos; 725 System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos); 726 pos -= workingPos; 727 } 728 } 729 730 732 private synchronized void endBlock() throws IOException { 733 boolean mustRecover = ! blockStreamWorking; 734 735 if (blockStreamWorking) { 739 try { 740 blockStream.writeLong(0); 741 blockStream.flush(); 742 743 long complete = blockReplyStream.readLong(); 744 if (complete != WRITE_COMPLETE) { 745 LOG.info("Did not receive WRITE_COMPLETE flag: " + complete); 746 throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete); 747 } 748 blockStream.close(); 749 blockReplyStream.close(); 750 } catch (IOException ie) { 751 try { 752 blockStream.close(); 753 } catch (IOException ie2) { 754 } 755 try { 756 blockReplyStream.close(); 757 } catch (IOException ie2) { 758 } 759 nameNodeCaller.abandonBlock(block, src); 760 mustRecover = true; 761 } finally { 762 blockStreamWorking = false; 763 } 764 } 765 766 backupStream.close(); 770 771 while (mustRecover) { 775 nextBlockOutputStream(false); 776 InputStream in = new FileInputStream(backupFile); 777 try { 778 byte buf[] = new byte[4096]; 779 int bytesRead = in.read(buf); 780 while (bytesRead >= 0) { 781 blockStream.writeLong((long) bytesRead); 782 blockStream.write(buf, 0, bytesRead); 783 bytesRead = in.read(buf); 784 } 785 blockStream.writeLong(0); 786 blockStream.close(); 787 LOG.info("Recovered from failed datanode connection"); 788 mustRecover = false; 789 } catch (IOException ie) { 790 try { 791 blockStream.close(); 792 } catch (IOException ie2) { 793 } 794 try { 795 blockReplyStream.close(); 796 } catch (IOException ie2) { 797 } 798 nameNodeCaller.abandonBlock(block, src); 799 blockStreamWorking = false; 800 } 801 } 802 803 backupFile.delete(); 807 &nb
|