KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > ndfs > NDFSClient


1 /* Copyright (c) 2004 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3 package net.nutch.ndfs;
4
5 import net.nutch.io.*;
6 import net.nutch.fs.*;
7 import net.nutch.ipc.*;
8 import net.nutch.util.*;
9
10 import java.io.*;
11 import java.net.*;
12 import java.util.*;
13 import java.util.logging.*;
14
15 /********************************************************
16  * NDFSClient does what's necessary to connect to a Nutch Filesystem
17  * and perform basic file tasks.
18  *
19  * @author Mike Cafarella, Tessa MacDuff
20  ********************************************************/

21 public class NDFSClient implements FSConstants {
22     public static final Logger LOG = LogFormatter.getLogger("net.nutch.fs.NDFSClient");
23     static int BUFFER_SIZE = 4096;
24     NameNodeCaller nameNodeCaller;
25     boolean running = true;
26     Random r = new Random();
27     UTF8 clientName;
28     Thread JavaDoc leaseChecker;
29
30
31     /**
32      */

33     public NDFSClient(InetSocketAddress namenode) {
34         this.nameNodeCaller = new NameNodeCaller(namenode);
35         this.clientName = new UTF8("NDFSClient_" + r.nextInt());
36         this.leaseChecker = new Thread JavaDoc(new LeaseChecker());
37         this.leaseChecker.start();
38     }
39
40     /**
41      */

42     public void close() throws IOException {
43         //nameNodeCaller.stop();
44
this.running = false;
45         try {
46             leaseChecker.join();
47         } catch (InterruptedException JavaDoc ie) {
48         }
49     }
50
51     /**
52      * Create an input stream that obtains a nodelist from the
53      * namenode, and then reads from all the right places. Creates
54      * inner subclass of InputStream that does the right out-of-band
55      * work.
56      */

57     public NFSInputStream open(UTF8 src) throws IOException {
58         // Get block info from namenode
59
Object JavaDoc results[] = nameNodeCaller.getBlocksNodes(src);
60         return new NDFSInputStream((Block[]) results[0], (DatanodeInfo[][]) results[1]);
61     }
62
63     /**
64      * Create an output stream that writes to all the right places.
65      * Basically creates instance of inner subclass of OutputStream
66      * that handles datanode/namenode negotiation.
67      */

68     public NFSOutputStream create(UTF8 src) throws IOException {
69         return create(src, false);
70     }
71     public NFSOutputStream create(UTF8 src, boolean overwrite) throws IOException {
72         return new NDFSOutputStream(src, overwrite);
73     }
74
75     /**
76      * Make a direct connection to namenode and manipulate structures
77      * there.
78      */

79     public boolean rename(UTF8 src, UTF8 dst) throws IOException {
80         return nameNodeCaller.rename(src, dst);
81     }
82
83     /**
84      * Make a direct connection to namenode and manipulate structures
85      * there.
86      */

87     public boolean delete(UTF8 src) throws IOException {
88         return nameNodeCaller.delete(src);
89     }
90
91     /**
92      */

93     public boolean exists(UTF8 src) throws IOException {
94         return nameNodeCaller.exists(src);
95     }
96
97     /**
98      */

99     public boolean isDirectory(UTF8 src) throws IOException {
100         return nameNodeCaller.isDirectory(src);
101     }
102
103     /**
104      */

105     public NDFSFileInfo[] listFiles(UTF8 src) throws IOException {
106         return nameNodeCaller.listing(src);
107     }
108
109     /**
110      */

111     public long totalRawCapacity() throws IOException {
112         long rawNums[] = nameNodeCaller.rawReport();
113         return rawNums[0];
114     }
115
116     /**
117      */

118     public long totalRawUsed() throws IOException {
119         long rawNums[] = nameNodeCaller.rawReport();
120         return rawNums[1];
121     }
122
123     public DatanodeInfo[] datanodeReport() throws IOException {
124         return nameNodeCaller.datanodeReport();
125     }
126
127     /**
128      */

129     public boolean mkdirs(UTF8 src) throws IOException {
130         return nameNodeCaller.mkdirs(src);
131     }
132
133     /**
134      */

135     public void lock(UTF8 src, boolean exclusive) throws IOException {
136         nameNodeCaller.lock(src, exclusive);
137     }
138
139     /**
140      */

141     public void release(UTF8 src) throws IOException {
142         nameNodeCaller.release(src);
143     }
144
145     /**
146      * Pick the best/closest node which to stream the data.
147      * For now, just pick the first on the list.
148      */

149     private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
150         if ((nodes == null) ||
151             (nodes.length - deadNodes.size() < 1)) {
152             throw new IOException("No live nodes contain current block");
153         }
154         DatanodeInfo chosenNode = null;
155         do {
156             chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length];
157         } while (deadNodes.contains(chosenNode));
158         return chosenNode;
159     }
160
161     /***************************************************************
162      * If any leases are outstanding, periodically check in with the
163      * namenode and renew all the leases.
164      ***************************************************************/

