KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > ndfs > NDFSClient


1 /* Copyright (c) 2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3 package net.nutch.ndfs;
4
5 import net.nutch.io.*;
6 import net.nutch.fs.*;
7 import net.nutch.ipc.*;
8 import net.nutch.util.*;
9
10 import java.io.*;
11 import java.net.*;
12 import java.util.*;
13 import java.util.logging.*;
14
15 /********************************************************
16  * NDFSClient does what's necessary to connect to a Nutch Filesystem
17  * and perform basic file tasks.
18  *
19  * @author Mike Cafarella, Tessa MacDuff
20  ********************************************************/

21 public class NDFSClient implements FSConstants {
22     public static final Logger LOG = LogFormatter.getLogger("net.nutch.fs.NDFSClient");
23     static int BUFFER_SIZE = 4096;
24     NameNodeCaller nameNodeCaller;
25     boolean running = true;
26     Random r = new Random();
27     UTF8 clientName;
28     Thread JavaDoc leaseChecker;
29
30
31     /**
32      */

33     public NDFSClient(InetSocketAddress namenode) {
34         this.nameNodeCaller = new NameNodeCaller(namenode);
35         this.clientName = new UTF8("NDFSClient_" + r.nextInt());
36         this.leaseChecker = new Thread JavaDoc(new LeaseChecker());
37         this.leaseChecker.start();
38     }
39
40     /**
41      */

42     public void close() throws IOException {
43         //nameNodeCaller.stop();
44
this.running = false;
45         try {
46             leaseChecker.join();
47         } catch (InterruptedException JavaDoc ie) {
48         }
49     }
50
51     /**
52      * Create an input stream that obtains a nodelist from the
53      * namenode, and then reads from all the right places. Creates
54      * inner subclass of InputStream that does the right out-of-band
55      * work.
56      */

57     public NFSInputStream open(UTF8 src) throws IOException {
58         // Get block info from namenode
59
Object JavaDoc results[] = nameNodeCaller.getBlocksNodes(src);
60         return new NDFSInputStream((Block[]) results[0], (DatanodeInfo[][]) results[1]);
61     }
62
63     /**
64      * Create an output stream that writes to all the right places.
65      * Basically creates instance of inner subclass of OutputStream
66      * that handles datanode/namenode negotiation.
67      */

68     public NFSOutputStream create(UTF8 src) throws IOException {
69         return create(src, false);
70     }
71     public NFSOutputStream create(UTF8 src, boolean overwrite) throws IOException {
72         return new NDFSOutputStream(src, overwrite);
73     }
74
75     /**
76      * Make a direct connection to namenode and manipulate structures
77      * there.
78      */

79     public boolean rename(UTF8 src, UTF8 dst) throws IOException {
80         return nameNodeCaller.rename(src, dst);
81     }
82
83     /**
84      * Make a direct connection to namenode and manipulate structures
85      * there.
86      */

87     public boolean delete(UTF8 src) throws IOException {
88         return nameNodeCaller.delete(src);
89     }
90
91     /**
92      */

93     public boolean exists(UTF8 src) throws IOException {
94         return nameNodeCaller.exists(src);
95     }
96
97     /**
98      */

99     public boolean isDirectory(UTF8 src) throws IOException {
100         return nameNodeCaller.isDirectory(src);
101     }
102
103     /**
104      */

105     public NDFSFileInfo[] listFiles(UTF8 src) throws IOException {
106         return nameNodeCaller.listing(src);
107     }
108
109     /**
110      */

111     public long totalRawCapacity() throws IOException {
112         long rawNums[] = nameNodeCaller.rawReport();
113         return rawNums[0];
114     }
115
116     /**
117      */

118     public long totalRawUsed() throws IOException {
119         long rawNums[] = nameNodeCaller.rawReport();
120         return rawNums[1];
121     }
122
123     public DatanodeInfo[] datanodeReport() throws IOException {
124         return nameNodeCaller.datanodeReport();
125     }
126
127     /**
128      */

129     public boolean mkdirs(UTF8 src) throws IOException {
130         return nameNodeCaller.mkdirs(src);
131     }
132
133     /**
134      */

135     public void lock(UTF8 src, boolean exclusive) throws IOException {
136         nameNodeCaller.lock(src, exclusive);
137     }
138
139     /**
140      */

141     public void release(UTF8 src) throws IOException {
142         nameNodeCaller.release(src);
143     }
144
145     /**
146      * Pick the best/closest node which to stream the data.
147      * For now, just pick the first on the list.
148      */

149     private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
150         if ((nodes == null) ||
151             (nodes.length - deadNodes.size() < 1)) {
152             throw new IOException("No live nodes contain current block");
153         }
154         DatanodeInfo chosenNode = null;
155         do {
156             chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length];
157         } while (deadNodes.contains(chosenNode));
158         return chosenNode;
159     }
160
161     /***************************************************************
162      * If any leases are outstanding, periodically check in with the
163      * namenode and renew all the leases.
164      ***************************************************************/