165     class LeaseChecker implements Runnable JavaDoc {
166         /**
167          */

168         public void run() {
169             long lastRenewed = 0;
170             while (running) {
171                 if (System.currentTimeMillis() - lastRenewed > (LEASE_PERIOD / 2)) {
172                     try {
173                         FSParam p = new FSParam(OP_CLIENT_RENEW_LEASE, clientName);
174                         FSResults r = nameNodeCaller.call(p);
175                         lastRenewed = System.currentTimeMillis();
176                     } catch (IOException ie) {
177                     }
178                 }
179                 try {
180                     Thread.sleep(1000);
181                 } catch (InterruptedException JavaDoc ie) {
182                 }
183             }
184         }
185     }
186
187     /****************************************************************
188      * NDFSInputStream provides bytes from a named file. It handles
189      * negotiation of the namenode and various datanodes as necessary.
190      ****************************************************************/

191     class NDFSInputStream extends NFSInputStream {
192         boolean closed = false;
193
194         private DataInputStream blockStream;
195         private DataOutputStream partnerStream;
196         private Block blocks[];
197         private int curBlock = 0;
198         private DatanodeInfo nodes[][];
199         private long pos = 0;
200         private long bytesRemainingInBlock = 0, curBlockSize = 0;
201
202         private int memoryBuf[] = new int[32 * 1024];
203         private long memoryStartPos = 0;
204         private long openPoint = 0;
205         private int memoryBytes = 0;
206         private int memoryBytesStart = 0;
207
208         /**
209          */

210         public NDFSInputStream(Block blocks[], DatanodeInfo nodes[][]) throws IOException {
211             this.blocks = blocks;
212             this.nodes = nodes;
213             this.blockStream = null;
214             this.partnerStream = null;
215         }
216
217         /**
218          * Open a DataInputStream to a DataNode so that it can be written to.
219          * This happens when a file is created and each time a new block is allocated.
220          * Must get block ID and the IDs of the destinations from the namenode.
221          */

222         private synchronized void nextBlockInputStream() throws IOException {
223             nextBlockInputStream(0);
224         }
225         private synchronized void nextBlockInputStream(long preSkip) throws IOException {
226             if (curBlock >= blocks.length) {
227                 throw new IOException("Attempted to read past end of file");
228             }
229             if (bytesRemainingInBlock > 0) {
230                 throw new IOException("Trying to skip to next block without reading all data");
231             }
232
233             if (blockStream != null) {
234                 blockStream.close();
235                 partnerStream.close();
236             }
237
238             //
239
// Connect to best DataNode for current Block
240
//
241
InetSocketAddress target = null;
242             Socket s = null;
243             TreeSet deadNodes = new TreeSet();
244             while (s == null) {
245                 DatanodeInfo chosenNode;
246
247                 try {
248                     chosenNode = bestNode(nodes[curBlock], deadNodes);
249                     target = NDFS.createSocketAddr(chosenNode.getName().toString());
250                 } catch (IOException ie) {
251                     LOG.info("Could not obtain block from any node. Retrying...");
252                     try {
253                         Thread.sleep(10000);
254                     } catch (InterruptedException JavaDoc iex) {
255                     }
256                     deadNodes.clear();
257                     continue;
258                 }
259                 try {
260                     s = new Socket(target.getAddress(), target.getPort());
261                     LOG.info("Now downloading from " + target + ", block " + blocks[curBlock] + ", skipahead " + preSkip);
262
263                     //
264
// Xmit header info to datanode
265
//
266
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
267                     out.write(OP_READSKIP_BLOCK);
268                     blocks[curBlock].write(out);
269                     out.writeLong(preSkip);
270                     out.flush();
271
272                     //
273
// Get bytes in block, set streams
274
//
275
DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
276                     curBlockSize = in.readLong();
277                     long amtSkipped = in.readLong();
278
279                     pos += amtSkipped;
280                     bytesRemainingInBlock = curBlockSize - amtSkipped;
281
282                     if (amtSkipped > 0) {
283                         memoryStartPos = pos;
284                         memoryBytes = 0;
285                         memoryBytesStart = 0;
286                     }
287                     blockStream = in;
288                     partnerStream = out;
289                     curBlock++;
290                     openPoint = pos;
291                 } catch (IOException ex) {
292                     // Put chosen node into dead list, continue
293
LOG.info("Could not connect to " + target);
294                     deadNodes.add(chosenNode);
295                     s = null;
296                 }
297             }
298         }
299
300         /**
301          */

302         public synchronized void seek(long pos) throws IOException {
303             if (pos < 0) {
304                 throw new IOException("Cannot seek to negative position " + pos);
305             }
306             if (pos == this.pos) {
307                 return;
308             }
309
310             //
311
// If we have remembered enough bytes to seek backwards to the
312
// desired pos, we can do so easily
313
//
314
if ((pos >= memoryStartPos) && (memoryStartPos + memoryBytes > pos)) {
315                 this.pos = pos;
316             } else {
317                 //
318
// If we are seeking backwards (and *don't* have enough memory bytes)
319
// we need to reset the NDFS streams. They will be reopened upon the
320
// next call to nextBlockInputStream(). After this operation, all
321
// seeks will be "forwardSeeks".
322
//
323
if (pos < memoryStartPos && blockStream != null) {
324                     blockStream.close();
325                     blockStream = null;
326                     partnerStream.close();
327                     partnerStream = null;
328                     this.curBlock = 0;
329                     this.bytesRemainingInBlock = 0;
330                     this.pos = 0;
331                     this.memoryStartPos = 0;
332                     this.memoryBytes = 0;
333                     this.memoryBytesStart = 0;
334                     //
335
// REMIND - this could be made more efficient, to just
336
// skip back block-by-block
337
//
338
}
339
340                 //
341
// Now read ahead to the desired position.
342
//
343
long diff = pos - this.pos;
344                 while (diff > 0) {
345                     long skipped = skip(diff);
346                     if (skipped > 0) {
347                         diff -= skipped;
348                     }
349                 }
350                 // Pos will be incremented by skip()
351
}
352         }
353
354         /**
355          * Skip ahead some number of bytes
356          */

357         public synchronized long skip(long skip) throws IOException {
358             long toSkip = 0;
359             long toFastSkip = 0;
360             if (skip > memoryBuf.length) {
361                 toSkip = memoryBuf.length;
362                 toFastSkip = skip - toSkip;
363             } else {
364                 toSkip = skip;
365             }
366             long totalSkipped = 0;
367
368             //
369
// If there's a lot of fast-skipping to do within the current block,
370
// close it and reopen, so we can fast-skip to the target
371
//
372
/**
373             while (toFastSkip > 0) {
374                 long amtSkipped = super.skip(toFastSkip);
375                 toFastSkip -= amtSkipped;
376                 totalSkipped += amtSkipped;
377             }
378             **/

379             long realBytesRemaining = bytesRemainingInBlock + (memoryBytes - (pos - memoryStartPos));
380             if (toFastSkip > 0 && realBytesRemaining > 0 &&
381                 toFastSkip < realBytesRemaining) {
382
383                 blockStream.close();
384                 blockStream = null;
385                 partnerStream.close();
386                 partnerStream = null;
387
388                 long backwardsDistance = curBlockSize - realBytesRemaining;
389                 pos -= backwardsDistance;
390                 totalSkipped -= backwardsDistance;
391                 toFastSkip += backwardsDistance;
392                 bytesRemainingInBlock = 0;
393                 curBlock--;
394
395                 memoryStartPos = pos;
396                 memoryBytes = 0;
397                 memoryBytesStart = 0;
398             }
399
400             //
401
// If there's any fast-skipping to do, we do it by opening a
402
// new block and telling the datanode how many bytes to skip.
403
//
404
while (toFastSkip > 0 && curBlock < blocks.length) {
405
406                 if (bytesRemainingInBlock > 0) {
407                     blockStream.close();
408                     blockStream = null;
409                     partnerStream.close();
410                     partnerStream = null;
411
412                     pos += bytesRemainingInBlock;
413                     totalSkipped += bytesRemainingInBlock;
414                     toFastSkip -= bytesRemainingInBlock;
415                     bytesRemainingInBlock = 0;
416                 }
417
418                 long oldPos = pos;
419                 nextBlockInputStream(toFastSkip);
420                 long forwardDistance = (pos - oldPos);
421                 totalSkipped += forwardDistance;
422                 toFastSkip -= (pos - oldPos);
423
424                 memoryStartPos = pos;
425                 memoryBytes = 0;
426                 memoryBytesStart = 0;
427             }
428
429             //
430
// If there's any remaining toFastSkip, well, there's
431
// not much we can do about it. We're at the end of
432
// the stream!
433
//
434
if (toFastSkip > 0) {
435                 System.err.println("Trying to skip past end of file....");
436                 toFastSkip = 0;
437             }
438
439             //
440
// Do a slow skip as we approach, so we can fill the client
441
// history buffer
442
//
443
totalSkipped += super.skip(toSkip);
444             toSkip = 0;
445             return totalSkipped;
446         }
447
448         /**
449          */

450         public synchronized long getPos() throws IOException {
451             return pos;
452         }
453
454         /**
455          */

456         public synchronized int available() throws IOException {
457             if (closed) {
458                 throw new IOException("Stream closed");
459             }
460             return (int) Math.min((long) Integer.MAX_VALUE, bytesRemainingInBlock);
461         }
462
463         /**
464          */

465         public synchronized void close() throws IOException {
466             if (closed) {
467                 throw new IOException("Stream closed");
468             }
469
470             if (blockStream != null) {
471                 blockStream.close();
472                 blockStream = null;
473                 partnerStream.close();
474             }
475             super.close();
476             closed = true;
477         }
478
479         /**
480          * Other read() functions are implemented in terms of
481          * this one.
482          */

483         public synchronized int read() throws IOException {
484             if (closed) {
485                 throw new IOException("Stream closed");
486             }
487
488             int b = 0;
489             if (pos - memoryStartPos < memoryBytes) {
490                 //
491
// Move the memoryStartPos up to current pos, if necessary.
492
//
493
int diff = (int) (pos - memoryStartPos);
494
495                 //
496
// Fetch the byte
497
//
498
b = memoryBuf[(memoryBytesStart + diff) % memoryBuf.length];
499
500                 //
501
// Bump the pos
502
//
503
pos++;
504             } else {
505                 if (bytesRemainingInBlock == 0) {
506                     if (curBlock < blocks.length) {
507                         nextBlockInputStream();
508                     } else {
509                         return -1;
510                     }
511                 }
512                 b = blockStream.read();
513                 if (b >= 0) {
514                     //
515
// Remember byte so we can seek backwards at some later time
516
//
517
if (memoryBytes == memoryBuf.length) {
518                         memoryStartPos++;
519                     }
520
521                     if (memoryBuf.length > 0) {
522                         int target;
523                         if (memoryBytes == memoryBuf.length) {
524                             target = memoryBytesStart;
525                             memoryBytesStart = (memoryBytesStart + 1) % memoryBuf.length;
526                         } else {
527                             target = (memoryBytesStart + memoryBytes) % memoryBuf.length;
528                             memoryBytes++;
529                         }
530                         memoryBuf[target] = b;
531                     }
532                     bytesRemainingInBlock--;
533                     pos++;
534                 }
535             }
536             return b;
537         }
538
539         /**
540          * We definitely don't support marks
541          */

542         public boolean markSupported() {
543             return false;
544         }
545         public void mark(int readLimit) {
546         }
547         public void reset() throws IOException {
548             throw new IOException("Mark not supported");
549         }
550     }
551
552     /****************************************************************
553      * NDFSOutputStream creates files from a stream of bytes.
554      ****************************************************************/

555     class NDFSOutputStream extends NFSOutputStream {
556         boolean closed = false;
557
558         private byte outBuf[] = new byte[BUFFER_SIZE];
559         private int pos = 0;
560
561         private UTF8 src;
562         private boolean overwrite;
563         private boolean blockStreamWorking;
564         private DataOutputStream blockStream;
565         private DataInputStream blockReplyStream;
566         private File backupFile;
567         private OutputStream backupStream;
568         private Block block;
569         private DatanodeInfo targets[];
570         private long filePos = 0;
571         private int bytesWrittenToBlock = 0;
572
573         /**
574          * Create a new output stream to the given DataNode.
575          */

576         public NDFSOutputStream(UTF8 src, boolean overwrite) throws IOException {
577             this.src = src;
578             this.overwrite = overwrite;
579             this.blockStream = null;
580             this.blockReplyStream = null;
581             this.blockStreamWorking = false;
582             this.backupFile = File.createTempFile("ndfsout", "bak");
583             this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
584             nextBlockOutputStream(true);
585         }
586
587         /**
588          * Open a DataOutputStream to a DataNode so that it can be written to.
589          * This happens when a file is created and each time a new block is allocated.
590          * Must get block ID and the IDs of the destinations from the namenode.
591          */

592         private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException {
593             if (! firstTime && blockStreamWorking) {
594                 blockStream.close();
595                 blockReplyStream.close();
596                 blockStreamWorking = false;
597             }
598
599             boolean retry = false;
600             long start = System.currentTimeMillis();
601             do {
602                 retry = false;
603                 Object JavaDoc results[] = nameNodeCaller.getNewOutputBlock(firstTime, overwrite, src);
604                 block = (Block) results[0];
605                 DatanodeInfo nodes[] = (DatanodeInfo[]) results[1];
606
607                 //
608
// Connect to first DataNode in the list. Abort if this fails.
609
//
610
InetSocketAddress target = NDFS.createSocketAddr(nodes[0].getName().toString());
611                 Socket s = null;
612                 try {
613                     s = new Socket(target.getAddress(), target.getPort());
614                 } catch (IOException ie) {
615                     // Connection failed. Let's wait a little bit and retry
616
try {
617                         if (System.currentTimeMillis() - start > 5000) {
618                             LOG.info("Waiting to find target node");
619                         }
620                         Thread.sleep(6000);
621                     } catch (InterruptedException JavaDoc iex) {
622                     }
623                     nameNodeCaller.abandonBlock(block, src);
624                     retry = true;
625                     continue;
626                 }
627
628                 //
629
// Xmit header info to datanode
630
//
631
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
632                 out.write(OP_WRITE_BLOCK);
633                 block.write(out);
634                 out.writeInt(nodes.length);
635                 for (int i = 0; i < nodes.length; i++) {
636                     nodes[i].write(out);
637                 }
638                 out.write(CHUNKED_ENCODING);
639                 bytesWrittenToBlock = 0;
640                 blockStream = out;
641                 blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
642                 blockStreamWorking = true;
643             } while (retry);
644         }
645
646         /**
647          * We're referring to the file pos here
648          */

649         public synchronized long getPos() throws IOException {
650             return filePos;
651         }
652             
653         /**
654          * Writes the specified byte to this output stream.
655          * This is the only write method that needs to be implemented.
656          */

657         public synchronized void write(int b) throws IOException {
658             if (closed) {
659                 throw new IOException("Stream closed");
660             }
661
662             if ((bytesWrittenToBlock + pos == BLOCK_SIZE) ||
663                 (pos >= BUFFER_SIZE)) {
664                 flush();
665             }
666             outBuf[pos++] = (byte) b;
667             filePos++;
668         }
669
670         /**
671          * Flush the buffer, getting a stream to a new block if necessary.
672          */

673         public synchronized void flush() throws IOException {
674             if (closed) {
675                 throw new IOException("Stream closed");
676             }
677
678             if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {
679                 flushData(BLOCK_SIZE - bytesWrittenToBlock);
680             }
681             if (bytesWrittenToBlock == BLOCK_SIZE) {
682                 endBlock();
683                 nextBlockOutputStream(false);
684             }
685             flushData(pos);
686         }
687
688         /**
689          * Actually flush the accumulated bytes to the remote node,
690          * but no more bytes than the indicated number.
691          */

692         private synchronized void flushData(int maxPos) throws IOException {
693             int workingPos = Math.min(pos, maxPos);
694             
695             if (workingPos >= 0) {
696                 //
697
// To the blockStream, write length, then bytes
698
//
699
if (blockStreamWorking) {
700                     try {
701                         blockStream.writeLong(workingPos);
702                         blockStream.write(outBuf, 0, workingPos);
703                     } catch (IOException ie) {
704                         try {
705                             blockStream.close();
706                         } catch (IOException ie2) {
707                         }
708                         try {
709                             blockReplyStream.close();
710                         } catch (IOException ie2) {
711                         }
712                         nameNodeCaller.abandonBlock(block, src);
713                         blockStreamWorking = false;
714                     }
715                 }
716                 //
717
// To the local block backup, write just the bytes
718
//
719
backupStream.write(outBuf, 0, workingPos);
720
721                 //
722
// Track position
723
//
724
bytesWrittenToBlock += workingPos;
725                 System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
726                 pos -= workingPos;
727             }
728         }
729
730         /**
731          */

732         private synchronized void endBlock() throws IOException {
733             boolean mustRecover = ! blockStreamWorking;
734
735             //
736
// A zero-length set of data indicates the end of the block
737
//
738
if (blockStreamWorking) {
739                 try {
740                     blockStream.writeLong(0);
741                     blockStream.flush();
742
743                     long complete = blockReplyStream.readLong();
744                     if (complete != WRITE_COMPLETE) {
745                         LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
746                         throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
747                     }
748                     blockStream.close();
749                     blockReplyStream.close();
750                 } catch (IOException ie) {
751                     try {
752                         blockStream.close();
753                     } catch (IOException ie2) {
754                     }
755                     try {
756                         blockReplyStream.close();
757                     } catch (IOException ie2) {
758                     }
759                     nameNodeCaller.abandonBlock(block, src);
760                     mustRecover = true;
761                 } finally {
762                     blockStreamWorking = false;
763                 }
764             }
765
766             //
767
// Done with local copy
768
//
769
backupStream.close();
770
771             //
772
// If necessary, recover from a failed datanode connection.
773
//
774
while (mustRecover) {
775                 nextBlockOutputStream(false);
776                 InputStream in = new FileInputStream(backupFile);
777                 try {
778                     byte buf[] = new byte[4096];
779                     int bytesRead = in.read(buf);
780                     while (bytesRead >= 0) {
781                         blockStream.writeLong((long) bytesRead);
782                         blockStream.write(buf, 0, bytesRead);
783                         bytesRead = in.read(buf);
784                     }
785                     blockStream.writeLong(0);
786                     blockStream.close();
787                     LOG.info("Recovered from failed datanode connection");
788                     mustRecover = false;
789                 } catch (IOException ie) {
790                     try {
791                         blockStream.close();
792                     } catch (IOException ie2) {
793                     }
794                     try {
795                         blockReplyStream.close();
796                     } catch (IOException ie2) {
797                     }
798                     nameNodeCaller.abandonBlock(block, src);
799                     blockStreamWorking = false;
800                 }
801             }
802
803             //
804
// Delete local backup, start new one
805
//
806
backupFile.delete();
807             backupFile = File.createTempFile("ndfsout", "bak");
808             backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
809         }
810
811         /**
812          * Closes this output stream and releases any system
813          * resources associated with this stream.
814          */

815         public synchronized void close() throws IOException {
816             if (closed) {
817                 throw new IOException("Stream closed");
818             }
819
820             flush();
821             endBlock();
822
823             backupStream.close();
824             backupFile.delete();
825
826             if (blockStreamWorking) {
827                 blockStream.close();
828                 blockReplyStream.close();
829                 blockStreamWorking = false;
830             }
831             super.close();
832
833             nameNodeCaller.completeFile(src);
834             closed = true;
835         }
836     }
837
838     /******************************************************************
839      * Handles the IPC calls to the nameNode.
840      * This keeps the IPC methods hidden from a the users of FSClient.
841      *
842      * I imagine that there will be a NameNodeCaller in the FSClient as
843      * well as each Stream.
844      *******************************************************************/