165     class LeaseChecker implements Runnable JavaDoc {
166         /**
167          */

168         public void run() {
169             long lastRenewed = 0;
170             while (running) {
171                 if (System.currentTimeMillis() - lastRenewed > (LEASE_PERIOD / 2)) {
172                     try {
173                         FSParam p = new FSParam(OP_CLIENT_RENEW_LEASE, clientName);
174                         FSResults r = nameNodeCaller.call(p);
175                         lastRenewed = System.currentTimeMillis();
176                     } catch (IOException ie) {
177                     }
178                 }
179                 try {
180                     Thread.sleep(1000);
181                 } catch (InterruptedException JavaDoc ie) {
182                 }
183             }
184         }
185     }
186
187     /****************************************************************
188      * NDFSInputStream provides bytes from a named file. It handles
189      * negotiation of the namenode and various datanodes as necessary.
190      ****************************************************************/

191     class NDFSInputStream extends NFSInputStream {
192         boolean closed = false;
193
194         private DataInputStream blockStream;
195         private DataOutputStream partnerStream;
196         private Block blocks[];
197         private int curBlock = 0;
198         private DatanodeInfo nodes[][];
199         private long pos = 0;
200         private long bytesRemainingInBlock = 0, curBlockSize = 0;
201
202         private int memoryBuf[] = new int[32 * 1024];
203         private long memoryStartPos = 0;
204         private long openPoint = 0;
205         private int memoryBytes = 0;
206         private int memoryBytesStart = 0;
207
208         /**
209          */

210         public NDFSInputStream(Block blocks[], DatanodeInfo nodes[][]) throws IOException {
211             this.blocks = blocks;
212             this.nodes = nodes;
213             this.blockStream = null;
214             this.partnerStream = null;
215         }
216
217         /**
218          * Open a DataInputStream to a DataNode so that it can be written to.
219          * This happens when a file is created and each time a new block is allocated.
220          * Must get block ID and the IDs of the destinations from the namenode.
221          */

222         private synchronized void nextBlockInputStream() throws IOException {
223             nextBlockInputStream(0);
224         }
225         private synchronized void nextBlockInputStream(long preSkip) throws IOException {
226             if (curBlock >= blocks.length) {
227                 throw new IOException("Attempted to read past end of file");
228             }
229             if (bytesRemainingInBlock > 0) {
230                 throw new IOException("Trying to skip to next block without reading all data");
231             }
232
233             if (blockStream != null) {
234                 blockStream.close();
235                 partnerStream.close();
236             }
237
238             //
239
// Connect to best DataNode for current Block
240
//
241
InetSocketAddress target = null;
242             Socket s = null;
243             TreeSet deadNodes = new TreeSet();
244             while (s == null) {
245                 DatanodeInfo chosenNode;
246
247                 try {
248                     chosenNode = bestNode(nodes[curBlock], deadNodes);
249                     target = NDFS.createSocketAddr(chosenNode.getName().toString());
250                 } catch (IOException ie) {
251                     LOG.info("Could not obtain block from any node. Retrying...");
252                     try {
253                         Thread.sleep(10000);
254                     } catch (InterruptedException JavaDoc iex) {
255                     }
256                     deadNodes.clear();
257                     continue;
258                 }
259                 try {
260                     s = new Socket(target.getAddress(), target.getPort());
261                     LOG.info("Now downloading from " + target + ", block " + blocks[curBlock] + ", skipahead " + preSkip);
262
263                     //
264
// Xmit header info to datanode
265
//
266
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
267                     out.write(OP_READSKIP_BLOCK);
268                     blocks[curBlock].write(out);
269                     out.writeLong(preSkip);
270                     out.flush();
271
272                     //
273
// Get bytes in block, set streams
274
//
275
DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
276                     curBlockSize = in.readLong();
277                     long amtSkipped = in.readLong();
278
279                     pos += amtSkipped;
280                     bytesRemainingInBlock = curBlockSize - amtSkipped;
281
282                     if (amtSkipped > 0) {
283                         memoryStartPos = pos;
284                         memoryBytes = 0;
285                         memoryBytesStart = 0;
286                     }
287                     blockStream = in;
288                     partnerStream = out;
289                     curBlock++;
290                     openPoint = pos;
291                 } catch (IOException ex) {
292                     // Put chosen node into dead list, continue
293
LOG.info("Could not connect to " + target);
294                     deadNodes.add(chosenNode);
295                     s = null;
296                 }
297             }
298         }
299
300         /**
301          */

302         public synchronized void seek(long pos) throws IOException {
303             if (pos < 0) {
304                 throw new IOException("Cannot seek to negative position " + pos);
305             }
306             if (pos == this.pos) {
307                 return;
308             }
309
310             //
311
// If we have remembered enough bytes to seek backwards to the
312
// desired pos, we can do so easily
313
//
314
if ((pos >= memoryStartPos) && (memoryStartPos + memoryBytes > pos)) {
315                 this.pos = pos;
316             } else {
317                 //
318
// If we are seeking backwards (and *don't* have enough memory bytes)
319
// we need to reset the NDFS streams. They will be reopened upon the
320
// next call to nextBlockInputStream(). After this operation, all
321
// seeks will be "forwardSeeks".
322
//
323
if (pos < memoryStartPos && blockStream != null) {
324                     blockStream.close();
325                     blockStream = null;
326                     partnerStream.close();
327                     partnerStream = null;
328                     this.curBlock = 0;
329                     this.bytesRemainingInBlock = 0;
330                     this.pos = 0;
331                     this.memoryStartPos = 0;
332                     this.memoryBytes = 0;
333                     this.memoryBytesStart = 0;
334                     //
335
// REMIND - this could be made more efficient, to just
336
// skip back block-by-block
337
//
338
}
339
340                 //
341
// Now read ahead to the desired position.
342
//
343
long diff = pos - this.pos;
344                 while (diff > 0) {
345                     long skipped = skip(diff);
346                     if (skipped > 0) {
347                         diff -= skipped;
348                     }
349                 }
350                 // Pos will be incremented by skip()
351
}
352         }
353
354         /**
355          * Skip ahead some number of bytes
356          */

357         public synchronized long skip(long skip) throws IOException {
358             long toSkip = 0;
359             long toFastSkip = 0;
360             if (skip > memoryBuf.length) {
361                 toSkip = memoryBuf.length;
362                 toFastSkip = skip - toSkip;
363             } else {
364                 toSkip = skip;
365             }
366             long totalSkipped = 0;
367
368             //
369
// If there's a lot of fast-skipping to do within the current block,
370
// close it and reopen, so we can fast-skip to the target
371
//
372
/**
373             while (toFastSkip > 0) {
374                 long amtSkipped = super.skip(toFastSkip);
375                 toFastSkip -= amtSkipped;
376                 totalSkipped += amtSkipped;
377             }
378             **/

379             long realBytesRemaining = bytesRemainingInBlock + (memoryBytes - (pos - memoryStartPos));
380             if (toFastSkip > 0 && realBytesRemaining > 0 &&
381                 toFastSkip < realBytesRemaining) {
382
383                 blockStream.close();
384                 blockStream = null;
385                 partnerStream.close();
386                 partnerStream = null;
387
388                 long backwardsDistance = curBlockSize - realBytesRemaining;
389                 pos -= backwardsDistance;
390                 totalSkipped -= backwardsDistance;
391                 toFastSkip += backwardsDistance;
392                 bytesRemainingInBlock = 0;
393                 curBlock--;
394
395                 memoryStartPos = pos;
396                 memoryBytes = 0;
397                 memoryBytesStart = 0;
398             }
399
400             //
401
// If there's any fast-skipping to do, we do it by opening a
402
// new block and telling the datanode how many bytes to skip.
403
//
404
while (toFastSkip > 0 && curBlock < blocks.length) {
405
406                 if (bytesRemainingInBlock > 0) {
407                     blockStream.close();
408                     blockStream = null;
409                     partnerStream.close();
410                     partnerStream = null;
411
412                     pos += bytesRemainingInBlock;
413                     totalSkipped += bytesRemainingInBlock;
414                     toFastSkip -= bytesRemainingInBlock;
415                     bytesRemainingInBlock = 0;
416                 }
417
418                 long oldPos = pos;
419                 nextBlockInputStream(toFastSkip);
420                 long forwardDistance = (pos - oldPos);
421                 totalSkipped += forwardDistance;
422                 toFastSkip -= (pos - oldPos);
423
424                 memoryStartPos = pos;
425                 memoryBytes = 0;
426                 memoryBytesStart = 0;
427             }
428
429             //
430
// If there's any remaining toFastSkip, well, there's
431
// not much we can do about it. We're at the end of
432
// the stream!
433
//
434
if (toFastSkip > 0) {
435                 System.err.println("Trying to skip past end of file....");
436                 toFastSkip = 0;
437             }
438
439             //
440
// Do a slow skip as we approach, so we can fill the client
441
// history buffer
442
//
443
totalSkipped += super.skip(toSkip);
444             toSkip = 0;
445             return totalSkipped;
446         }
447
448         /**
449          */

450         public synchronized long getPos() throws IOException {
451             return pos;
452         }
453
454         /**
455          */

456         public synchronized int available() throws IOException {
457             if (closed) {
458                 throw new IOException("Stream closed");
459             }
460             return (int) Math.min((long) Integer.MAX_VALUE, bytesRemainingInBlock);
461         }
462
463         /**
464          */

465         public synchronized void close() throws IOException {
466             if (closed) {
467                 throw new IOException("Stream closed");
468             }
469
470             if (blockStream != null) {
471                 blockStream.close();
472                 blockStream = null;
473                 partnerStream.close();
474             }
475             super.close();
476             closed = true;
477         }
478
479         /**
480          * Other read() functions are implemented in terms of
481          * this one.
482          */

483         public synchronized int read() throws IOException {
484             if (closed) {
485                 throw new IOException("Stream closed");
486             }
487
488             int b = 0;
489             if (pos - memoryStartPos < memoryBytes) {
490                 //
491
// Move the memoryStartPos up to current pos, if necessary.
492
//
493
int diff = (int) (pos - memoryStartPos);
494
495                 //
496
// Fetch the byte
497
//
498
b = memoryBuf[(memoryBytesStart + diff) % memoryBuf.length];
499
500                 //
501
// Bump the pos
502
//
503
pos++;
504             } else {
505                 if (bytesRemainingInBlock == 0) {
506                     if (curBlock < blocks.length) {
507                         nextBlockInputStream();
508                     } else {
509                         return -1;
510                     }
511                 }
512                 b = blockStream.read();
513                 if (b >= 0) {
514                     //
515
// Remember byte so we can seek backwards at some later time
516
//
517
if (memoryBytes == memoryBuf.length) {
518                         memoryStartPos++;
519                     }
520
521                     if (memoryBuf.length > 0) {
522                         int target;
523                         if (memoryBytes == memoryBuf.length) {
524                             target = memoryBytesStart;
525                             memoryBytesStart = (memoryBytesStart + 1) % memoryBuf.length;
526                         } else {
527                             target = (memoryBytesStart + memoryBytes) % memoryBuf.length;
528                             memoryBytes++;
529                         }
530                         memoryBuf[target] = b;
531                     }
532                     bytesRemainingInBlock--;
533                     pos++;
534                 }
535             }
536             return b;
537         }
538
539         /**
540          * We definitely don't support marks
541          */

542         public boolean markSupported() {
543             return false;
544         }
545         public void mark(int readLimit) {
546         }
547         public void reset() throws IOException {
548             throw new IOException("Mark not supported");
549         }
550     }
551
552     /****************************************************************
553      * NDFSOutputStream creates files from a stream of bytes.
554      ****************************************************************/