845     private class NameNodeCaller {
846         private net.nutch.ipc.Client client;
847         private InetSocketAddress namenode;
848         
849         /**
850          * Constructor takes the Socket Address of the NameNode.
851          */

852         public NameNodeCaller(InetSocketAddress namenode) {
853             this.client = new net.nutch.ipc.Client(FSResults.class);
854             this.namenode = namenode;
855         }
856         
857         /**
858          * General-purpose call
859          */

860         public FSResults call(FSParam p) throws IOException {
861             return (FSResults) call(p, namenode);
862         }
863
864         private synchronized FSResults call(FSParam p, InetSocketAddress target) throws IOException {
865             FSResults results = null;
866             while (results == null) {
867                 try {
868                     results = (FSResults) client.call(p, target);
869                 } catch (IOException ie) {
870                     long start = System.currentTimeMillis();
871                     LOG.info("Problem making IPC call on " + target);
872                     client.stop();
873                     long end = System.currentTimeMillis();
874                     if (end - start < 15000) {
875                         try {
876                             Thread.sleep(15000 - (end - start));
877                         } catch (InterruptedException JavaDoc iex) {
878                         }
879                     }
880                     LOG.info("Restarting client");
881                     client = new net.nutch.ipc.Client(FSResults.class);
882                 }
883             }
884             return results;
885         }
886
887         /**
888          * Calls the nameNode to get a new block. Returns the blockID
889          * and resets the given destination nodes.
890          */

891         public Object JavaDoc[] getNewOutputBlock(boolean newFile, boolean overwrite, UTF8 src) throws IOException {
892             long start = System.currentTimeMillis();
893             FSParam p = null;
894             FSResults r = null;
895             boolean blockComplete = false;
896             while (! blockComplete) {
897                 UTF8 nameParams[] = new UTF8[2];
898                 nameParams[0] = src;
899                 nameParams[1] = clientName;
900                 if (newFile) {
901                     p = new FSParam(OP_CLIENT_STARTFILE, new ArrayWritable(UTF8.class, nameParams), new BooleanWritable(overwrite));
902                 } else {
903                     p = new FSParam(OP_CLIENT_ADDBLOCK, src);
904                 }
905                 r = (FSResults) call(p, namenode);
906                 if (! r.success()) {
907                     throw new IOException("Could not obtain new output block for file " + src);
908                 } else if (r.tryagain()) {
909                     try {
910                         Thread.sleep(400);
911                         if (System.currentTimeMillis() - start > 5000) {
912                             LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis() - start) + "ms");
913                         }
914                     } catch (InterruptedException JavaDoc ie) {
915                     }
916                 } else {
917                     blockComplete = true;
918                 }
919             }
920             Block b = (Block) r.first;
921             DatanodeInfo targets[] = (DatanodeInfo[]) ((ArrayWritable) r.second).toArray();
922
923             Object JavaDoc results[] = new Object JavaDoc[2];
924             results[0] = b;
925             results[1] = targets;
926             return results;
927         }
928
929         /**
930          */

931         public void abandonBlock(Block b, UTF8 src) throws IOException {
932             FSParam p = null;
933             FSResults r = null;
934             p = new FSParam(OP_CLIENT_ABANDONBLOCK, b, src);
935             r = (FSResults) call(p, namenode);
936             if (! r.success()) {
937                 throw new IOException("Block " + b + " has already been committed.");
938             }
939         }
940
941         /**
942          * Get the block IDs and Nodes for the given file name (src).
943          */

944         public Object JavaDoc[] getBlocksNodes(UTF8 src) throws IOException {
945             FSParam p = new FSParam(OP_CLIENT_OPEN, src);
946             FSResults r = (FSResults) call(p, namenode);
947             if (! r.success()) {
948                 throw new IOException("Could not open file " + src);
949             } else {
950                 Block blocks[] = (Block[]) ((ArrayWritable) r.first).toArray();
951                 DatanodeInfo nodes[][] = (DatanodeInfo[][]) ((TwoDArrayWritable) r.second).toArray();
952                 Object JavaDoc results[] = new Object JavaDoc[2];
953                 results[0] = blocks;
954                 results[1] = nodes;
955                 return results;
956             }
957         }
958         
959         /**
960          * Rename details are kept within the NameNodeCaller.
961          * This causes an extra level of indirection which might be too costly.
962          */