555     class NDFSOutputStream extends NFSOutputStream {
556         boolean closed = false;
557
558         private byte outBuf[] = new byte[BUFFER_SIZE];
559         private int pos = 0;
560
561         private UTF8 src;
562         private boolean overwrite;
563         private boolean blockStreamWorking;
564         private DataOutputStream blockStream;
565         private DataInputStream blockReplyStream;
566         private File backupFile;
567         private OutputStream backupStream;
568         private Block block;
569         private DatanodeInfo targets[];
570         private long filePos = 0;
571         private int bytesWrittenToBlock = 0;
572
573         /**
574          * Create a new output stream to the given DataNode.
575          */

576         public NDFSOutputStream(UTF8 src, boolean overwrite) throws IOException {
577             this.src = src;
578             this.overwrite = overwrite;
579             this.blockStream = null;
580             this.blockReplyStream = null;
581             this.blockStreamWorking = false;
582             this.backupFile = File.createTempFile("ndfsout", "bak");
583             this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
584             nextBlockOutputStream(true);
585         }
586
587         /**
588          * Open a DataOutputStream to a DataNode so that it can be written to.
589          * This happens when a file is created and each time a new block is allocated.
590          * Must get block ID and the IDs of the destinations from the namenode.
591          */

592         private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException {
593             if (! firstTime && blockStreamWorking) {
594                 blockStream.close();
595                 blockReplyStream.close();
596                 blockStreamWorking = false;
597             }
598
599             boolean retry = false;
600             long start = System.currentTimeMillis();
601             do {
602                 retry = false;
603                 Object JavaDoc results[] = nameNodeCaller.getNewOutputBlock(firstTime, overwrite, src);
604                 block = (Block) results[0];
605                 DatanodeInfo nodes[] = (DatanodeInfo[]) results[1];
606
607                 //
608
// Connect to first DataNode in the list. Abort if this fails.
609
//
610
InetSocketAddress target = NDFS.createSocketAddr(nodes[0].getName().toString());
611                 Socket s = null;
612                 try {
613                     s = new Socket(target.getAddress(), target.getPort());
614                 } catch (IOException ie) {
615                     // Connection failed. Let's wait a little bit and retry
616
try {
617                         if (System.currentTimeMillis() - start > 5000) {
618                             LOG.info("Waiting to find target node");
619                         }
620                         Thread.sleep(6000);
621                     } catch (InterruptedException JavaDoc iex) {
622                     }
623                     nameNodeCaller.abandonBlock(block, src);
624                     retry = true;
625                     continue;
626                 }
627
628                 //
629
// Xmit header info to datanode
630
//
631
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
632                 out.write(OP_WRITE_BLOCK);
633                 block.write(out);
634                 out.writeInt(nodes.length);
635                 for (int i = 0; i < nodes.length; i++) {
636                     nodes[i].write(out);
637                 }
638                 out.write(CHUNKED_ENCODING);
639                 bytesWrittenToBlock = 0;
640                 blockStream = out;
641                 blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
642                 blockStreamWorking = true;
643             } while (retry);
644         }
645
646         /**
647          * We're referring to the file pos here
648          */

649         public synchronized long getPos() throws IOException {
650             return filePos;
651         }
652             
653         /**
654          * Writes the specified byte to this output stream.
655          * This is the only write method that needs to be implemented.
656          */

657         public synchronized void write(int b) throws IOException {
658             if (closed) {
659                 throw new IOException("Stream closed");
660             }
661
662             if ((bytesWrittenToBlock + pos == BLOCK_SIZE) ||
663                 (pos >= BUFFER_SIZE)) {
664                 flush();
665             }
666             outBuf[pos++] = (byte) b;
667             filePos++;
668         }
669
670         /**
671          * Flush the buffer, getting a stream to a new block if necessary.
672          */

673         public synchronized void flush() throws IOException {
674             if (closed) {
675                 throw new IOException("Stream closed");
676             }
677
678             if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {
679                 flushData(BLOCK_SIZE - bytesWrittenToBlock);
680             }
681             if (bytesWrittenToBlock == BLOCK_SIZE) {
682                 endBlock();
683                 nextBlockOutputStream(false);
684             }
685             flushData(pos);
686         }
687
688         /**
689          * Actually flush the accumulated bytes to the remote node,
690          * but no more bytes than the indicated number.
691          */

692         private synchronized void flushData(int maxPos) throws IOException {
693             int workingPos = Math.min(pos, maxPos);
694             
695             if (workingPos >= 0) {
696                 //
697
// To the blockStream, write length, then bytes
698
//
699
if (blockStreamWorking) {
700                     try {
701                         blockStream.writeLong(workingPos);
702                         blockStream.write(outBuf, 0, workingPos);
703                     } catch (IOException ie) {
704                         try {
705                             blockStream.close();
706                         } catch (IOException ie2) {
707                         }
708                         try {
709                             blockReplyStream.close();
710                         } catch (IOException ie2) {
711                         }
712                         nameNodeCaller.abandonBlock(block, src);
713                         blockStreamWorking = false;
714                     }
715                 }
716                 //
717
// To the local block backup, write just the bytes
718
//
719
backupStream.write(outBuf, 0, workingPos);
720
721                 //
722
// Track position
723
//
724
bytesWrittenToBlock += workingPos;
725                 System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
726                 pos -= workingPos;
727             }
728         }
729
730         /**
731          */

732         private synchronized void endBlock() throws IOException {
733             boolean mustRecover = ! blockStreamWorking;
734
735             //
736
// A zero-length set of data indicates the end of the block
737
//
738
if (blockStreamWorking) {
739                 try {
740                     blockStream.writeLong(0);
741                     blockStream.flush();
742
743                     long complete = blockReplyStream.readLong();
744                     if (complete != WRITE_COMPLETE) {
745                         LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
746                         throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
747                     }
748                     blockStream.close();
749                     blockReplyStream.close();
750                 } catch (IOException ie) {
751                     try {
752                         blockStream.close();
753                     } catch (IOException ie2) {
754                     }
755                     try {
756                         blockReplyStream.close();
757                     } catch (IOException ie2) {
758                     }
759                     nameNodeCaller.abandonBlock(block, src);
760                     mustRecover = true;
761                 } finally {
762                     blockStreamWorking = false;
763                 }
764             }
765
766             //
767
// Done with local copy
768
//
769
backupStream.close();
770
771             //
772
// If necessary, recover from a failed datanode connection.
773
//
774
while (mustRecover) {
775                 nextBlockOutputStream(false);
776                 InputStream in = new FileInputStream(backupFile);
777                 try {
778                     byte buf[] = new byte[4096];
779                     int bytesRead = in.read(buf);
780                     while (bytesRead >= 0) {
781                         blockStream.writeLong((long) bytesRead);
782                         blockStream.write(buf, 0, bytesRead);
783                         bytesRead = in.read(buf);
784                     }
785                     blockStream.writeLong(0);
786                     blockStream.close();
787                     LOG.info("Recovered from failed datanode connection");
788                     mustRecover = false;
789                 } catch (IOException ie) {
790                     try {
791                         blockStream.close();
792                     } catch (IOException ie2) {
793                     }
794                     try {
795                         blockReplyStream.close();
796                     } catch (IOException ie2) {
797                     }
798                     nameNodeCaller.abandonBlock(block, src);
799                     blockStreamWorking = false;
800                 }
801             }
802
803             //
804
// Delete local backup, start new one
805
//
806
backupFile.delete();
807        &nb