963         public boolean rename(UTF8 src, UTF8 dst) throws IOException{
964             FSParam p = new FSParam(OP_CLIENT_RENAMETO, src, dst);
965             FSResults r = (FSResults) call(p, namenode);
966             return r.success();
967         }
968
969         /**
970          * Delete details are kept within the NameNodeCaller.
971          * This causes an extra level of indirection which might be too costly.
972          */

973         public boolean delete(UTF8 src) throws IOException {
974             FSParam p = new FSParam(OP_CLIENT_DELETE, src);
975             FSResults r = (FSResults) call(p, namenode);
976             return r.success();
977         }
978
979         /**
980          * Checks to see if the given path exists (as dir or file)
981          */

982         public boolean exists(UTF8 src) throws IOException {
983             FSParam p = new FSParam(OP_CLIENT_EXISTS, src);
984             FSResults r = (FSResults) call(p, namenode);
985             return r.success();
986         }
987
988         /**
989          * Checks to see if the given path is a dir.
990          */

991         public boolean isDirectory(UTF8 src) throws IOException {
992             FSParam p = new FSParam(OP_CLIENT_ISDIR, src);
993             FSResults r = (FSResults) call(p, namenode);
994             return r.success();
995         }
996
997         /**
998          */

999         public boolean mkdirs(UTF8 src) throws IOException {
1000            FSParam p = new FSParam(OP_CLIENT_MKDIRS, src);
1001            FSResults r = (FSResults) call(p, namenode);
1002            return r.success();
1003        }
1004
1005        /**
1006         * We try to obtain a lock (ex or not, as described). We
1007         * block until successful.
1008         */

1009        public void lock(UTF8 src, boolean exclusive) throws IOException {
1010            long start = System.currentTimeMillis();
1011            boolean complete = false;
1012
1013            while (! complete) {
1014                UTF8 nameParams[] = new UTF8[2];
1015                nameParams[0] = src;
1016                nameParams[1] = clientName;
1017                FSParam p = new FSParam(OP_CLIENT_OBTAINLOCK, new ArrayWritable(UTF8.class, nameParams), new BooleanWritable(exclusive));
1018                FSResults r = (FSResults) call(p, namenode);
1019                if (! r.success()) {
1020                    throw new IOException("Could not obtain lock " + src);
1021                } else if (r.tryagain()) {
1022                    try {
1023                        Thread.sleep(400);
1024                        if (System.currentTimeMillis() - start > 5000) {
1025                            LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms.");
1026                            Thread.sleep(2000);
1027                        }
1028                    } catch (InterruptedException JavaDoc ie) {
1029                    }
1030                } else {
1031                    complete = true;
1032                }
1033            }
1034        }
1035
1036        /**
1037         * Release the given lock
1038         */

1039        public void release(UTF8 src) throws IOException {
1040            boolean complete = false;
1041            while (! complete) {
1042                UTF8 nameParams[] = new UTF8[2];
1043                nameParams[0] = src;
1044                nameParams[1] = clientName;
1045                FSParam p = new FSParam(OP_CLIENT_RELEASELOCK, new ArrayWritable(UTF8.class, nameParams));
1046                FSResults r = (FSResults) call(p, namenode);
1047                if (! r.success()) {
1048                    throw new IOException("Could not release lock " + src);
1049                } else if (r.tryagain()) {
1050                    LOG.info("Could not release. Retrying...");
1051                    try {
1052                        Thread.sleep(2000);
1053                    } catch (InterruptedException JavaDoc ie) {
1054                    }
1055                } else {
1056                    complete = true;
1057                }
1058            }
1059        }
1060
1061        /**
1062         * List all the files at the given path
1063         */

1064        public NDFSFileInfo[] listing(UTF8 src) throws IOException {
1065            FSParam p = new FSParam(OP_CLIENT_LISTING, src);
1066            FSResults r = (FSResults) call(p, namenode);
1067            if (r.success()) {
1068                return (NDFSFileInfo[]) ((ArrayWritable) r.first).toArray();
1069            } else {
1070                return null;
1071            }
1072        }
1073
1074        /**
1075         * Report on raw bytes
1076         */

1077        public long[] rawReport() throws IOException {
1078            long results[] = null;
1079            FSParam p = new FSParam(OP_CLIENT_RAWSTATS);
1080            FSResults r = (FSResults) call(p, namenode);
1081            if (r.success()) {
1082                LongWritable report[] = (LongWritable[]) ((ArrayWritable) r.first).toArray();
1083                results = new long[report.length];
1084                for (int i = 0; i < report.length; i++) {
1085                    results[i] = report[i].get();
1086                }
1087            }
1088            return results;
1089        }
1090
1091        /**
1092         */

1093        public DatanodeInfo[] datanodeReport() throws IOException {
1094            FSParam p = new FSParam(OP_CLIENT_DATANODEREPORT);
1095            FSResults r = (FSResults) call(p, namenode);
1096            if (r.success()) {
1097                return (DatanodeInfo[]) ((ArrayWritable) r.first).toArray();
1098            } else {
1099                return null;
1100            }
1101        }
1102
1103        /**
1104         * Contacts the namenode repeatedly until file is wholly
1105         * committed. Blocks until that time.
1106         */

1107        public void completeFile(UTF8 src) throws IOException {
1108            long start = System.currentTimeMillis();
1109            boolean fileComplete = false;
1110            UTF8 nameParams[] = new UTF8[2];
1111            nameParams[0] = src;
1112            nameParams[1] = clientName;
1113            
1114            while (! fileComplete) {
1115                FSParam p = new FSParam(OP_CLIENT_COMPLETEFILE, new ArrayWritable(UTF8.class, nameParams));
1116                FSResults r = (FSResults) call(p, namenode);
1117                if (! r.success()) {
1118                    throw new IOException("Could not complete file " + src);
1119                } else if (r.tryagain()) {
1120                    try {
1121                        Thread.sleep(400);
1122                        if (System.currentTimeMillis() - start > 5000) {
1123                            LOG.info("Could not complete file, retrying...");
1124                        }
1125                    } catch (InterruptedException JavaDoc ie) {
1126                    }
1127                } else {
1128                    fileComplete = true;
1129                }
1130            }
1131        }
1132    }
1133}
1134
Popular Tags